北京JAVA基礎面試30天打卡08

RocketMQ、RabbitMQ與Kafka對比及常見問題解決方案

一、概述

消息隊列(Message Queue, MQ)是企業IT系統內部通信的核心手段,用于提升性能、實現系統解耦和流量削峰。它具有低耦合、可靠投遞、廣播、流量控制、最終一致性等功能,是異步RPC的主要手段之一。當前主流消息中間件包括ActiveMQ、RabbitMQ、Kafka和RocketMQ等。本文對比RocketMQ、RabbitMQ和Kafka,并總結常見問題的解決方案。

Kafka是一種分布式流處理平臺,最初由LinkedIn開發,現由Apache維護,專注于高吞吐量、持久化和實時數據流處理,廣泛用于大數據和日志聚合場景。


二、特性對比

1. RocketMQ

RocketMQ是阿里巴巴自主研發的分布式消息中間件,具有高性能、高可靠性和高可擴展性,廣泛應用于高并發場景。

  • NameServer:輕量級服務協調與治理組件,負責記錄和維護Topic、Broker信息,監控Broker運行狀態。NameServer幾乎無狀態,可集群部署,節點間無信息同步,類似注冊中心。
  • Broker:消息服務器,提供核心消息服務。每個Broker與NameServer集群中的所有節點保持長連接,定時注冊Topic信息。
  • Producer:消息生產者,負責生成消息并發送至Broker。
  • Consumer:消息消費者,從Broker獲取消息并處理業務邏輯。

2. RabbitMQ

RabbitMQ是基于AMQP協議的開源消息中間件,注重靈活的路由機制和易用性,適合中小型企業或復雜路由場景。

  • Exchange:交換機,根據路由規則將消息轉發到對應隊列。
  • Broker:消息服務器,提供核心消息服務。
  • Channel:基于TCP連接的虛擬連接,用于消息傳輸。
  • Routing Key:生產者發送消息時攜帶的鍵,指定消息路由規則。
  • Binding Key:綁定Exchange與Queue時指定的鍵,Routing Key與Binding Key匹配時,消息被路由到對應Queue。

3. Kafka

Kafka是分布式、持久化的消息系統,設計為高吞吐量的日志流處理平臺,支持分區和副本機制,適合大數據管道和實時分析。

  • Topic:消息主題,可分為多個分區(Partition),每個分區是一個有序的日志序列,支持并行消費。
  • Broker:Kafka服務器節點,負責存儲和轉發消息。多個Broker組成集群。
  • Producer:消息生產者,將消息發送到指定Topic的分區,支持分區策略(如輪詢、鍵哈希)。
  • Consumer:消息消費者,通過Consumer Group實現負載均衡,每個消費者訂閱Topic并消費分區消息。
  • Zookeeper/KRaft:早期依賴Zookeeper進行元數據管理和選舉,新版使用KRaft(Kafka Raft)模式實現內置共識,無需Zookeeper。

三、常見問題及解決方案

1. 重復消費

問題描述:消費者消費消息后需發送確認消息(ACK)給消息隊列,通知消息已被消費。若確認消息因網絡故障等原因未送達,消息隊列可能重復分發消息給其他消費者。

解決方案

  • 保證消息冪等性

    :確保消息多次消費不影響結果。常見方法:

    • 使用唯一消息ID,消費者檢查是否已處理。
    • 數據庫操作使用唯一約束或版本號控制。
  • RocketMQ

    • 消費者在業務邏輯處理完成后發送ACK,確保消息被正確消費。
    • 使用事務性消息或本地事務狀態表,防止重復消費影響業務。
  • RabbitMQ

    • 采用手動確認模式(Manual ACK),處理消息成功后再回復確認。
    • 消費者通過檢查消息的唯一標識(如Message ID)避免重復處理。
  • Kafka

    • 消費者管理Offset(消費偏移量),手動提交Offset(disable auto-commit)。
    • 如果消費者崩潰未提交Offset,重啟后從上次Offset消費,可能重復;通過冪等操作或Exactly-Once語義(啟用idempotence)處理。
    • Consumer Group中,Rebalance可能導致重復消費,使用唯一ID或狀態存儲(如數據庫)確保冪等。

2. 數據丟失

問題描述:消息可能在生產者、消息隊列或消費者端丟失,導致業務異常。

