RocketMQ、RabbitMQ與Kafka對比及常見問題解決方案
一、概述
消息隊列(Message Queue, MQ)是企業IT系統內部通信的核心手段,用于提升性能、實現系統解耦和流量削峰。它具有低耦合、可靠投遞、廣播、流量控制、最終一致性等功能,是異步RPC的主要手段之一。當前主流消息中間件包括ActiveMQ、RabbitMQ、Kafka和RocketMQ等。本文對比RocketMQ、RabbitMQ和Kafka,并總結常見問題的解決方案。
Kafka是一種分布式流處理平臺,最初由LinkedIn開發,現由Apache維護,專注于高吞吐量、持久化和實時數據流處理,廣泛用于大數據和日志聚合場景。
二、特性對比
1. RocketMQ
RocketMQ是阿里巴巴自主研發的分布式消息中間件,具有高性能、高可靠性和高可擴展性,廣泛應用于高并發場景。
- NameServer:輕量級服務協調與治理組件,負責記錄和維護Topic、Broker信息,監控Broker運行狀態。NameServer幾乎無狀態,可集群部署,節點間無信息同步,類似注冊中心。
- Broker:消息服務器,提供核心消息服務。每個Broker與NameServer集群中的所有節點保持長連接,定時注冊Topic信息。
- Producer:消息生產者,負責生成消息并發送至Broker。
- Consumer:消息消費者,從Broker獲取消息并處理業務邏輯。
2. RabbitMQ
RabbitMQ是基于AMQP協議的開源消息中間件,注重靈活的路由機制和易用性,適合中小型企業或復雜路由場景。
- Exchange:交換機,根據路由規則將消息轉發到對應隊列。
- Broker:消息服務器,提供核心消息服務。
- Channel:基于TCP連接的虛擬連接,用于消息傳輸。
- Routing Key:生產者發送消息時攜帶的鍵,指定消息路由規則。
- Binding Key:綁定Exchange與Queue時指定的鍵,Routing Key與Binding Key匹配時,消息被路由到對應Queue。
3. Kafka
Kafka是分布式、持久化的消息系統,設計為高吞吐量的日志流處理平臺,支持分區和副本機制,適合大數據管道和實時分析。
- Topic:消息主題,可分為多個分區(Partition),每個分區是一個有序的日志序列,支持并行消費。
- Broker:Kafka服務器節點,負責存儲和轉發消息。多個Broker組成集群。
- Producer:消息生產者,將消息發送到指定Topic的分區,支持分區策略(如輪詢、鍵哈希)。
- Consumer:消息消費者,通過Consumer Group實現負載均衡,每個消費者訂閱Topic并消費分區消息。
- Zookeeper/KRaft:早期依賴Zookeeper進行元數據管理和選舉,新版使用KRaft(Kafka Raft)模式實現內置共識,無需Zookeeper。
三、常見問題及解決方案
1. 重復消費
問題描述:消費者消費消息后需發送確認消息(ACK)給消息隊列,通知消息已被消費。若確認消息因網絡故障等原因未送達,消息隊列可能重復分發消息給其他消費者。
解決方案:
-
保證消息冪等性
:確保消息多次消費不影響結果。常見方法:
- 使用唯一消息ID,消費者檢查是否已處理。
- 數據庫操作使用唯一約束或版本號控制。
-
RocketMQ
- 消費者在業務邏輯處理完成后發送ACK,確保消息被正確消費。
- 使用事務性消息或本地事務狀態表,防止重復消費影響業務。
-
RabbitMQ
- 采用手動確認模式(Manual ACK),處理消息成功后再回復確認。
- 消費者通過檢查消息的唯一標識(如Message ID)避免重復處理。
-
Kafka
- 消費者管理Offset(消費偏移量),手動提交Offset(disable auto-commit)。
- 如果消費者崩潰未提交Offset,重啟后從上次Offset消費,可能重復;通過冪等操作或Exactly-Once語義(啟用idempotence)處理。
- Consumer Group中,Rebalance可能導致重復消費,使用唯一ID或狀態存儲(如數據庫)確保冪等。
2. 數據丟失
問題描述:消息可能在生產者、消息隊列或消費者端丟失,導致業務異常。
RocketMQ
- 生產者丟數據
- 使用同步發送(send()),同步感知發送結果,失敗可重試(默認重試3次)。
- 失敗消息存儲在CommitLog中,支持后續重試。
- 消息隊列丟數據
- 消息持久化到CommitLog,即使Broker宕機后重啟,未消費消息可恢復。
- 支持同步刷盤(確保消息寫入磁盤)和異步刷盤(高性能但可能丟失少量數據)。
- Broker集群支持1主N從,同步復制確保主節點磁盤故障不丟失消息,異步復制性能更高但有毫秒級延遲。
- 消費者丟數據
- 完全消費成功后發送ACK。
- 維護持久化的Offset記錄消費進度,防止因故障丟失消費狀態。
RabbitMQ
- 生產者丟數據
- 使用事務模式(支持回滾)或Confirm模式(ACK確認),確保生產者可靠發送。
- 消息隊列丟數據
- 開啟消息持久化,消息寫入磁盤后通知生產者ACK。
- 配合Confirm機制,確保消息持久化到磁盤。
- 消費者丟數據
- 禁用自動確認模式,改為手動確認(Manual ACK),確保消息處理成功后再確認。
- 消費者維護消費狀態,避免因故障重復消費或丟失。
Kafka
- 生產者丟數據
- 配置acks參數:acks=0(不確認,高性能可能丟失)、acks=1(Leader確認)、acks=all(所有副本確認,確保不丟失)。
- 啟用重試和冪等生產者(enable.idempotence=true),防止重復發送。
- 消息隊列丟數據
- 消息持久化到日志文件(Log Segments),支持配置保留策略(時間或大小)。
- 通過Replication Factor(副本因子)設置分區副本數,Leader-Follower機制確保高可用;min.insync.replicas配置最小同步副本數。
- Broker宕機時,副本可選舉新Leader,消息不丟失(視副本配置)。
- 消費者丟數據
- 禁用自動提交Offset(enable.auto.commit=false),手動提交確保處理成功。
- 如果自動提交,處理中崩潰可能丟失消息;使用Exactly-Once語義結合事務處理。
3. 消費順序
問題描述:某些業務場景要求消息按順序消費,但分布式系統或多線程消費可能導致亂序。
解決方案:
- 單線程消費:保證隊列內消息按順序處理,但可能影響性能。
- 消息編號:為消息分配序列號,消費者根據編號判斷順序。
- Queue有序性
- 消息隊列內部數據天然有序。
- 消費者端通過單線程消費或內存隊列排序,確保順序處理。
- RocketMQ
- 使用順序消息(Sequential Message),將相關消息發送到同一分區,保證分區內順序。
- 消費者單線程拉取并處理分區消息。
- RabbitMQ
- 利用Queue的FIFO特性,單線程消費確保順序。
- 多線程消費時,消費者內部維護內存隊列進行排序。
- Kafka
- 分區內消息嚴格有序(append-only日志),但多分區無全局順序。
- 對于順序需求,將相關消息發送到同一分區(基于鍵哈希)。
- 消費者組中,每個分區分配給單一消費者,確保分區內順序;多線程消費需消費者內部協調。
4. 高可用
問題描述:消息隊列需保證高可用,防止單點故障導致服務不可用。
RocketMQ
- 多Master模式
- 配置簡單,性能最高。
- 單機宕機或重啟期間,該機器未消費消息不可用,影響實時性。
- 可能有少量消息丟失(視配置)。
- 多Master多Slave異步模式
- 每Master配一個Slave,消息寫入Master,異步復制到Slave。
- 性能接近多Master,實時性高,主備切換對應用透明。
- Master宕機或磁盤損壞可能丟失少量消息(毫秒級延遲)。
- 多Master多Slave同步模式
- 每Master配一個Slave,消息同步寫入主備,成功后返回。
- 服務和數據可用性高,但性能略低于異步模式。
- 主節點宕機后,備節點無法自動切換為主,需人工干預。
RabbitMQ
- 普通集群模式
- 多臺機器運行RabbitMQ實例,Queue僅存儲在一個實例上,其他實例同步元數據。
- 消費時,若連接到非Queue所在實例,會從Queue所在實例拉取數據。
- 若Queue所在實例宕機,需等待其恢復(持久化消息不丟失),影響實時性。
- 鏡像集群模式
- Queue的元數據和消息同步到多個實例,寫入消息時自動同步到所有實例的Queue。
- 優點:高可用,單節點宕機不影響服務。
- 缺點:
- 數據同步導致性能開銷大。
- 無法線性擴容,因每個節點存儲全量數據,單節點容量受限。
Kafka
- 分布式Broker集群
- 通過分區和副本機制實現高可用,每個分區有多個副本分布在不同Broker。
- Leader選舉由Controller(基于Zookeeper或KRaft)管理,Broker宕機時自動切換到Follower副本。
- 支持水平擴展,添加Broker可重新分配分區,提高吞吐量。
- 優點:高吞吐(百萬級TPS),數據持久化強,適合大規模數據流。
- 缺點:配置復雜,依賴外部協調(如Zookeeper,KRaft緩解);實時性不如RabbitMQ,但延遲低(毫秒級)。
四、總結
- RocketMQ:適合高并發交易場景,強調性能和分布式架構,NameServer和Broker設計支持大規模集群。數據丟失防護完善,適合對實時性要求高的場景,但在同步模式下主備切換需人工干預。
- RabbitMQ:基于AMQP協議,靈活的路由機制適合復雜路由場景,易用性強。但鏡像集群性能開銷大,擴展性受限,適合中小規模應用。
- Kafka:專注于高吞吐量和數據流處理,分區機制支持并行消費和擴展,適合日志、大數據管道。但不原生支持復雜路由,順序消費限于分區內,配置較復雜。
- 常見問題解決方案
- 重復消費:三者均通過冪等性和手動確認/提交Offset解決。
- 數據丟失:RocketMQ和Kafka通過主從/副本復制,RabbitMQ通過持久化和Confirm。
- 消費順序:利用分區/隊列有序性,結合單線程或鍵分區。
- 高可用:Kafka的分布式副本最強擴展性,RabbitMQ鏡像集群數據一致性高,RocketMQ平衡性能與可靠性。
市場上幾大消息隊列對比如下:
對比項 | RabbitMQ | ActiveMQ | RocketMQ | Kafka |
---|---|---|---|---|
公司 | Rabbit | Apache | 阿里 | Apache |
語言 | Erlang | Java | Java | Scala & Java |
協議支持 | AMQP | OpenWire、STOMP、REST、XMPP、AMQP | 自定義 | 自定義協議,社區封裝了 HTTP 協議支持 |
客戶端支持語言 | 官方支持 Erlang、Java、Ruby 等,社區產出多種 API,幾乎支持所有語言 | Java、C、C++、Python、PHP、Perl、.NET 等 | Java、C++(不成熟) | 官方支持 Java,社區產出多種 API,如 PHP、Python 等 |
單機吞吐量 | 萬級(約 3 萬) | 萬級(約 4 萬) | 十萬級(約 10 萬) | 十萬級(約 10 萬) |
消息延遲 | 微秒級 | 毫秒級 | 毫秒級 | 毫秒以內 |
可用性 | 高,基于主從架構實現可用性 | 高,基于主從架構實現可用性 | 非常高,分布式架構 | 非常高,分布式架構,一個數據多副本 |
消息可靠性 | - | 有較低概率丟失數據 | 經過參數優化配置,可做到零丟失 | 經過參數配置,可做到零丟失 |
功能支持 | 基于 Erlang 開發,并發性能極強,性能極好,延時低 | MQ 領域功能極其完備 | MQ 功能較為完備,分布式擴展性好 | 功能較為簡單,主要支持基本 MQ 功能 |
優勢 | Erlang 開發,性能極好、延時低,吞吐量萬級,功能完備,管理界面優秀,社區活躍,互聯網公司使用多 | 成熟穩定,功能強大,業內大量應用 | 接口簡單易用,阿里出品,吞吐量大,分布式擴展方便,社區活躍,支持大規模 Topic 和復雜業務場景,可定制開發 | 超高吞吐量,毫秒級延時,極高可用性和可靠性,分布式擴展方便 |
劣勢 | 吞吐量較低,Erlang 開發不易定制,集群動態擴展麻煩 | 偶爾有低概率消息丟失,社區活躍度不高 | 不遵循 JMS 規范,系統遷移需改大量代碼,存在被替代風險 | 可能發生消息重復消費 |
應用 | 各類場景均有使用 | 主要用于解耦和異步,較少用于大規模吞吐 | 適用于大規模吞吐、復雜業務場景 | 大數據實時計算、日志采集等場景的業界標準 |
選擇中間件的可以從這些維度來考慮:可靠性,性能,功能,可運維行,可拓展性,社區活躍度。目前常用的幾個中間件,ActiveMQ作為“老古董”,市面上用的已經不多,其它幾種:
RabbitMQ:
優點:輕量,迅捷,容易部署和使用,擁有靈活的路由配置
缺點:性能和吞吐量不太理想,不易進行二次開發
RocketMQ:
優點:性能好,高吞吐量,穩定可靠,有活躍的中文社區
缺點:兼容性上不是太好
Kafka:
優點:擁有強大的性能及吞吐量,兼容性很好
缺點:由于“攢一波再處理”導致延遲比較高
RocketMQ專欄
1. 推模式(Push)與拉模式(Pull)的區別與實現
推模式:RocketMQ 的 PushConsumer 實際基于長輪詢(Long Polling)實現,Broker 收到請求后若隊列無消息,會掛起請求并在新消息到達時立即響應。
拉模式:消費者主動拉取,需自行控制頻率(如 DefaultLitePullConsumer),適用于需精準控制消費速率的場景。
對比:
推模式實時性高,但需 Broker 維護連接狀態,可能因消費能力不足導致積壓。
拉模式靈活性高,但需處理消息延遲與空輪詢問題。
2. 如何保證消息順序性?
生產者:通過 MessageQueueSelector 將同一業務 ID 的消息發送至固定隊列(如哈希取模)。
消費者:使用 MessageListenerOrderly 監聽器,鎖定隊列并單線程消費。
源碼關鍵點:RebalanceLockManager 管理隊列鎖,確保同一隊列僅被一個線程消費。
3. 事務消息的實現機制
兩階段提交:
發送 Half 消息(預提交),Broker 存儲但暫不投遞。
執行本地事務,返回 Commit/Rollback 狀態。
Broker 根據狀態投遞或刪除消息,若未收到確認則發起事務回查。
應用場景:跨系統分布式事務(如訂單創建與庫存扣減)。
4. 消息積壓的解決方案
臨時擴容:增加 Consumer 實例或線程數,提升消費能力。
批量消費:調整 consumeMessageBatchMaxSize 參數,一次處理多條消息。
跳過非關鍵消息:若允許部分消息丟失,可重置消費位點(resetOffsetByTime)。
異步處理:將耗時操作(如 DB 寫入)異步化,減少消費阻塞。
5. 消息的存儲結構是怎樣的?CommitLog 和 ConsumeQueue 的關系?
CommitLog 存儲原始消息,ConsumeQueue 存儲邏輯隊列的偏移量,通過偏移量快速定位消息。
6. Consumer 的負載均衡策略是什么?
平均分配、一致性 Hash 等,通過 RebalanceService 定時調整隊列分配。
7. 如何實現消息的精準一次投遞?
RocketMQ 不保證,需業務端結合事務消息 + 冪等性實現。
8. Broker 的刷盤機制如何選擇?
高可靠性場景用 SYNC_FLUSH,高性能場景用 ASYNC_FLUSH。
9. NameServer 宕機后,Producer 和 Consumer 還能工作嗎?
可以,客戶端會緩存路由信息,但無法感知新 Broker 或 Topic 變化。
10. 性能調優
Broker 參數:
sendMessageThreadPoolNums:發送線程數。
pullMessageThreadPoolNums:拉取線程數。
零拷貝技術:通過 MappedFile 內存映射文件減少數據拷貝。
11. Broker 如何處理拉取請求?
長輪詢機制:Consumer 拉取請求無消息時,Broker 掛起請求(默認 30s),新消息到達后立即響應。
源碼關鍵點:PullRequestHoldService 管理掛起請求,通過 checkHoldRequest 周期性檢查消息到達。
12. RocketMQ 消息存儲結構:CommitLog 與 ConsumeQueue 的關系
CommitLog:所有 Topic 的消息按順序追加寫入,文件名格式為 {文件起始偏移量}.log,固定大小 1GB(可配置)。
ConsumeQueue:邏輯隊列索引,存儲消息在 CommitLog 中的偏移量、大小、Tag HashCode,文件名格式為 {Topic}/{QueueId}/{ConsumeQueueOffset}。
關系:消費者通過 ConsumeQueue 快速定位 CommitLog 中的消息,實現高效檢索。
13. 主從同步機制(SYNC/ASYNC)的區別與選型
SYNC_MASTER:
生產者收到 Slave 寫入成功 ACK 后才返回,保證數據強一致。
適用場景:金融交易、資金扣減。
ASYNC_MASTER:
主節點寫入成功即返回,Slave 異步復制,性能更高。
適用場景:日志傳輸、允許短暫不一致。
14. 消息重試與死信隊列(DLQ)機制
重試隊列:消費失敗的消息進入重試隊列(命名格式:%RETRY%{ConsumerGroup}),按延遲等級(1s, 5s, 10s…)重試。
死信隊列:重試 16 次后仍失敗,消息進入死信隊列(%DLQ%{ConsumerGroup}),需人工處理。
配置參數:maxReconsumeTimes(默認 16 次)。
15. 如何實現消息軌跡(Trace)追蹤?
開啟方式:Broker 配置 traceTopicEnable=true,Producer/Consumer 設置 enableMsgTrace=true。
原理:消息發送/消費時,額外生成軌跡數據寫入內部 Topic RMQ_SYS_TRACE_TOPIC。
查詢工具:RocketMQ Console 或自定義消費者訂閱軌跡 Topic。
16. Rebalance 機制如何工作?
觸發條件:Consumer 數量變化、Broker 上下線、Topic 路由變更。
流程:
客戶端定時向 Broker 發送心跳,上報 Consumer Group 信息。
Broker 通過 RebalanceService 計算隊列分配策略(平均分配、一致性 Hash)。
Consumer 根據新分配結果調整拉取隊列。
源碼入口:RebalanceImpl#rebalanceByTopic。
17. RocketMQ 5.0 新特性(如 Proxy 模式)
Proxy 模式:解耦 Broker 與客戶端協議,支持多語言客戶端(如 HTTP/gRPC),增強云原生兼容性。
事務增強:支持 TCC 模式,提供更靈活的事務解決方案。
輕量級 SDK:簡化客戶端依賴,提升啟動速度。
三、高級特性與源碼原理
18. 零拷貝技術
RocketMQ:使用 mmap 內存映射文件,減少用戶態與內核態數據拷貝。
Kafka:采用 sendfile 系統調用,實現更高吞吐但靈活性較低。
19. DLedger 高可用機制
基于 Raft 協議實現主從選舉,主節點故障時自動切換,保障數據一致性。
20. 消息過濾
Tag 過濾:Broker 端過濾,減少網絡傳輸。
SQL 過濾:需開啟 enablePropertyFilter=true,支持復雜條件匹配。
21. 事務消息實現細節
兩階段提交:
發送 Half 消息(預提交),Broker 存儲但暫不投遞。
執行本地事務,返回 Commit/Rollback 狀態。
Broker 根據狀態投遞或刪除消息,若未收到確認則發起事務回查。
源碼分析:TransactionMQProducer 處理本地事務回調,TransactionalMessageService 管理事務狀態。
22. 消息索引文件(IndexFile)的作用
存儲結構:哈希索引(Key: Message Key, Value: CommitLog Offset)。
用途:通過 Message Key 或 Unique Key 快速查詢消息,支持按時間范圍檢索。
源碼類:IndexService, IndexFile。
23. PageCache 與 Mmap 如何提升性能?
PageCache:利用操作系統緩存,將磁盤文件映射到內存,加速讀寫。
Mmap:通過內存映射文件,避免 read()/write() 系統調用的數據拷貝,提升 CommitLog 寫入效率。
刷盤策略:SYNC_FLUSH(同步刷盤)依賴 FileChannel.force(),ASYNC_FLUSH 使用后臺線程批量刷盤。
24. 消息消費位點(Offset)管理機制
本地存儲:Consumer 默認將 Offset 存儲在本地文件(~/.rocketmq_offsets)。
遠程存儲:集群模式下,Offset 上報至 Broker(ConsumerOffsetManager)。
重置方式:
CONSUME_FROM_LAST_OFFSET:從最大位點開始消費。
CONSUME_FROM_FIRST_OFFSET:從最小位點開始消費。
25. 消息索引文件(IndexFile)的作用
存儲結構:哈希索引(Key: Message Key, Value: CommitLog Offset)。
用途:通過 Message Key 或 Unique Key 快速查詢消息,支持按時間范圍檢索。
源碼類:IndexService, IndexFile。
26. PageCache 與 Mmap 如何提升性能?
PageCache:利用操作系統緩存,將磁盤文件映射到內存,加速讀寫。
Mmap:通過內存映射文件,避免 read()/write() 系統調用的數據拷貝,提升 CommitLog 寫入效率。
刷盤策略:SYNC_FLUSH(同步刷盤)依賴 FileChannel.force(),ASYNC_FLUSH 使用后臺線程批量刷盤。
27. 消息消費位點(Offset)管理機制
本地存儲:Consumer 默認將 Offset 存儲在本地文件(~/.rocketmq_offsets)。
遠程存儲:集群模式下,Offset 上報至 Broker(ConsumerOffsetManager)。
重置方式:
CONSUME_FROM_LAST_OFFSET:從最大位點開始消費。
CONSUME_FROM_FIRST_OFFSET:從最小位點開始消費。
場景設計題
1 .設計一個高并發秒殺系統,如何利用 RocketMQ 優化?
流量削峰:將秒殺請求寫入 RocketMQ 隊列,異步處理訂單創建與庫存扣減。順序消息:使用哈希選擇器將同一用戶請求路由到固定隊列,避免超賣。事務消息:扣減庫存與生成訂單通過事務消息保證最終一致性。動態擴容:根據監控指標(如堆積消息數)自動擴容 Consumer,快速消化積壓
2 . 設計一個秒殺系統,如何用 RocketMQ 解決超賣問題?
消息隊列削峰填谷 + 數據庫樂觀鎖 + 事務消息保證最終庫存一致。
3 . 如何實現分布式事務(訂單扣庫存+生成訂單)?
事務消息:半消息預扣庫存,本地事務生成訂單,失敗則回滾庫存。
4.如何設計一個異地多活消息隊列系統?
跨城同步:Broker 集群分機房部署,通過 Async replication 同步消息。單元化路由:Producer 根據用戶 ID 哈希選擇本地機房 Broker,減少跨城延遲。容災切換:監控機房狀態,自動切換消息路由至可用機房。
5.消息丟失的可能原因與解決方案
生產者丟失:原因:異步發送未處理 SendCallback 異常。解決:使用同步發送 + 重試機制。Broker 丟失:原因:異步刷盤時宕機,PageCache 數據未落盤。解決:SYNC_FLUSH 刷盤 + 主從同步。消費者丟失:原因:消費成功但 Offset 未提交。解決:先處理業務邏輯,再手動提交 Offset。、
Demo實操
- 引入依賴
<dependencies><!-- RocketMQ 客戶端 --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version></dependency><!-- Spring Boot Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
</dependencies>
2. 配置文件 application.yml
yamlrocketmq:namesrvAddr: 127.0.0.1:9876producer:group: test-producer-groupconsumer:group: test-consumer-grouptopic: TestTopic
3. 生產者配置類
javapackage com.example.demo.config;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RocketMQProducerConfig {@Value("${rocketmq.namesrvAddr}")private String namesrvAddr;@Value("${rocketmq.producer.group}")private String producerGroup;@Beanpublic DefaultMQProducer mqProducer() throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer(producerGroup);producer.setNamesrvAddr(namesrvAddr);producer.start();System.out.println("🚀 RocketMQ Producer 啟動成功");return producer;}
}
4. 生產者發送接口
javapackage com.example.demo.controller;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;import java.nio.charset.StandardCharsets;@RestController
@RequestMapping("/mq")
public class ProducerController {@Autowiredprivate DefaultMQProducer producer;@Value("${rocketmq.topic}")private String topic;@PostMapping("/send")public String sendMessage(@RequestParam String msg) throws Exception {Message message = new Message(topic, msg.getBytes(StandardCharsets.UTF_8));SendResult result = producer.send(message);return "發送成功: " + result;}
}
5. 消費者配置類
java復制編輯
package com.example.demo.config;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.nio.charset.StandardCharsets;
import java.util.List;@Configuration
public class RocketMQConsumerConfig {@Value("${rocketmq.namesrvAddr}")private String namesrvAddr;@Value("${rocketmq.consumer.group}")private String consumerGroup;@Value("${rocketmq.topic}")private String topic;@Beanpublic DefaultMQPushConsumer mqConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);consumer.setNamesrvAddr(namesrvAddr);consumer.subscribe(topic, "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {String body = new String(msg.getBody(), StandardCharsets.UTF_8);System.out.println("📩 收到消息: " + body);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("? RocketMQ Consumer 啟動成功");return consumer;}
}
最后有興趣可以嘗試自動重試,TraceId 追蹤, 異步發送, 批量發送