RocketMQ學習系列之——客戶端消息確認機制

一、客戶端使用MQ基本代碼示例

1、添加maven依賴

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.3.0</version>
</dependency>

2、生產者代碼示例

public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {//初始化一個消息生產者DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 指定nameserver地址producer.setNamesrvAddr("192.168.65.112:9876");// 啟動消息生產者服務producer.start();for (int i = 0; i < 2; i++) {try {// 創建消息。消息由Topic,Tag和body三個屬性組成,其中Body就是消息內容Message msg = new Message("TopicTest","TagA",("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));//發送消息,獲取發送結果SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}//消息發送完后,停止消息生產者服務。producer.shutdown();}
}

3、消費者代碼示例

public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {//構建一個消息消費者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");//指定nameserver地址consumer.setNamesrvAddr("192.168.65.112:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// 訂閱一個感興趣的話題,這個話題需要與消息的topic一致consumer.subscribe("TopicTest", "*");// 注冊一個消息回調函數,消費到消息后就會觸發回調。consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {msgs.forEach(messageExt -> {try {System.out.println("收到消息:"+new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {}});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//啟動消費者服務consumer.start();System.out.print("Consumer Started");}
}

4、代碼邏輯解讀

生產者:

1. 創建消息生產者producer,并指定生產者組名
2. 指定Nameserver地址
3. 啟動producer。可以認為這是消息生產者與服務端建立連接的過程。
4. 創建消息對象,指定Topic、Tag和消息體
5. 發送消息
6. 關閉生產者producer,釋放資源。

消費者:

1. 創建消費者Consumer,必須指定消費者組名
2. 指定Nameserver地址
3. 訂閱主題Topic和Tag
4. 設置回調函數,處理消息
5. 啟動消費者consumer。消費者會一直掛起,持續處理消息。

二、消息確認機制

1、生產者確認機制

? ? ? ? 生產者發送消息的方式有三種:

(1)單向發送:消息生產者只管往Broker發送消息,而全然不關心Broker端有沒有成功接收到消息。

public class OnewayProducer {public static void main(String[] args)throws Exception{DefaultMQProducer producer = new DefaultMQProducer("producerGroup");producer.start();Message message = new Message("Order","tag","order info : orderId = xxx".getBytes(StandardCharsets.UTF_8));producer.sendOneway(message);Thread.sleep(50000);producer.shutdown();}
}

????????sendOneway方法沒有返回值,如果發送失敗,生產者無法補救。

????????單向發送有一個好處,就是發送消息的效率更高。適用于一些追求消息發送效率,而允許消息丟失的業務場景。比如日志。

(2)同步發送:消息生產者在往Broker端發送消息后,會阻塞當前線程,等待Broker端的相應結果。

 SendResult sendResult = producer.send(msg);

????????SendResult來自于Broker的反饋。producer在send發出消息,到Broker返回SendResult的過程中,無法做其他的事情。在SendResult中有一個SendStatus屬性,這個SendStatus是一個枚舉類型,其中包含了Broker端的各種情況。

public enum SendStatus {SEND_OK,FLUSH_DISK_TIMEOUT,FLUSH_SLAVE_TIMEOUT,SLAVE_NOT_AVAILABLE,
}

????????在這幾種枚舉值中,SEND_OK表示消息已經成功發送到Broker上。至于其他幾種枚舉值,都是表示消息在Broker端處理失敗了。使用同步發送的機制,我們就可以在消息生產者發送完消息后,對發送失敗的消息進行補救。例如重新發送。

????????但是此時要注意,如果Broker端返回的SendStatus不是SEND_OK,也并不表示消息就一定不會推送給下游的消費者。僅僅只是表示Broker端并沒有完全正確的處理這些消息。因此,如果要重新發送消息,最好要帶上唯一的系統標識,這樣在消費者端,才能自行做冪等判斷。也就是用具有業務含義的OrderID這樣的字段來判斷消息有沒有被重復處理。

????????這種同步發送的機制能夠很大程度上保證消息發送的安全性。但是,這種同步發送機制的發送效率比較低。畢竟,send方法需要消息在生產者和Broker之間傳輸一個來回后才能結束。如果網速比較慢,同步發送的耗時就會很長。

(3)異步發送:生產者在向Broker發送消息時,會同時注冊一個回調函數。接下來生產者并不等待Broker的響應。當Broker端有響應數據過來時,自動觸發回調函數進行對應的處理。

	producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});

????????在SendCallback接口中有兩個方法,onSuccess和onException。當Broker端返回消息處理成功的響應信息SendResult時,就會調用onSuccess方法。當Broker端處理消息超時或者失敗時,就會調用onExcetion方法,生產者就可以在onException方法中進行補救措施。