RocketMQ
  • 生產者丟數據
    • 使用同步發送(send()),同步感知發送結果,失敗可重試(默認重試3次)。
    • 失敗消息存儲在CommitLog中,支持后續重試。
  • 消息隊列丟數據
    • 消息持久化到CommitLog,即使Broker宕機后重啟,未消費消息可恢復。
    • 支持同步刷盤(確保消息寫入磁盤)和異步刷盤(高性能但可能丟失少量數據)。
    • Broker集群支持1主N從,同步復制確保主節點磁盤故障不丟失消息,異步復制性能更高但有毫秒級延遲。
  • 消費者丟數據
    • 完全消費成功后發送ACK。
    • 維護持久化的Offset記錄消費進度,防止因故障丟失消費狀態。
RabbitMQ
  • 生產者丟數據
    • 使用事務模式(支持回滾)或Confirm模式(ACK確認),確保生產者可靠發送。
  • 消息隊列丟數據
    • 開啟消息持久化,消息寫入磁盤后通知生產者ACK。
    • 配合Confirm機制,確保消息持久化到磁盤。
  • 消費者丟數據
    • 禁用自動確認模式,改為手動確認(Manual ACK),確保消息處理成功后再確認。
    • 消費者維護消費狀態,避免因故障重復消費或丟失。
Kafka
  • 生產者丟數據
    • 配置acks參數:acks=0(不確認,高性能可能丟失)、acks=1(Leader確認)、acks=all(所有副本確認,確保不丟失)。
    • 啟用重試和冪等生產者(enable.idempotence=true),防止重復發送。
  • 消息隊列丟數據
    • 消息持久化到日志文件(Log Segments),支持配置保留策略(時間或大小)。
    • 通過Replication Factor(副本因子)設置分區副本數,Leader-Follower機制確保高可用;min.insync.replicas配置最小同步副本數。
    • Broker宕機時,副本可選舉新Leader,消息不丟失(視副本配置)。
  • 消費者丟數據
    • 禁用自動提交Offset(enable.auto.commit=false),手動提交確保處理成功。
    • 如果自動提交,處理中崩潰可能丟失消息;使用Exactly-Once語義結合事務處理。

3. 消費順序

問題描述:某些業務場景要求消息按順序消費,但分布式系統或多線程消費可能導致亂序。

解決方案

  • 單線程消費:保證隊列內消息按順序處理,但可能影響性能。
  • 消息編號:為消息分配序列號,消費者根據編號判斷順序。
  • Queue有序性
    • 消息隊列內部數據天然有序。
    • 消費者端通過單線程消費或內存隊列排序,確保順序處理。
  • RocketMQ
    • 使用順序消息(Sequential Message),將相關消息發送到同一分區,保證分區內順序。
    • 消費者單線程拉取并處理分區消息。
  • RabbitMQ
    • 利用Queue的FIFO特性,單線程消費確保順序。
    • 多線程消費時,消費者內部維護內存隊列進行排序。
  • Kafka
    • 分區內消息嚴格有序(append-only日志),但多分區無全局順序。
    • 對于順序需求,將相關消息發送到同一分區(基于鍵哈希)。
    • 消費者組中,每個分區分配給單一消費者,確保分區內順序;多線程消費需消費者內部協調。

4. 高可用

問題描述:消息隊列需保證高可用,防止單點故障導致服務不可用。

RocketMQ
  • 多Master模式
    • 配置簡單,性能最高。
    • 單機宕機或重啟期間,該機器未消費消息不可用,影響實時性。
    • 可能有少量消息丟失(視配置)。
  • 多Master多Slave異步模式
    • 每Master配一個Slave,消息寫入Master,異步復制到Slave。
    • 性能接近多Master,實時性高,主備切換對應用透明。
    • Master宕機或磁盤損壞可能丟失少量消息(毫秒級延遲)。
  • 多Master多Slave同步模式
    • 每Master配一個Slave,消息同步寫入主備,成功后返回。
    • 服務和數據可用性高,但性能略低于異步模式。
    • 主節點宕機后,備節點無法自動切換為主,需人工干預。
