RocketMq的消息類型及代碼案例

RocketMQ 提供了多種消息類型,以滿足不同業務場景對?順序性、事務性、時效性?的要求。其核心設計思想是通過解耦 “消息傳遞模式” 與 “業務邏輯”,實現高性能、高可靠的分布式通信。

一、主要類型包括

  1. 普通消息(基礎類型)
  2. 順序消息(保證消費順序)
  3. 定時 / 延遲消息(控制投遞時間)
  4. 事務消息(分布式事務最終一致性)
  5. 批量消息(提升吞吐量)

二、消息類型及代碼示例

1. 普通消息(Normal Message)

描述:最基礎的消息類型,支持異步發送、批量發送等模式,適用于無需嚴格順序和事務保證的場景(如日志收集、通知推送)。
核心優勢:高吞吐量、低延遲,生產端通過負載均衡自動選擇 Broker。

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;public class NormalMessageExample {public static void main(String[] args) throws ClientException, InterruptedException {// 1. 加載服務提供者(支持SPI擴展)ClientServiceProvider provider = ClientServiceProvider.loadService();// 2. 配置認證信息(密鑰管理)String accessKey = "yourAccessKey";String secretKey = "yourSecretKey";StaticSessionCredentialsProvider credentialsProvider =new StaticSessionCredentialsProvider(accessKey, secretKey);// 3. 構建客戶端配置(支持多協議、TLS加密)ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints("localhost:8081") // 支持域名或IP:端口列表.setCredentialProvider(credentialsProvider).setRequestTimeout(Duration.ofSeconds(3)) // 請求超時時間.build();// 4. 創建生產者(支持自動重試、批量發送)String topic = "normal-message-topic";Producer producer = provider.newProducerBuilder().setClientConfiguration(clientConfiguration).setTopics(topic).setMaxAttempts(3) // 發送失敗最大重試次數.build();// 5. 構建并發送消息Message message = provider.newMessageBuilder().setTopic(topic).setBody("Hello RocketMQ 5.0!".getBytes()).setTag("order") // 可選標簽,用于消息過濾.setKeys("key123") // 消息業務鍵,用于查詢.build();// 同步發送(阻塞當前線程直到返回結果)SendReceipt receipt = producer.send(message);System.out.println("消息發送成功: " + receipt.getMessageId());// 異步發送示例/*producer.sendAsync(message).thenAccept(sendReceipt -> {System.out.println("異步發送成功: " + sendReceipt.getMessageId());}).exceptionally(throwable -> {System.out.println("異步發送失敗: " + throwable.getMessage());return null;});*/// 6. 關閉資源(重要!避免內存泄漏)producer.close();}
}
2. 順序消息(Ordered Message)

描述:通過將同一業務主鍵的消息路由到相同隊列,保證消息消費順序與發送順序一致。
應用場景:金融交易流水、訂單狀態變更、時序數據處理。

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;import java.util.List;public class OrderedMessageExample {public static void main(String[] args) throws ClientException, InterruptedException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration config = ClientConfiguration.newBuilder().setEndpoints("localhost:8081").build();String topic = "ordered-message-topic";Producer producer = provider.newProducerBuilder().setClientConfiguration(config).setTopics(topic).build();// 模擬訂單狀態變更(同一訂單ID的消息必須順序處理)String[] orderIds = {"order1001", "order1002", "order1001"};String[] orderStatus = {"CREATED", "PAYED", "SHIPPED"};for (int i = 0; i < orderIds.length; i++) {String orderId = orderIds[i];String status = orderStatus[i % orderStatus.length];// 關鍵:通過MessageGroup確保相同訂單的消息發送到同一隊列Message message = provider.newMessageBuilder().setTopic(topic).setBody(("訂單[" + orderId + "]狀態變更為: " + status).getBytes()).setMessageGroup(orderId) // 消息組決定消息路由的隊列.setKeys(orderId) // 設置業務鍵便于查詢.build();SendReceipt receipt = producer.send(message);System.out.println("發送順序消息: " + receipt.getMessageId() + ", 訂單ID: " + orderId + ", 狀態: " + status);}producer.close();}
}
3. 定時 / 延遲消息(Scheduled/Delay Message)

描述:消息發送后,需等待指定時間(或到達指定時間點)才會被消費者可見。
應用場景:訂單超時自動關閉、任務調度、延遲重試。

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;import java.time.Duration;
import java.time.Instant;public class DelayMessageExample {public static void main(String[] args) throws ClientException, InterruptedException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration config = ClientConfiguration.newBuilder().setEndpoints("localhost:8081").build();String topic = "delay-message-topic";Producer producer = provider.newProducerBuilder().setClientConfiguration(config).setTopics(topic).build();// 方式一:使用絕對時間戳(精確到毫秒)long timestamp = Instant.now().plus(Duration.ofMinutes(5)).toEpochMilli();Message messageByTimestamp = provider.newMessageBuilder().setTopic(topic).setBody("5分鐘后執行的定時消息".getBytes()).setDeliveryTimestamp(timestamp) // 設置投遞時間戳.build();// 方式二:使用預定義延遲級別(需Broker配置支持)Message messageByLevel = provider.newMessageBuilder().setTopic(topic).setBody("延遲30秒的消息".getBytes()).addProperty("DELAY", "3") // 假設3對應30秒(需Broker配置).build();// 發送延遲消息SendReceipt receipt = producer.send(messageByTimestamp);System.out.println("延遲消息發送成功: " + receipt.getMessageId() + ", 將于 " + Instant.ofEpochMilli(timestamp) + " 可見");producer.close();}
}
4. 事務消息(Transactional Message)

描述
通過兩階段提交機制,保證本地事務與消息發送的最終一致性。

核心流程

發送半消息(對消費者不可見)

  1. 執行本地事務
  2. 根據事務結果提交或回滾半消息
  3. 支持事務狀態回查(處理超時情況)
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.TransactionResolution;
import org.apache.rocketmq.client.apis.producer.TransactionalProducer;public class TransactionalMessageExample {public static void main(String[] args) throws ClientException, InterruptedException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration config = ClientConfiguration.newBuilder().setEndpoints("localhost:8081").build();String topic = "transactional-message-topic";TransactionalProducer producer = provider.newTransactionalProducerBuilder().setClientConfiguration(config).setTopics(topic)// 關鍵:設置事務狀態回查處理器(當Broker長時間未收到事務狀態時觸發).setTransactionChecker(messageView -> {System.out.println("回查事務狀態: " + messageView.getBodyAsString());// 根據業務ID查詢本地事務狀態String bizId = messageView.getKeys().iterator().next();boolean transactionStatus = checkLocalTransactionStatus(bizId);return transactionStatus ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;}).build();try {// 1. 開啟事務上下文producer.beginTransaction();// 2. 發送半消息(未提交狀態)Message message = provider.newMessageBuilder().setTopic(topic).setBody("用戶賬戶扣款成功,通知庫存系統扣減".getBytes()).setKeys("order_12345") // 設置業務鍵,用于回查.build();producer.send(message);// 3. 執行本地事務(如數據庫操作)boolean localTransactionResult = executeLocalTransaction();// 4. 根據本地事務結果提交或回滾if (localTransactionResult) {producer.commit(); // 提交事務,消息對消費者可見System.out.println("本地事務執行成功,消息提交");} else {producer.rollback(); // 回滾事務,消息被丟棄System.out.println("本地事務執行失敗,消息回滾");}} catch (Exception e) {producer.rollback(); // 異常時回滾e.printStackTrace();} finally {producer.close();}}private static boolean executeLocalTransaction() {// 模擬本地事務:如用戶賬戶扣款System.out.println("執行本地事務...");return true; // 返回事務執行結果}private static boolean checkLocalTransactionStatus(String bizId) {// 模擬查詢本地事務狀態(如查詢數據庫訂單狀態)System.out.println("查詢本地事務狀態: " + bizId);return true; // 實際應根據業務ID查詢真實狀態}
}
5. 批量消息(Batch Message)

描述:將多條消息打包為一個批次發送,減少網絡開銷,提升吞吐量。
注意事項

  • 所有消息必須屬于同一 Topic
  • 總大小不能超過 4MB(默認限制,可配置)
  • 不支持事務和延遲屬性
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;import java.util.ArrayList;
import java.util.List;public class BatchMessageExample {public static void main(String[] args) throws ClientException, InterruptedException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration config = ClientConfiguration.newBuilder().setEndpoints("localhost:8081").build();String topic = "batch-message-topic";Producer producer = provider.newProducerBuilder().setClientConfiguration(config).setTopics(topic).build();// 創建批量消息集合List<Message> messages = new ArrayList<>();for (int i = 0; i < 100; i++) { // 示例:批量發送100條消息Message message = provider.newMessageBuilder().setTopic(topic).setBody(("批量消息-" + i).getBytes()).setKeys("key-" + i).build();messages.add(message);}// 智能拆分大批次(避免超過4MB限制)List<List<Message>> batches = splitMessages(messages);// 發送所有批次for (List<Message> batch : batches) {List<SendReceipt> receipts = producer.send(batch);System.out.println("批量發送成功,共" + receipts.size() + "條消息");}producer.close();}// 智能拆分大批次消息(實際生產中建議實現)private static List<List<Message>> splitMessages(List<Message> messages) {// 簡單實現:實際應根據消息大小動態拆分List<List<Message>> result = new ArrayList<>();result.add(messages);return result;}
}
6. 消費者示例(通用)

描述:RocketMQ 5.0 支持 Push 和 Pull 兩種消費模式,以下是基于長輪詢的 PushConsumer 示例。

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;import java.time.Duration;public class ConsumerExample {public static void main(String[] args) throws ClientException, InterruptedException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration config = ClientConfiguration.newBuilder().setEndpoints("localhost:8081").build();// 訂閱主題和過濾表達式(支持SQL92語法)String topic = "normal-message-topic";FilterExpression filterExpression = new FilterExpression("TAG = 'order' AND age > 18", // 示例SQL過濾條件FilterExpressionType.SQL92);// 創建PushConsumer(基于長輪詢的"偽推"模式)PushConsumer consumer = provider.newPushConsumerBuilder().setClientConfiguration(config).setConsumerGroup("my-consumer-group") // 消費組決定消息負載方式.setSubscriptionExpressions(Map.of(topic, filterExpression)).setMaxPollInterval(Duration.ofSeconds(30)) // 長輪詢超時時間.setConsumptionThreadCount(10) // 消費線程數.setMessageListener(messageView -> {try {// 處理消息邏輯(業務代碼)System.out.println("接收到消息: " + messageView.getBodyAsString());System.out.println("消息屬性: " + messageView.getProperties());// 模擬業務處理耗時Thread.sleep(100);// 返回消費結果(成功/失敗)return ConsumeResult.SUCCESS;} catch (Exception e) {// 消費失敗時返回RETRY,消息將重試消費System.out.println("消息消費失敗: " + e.getMessage());return ConsumeResult.FAILURE;}}).build();// 保持主線程運行,避免消費者立即關閉System.out.println("消費者已啟動,按Ctrl+C退出...");Thread.sleep(Long.MAX_VALUE);}
}

關鍵依賴配置(Maven)

<dependencies><!-- RocketMQ 5.0 Java客戶端 --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.1.0</version></dependency><!-- gRPC依賴 --><dependency><groupId>io.grpc</groupId><artifactId>grpc-netty-shaded</artifactId><version>1.54.0</version></dependency><!-- 序列化依賴 --><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.21.9</version></dependency>
</dependencies>

三、最佳實踐建議

  1. 連接配置

    • 生產環境建議使用域名而非 IP,支持動態擴容
    • 開啟 TLS 加密(通過ClientConfiguration.setSslTrustStorePath
  2. 消息大小

    • 單條消息建議不超過 1MB
    • 批量消息總大小不超過 4MB(可通過producer.setMaxMessageSize調整)
  3. 異常處理

    • 生產者需捕獲ClientException并實現重試邏輯
    • 消費者應避免長時間阻塞,建議使用異步處理
  4. 性能調優

    • 生產者:調整sendMsgTimeoutmaxAttempts參數
    • 消費者:根據業務吞吐量調整consumptionThreadCount
  5. 監控告警

    • 監控 Topic 的 TPS、RT、堆積量等指標
    • 配置告警閾值(如單隊列堆積超過 10 萬條)

四、總結

RocketMQ支持多種消息類型以滿足不同業務需求:普通消息適用于高吞吐場景;順序消息保證消費順序;定時/延遲消息控制投遞時間;事務消息確保分布式事務一致性;批量消息提升吞吐量。每種類型都提供了對應的Java代碼示例,包括生產者配置、消息構建和發送邏輯。最佳實踐建議包括合理配置連接、控制消息大小、完善異常處理、性能調優和監控告警。通過解耦消息傳遞與業務邏輯,RocketMQ實現了高性能、高可靠的分布式通信能力。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/84396.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/84396.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/84396.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

maxkey單點登錄系統

github地址 https://github.com/MaxKeyTop/MaxKey/blob/master/README_zh.md 1、官方鏡像 https://hub.docker.com/u/maxkeytop 2、MaxKey:Docker快速部署 參考地址&#xff1a; Docker部署 | MaxKey單點登錄認證系統 拉取docker腳本MaxKey: Dromara &#x1f5dd;?MaxK…

基于AI生成測試用例的處理過程

基于AI生成測試用例的處理過程是一個結合機器學習、自然語言處理&#xff08;NLP&#xff09;和領域知識的系統性流程。以下是其核心步驟和關鍵技術細節&#xff0c;以幫助理解如何利用AI自動化生成高效、覆蓋全面的測試用例。 1. 輸入分析與需求建模 目標 將用戶需求、系統文…

《Java vs Go vs C++ vs C:四門編程語言的深度對比》

引言?? 從底層硬件操作到云端分布式系統&#xff0c;Java、Go、C 和 C 四門語言各自占據不同生態位。本文從??設計哲學??、??語法范式??、??性能特性??、??應用場景??等維度進行對比&#xff0c;為開發者提供技術選型參考。 一、??設計哲學與歷史定位??…

無損提速黑科技:YOLOv8+OREPA卷積優化方案解析(原理推導/代碼實現/調參技巧三合一)

文章目錄 一、OREPA核心思想與創新突破1.1 傳統重參數化的局限性1.2 OREPA的核心創新二、OREPA實現原理與數學推導2.1 卷積核分解策略2.2 動態融合公式三、YOLOv8集成實戰(完整代碼實現)3.1 OREPA卷積模塊定義3.2 YOLOv8模型集成3.3 訓練與推理配置四、性能對比與實驗分析4.1…

RestTemplate 發送的字段第二個大寫字母變成小寫的問題探究

在使用RestTemplate 發送http 請求的時候&#xff0c;發現nDecisonVar 轉換成了ndecisonVar ,但是打印日志用fastjson 打印的沒有問題&#xff0c;換成jackson 打印就有問題。因為RestTemplate 默認使用的jackson 作為json 序列化方式&#xff0c;導致的問題&#xff0c;但是為…

C#核心概念解析:析構函數、readonly與this關鍵字

&#x1f50d; 析構函數&#xff1a;資源清理的最后防線 核心作用 析構函數&#xff08;~ClassName&#xff09;在對象銷毀前執行&#xff0c;專用于釋放非托管資源&#xff08;如文件句柄、非托管內存&#xff09;。托管資源&#xff08;如.NET對象&#xff09;由GC自動回收…

FFmpeg中使用Android Content協議打開文件設備

引言 隨著Android 10引入的Scoped Storage&#xff08;分區存儲&#xff09;機制&#xff0c;傳統的文件訪問方式發生了重大變化。FFmpeg作為強大的多媒體處理工具&#xff0c;也在不斷適應Android平臺的演進。本文將介紹如何在FFmpeg 7.0版本中使用Android content協議直接訪…

vue——v-pre的使用

&#x1f530; 基礎理解 ? 什么是 v-pre&#xff1f; v-pre 是一個跳過編譯的 Vue 指令。 它告訴 Vue&#xff1a;“這個元素和其子元素中的內容不要被編譯處理&#xff0c;按原樣輸出。” ? 使用場景&#xff1a; 展示原始的 Mustache 插值語法&#xff08;{{ xxx }}&a…

PyTorch中TensorBoardX模塊與torch.utils.tensorboard模塊的對比分析

文章目錄 說明1. 模塊起源與開發背景2. 功能特性對比3. 安裝與依賴關系4. 性能與使用體驗5. 遷移與兼容性策略6. 最佳實踐與建議7. 未來展望8. 結論實際相關信息推薦資源 說明 TensorBoard&#xff1a;獨立工具&#xff0c;只需安裝tensorboard。TensorFlow&#xff1a;非必需…

單片機中斷系統工作原理及定時器中斷應用

文件目錄 main.c #include <REGX52.H> #include "TIMER0.H" #include "KEY.H" #include "DELAY.H"//void Timer0_Init() { // TMOD 0x01; // TL0 64536 % 256; // TH0 64536 / 256; // ET0 1; // EA 1; // TR0 1; //}unsigned char…

Python爬蟲實戰:研究Portia框架相關技術

1. 引言 1.1 研究背景與意義 在大數據時代,網絡數據已成為企業決策、學術研究和社會分析的重要資源。據 Statista 統計,2025 年全球數據總量將達到 175ZB,其中 80% 以上來自非結構化網絡內容。如何高效獲取并結構化這些數據,成為數據科學領域的關鍵挑戰。 傳統爬蟲開發需…

【機器學習基礎】機器學習與深度學習概述 算法入門指南

機器學習與深度學習概述 算法入門指南 一、引言&#xff1a;機器學習與深度學習&#xff08;一&#xff09;定義與區別&#xff08;二&#xff09;發展歷程&#xff08;三&#xff09;應用場景 二、機器學習基礎&#xff08;一&#xff09;監督學習&#xff08;二&#xff09;無…

[C語言初階]掃雷小游戲

目錄 一、原理及問題分析二、代碼實現2.1 分文件結構設計2.2 棋盤初始化與打印2.3 布置雷與排查雷2.4 游戲主流程實現 三、后期優化方向 在上一篇文章中&#xff0c;我們實現了我們的第二個游戲——三子棋小游戲。這次我們繼續結合我們之前所學的所有內容&#xff0c;制作出我們…

ROS云課三分鐘-破壁篇GCompris-一小部分支持Edu應用列表-2025

開啟藍橋云課ROS ROS 機器人操作系統初級教程_ROS - 藍橋云課 安裝和使用GCompris 終端輸入&#xff1a;sudo apt install gcompris sudo apt install gcompris ok&#xff0c;完成即可。 sudo apt install gcompris 如果是平板&#xff0c;秒變兒童學習機。 啟動 流暢運…

Linux系統基礎——是什么、適用在哪里、如何選

一、Linux是什么 Linux最初是由林納斯托瓦茲&#xff08;Linus Torvalds&#xff09;基于個人興趣愛好開發的個人項目&#xff0c;他編寫了最核心的內核&#xff1b;后面為了發展壯大Linux系統他將整個項目開源到GitHub上&#xff0c;可以讓全世界的人都參與到項目的開發維護中…

26、AI 預測性維護 (燃氣輪機軸承) - /安全與維護組件/ai-predictive-maintenance-turbine

76個工業組件庫示例匯總 AI 預測性維護模擬組件 (燃氣輪機軸承) 概述 這是一個交互式的 Web 組件,旨在模擬基于 AI 的預測性維護 (Predictive Maintenance, PdM) 概念,應用于工業燃氣輪機的關鍵部件(例如軸承)。它通過模擬傳感器數據、動態預測剩余使用壽命 (RUL),并根…

el-form 使用el-row el-col對齊 注意事項

1.el-form 使用inline&#xff0c;el-form-item寬度會失效。 2.為了保證el-form-item 和 它內部的el-input 能在一行&#xff0c;要設置el-form-item的label-width <el-form :model"editInspectform"><el-row style"margin-bottom: 20px"><…

mac 安裝 mysql 和 mysqlshell

1. 安裝 mysql https://dev.mysql.com/downloads/mysql/?spma2c6h.12873639.article-detail.4.37474f4dTHdszC 默認mysql未配置環境變量&#xff0c;可以在設置中找到 2. 安裝 mysqlshell https://dev.mysql.com/downloads/shell/ #啟動mysql-shell mysqlsh 3. 使用 mysq…

漏洞檢測與滲透檢驗在功能及范圍上究竟有何顯著差異?

漏洞檢測與滲透檢驗是確保系統安全的重要途徑&#xff0c;這兩種方法各具特色和功效&#xff0c;它們在功能上有著顯著的差異。 目的不同 漏洞掃描的主要任務是揭示系統內已知的安全漏洞和隱患&#xff0c;這就像是對系統進行一次全面的健康檢查&#xff0c;看是否有已知的疾…

機器學習模型度量指標(混淆矩陣、準確率、精確率、召回率、F1分數、ROC曲線、AUC、平均精度均值)

我們研究的是多分類問題&#xff0c;下面所有例子以多分類問題舉例 混淆矩陣&#xff08;Confusion Matrix&#xff09; 混淆矩陣&#xff08; Confusion Matrix &#xff09;是一個表格&#xff0c;用于可視化機器學習模型在分類問題上 的性能。混淆矩陣的行表示實際類別&…