????????此時同樣有幾個問題需要注意。一是與同步發送機制類似,觸發了SendCallback的onException方法同樣并不一定就表示消息不會向消費者推送,例如:如果Broker端返回響應信息太慢,超過了超時時間,也會觸發onException方法。二是在SendCallback的對應方法被觸發之前,生產者不能調用shutdown()方法。如果消息處理完之前,生產者線程就關閉了,生產者的SendCallback對應方法就不會觸發。這是因為使用異步發送機制后,生產者雖然不用阻塞下來等待Broker端響應,但是SendCallback還是需要附屬于生產者的主線程才能執行。

2、消費者確認機制

? ? ? ? 消費者收到消息后,向 Broker 響應消息來進行確認。

consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});

????????這個返回值是一個枚舉值,有兩個選項 CONSUME_SUCCESS和RECONSUME_LATER。如果消費者返回CONSUME_SUCCESS,那么消息自然就處理結束了。但是如果消費者沒有處理成功,返回的是RECONSUME_LATER,Broker就會過一段時間再發起消息重試。

????????為了兼顧重試機制的成功率和性能,RocketMQ設計了一套非常完善的消息重試機制:

(1)失敗重試

? ? ? ? 當消費者沒能正常消費消息時,Broker 會進行消息重試,也就是將消息移到重試Topic中。

????????為了讓這些重試的消息不會影響Topic下其他正常的消息,Broker會給每個消費者組設計對應的重試Topic,名稱為 %RETRY%+ConsumeGroup。這是因為,MessageQueue是一個具有嚴格FIFO特性的數據結構,如果需要重試的這些消息還是放在原來的MessageQueue中,就會對當前MessageQueue產生阻塞,讓其他正常的消息無法處理。

????????RocketMQ默認的最大重試次數是16次。重試的間隔時間如下圖所示,是延遲消息的后16個延遲級別。

????????如果我們修改了重試次數為20次,那么超過16次后每次重試間隔為2小時。同一個消費者組中,如果多個消費者都設置了重試次數,那么后設置的會覆蓋先設置的。

(2)死信隊列

????????Broker不可能無限制的向消費失敗的消費者推送消息,當超過最大重試次數后,消息會移到死信隊列,它相當于windows當中的回收站。我們可以人工介入對死信隊列中的消息進行補救,也可以直接徹底刪除這些消息。

????????死信隊列的 topic 名稱為?%DLQ%+ConsumGroup,一個消費者組只有一個死信隊列,且只有死信消息產生時,才會生成死信隊列。

? ? ? ? 當我們對死信隊列中的消息進行補救時,通常會創建一個新的消費者組獲取死信隊列中的消息,對消息內容進行修正后,重新發送到正常的 topic 中。

? ? ? ? 需要注意的是,死信隊列被創建出來后,它的權限 perm 被設置為 2(2:禁讀,4:禁寫,6:可讀可寫),所以它里面的消息是無法讀取的。在補救前,需要將死信隊列的權限修改為 6。

????????死信隊列的有效期跟正常消息相同,默認3天,對應broker.conf中的fileReservedTime屬性。超過這個最長時間的消息都會被刪除,而不管消息是否消費過。

(3)盡量保證同一消費者組具有相同的邏輯

????????RocketMQ中設定的消費者組都是訂閱主題和消費邏輯相同的服務備份,所以當消息重試時,Broker會往消費者組中任意一個實例推送。因此,我們在編碼時,盡量要保證一個消費者組處理業務的邏輯相同。

(4)消費邏輯盡量避免異步

????????Broker端最終只通過消費者組返回的狀態來確定消息有沒有處理成功。至于消費者組自己的業務執行是否正常,Broker端是沒有辦法知道的。因此,在實現消費者的業務邏輯時,應該要盡量使用同步實現方式,保證在自己業務處理完成之后再向Broker端返回狀態。

3、冪等性保證

? ? ? ? 在 MQ 系統中,冪等性有三種實現語義:

1、at most once:每條消息最多被消費一次。對于 at most once,生產者使用 sendOneWay 發送消息即可。

2、at least once:每條消息至少被消費一次。對于 at least once,利用生產者和消費者的消息確認機制,即可確保消息成功發送和接收。

3、exactly once:每條消息正好被消費一次。對于 exactly once,難以通過 MQ 本身直接實現。通常的方法是利用消息確認機制確保 at least once,再通過對消息設置業務主鍵進行消息去重來確保 at most once,兩者組合實現 exactly once。

? ? ? ? 如何使用消息的業務主鍵去重呢?