RabbitMQ
  • 普通集群模式
    • 多臺機器運行RabbitMQ實例,Queue僅存儲在一個實例上,其他實例同步元數據。
    • 消費時,若連接到非Queue所在實例,會從Queue所在實例拉取數據。
    • 若Queue所在實例宕機,需等待其恢復(持久化消息不丟失),影響實時性。
  • 鏡像集群模式
    • Queue的元數據和消息同步到多個實例,寫入消息時自動同步到所有實例的Queue。
    • 優點:高可用,單節點宕機不影響服務。
    • 缺點:
      • 數據同步導致性能開銷大。
      • 無法線性擴容,因每個節點存儲全量數據,單節點容量受限。
Kafka
  • 分布式Broker集群
    • 通過分區和副本機制實現高可用,每個分區有多個副本分布在不同Broker。
    • Leader選舉由Controller(基于Zookeeper或KRaft)管理,Broker宕機時自動切換到Follower副本。
    • 支持水平擴展,添加Broker可重新分配分區,提高吞吐量。
  • 優點:高吞吐(百萬級TPS),數據持久化強,適合大規模數據流。
  • 缺點:配置復雜,依賴外部協調(如Zookeeper,KRaft緩解);實時性不如RabbitMQ,但延遲低(毫秒級)。

四、總結

  • RocketMQ:適合高并發交易場景,強調性能和分布式架構,NameServer和Broker設計支持大規模集群。數據丟失防護完善,適合對實時性要求高的場景,但在同步模式下主備切換需人工干預。
  • RabbitMQ:基于AMQP協議,靈活的路由機制適合復雜路由場景,易用性強。但鏡像集群性能開銷大,擴展性受限,適合中小規模應用。
  • Kafka:專注于高吞吐量和數據流處理,分區機制支持并行消費和擴展,適合日志、大數據管道。但不原生支持復雜路由,順序消費限于分區內,配置較復雜。
  • 常見問題解決方案
    • 重復消費:三者均通過冪等性和手動確認/提交Offset解決。
    • 數據丟失:RocketMQ和Kafka通過主從/副本復制,RabbitMQ通過持久化和Confirm。
    • 消費順序:利用分區/隊列有序性,結合單線程或鍵分區。
    • 高可用:Kafka的分布式副本最強擴展性,RabbitMQ鏡像集群數據一致性高,RocketMQ平衡性能與可靠性。

市場上幾大消息隊列對比如下:

對比項RabbitMQActiveMQRocketMQKafka
公司RabbitApache阿里Apache
語言ErlangJavaJavaScala & Java
協議支持AMQPOpenWire、STOMP、REST、XMPP、AMQP自定義自定義協議,社區封裝了 HTTP 協議支持
客戶端支持語言官方支持 Erlang、Java、Ruby 等,社區產出多種 API,幾乎支持所有語言Java、C、C++、Python、PHP、Perl、.NET 等Java、C++(不成熟)官方支持 Java,社區產出多種 API,如 PHP、Python 等
單機吞吐量萬級(約 3 萬)萬級(約 4 萬)十萬級(約 10 萬)十萬級(約 10 萬)
消息延遲微秒級毫秒級毫秒級毫秒以內
可用性高,基于主從架構實現可用性高,基于主從架構實現可用性非常高,分布式架構非常高,分布式架構,一個數據多副本
消息可靠性-有較低概率丟失數據經過參數優化配置,可做到零丟失經過參數配置,可做到零丟失
功能支持基于 Erlang 開發,并發性能極強,性能極好,延時低MQ 領域功能極其完備MQ 功能較為完備,分布式擴展性好功能較為簡單,主要支持基本 MQ 功能
優勢Erlang 開發,性能極好、延時低,吞吐量萬級,功能完備,管理界面優秀,社區活躍,互聯網公司使用多成熟穩定,功能強大,業內大量應用接口簡單易用,阿里出品,吞吐量大,分布式擴展方便,社區活躍,支持大規模 Topic 和復雜業務場景,可定制開發超高吞吐量,毫秒級延時,極高可用性和可靠性,分布式擴展方便
劣勢吞吐量較低,Erlang 開發不易定制,集群動態擴展麻煩偶爾有低概率消息丟失,社區活躍度不高不遵循 JMS 規范,系統遷移需改大量代碼,存在被替代風險可能發生消息重復消費
應用各類場景均有使用主要用于解耦和異步,較少用于大規模吞吐適用于大規模吞吐、復雜業務場景大數據實時計算、日志采集等場景的業界標準

選擇中間件的可以從這些維度來考慮:可靠性,性能,功能,可運維行,可拓展性,社區活躍度。目前常用的幾個中間件,ActiveMQ作為“老古董”,市面上用的已經不多,其它幾種:

