在 RabbitMQ 的消費者代碼中,Channel 和 tag 參數的存在是為了實現消息確認機制(Acknowledgment)和精細化的消息控制。
Channel
參數
作用
Channel
是 AMQP 協議的核心操作接口,通過它可以直接與 RabbitMQ 交互:
- 手動消息確認:通過
basicAck
/basicNack
顯式告知 RabbitMQ 消息處理結果 - 流量控制:可調用
basicQos
限制預取消息數量(防止消費者過載) - 其他高級操作:如消息重發、隊列綁定等
如果不傳入
Channel
,Spring AMQP 會自動使用默認信道,但會失去對信道的直接控制權。
對應原理
生產者的 Channel
- 當生產者調用
rabbitTemplate.convertAndSend()
時:
rabbitTemplate.convertAndSend("doctor.queue", jsonMessage);
- Spring AMQP 內部會從 連接池 獲取一個
Channel
(信道)。 - 該
Channel
用于將消息發布到指定隊列。 - 發布后自動關閉(如果是非事務模式)或復用。
消費者的 Channel
- 消費者通過
@RabbitListener
監聽隊列時:
@RabbitListener(queues = "doctor.queue")
public void onMessage(String json, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {// 處理邏輯
}
- Spring AMQP 會為每個消費者線程分配一個 獨立的
Channel
。 - 所有消息的確認(ACK/NACK)必須通過 同一個 Channel 操作(否則會報錯)。
tag
參數
作用
@Header(AmqpHeaders.DELIVERY_TAG)
注入的 tag
是消息的唯一標識符:
- 消息指紋:每個投遞給消費者的消息都會獲得唯一的 delivery tag
- 冪等性設計:通過 tag 可以精確確認/拒絕特定消息
- 必須參數:調用
basicAck
/basicNack
時必須指定此 tag
Tag 的數值范圍僅在當前信道內唯一,不同信道的 tag 可能重復。
對應原理
Tag 的生成
- 當 RabbitMQ 將消息推送給消費者時:
- 服務端會為 每條消息 分配一個唯一的
Delivery Tag
(在當前 Channel 內遞增)。 - 例如:第一次推送的 Tag=1,第二次 Tag=2,…(不同 Channel 的 Tag 獨立計數)。
- 服務端會為 每條消息 分配一個唯一的
Tag 的作用
- 唯一標識消息:消費者通過 Tag 告訴 RabbitMQ 要確認/拒絕哪條消息。
channel.basicAck(tag, false); // 確認當前消息channel.basicNack(tag, false, true); // 拒絕并重新入隊
- 嚴格順序性:Tag 在同一個 Channel 內嚴格遞增,確保消息順序處理。
- RabbitMQ 服務端維護了一個 消息投遞狀態表,記錄每個 Channel 的 Tag 對應哪條消息。
- 當消費者發送 ACK/NACK 時,RabbitMQ 根據 Channel + Tag 組合定位到原始消息。
手動確認模式
手動確認模式的優點:
- 可靠性:只有處理成功的消息才會被確認(
basicAck
) - 錯誤恢復:處理失敗時通過
basicNack
讓消息重新入隊 - 業務控制:可以根據業務邏輯決定是否確認(如示例中的
shouldBeProcessed
判斷)
// 成功處理 - 確認刪除
channel.basicAck(tag, false); // 處理失敗 - 拒絕并重新入隊
channel.basicNack(tag, false, true);