????????當消息發送到 RocketMQ 時,RocketMQ 會為消息生成唯一的 msgId,該 msgId 在消息重復生產和消費時都不會發生改變,通常可用于區分每條消息。但這個 msgId 并不能完全確保全局唯一,在對冪等性要求嚴格的場景,可以在發送消息時設置全局唯一的 message key,并在獲取消息時根據 message key 來去重(MQ 會對 message key 進行索引,我們除了可以使用 message key 保證冪等性,還能用它來快速查找消息)。

? ? ? ? 什么時候會出現消息重復呢?

1、發送時重復:生產者客戶端已成功發送消息且消息已在服務端持久化,但由于網絡阻塞或客戶端宕機,導致服務端向客戶端應答失敗。故障恢復后,客戶端由于未收到應答,會認為消息發送失敗而重新發送,服務端就會存在兩條內容相同并且 msgId 也相同的消息。

2、接收時重復:消費者客戶端已成功收到消息并完成業務處理,但由于網絡阻塞或客戶端宕機,導致客戶端向服務端應答失敗。故障恢復后,服務端由于未收到應帶,會認為消息投遞失敗而重新投遞,客戶端就會收到兩條內容相同并且 msgId 也相同的消息。

3、Rebalance 導致重復:當 Broker 或消費者出現重啟、擴容和縮容時,會觸發 Rebalance,此時可能導致消息重復。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/91216.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/91216.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/91216.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

[leetcode] 組合總和

39. 組合總和 - 力扣&#xff08;LeetCode&#xff09; i class Solution {int aim;vector<vector<int>> ret;vector<int> path; public:vector<vector<int>> combinationSum(vector<int>& nums, int target) {aim target;dfs(nums…

新能源行業B端極簡設計:碳中和目標下的交互輕量化實踐

新能源行業B端極簡設計&#xff1a;碳中和目標下的交互輕量化實踐內容摘要在新能源行業&#xff0c;碳中和目標正推動著企業追求更高的運營效率和更低的資源消耗。然而&#xff0c;傳統的B端交互設計往往復雜繁瑣&#xff0c;不僅增加了用戶的操作成本&#xff0c;還可能導致資…

減速機:自動化生產線的“精密傳動心臟”

減速機作為自動化生產線的核心傳動部件&#xff0c;通過調節轉速與扭矩實現設備精準控制&#xff0c;其在自動化生產線中發揮著關鍵作用。以下是其具體應用方式&#xff1a;輸送線驅動在自動化生產線中&#xff0c;輸送線用于運輸物料、半成品或成品&#xff0c;通過減速機可以…

從0到1學PHP(五):PHP 數組:高效存儲與處理數據

目錄一、數組的定義與分類1.1 索引數組1.2 關聯數組1.3 多維數組二、數組的基本操作2.1 數組元素的添加、刪除、修改和訪問2.2 數組指針的操作三、數組處理函數3.1 數組排序函數3.2 數組統計函數3.3 數組過濾與轉換函數一、數組的定義與分類 在 PHP 中&#xff0c;數組是一種非…

vscode 字體的跟換

打開vscode 左下角輸入電腦中已經有的字體&#xff1a;有想要用的可以自己進行安裝刷新這樣就可改變了

墨者:SQL過濾字符后手工注入漏洞測試(第3題)

1. 墨者學院&#xff1a;SQL過濾字符后手工注入漏洞測試(第3題)&#x1f680; 因為練習過太多的sql注入&#xff0c;廢話不多介紹&#xff0c;我會通過圍繞手動注入和工具爆破的方式達到靶場目標&#xff0c;開練&#xff01;&#xff01;&#xff01; 2. 手工注入方式&#x1…

【Spring AI實戰】實現仿DeepSeek頁面對話機器人(支持多模態上傳)

一、前言 二、實現效果 三、代碼實現 3.1 后端代碼 3.2 前端代碼 一、前言 Spring AI詳解&#xff1a;【Spring AI詳解】開啟Java生態的智能應用開發新時代(附不同功能的Spring AI實戰項目)-CSDN博客 二、實現效果 可上傳圖片或音頻數據給大模型分析 三、代碼實現 3.1 后…

Vue 正在熱映模塊

Vue 漸進式JavaScript 框架 基于Vue2的移動端項目&#xff1a;正在熱映模塊 目錄 正在熱映 數據修改 導入axios 配置反向代理 正在熱映渲染 賦值數據 渲染列表 顯示圖片 優化列表 設置列表樣式 主演 定義過濾器 使用過濾器 主演過長處理 無主演情況處理 觀眾評…

阿里云上進行k8s集群的配置

在阿里云容器服務Kubernetes&#xff08;ACK&#xff09;中配置集群的核心步驟可分為以下六大關鍵環節&#xff0c;涵蓋架構設計到運維管理&#xff1a;1. 集群規劃與基礎配置 集群類型選擇 托管版&#xff1a;Master節點由阿里云托管&#xff08;推薦生產環境&#xff09;專有…