RabbitMQ:

優點:輕量,迅捷,容易部署和使用,擁有靈活的路由配置
缺點:性能和吞吐量不太理想,不易進行二次開發
RocketMQ:

優點:性能好,高吞吐量,穩定可靠,有活躍的中文社區
缺點:兼容性上不是太好
Kafka:

優點:擁有強大的性能及吞吐量,兼容性很好
缺點:由于“攢一波再處理”導致延遲比較高

RocketMQ專欄

1. 推模式(Push)與拉模式(Pull)的區別與實現
推模式:RocketMQ 的 PushConsumer 實際基于長輪詢(Long Polling)實現,Broker 收到請求后若隊列無消息,會掛起請求并在新消息到達時立即響應。
拉模式:消費者主動拉取,需自行控制頻率(如 DefaultLitePullConsumer),適用于需精準控制消費速率的場景。
對比:
推模式實時性高,但需 Broker 維護連接狀態,可能因消費能力不足導致積壓。
拉模式靈活性高,但需處理消息延遲與空輪詢問題。

2. 如何保證消息順序性?
生產者:通過 MessageQueueSelector 將同一業務 ID 的消息發送至固定隊列(如哈希取模)。
消費者:使用 MessageListenerOrderly 監聽器,鎖定隊列并單線程消費。
源碼關鍵點:RebalanceLockManager 管理隊列鎖,確保同一隊列僅被一個線程消費。

3. 事務消息的實現機制
兩階段提交:
發送 Half 消息(預提交),Broker 存儲但暫不投遞。
執行本地事務,返回 Commit/Rollback 狀態。
Broker 根據狀態投遞或刪除消息,若未收到確認則發起事務回查。
應用場景:跨系統分布式事務(如訂單創建與庫存扣減)。

4. 消息積壓的解決方案
臨時擴容:增加 Consumer 實例或線程數,提升消費能力。
批量消費:調整 consumeMessageBatchMaxSize 參數,一次處理多條消息。
跳過非關鍵消息:若允許部分消息丟失,可重置消費位點(resetOffsetByTime)。
異步處理:將耗時操作(如 DB 寫入)異步化,減少消費阻塞。

5. 消息的存儲結構是怎樣的?CommitLog 和 ConsumeQueue 的關系?
CommitLog 存儲原始消息,ConsumeQueue 存儲邏輯隊列的偏移量,通過偏移量快速定位消息。

6. Consumer 的負載均衡策略是什么?
平均分配、一致性 Hash 等,通過 RebalanceService 定時調整隊列分配。

7. 如何實現消息的精準一次投遞?
RocketMQ 不保證,需業務端結合事務消息 + 冪等性實現。

8. Broker 的刷盤機制如何選擇?
高可靠性場景用 SYNC_FLUSH,高性能場景用 ASYNC_FLUSH。

9. NameServer 宕機后,Producer 和 Consumer 還能工作嗎?
可以,客戶端會緩存路由信息,但無法感知新 Broker 或 Topic 變化。

10. 性能調優
Broker 參數:
sendMessageThreadPoolNums:發送線程數。
pullMessageThreadPoolNums:拉取線程數。
零拷貝技術:通過 MappedFile 內存映射文件減少數據拷貝。

11. Broker 如何處理拉取請求?
長輪詢機制:Consumer 拉取請求無消息時,Broker 掛起請求(默認 30s),新消息到達后立即響應。
源碼關鍵點:PullRequestHoldService 管理掛起請求,通過 checkHoldRequest 周期性檢查消息到達。

12. RocketMQ 消息存儲結構:CommitLog 與 ConsumeQueue 的關系
CommitLog:所有 Topic 的消息按順序追加寫入,文件名格式為 {文件起始偏移量}.log,固定大小 1GB(可配置)。
ConsumeQueue:邏輯隊列索引,存儲消息在 CommitLog 中的偏移量、大小、Tag HashCode,文件名格式為 {Topic}/{QueueId}/{ConsumeQueueOffset}。
關系:消費者通過 ConsumeQueue 快速定位 CommitLog 中的消息,實現高效檢索。

13. 主從同步機制(SYNC/ASYNC)的區別與選型
SYNC_MASTER:
生產者收到 Slave 寫入成功 ACK 后才返回,保證數據強一致。
適用場景:金融交易、資金扣減。
ASYNC_MASTER:
主節點寫入成功即返回,Slave 異步復制,性能更高。
適用場景:日志傳輸、允許短暫不一致。

