文章目錄
- 1.順序消息的全流程
- 1.1 發送階段:消息分區
- 1.2.存儲階段:順序寫入
- 1.3.消費階段:串行消費
- 2.第三把鎖有什么用?
- 3.順序消費存在的問題
和Kafka只支持同一個Partition內消息的順序性一樣,RocketMQ中也提供了基于隊列(分區)的順序消費。即同個隊列內的消息可以做到有序,但是不同隊列內的消息是無序的!
在RocketMq中,它的順序消息通過客戶端的三把鎖以及同一隊列順序寫入協同工作,確保消息從發送、存儲到消費的全流程嚴格有序。其核心在于:
- 分區一致性(同一業務邏輯的消息發送到同一個分區)
- 存儲順序性(單線程順序寫入)
- 消費串行化(單線程消費分區)
這種設計在需要嚴格順序性的場景(如金融交易、訂單處理)中非常關鍵。
RocketMQ 的順序消息需要從 發送、存儲、消費 三個環節嚴格控制順序性,因此引入了三把鎖:
鎖類型 | 作用階段 | 實現方式 | 目標 |
---|---|---|---|
發送端 | 消息發送階段, 固定 MessageQueue,確保同一業務邏輯的消息發送到同一個 MessageQueue,為后續的存儲和消費順序性奠定基礎 | MessageQueueSelector | 消息分區一致性 |
Broker | 消息存儲階段, 確保同一 MessageQueue的消息按順序寫入磁盤 | CommmitLog順序寫機制+ConsumeQueue 分區索引,每個 MessageQueue 對應一個 ConsumeQueue 文件,記錄消息在 CommitLog 中的位置(偏移量),確保同一 MessageQueue 的消息在 ConsumeQueue 中的順序性 | 消息存儲順序性 |
消費鎖 | 消息消費階段,通過三把鎖確保同一 MessageQueue的消息單線程串行消費 | 分布式鎖 + 本地鎖 + ProcessQueue 鎖 分布式鎖:消費者向 Broker 申請分布式鎖(默認每 20 秒續簽),確保同一消費組內只有一個消費者能消費該 MessageQueue 本地鎖:通過 MessageQueueLock 的 Synchronized 鎖,確保同一消費者線程池中只有一個線程處理該 MessageQueue ProcessQueue 鎖:通過 ProcessQueue 的 ReentrantLock(consumeLock),防止消費過程中因負載均衡或重平衡導致 ProcessQueue 被刪除 通過這三個組合確保同一 MessageQueue 的消息由單線程串行消費,避免多線程并發導致順序錯亂 | 消息消費順序性 |
1.順序消息的全流程
1.1 發送階段:消息分區
生產者通過 MessageQueueSelector(如 ShardingKeySelector)將 同一業務邏輯的消息(如同一訂單 ID)發送到 同一個 MessageQueue。
當我們作為MQ的生產者需要發送順序消息時,需要在Send方法中,傳入一個MessageQueueSelector,
MessageQueueSelector中需要實現一個select方法,這個方法就是用來定義要把消息發送到哪個MessageQueue的,通常可以使用取模法進行路由:
public void send() {SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;//根據參數,計算出一個要接收消息的MessageQueue的下標int index = id % mqs.size();//返回這個MessageQueuereturn mqs.get(index);}}, orderId);}
通過以上形式就可以將需要有序的消息發送到同一個隊列中。需要注意的是,這里需要使用同步發送的方式!
1.2.存儲階段:順序寫入
Broker 接收到消息后,根據 MessageQueue 的物理分區(CommitLog + ConsumeQueue)進行 順序寫入。
同一 MessageQueue 的消息在物理文件中 RocketMQ 通過單線程寫入 CommitLog 保證順序性,按順序追加寫入,保證存儲順序性。
消息按照順序發送的消息隊列中之后,那么,消費者如何按照發送順序進行消費呢?
1.3.消費階段:串行消費
RocketMQ的MessageListener回調函數提供了兩種消費模式:
- 有序消費模式MessageListenerOrderly
- 并發消費模式MessageListenerConcurrently。
所以,想要實現順序消費,需要使用MessageListenerOrderly模式接收消息:
consumer.registerMessagelistener(new MessagelistenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {System.out.printf("Receive order msg:" + new String(msgs.get(0) .getBody( )));return ConsumeOrderlyStatus.SUCCESS;}}, new ConsumeOrderlyContext());
當我們用以上方式注冊一個消費之后,為了保證同一個隊列中的有序消息可以被順序消費,就要保證RocketMQ Broker只會把消息發送到同一個消費者上,這時候就需要加鎖了。
在實現中,ConsumeMessageOrderlyService 初始化的時候,會啟動一個定時任務,會嘗試向 Broker 為當前消費者客戶端申請分布式鎖(第一把鎖)。如果獲取成功,那么后續消息將會只發給這個Consumer。
接下來在消息拉取的過程中,消費者會一次性拉取多條消息的,并且會將拉取到的消息放入 ProcessQueue,同時將消息提交到消費線程池進行執行。
那么拉取之后的消費過程,怎么保證順序消費呢?這里就需要更多的鎖了
RocketMQ在消費的過程中,需要申請 MessageQueue 鎖,消費線程通過 MessageQueueLock 的 synchronized 鎖(第二把鎖)住當前 MessageQueue 的消費過程,確保在同一時間,一個隊列中只有一個線程能處理列中的消息。
獲取到 MessageQueue 的鎖后,就可以從ProcessQueue中依次拉取一批消息處理了,但是這個過程中,為了保證消息不會出現重復消費,還需要對ProcessQueue進行加鎖,通過 ProcessQueue 的 ReentrantLock。然后就可以開始處理業務邏輯了
總結下來就是三次加鎖:
- 首先鎖定Broker上的MessageQueue,確保消息只會投遞到唯一的消費者(分布式鎖)
- 然后對本地的MessageQueue加鎖,確保只有一個線程能處理這個消息隊列。(synchronized)
- 最后對存儲消息的ProcessQueue加鎖,確保在重平衡的過程中不會出現消息的重復消費。(ReentrantLock)
里面有幾個點需要大家注意下:
2.第三把鎖有什么用?
前面介紹客戶端加鎖過程中,一共加了三把鎖,那么,有沒有想過這樣一個問題,第三把鎖如果不加的話,是不是也沒問題?
因為我們已經對MessageQueue加鎖了,為啥還需要對ProcessQueue再次加鎖呢?
這里其實主要考慮的是重平衡(Rebalance)的問題
當我們的消費者集群,新增了一些消費者,發生重平衡的時候,某個隊列可能會原來屬于客戶端A消費的,但是現在要重新分配給客戶端B了。
這時候客戶端A就需要把自己加在Broker上的鎖解掉,而在這個解鎖的過程中,就需要確保消息不能在消費過程中就被移除了,因為如果客戶端A可能正在處理一部分消息,但是位點信息還沒有提交,如果客戶端B立馬去消費隊列中的消息,那存在一部分數據會被重復消費
那么如何判斷消息是否正在消費中呢,就需要通過這個ProcessQueue上面的鎖來判斷了,也就是說在解鎖的線程也需要嘗試對ProcessQueue進行加鎖,加鎖成功才能進行解鎖操作。以避免過程中有消息消費。
3.順序消費存在的問題
通過上面的介紹,我們知道了RocketMQ的順序消費是通過在消費者上多次加鎖實現的,這種方式帶來的問題就是會降低吞吐量,并且如果前面的消息阻寨,會導致更多消息阻塞。所以,順序消息需要慎用。