頁面性能優化

優化點解決方案效果雙向綁定數量過多競對設置單元格內部涉及雙向綁定的輸入組件過多&#xff0c;線上頁面最多有88個該和抽屜中的編輯表格一樣的組件&#xff0c;共計930個&#xff08;按每行最少6個來計算的&#xff09;雙向綁定的組件&#xff0c;嚴重拖累頁面性能。數據計算…

詳細說明零拷貝

詳細說明零拷貝【一】零拷貝介紹【1】說明【2】為什么需要零拷貝&#xff1f;—— 傳統數據傳輸的問題【3】零拷貝的核心優化【4】零拷貝的實現方式&#xff08;1&#xff09;mmap&#xff08;內存映射&#xff09;&#xff08;2&#xff09;sendfile&#xff08;Linux 系統調用…

docker部署自己寫的c++http服務器教程

我用的是ubuntu 22.04環境下 qt c 寫的應用程序&#xff0c;是終端程序&#xff0c;不是界面&#xff0c;然后用linuxdeployqt工具將其打包成了AppImage可執行文件&#xff0c;以上是部署前的準備工作&#xff0c;需要確保AppImage可執行文件在自己的ubuntu上可以運行才能執行以…

Caffeine 緩存庫的常用功能使用介紹

&#x1f9d1; 博主簡介&#xff1a;CSDN博客專家&#xff0c;歷代文學網&#xff08;PC端可以訪問&#xff1a;https://literature.sinhy.com/#/?__c1000&#xff0c;移動端可微信小程序搜索“歷代文學”&#xff09;總架構師&#xff0c;15年工作經驗&#xff0c;精通Java編…

C# _列表(List<T>)_ 字典(Dictionary<TKey, TValue>)

目錄 列表&#xff08;List&#xff09;特點 創建列表 RemoveAll 刪除與之條件相匹配的數據 會返回刪除的個數 Capacity 獲取或設置列表的容量 更多方法可參照上篇文章&#xff1a;C#_ArrayList動態數組 字典&#xff08;Dictionary&#xff09;特點 定義一個字典 向字…

【實時Linux實戰系列】實時網絡控制與調度

在實時控制系統中&#xff0c;網絡調度是確保實時數據流傳輸和處理不受延遲影響的關鍵。實時網絡控制與調度技術對于工業自動化、金融交易、多媒體流等領域至關重要。通過合理設計網絡調度策略&#xff0c;可以顯著提高系統的實時性和可靠性。本文將介紹如何在實時控制系統中實…

Qwen3-Coder:介紹及使用 -- 超強AI編程助手

更多內容&#xff1a;XiaoJ的知識星球 目錄一、Qwen3-Coder模型介紹1.預訓練階段&#xff08;Pre-Training&#xff09;2.后訓練階段&#xff08;Post-Training&#xff09;1&#xff09;Scaling Code RL: Hard to Solve, Easy to Verify2&#xff09;Scaling Long-Horizon RL二…

uniapp 如果進入頁面輸入框自動聚焦,此時快速返回頁面或者跳轉到下一個頁面,輸入法頂上來的頁面出現半屏的黑屏問題。

如果進入頁面輸入框自動聚焦&#xff0c;此時快速返回頁面或者跳轉到下一個頁面&#xff0c;輸入法頂上來的頁面出現半屏的黑屏問題。輸入法出來后&#xff0c;設置了自動將頁面頂上來的配置&#xff1a;pages.json"softinputMode": "adjustResize""g…

深入了解 Kubernetes(k8s):從概念到實踐

目錄 一、k8s 核心概念 二、k8s 的優勢 三、k8s 架構組件 控制平面組件 節點組件 四、k8s docker 運行前后端分離項目的例子 1. 準備前端項目 2. 準備后端項目 3. 創建 k8s 部署配置文件 4. 部署應用到 k8s 集群 在當今云計算和容器化技術飛速發展的時代&#xff0c…

Android User版本默認用test-keys,如何改用release-keys

Android User版本 默認用test-keys&#xff0c; 如何改用release-keys 開發云 - 一站式云服務平臺 --- build/core/Makefile | 5 1 file changed, 5 insertions() diff --git a/build/core/Makefile b/build/core/Makefile index --- a/build/core/Makefile b/build/core…

從零開始學習Dify-數據庫數據可視化(五)

概述上一篇文章我們圍繞 Excel 文件展開數據可視化教學&#xff0c;逐步掌握了數據導入、圖表構建和 AI 智能分析。在實際業務環境中&#xff0c;很多數據并不是保存在表格中&#xff0c;而是存儲于數據庫系統中&#xff0c;尤其是最常見的 MySQL。本篇作為本系列的第五篇&…