14. 消息重試與死信隊列(DLQ)機制
重試隊列:消費失敗的消息進入重試隊列(命名格式:%RETRY%{ConsumerGroup}),按延遲等級(1s, 5s, 10s…)重試。
死信隊列:重試 16 次后仍失敗,消息進入死信隊列(%DLQ%{ConsumerGroup}),需人工處理。
配置參數:maxReconsumeTimes(默認 16 次)。

15. 如何實現消息軌跡(Trace)追蹤?
開啟方式:Broker 配置 traceTopicEnable=true,Producer/Consumer 設置 enableMsgTrace=true。
原理:消息發送/消費時,額外生成軌跡數據寫入內部 Topic RMQ_SYS_TRACE_TOPIC。
查詢工具:RocketMQ Console 或自定義消費者訂閱軌跡 Topic。

16. Rebalance 機制如何工作?
觸發條件:Consumer 數量變化、Broker 上下線、Topic 路由變更。
流程:
客戶端定時向 Broker 發送心跳,上報 Consumer Group 信息。
Broker 通過 RebalanceService 計算隊列分配策略(平均分配、一致性 Hash)。
Consumer 根據新分配結果調整拉取隊列。
源碼入口:RebalanceImpl#rebalanceByTopic。

17. RocketMQ 5.0 新特性(如 Proxy 模式)
Proxy 模式:解耦 Broker 與客戶端協議,支持多語言客戶端(如 HTTP/gRPC),增強云原生兼容性。
事務增強:支持 TCC 模式,提供更靈活的事務解決方案。
輕量級 SDK:簡化客戶端依賴,提升啟動速度。
三、高級特性與源碼原理

18. 零拷貝技術
RocketMQ:使用 mmap 內存映射文件,減少用戶態與內核態數據拷貝。
Kafka:采用 sendfile 系統調用,實現更高吞吐但靈活性較低。

19. DLedger 高可用機制
基于 Raft 協議實現主從選舉,主節點故障時自動切換,保障數據一致性。

20. 消息過濾
Tag 過濾:Broker 端過濾,減少網絡傳輸。
SQL 過濾:需開啟 enablePropertyFilter=true,支持復雜條件匹配。

21. 事務消息實現細節
兩階段提交:
發送 Half 消息(預提交),Broker 存儲但暫不投遞。
執行本地事務,返回 Commit/Rollback 狀態。
Broker 根據狀態投遞或刪除消息,若未收到確認則發起事務回查。
源碼分析:TransactionMQProducer 處理本地事務回調,TransactionalMessageService 管理事務狀態。

22. 消息索引文件(IndexFile)的作用
存儲結構:哈希索引(Key: Message Key, Value: CommitLog Offset)。
用途:通過 Message Key 或 Unique Key 快速查詢消息,支持按時間范圍檢索。
源碼類:IndexService, IndexFile。

23. PageCache 與 Mmap 如何提升性能?
PageCache:利用操作系統緩存,將磁盤文件映射到內存,加速讀寫。
Mmap:通過內存映射文件,避免 read()/write() 系統調用的數據拷貝,提升 CommitLog 寫入效率。
刷盤策略:SYNC_FLUSH(同步刷盤)依賴 FileChannel.force(),ASYNC_FLUSH 使用后臺線程批量刷盤。

24. 消息消費位點(Offset)管理機制
本地存儲:Consumer 默認將 Offset 存儲在本地文件(~/.rocketmq_offsets)。
遠程存儲:集群模式下,Offset 上報至 Broker(ConsumerOffsetManager)。
重置方式:
CONSUME_FROM_LAST_OFFSET:從最大位點開始消費。
CONSUME_FROM_FIRST_OFFSET:從最小位點開始消費。

25. 消息索引文件(IndexFile)的作用
存儲結構:哈希索引(Key: Message Key, Value: CommitLog Offset)。
用途:通過 Message Key 或 Unique Key 快速查詢消息,支持按時間范圍檢索。
源碼類:IndexService, IndexFile。

