一、客戶端使用MQ基本代碼示例
1、添加maven依賴
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.3.0</version>
</dependency>
2、生產者代碼示例
public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {//初始化一個消息生產者DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 指定nameserver地址producer.setNamesrvAddr("192.168.65.112:9876");// 啟動消息生產者服務producer.start();for (int i = 0; i < 2; i++) {try {// 創建消息。消息由Topic,Tag和body三個屬性組成,其中Body就是消息內容Message msg = new Message("TopicTest","TagA",("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));//發送消息,獲取發送結果SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}//消息發送完后,停止消息生產者服務。producer.shutdown();}
}
3、消費者代碼示例
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {//構建一個消息消費者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");//指定nameserver地址consumer.setNamesrvAddr("192.168.65.112:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// 訂閱一個感興趣的話題,這個話題需要與消息的topic一致consumer.subscribe("TopicTest", "*");// 注冊一個消息回調函數,消費到消息后就會觸發回調。consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {msgs.forEach(messageExt -> {try {System.out.println("收到消息:"+new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {}});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//啟動消費者服務consumer.start();System.out.print("Consumer Started");}
}
4、代碼邏輯解讀
生產者:
1. 創建消息生產者producer,并指定生產者組名
2. 指定Nameserver地址
3. 啟動producer。可以認為這是消息生產者與服務端建立連接的過程。
4. 創建消息對象,指定Topic、Tag和消息體
5. 發送消息
6. 關閉生產者producer,釋放資源。
消費者:
1. 創建消費者Consumer,必須指定消費者組名
2. 指定Nameserver地址
3. 訂閱主題Topic和Tag
4. 設置回調函數,處理消息
5. 啟動消費者consumer。消費者會一直掛起,持續處理消息。
二、消息確認機制
1、生產者確認機制
? ? ? ? 生產者發送消息的方式有三種:
(1)單向發送:消息生產者只管往Broker發送消息,而全然不關心Broker端有沒有成功接收到消息。
public class OnewayProducer {public static void main(String[] args)throws Exception{DefaultMQProducer producer = new DefaultMQProducer("producerGroup");producer.start();Message message = new Message("Order","tag","order info : orderId = xxx".getBytes(StandardCharsets.UTF_8));producer.sendOneway(message);Thread.sleep(50000);producer.shutdown();}
}
????????sendOneway方法沒有返回值,如果發送失敗,生產者無法補救。
????????單向發送有一個好處,就是發送消息的效率更高。適用于一些追求消息發送效率,而允許消息丟失的業務場景。比如日志。
(2)同步發送:消息生產者在往Broker端發送消息后,會阻塞當前線程,等待Broker端的相應結果。
SendResult sendResult = producer.send(msg);
????????SendResult來自于Broker的反饋。producer在send發出消息,到Broker返回SendResult的過程中,無法做其他的事情。在SendResult中有一個SendStatus屬性,這個SendStatus是一個枚舉類型,其中包含了Broker端的各種情況。
public enum SendStatus {SEND_OK,FLUSH_DISK_TIMEOUT,FLUSH_SLAVE_TIMEOUT,SLAVE_NOT_AVAILABLE,
}
????????在這幾種枚舉值中,SEND_OK表示消息已經成功發送到Broker上。至于其他幾種枚舉值,都是表示消息在Broker端處理失敗了。使用同步發送的機制,我們就可以在消息生產者發送完消息后,對發送失敗的消息進行補救。例如重新發送。
????????但是此時要注意,如果Broker端返回的SendStatus不是SEND_OK,也并不表示消息就一定不會推送給下游的消費者。僅僅只是表示Broker端并沒有完全正確的處理這些消息。因此,如果要重新發送消息,最好要帶上唯一的系統標識,這樣在消費者端,才能自行做冪等判斷。也就是用具有業務含義的OrderID這樣的字段來判斷消息有沒有被重復處理。
????????這種同步發送的機制能夠很大程度上保證消息發送的安全性。但是,這種同步發送機制的發送效率比較低。畢竟,send方法需要消息在生產者和Broker之間傳輸一個來回后才能結束。如果網速比較慢,同步發送的耗時就會很長。
(3)異步發送:生產者在向Broker發送消息時,會同時注冊一個回調函數。接下來生產者并不等待Broker的響應。當Broker端有響應數據過來時,自動觸發回調函數進行對應的處理。
producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});
????????在SendCallback接口中有兩個方法,onSuccess和onException。當Broker端返回消息處理成功的響應信息SendResult時,就會調用onSuccess方法。當Broker端處理消息超時或者失敗時,就會調用onExcetion方法,生產者就可以在onException方法中進行補救措施。
????????此時同樣有幾個問題需要注意。一是與同步發送機制類似,觸發了SendCallback的onException方法同樣并不一定就表示消息不會向消費者推送,例如:如果Broker端返回響應信息太慢,超過了超時時間,也會觸發onException方法。二是在SendCallback的對應方法被觸發之前,生產者不能調用shutdown()方法。如果消息處理完之前,生產者線程就關閉了,生產者的SendCallback對應方法就不會觸發。這是因為使用異步發送機制后,生產者雖然不用阻塞下來等待Broker端響應,但是SendCallback還是需要附屬于生產者的主線程才能執行。
2、消費者確認機制
? ? ? ? 消費者收到消息后,向 Broker 響應消息來進行確認。
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});
????????這個返回值是一個枚舉值,有兩個選項 CONSUME_SUCCESS和RECONSUME_LATER。如果消費者返回CONSUME_SUCCESS,那么消息自然就處理結束了。但是如果消費者沒有處理成功,返回的是RECONSUME_LATER,Broker就會過一段時間再發起消息重試。
????????為了兼顧重試機制的成功率和性能,RocketMQ設計了一套非常完善的消息重試機制:
(1)失敗重試
? ? ? ? 當消費者沒能正常消費消息時,Broker 會進行消息重試,也就是將消息移到重試Topic中。
????????為了讓這些重試的消息不會影響Topic下其他正常的消息,Broker會給每個消費者組設計對應的重試Topic,名稱為 %RETRY%+ConsumeGroup。這是因為,MessageQueue是一個具有嚴格FIFO特性的數據結構,如果需要重試的這些消息還是放在原來的MessageQueue中,就會對當前MessageQueue產生阻塞,讓其他正常的消息無法處理。
????????RocketMQ默認的最大重試次數是16次。重試的間隔時間如下圖所示,是延遲消息的后16個延遲級別。
????????如果我們修改了重試次數為20次,那么超過16次后每次重試間隔為2小時。同一個消費者組中,如果多個消費者都設置了重試次數,那么后設置的會覆蓋先設置的。
(2)死信隊列
????????Broker不可能無限制的向消費失敗的消費者推送消息,當超過最大重試次數后,消息會移到死信隊列,它相當于windows當中的回收站。我們可以人工介入對死信隊列中的消息進行補救,也可以直接徹底刪除這些消息。
????????死信隊列的 topic 名稱為?%DLQ%+ConsumGroup,一個消費者組只有一個死信隊列,且只有死信消息產生時,才會生成死信隊列。
? ? ? ? 當我們對死信隊列中的消息進行補救時,通常會創建一個新的消費者組獲取死信隊列中的消息,對消息內容進行修正后,重新發送到正常的 topic 中。
? ? ? ? 需要注意的是,死信隊列被創建出來后,它的權限 perm 被設置為 2(2:禁讀,4:禁寫,6:可讀可寫),所以它里面的消息是無法讀取的。在補救前,需要將死信隊列的權限修改為 6。
????????死信隊列的有效期跟正常消息相同,默認3天,對應broker.conf中的fileReservedTime屬性。超過這個最長時間的消息都會被刪除,而不管消息是否消費過。
(3)盡量保證同一消費者組具有相同的邏輯
????????RocketMQ中設定的消費者組都是訂閱主題和消費邏輯相同的服務備份,所以當消息重試時,Broker會往消費者組中任意一個實例推送。因此,我們在編碼時,盡量要保證一個消費者組處理業務的邏輯相同。
(4)消費邏輯盡量避免異步
????????Broker端最終只通過消費者組返回的狀態來確定消息有沒有處理成功。至于消費者組自己的業務執行是否正常,Broker端是沒有辦法知道的。因此,在實現消費者的業務邏輯時,應該要盡量使用同步實現方式,保證在自己業務處理完成之后再向Broker端返回狀態。
3、冪等性保證
? ? ? ? 在 MQ 系統中,冪等性有三種實現語義:
1、at most once:每條消息最多被消費一次。對于 at most once,生產者使用 sendOneWay 發送消息即可。
2、at least once:每條消息至少被消費一次。對于 at least once,利用生產者和消費者的消息確認機制,即可確保消息成功發送和接收。
3、exactly once:每條消息正好被消費一次。對于 exactly once,難以通過 MQ 本身直接實現。通常的方法是利用消息確認機制確保 at least once,再通過對消息設置業務主鍵進行消息去重來確保 at most once,兩者組合實現 exactly once。
? ? ? ? 如何使用消息的業務主鍵去重呢?
????????當消息發送到 RocketMQ 時,RocketMQ 會為消息生成唯一的 msgId,該 msgId 在消息重復生產和消費時都不會發生改變,通常可用于區分每條消息。但這個 msgId 并不能完全確保全局唯一,在對冪等性要求嚴格的場景,可以在發送消息時設置全局唯一的 message key,并在獲取消息時根據 message key 來去重(MQ 會對 message key 進行索引,我們除了可以使用 message key 保證冪等性,還能用它來快速查找消息)。
? ? ? ? 什么時候會出現消息重復呢?
1、發送時重復:生產者客戶端已成功發送消息且消息已在服務端持久化,但由于網絡阻塞或客戶端宕機,導致服務端向客戶端應答失敗。故障恢復后,客戶端由于未收到應答,會認為消息發送失敗而重新發送,服務端就會存在兩條內容相同并且 msgId 也相同的消息。
2、接收時重復:消費者客戶端已成功收到消息并完成業務處理,但由于網絡阻塞或客戶端宕機,導致客戶端向服務端應答失敗。故障恢復后,服務端由于未收到應帶,會認為消息投遞失敗而重新投遞,客戶端就會收到兩條內容相同并且 msgId 也相同的消息。
3、Rebalance 導致重復:當 Broker 或消費者出現重啟、擴容和縮容時,會觸發 Rebalance,此時可能導致消息重復。