26. PageCache 與 Mmap 如何提升性能?
PageCache:利用操作系統緩存,將磁盤文件映射到內存,加速讀寫。
Mmap:通過內存映射文件,避免 read()/write() 系統調用的數據拷貝,提升 CommitLog 寫入效率。
刷盤策略:SYNC_FLUSH(同步刷盤)依賴 FileChannel.force(),ASYNC_FLUSH 使用后臺線程批量刷盤。

27. 消息消費位點(Offset)管理機制
本地存儲:Consumer 默認將 Offset 存儲在本地文件(~/.rocketmq_offsets)。
遠程存儲:集群模式下,Offset 上報至 Broker(ConsumerOffsetManager)。
重置方式:
CONSUME_FROM_LAST_OFFSET:從最大位點開始消費。
CONSUME_FROM_FIRST_OFFSET:從最小位點開始消費。

場景設計題

1 .設計一個高并發秒殺系統,如何利用 RocketMQ 優化?

    流量削峰:將秒殺請求寫入 RocketMQ 隊列,異步處理訂單創建與庫存扣減。順序消息:使用哈希選擇器將同一用戶請求路由到固定隊列,避免超賣。事務消息:扣減庫存與生成訂單通過事務消息保證最終一致性。動態擴容:根據監控指標(如堆積消息數)自動擴容 Consumer,快速消化積壓

2 . 設計一個秒殺系統,如何用 RocketMQ 解決超賣問題?

    消息隊列削峰填谷 + 數據庫樂觀鎖 + 事務消息保證最終庫存一致。

3 . 如何實現分布式事務(訂單扣庫存+生成訂單)?

    事務消息:半消息預扣庫存,本地事務生成訂單,失敗則回滾庫存。

4.如何設計一個異地多活消息隊列系統?

    跨城同步:Broker 集群分機房部署,通過 Async replication 同步消息。單元化路由:Producer 根據用戶 ID 哈希選擇本地機房 Broker,減少跨城延遲。容災切換:監控機房狀態,自動切換消息路由至可用機房。

5.消息丟失的可能原因與解決方案

    生產者丟失:原因:異步發送未處理 SendCallback 異常。解決:使用同步發送 + 重試機制。Broker 丟失:原因:異步刷盤時宕機,PageCache 數據未落盤。解決:SYNC_FLUSH 刷盤 + 主從同步。消費者丟失:原因:消費成功但 Offset 未提交。解決:先處理業務邏輯,再手動提交 Offset。、
Demo實操
  1. 引入依賴
<dependencies><!-- RocketMQ 客戶端 --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version></dependency><!-- Spring Boot Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
</dependencies>

2. 配置文件 application.yml

yamlrocketmq:namesrvAddr: 127.0.0.1:9876producer:group: test-producer-groupconsumer:group: test-consumer-grouptopic: TestTopic

3. 生產者配置類

javapackage com.example.demo.config;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RocketMQProducerConfig {@Value("${rocketmq.namesrvAddr}")private String namesrvAddr;@Value("${rocketmq.producer.group}")private String producerGroup;@Beanpublic DefaultMQProducer mqProducer() throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer(producerGroup);producer.setNamesrvAddr(namesrvAddr);producer.start();System.out.println("🚀 RocketMQ Producer 啟動成功");return producer;}
}

4. 生產者發送接口

javapackage com.example.demo.controller;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;import java.nio.charset.StandardCharsets;@RestController
@RequestMapping("/mq")
public class ProducerController {@Autowiredprivate DefaultMQProducer producer;@Value("${rocketmq.topic}")private String topic;@PostMapping("/send")public String sendMessage(@RequestParam String msg) throws Exception {Message message = new Message(topic, msg.getBytes(StandardCharsets.UTF_8));SendResult result = producer.send(message);return "發送成功: " + result;}
}

5. 消費者配置類

java復制編輯
package com.example.demo.config;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.nio.charset.StandardCharsets;
import java.util.List;@Configuration
public class RocketMQConsumerConfig {@Value("${rocketmq.namesrvAddr}")private String namesrvAddr;@Value("${rocketmq.consumer.group}")private String consumerGroup;@Value("${rocketmq.topic}")private String topic;@Beanpublic DefaultMQPushConsumer mqConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);consumer.setNamesrvAddr(namesrvAddr);consumer.subscribe(topic, "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {String body = new String(msg.getBody(), StandardCharsets.UTF_8);System.out.println("📩 收到消息: " + body);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("? RocketMQ Consumer 啟動成功");return consumer;}
}

最后有興趣可以嘗試自動重試,TraceId 追蹤, 異步發送, 批量發送

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/93049.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/93049.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/93049.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【CSS 變量】讓你的 CSS “活”起來:深入理解 CSS 自定義屬性與主題切換

【CSS 變量】讓你的 CSS “活”起來&#xff1a;深入理解 CSS 自定義屬性與主題切換 所屬專欄&#xff1a; 《前端小技巧集合&#xff1a;讓你的代碼更優雅高效》 上一篇&#xff1a; 【CSS 視覺】無需JS&#xff0c;純 CSS 實現酷炫視覺效果&#xff08;clip-path, filter, b…

RAG初步實戰:從 PDF 到問答:我的第一個輕量級 RAG 系統(附詳細項目代碼內容與說明)

RAG初步實戰&#xff1a;從 PDF 到問答&#xff1a;我的第一個輕量級 RAG 系統 項目背景與目標 在大模型逐漸普及的今天&#xff0c;Retrieval-Augmented Generation&#xff08;RAG&#xff0c;檢索增強生成&#xff09;作為連接“知識庫”和“大語言模型”的核心范式&#…

自主泊車算法

看我的git 在 open space 空間下規劃出?條??到停?位的?碰撞軌跡 滿?平滑約束 可跟蹤 考慮動態障礙物約束 在路徑不可?的情況下 具備重規劃能? 重規劃時能夠做到?縫切換 即從原路徑?縫切換到重規劃路徑 ?明顯體感 規劃頻率 10HZ

USB 2.0 學習(2)- 連接

上回說到 usb的信號 k 狀態和 j 狀態&#xff0c;補充一下 usb的一些電氣小知識。 1.USB設備有四根線 電源線VBus、 D、 D-、 地線GND 2.USB主機端的 D 和 D-各有1個15k下拉電阻&#xff0c;這是為了準確檢測 D還是D-線上電平的變化 因為USB總線檢測USB設備是低速還是全速設備…

解鎖 Appium Inspector:移動端 UI 自動化定位的利器

? 在移動端 UI 自動化測試中&#xff0c;元素定位是繞不開的核心環節。無論是 Android 還是 iOS 應用&#xff0c;能否精準、高效地定位到界面元素&#xff0c;直接決定了自動化腳本的穩定性和可維護性。而 Appium Inspector 作為 Appium 生態中專門用于元素定位的工具&#…

機器學習概念1

了解機器學習1、什么是機器學習機器學習是一門通過編程讓計算機從數據中進行學習的科學 通用定義&#xff1a;機器學習是一個研究領域讓計算機無須進行明確編程就具備學習能力 工程化定義&#xff1a;一個計算機程序利用經驗E來學習任務T&#xff0c;性能是P&#xff0c;如果針…

前端html學習筆記5:框架、字符實體與 HTML5 新增標簽

本文為個人學習總結&#xff0c;如有謬誤歡迎指正。前端知識眾多&#xff0c;后續將繼續記錄其他知識點&#xff01; 目錄 前言 一、框架標簽 作用&#xff1a; 語法&#xff1a; 屬性&#xff1a; 二、字符實體 作用&#xff1a; 三、html5新增標簽 語義化 狀態 列…

Day05 店鋪營業狀態設置 Redis

Redis 入門 Redis 簡介 Redis 是一個基于內存的 key-value 結構數據庫。 基于內存存儲&#xff0c;讀寫性能高 適合存儲熱點數據&#xff08;熱點商品&#xff0c;資訊&#xff0c;新聞&#xff09; 企業應用廣泛 redis 中文網&#xff1a;Redis中文網 Redis 下載與安裝 R…

Linux驅動開發probe字符設備的完整創建流程

一、 設備號分配1.靜態分配通過register_chrdev_region預先指定設備號&#xff08;需要確保未被占用&#xff09;2.動態分配通過alloc_chrdev_region由內核自動分配主設備號&#xff0c;一般都是動態分配以避免沖突。3316 xxxx_dev.major 0; 3317 3318 if (xx…

生產環境中Spring Cloud Sleuth與Zipkin分布式鏈路追蹤實戰經驗分享

生產環境中Spring Cloud Sleuth與Zipkin分布式鏈路追蹤實戰經驗分享 在復雜的微服務架構中&#xff0c;服務調用鏈路繁雜&#xff0c;單點故障或性能瓶頸往往難以定位。本文結合真實生產環境案例&#xff0c;分享如何基于Spring Cloud Sleuth與Zipkin構建高可用、低開銷的分布…

基于Python的《紅樓夢》文本分析與機器學習應用

本文將詳細介紹如何使用Python和機器學習技術對《紅樓夢》進行深入的文本分析和處理&#xff0c;包括文本分卷、分詞、停用詞處理、TF-IDF特征提取以及文本可視化等關鍵技術。一、項目概述本項目的目標是對中國古典文學名著《紅樓夢》進行全面的自動化處理和分析&#xff0c;主…

Bevy渲染引擎核心技術深度解析:架構、體積霧與Meshlet渲染

本文將深入探討Bevy游戲引擎的渲染架構&#xff0c;重點分析其體積霧實現原理、Meshlet渲染技術以及基于物理的渲染&#xff08;PBR&#xff09;系統。內容嚴格基于技術實現細節&#xff0c;覆蓋從底層渲染管線到高級特效的全套解決方案。一、Bevy渲染架構深度解析1.1 核心架構…

CASS11計算斜面面積

1.生成三角網2.工程應用--計算表面積--根據三角網

借助Rclone快速從阿里云OSS遷移到AWS S3

本文作者: 封磊 Eclicktech SA | AWS Community Builder DevTool | AWS UGL | 亞馬遜云科技云博主 阿里云&InfoQ&CSDN簽約作者 概述 隨著企業云戰略的調整和多云架構的普及&#xff0c;數據遷移成為了一個常見需求。本文將詳細介紹如何使用Rclone工具&#xff0c;高效…

【入門系列】圖像算法工程師如何入門計算機圖形學?

作為圖像算法工程師&#xff0c;入門計算機圖形學&#xff08;CG&#xff09;有天然優勢——你熟悉圖像處理的像素級操作、數學工具&#xff08;如矩陣運算&#xff09;和優化思維&#xff0c;而圖形學的核心目標&#xff08;從3D信息生成2D圖像&#xff09;與圖像處理有很強的…

淘寶API列表:高效獲取商品詳情圖主圖商品視頻參數item_get

淘寶商品詳情信息基本都是用圖片展示的&#xff0c;制作精美&#xff0c;能更好的展示商品信息。如何通過API實現批量獲取商品詳情信息呢&#xff1f;1、在API平臺注冊賬號&#xff0c;獲取調用API的key和密鑰。2、查看API文檔&#xff0c;了解相關請求參數和返回參數。item_ge…

第23章,景深:技術綜述

一&#xff0c;定義&#xff1a; 中景&#xff1a;物體聚焦的范圍&#xff08;即清晰成像的范圍&#xff09;。 景深&#xff1a;在中景之外&#xff0c;都會成像模糊&#xff0c;即景深。景深通常用來指示對場景的注意范圍&#xff0c;并提供場景深度的感覺。 背景&#xff1a…

飛算 JavaAI -智慧城市項目實踐:從交通協同到應急響應的全鏈路技術革新

免責聲明&#xff1a;此篇文章所有內容都是本人實驗&#xff0c;并非廣告推廣&#xff0c;并非抄襲&#xff0c;如有侵權&#xff0c;請聯系。 目錄 一、智慧城市核心場景的技術攻堅 1.1 交通信號智能優化系統的實時決策 1.1.1 實時車流數據處理與分析 1.1.2 動態信號配時…

GM3568JHF快速入門教程【二】FPGA+ARM異構開發板環境編譯教程

SDK 可通過搭建好的 Docker 鏡像環境進行編譯。 具體參可考該部分文檔內容。1 Docker鏡像環境編譯SDK1.1 SDK 自動編譯命令切換到 Docker 內需要編譯的 SDK 根目錄&#xff0c;全自動編譯默認是 Buildroot&#xff0c; 可以通過設置環境變量 RK_ROOTFS_SYSTEM 指定不同 rootfs.…

Vue3 整合高德地圖完成搜索、定位、選址功能,已封裝為組件開箱即用(最新)

Vue3 整合高德地圖完成搜索、定位、選址功能&#xff08;最新&#xff09;1、效果演示2、前端代碼2.1 .env.development2.2 GaodeMap.vue2.3使用示例1、效果演示 2、前端代碼 2.1 .env.development https://console.amap.com/dev/key/app# 地圖配置 VITE_AMAP_KEY "您的…