Spring Boot 集成 RocketMQ 全流程指南:從依賴引入到消息收發

前言

在分布式系統中,消息中間件是解耦服務、實現異步通信的核心組件。RocketMQ 作為阿里巴巴開源的高性能分布式消息中間件,憑借其高吞吐、低延遲、高可靠等特性,成為企業級應用的首選。而 Spring Boot 通過其“約定優于配置”的設計理念,極大簡化了項目開發的復雜度。本文將通過 手動連接配置連接 兩種方式,詳細講解如何在 Spring Boot 中集成 RocketMQ,實現消息的同步與異步發送,并提供完整示例代碼。

微信圖片_20250414010059?

?

一、環境準備

在開始前,請確保:

  1. JDK 17、Maven 3.6+、Spring Boot 2.7+。
  2. 安裝RocketMQ服務(本地或遠程),推薦使用RocketMQ Docker鏡像快速搭建(可參考之前文章)。

?

二、示例—Springboot集成mq(手動連接)

通過編碼方式初始化生產者,適用于需要動態控制資源的場景。

2.1 新建項目

image-20230727182626779?

image-20230727182647022?

?

image?

?

2.2 引入依賴

       <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.4</version></dependency>

image?

?

2.3 生產者發送消息

  • 構建一個消息生產者DefaultMQProducer實例,然后指定生產者組為jihaiProducer;
  • 指定NameServer的地址:服務器的ip:9876,因為需要從NameServer拉取Broker的信息
  • producer.start() 啟動生產者
  • 構建一個內容為:技海拾貝的消息1,然后指定這個消息往jihaishibei這個topic發送
  • producer.send(msg):發送消息,打印結果
  • 關閉生產者
public class Producer {public static void main(String[] args) throws Exception {//創建一個生產者,指定生產者組為jihaiProducerDefaultMQProducer producer = new DefaultMQProducer("jihaiProducer");// 指定NameServer的地址producer.setNamesrvAddr("localhost:9876");// 第一次發送可能會超時,設置的比較大producer.setSendMsgTimeout(60000);// 啟動生產者producer.start();// 創建一條消息// topic為 jihaishibei// 消息內容為 技海拾貝的消息1// tags 為 TagAMessage msg = new Message("jihaishibei", "TagA", "技海拾貝的消息1 ".getBytes(RemotingHelper.DEFAULT_CHARSET));// 發送消息并得到消息的發送結果,然后打印SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);// 關閉生產者producer.shutdown();}}

?

image?

?

?

啟動,發送消息

image?

?

在控制臺可以看到這條消息

image?

?

?

image?

?

這里就能看到發送消息的詳細信息。

左下角消息的消費的消費,因為我們還沒有消費者訂閱這個topic,所以左下角沒數據。

?

2.4 消費者消費消息

  • 創建一個消費者實例對象,指定消費者組為jihaiConsumer
  • 指定NameServer的地址:服務器的ip:9876
  • 訂閱 jihaishibei這個topic的所有信息
  • consumer.registerMessageListener ,這個很重要,是注冊一個監聽器,這個監聽器是當有消息的時候就會回調這個監聽器,處理消息,所以需要用戶實現這個接口,然后處理消息。
  • 啟動消費者

?

public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {// 通過push模式消費消息,指定消費者組DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("jihaiConsumer");// 指定NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 訂閱這個topic下的所有的消息consumer.subscribe("jihaishibei", "*");// 注冊一個消費的監聽器,當有消息的時候,會回調這個監聽器來消費消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.printf("消費消息:%s", new String(msg.getBody()) + "\n");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 啟動消費者consumer.start();System.out.printf("Consumer Started.%n");}
}

?

image?

?

?

啟動服務,進行消費

image?

?

在控制臺,發現被jihaiConsumer這個消費者組給消費了。

image?

?

?

三、示例2—Springboot集成mq(配置連接)

在 Spring Boot 中,可以通過配置文件簡化 RocketMQ 的連接配置。以下是在 application.yml? 文件中進行的配置:

3.1 配置文件修改

image?

?

?

image?

?

image?

?

spring:application:name: rocket-mq-demorocketmq:name-server: 127.0.0.1:9876producer:group: rocket-mq-demo-producersend-message-timeout: 10000comsumer:group: rocket-mq-demo-comsumersend-message-timeout: 10000

?

image?

?

3.2 添加依賴

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.0</version>  
</dependency>

根據需要選擇最新版本,從中央倉庫可以查看?https://central.sonatype.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter?

image?

?

image?

?

備注:如果添加rocketmq-client依賴,先注釋這個依賴

?

3.3 消費者service類

import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;/*** messageModel=MessageModel.CLUSTERING* 監聽模式,有消息就會消費*/
@Service
@RocketMQMessageListener(topic = "jihaishibei-topic", consumerGroup = "rocket-mq-demo-comsumer", messageModel = MessageModel.CLUSTERING)
public class RocketMQConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.printf("收到消息: %s\n", s);}
}

?

3.4 生產者service類

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;@Service
public class RocketMQProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;private final String topic = "jihaishibei-topic";// 1.同步發送消息// 同步發送是指發送方發送一條消息后,會等待服務器返回確認信息后再進行后續操作。這種方式適用于需要可靠性保證的場景。public void createAndSend(String message){rocketMQTemplate.convertAndSend(topic, message);System.out.printf("同步發送結果: %s\n", message);}// 1.同步發送消息// 同步發送是指發送方發送一條消息后,會等待服務器返回確認信息后再進行后續操作。這種方式適用于需要可靠性保證的場景。public void sendSyncMessage(String message){SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build());System.out.println(sendResult.getMsgId());System.out.printf("同步發送結果: %s\n", message);}// 2.異步發送消息// 異步發送是指發送方發送消息后,不等待服務器返回確認信息,而是通過回調接口處理返回結果。這種方式適用于對響應時間要求較高的場景。public void sendAsyncMessage(String message){rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(message).build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.printf("異步發送成功: %s\n", sendResult);}@Overridepublic void onException(Throwable throwable) {System.out.printf("異步發送失敗: %s\n", throwable.getMessage());}});}// 3.單向發送消息// 單向發送是指發送方只負責發送消息,不關心服務器的響應。該方式適用于對可靠性要求不高的場景,如日志收集。public void sendOneWayMessage(String message){rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(message).build());System.out.println("單向消息發送成功");}
}

?

3.5 測試controller類

@RequestMapping("api")
@RestController
public class RocketController {@Autowiredprivate RocketMQProducer rocketMQProducer;@GetMapping("/createAndSend")public String createAndSend(@RequestParam String message) {rocketMQProducer.createAndSend(message);return "同步消息發送成功";}@GetMapping("/sendSync")public String sendSync(@RequestParam String message) {rocketMQProducer.sendSyncMessage(message);return "同步消息發送成功";}@GetMapping("/sendAsync")public String sendAsync(@RequestParam String message) {rocketMQProducer.sendAsyncMessage(message);return "異步消息發送中";}@GetMapping("/sendOneWay")public String sendOneWay(@RequestParam String message) {rocketMQProducer.sendOneWayMessage(message);return "單向消息發送成功";}
}

?

3.6 啟動服務

image?

?

3.7 測試

同步消息1

image?

?

?

image?

?

image?

?

同步消息2

image?

?

image?

?

image?

?

異步消息

image?

?

?

image?

?

image?

?

?

?

單向發送消息

image?

image?

?

?

image?

?

?

四、結束語

本文通過手動連接與配置連接兩種方式,展示了Spring Boot與RocketMQ的集成實踐。手動連接幫助開發者理解底層API邏輯,而Spring Boot的配置化集成則極大簡化了開發流程。無論是同步消息的可靠性保障,還是異步消息的性能優化,RocketMQ均能與Spring Boot無縫協作,為分布式系統提供高效的消息通信能力。

未來可進一步探索集群部署、消息重試機制及監控告警,以實現更健壯的消息服務。希望本文能為開發者快速構建高可用的消息系統提供參考!

求點關注-gif動圖 138_愛給網_aigei_com?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/76836.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/76836.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/76836.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

HTTPS實現安全的關鍵方法及技術細節

HTTPS&#xff08;HyperText Transfer Protocol Secure&#xff09;通過多種技術手段實現數據傳輸的安全性&#xff0c;其核心機制基于SSL/TLS協議&#xff0c;并結合數字證書、加密算法等技術。 SSL&#xff1a;Secure Sockets Layer&#xff0c;安全套接字層 TLS&#xff1a;…

Java【多線程】(8)CAS與JUC組件

目錄 1.前言 2.正文 2.1CAS概念 2.2CAS兩種用途 2.2.1實現原子類 2.2.2實現自旋鎖 2.3缺陷&#xff1a;ABA問題 2.4JUC組件 2.4.1Callable接口 2.4.2ReentrantLock&#xff08;與synchronized對比&#xff09; 2.4.3Semaphore信號量 2.4.4CountDownLatch 3.小結 1…

【Docker】離線安裝Docker

背景 離線安裝Docker的必要性&#xff0c;第一&#xff0c;在目前數據安全升級的情況下&#xff0c;很多外網已經基本不好訪問了。第二&#xff0c;如果公司有對外部署的需求&#xff0c;那么難免會存在對方只有內網的情況&#xff0c;那么我們就要做到學會離線安裝。 下載安…

MecAgent Copilot:機械設計師的AI助手,開啟“氛圍建模”新時代

MecAgent Copilot作為機械設計師的AI助手,正通過多項核心技術推動機械設計進入“氛圍建模”新時代。以下從功能特性、技術支撐和應用場景三方面解析其創新價值: 一、核心功能特性 ??智能草圖生成與參數化建模?? 支持自然語言輸入生成設計草圖和3D模型,如輸入“剖面透視…

MCU屏和RGB屏

一、MCU屏 MCU屏?&#xff1a;全稱為單片機控制屏&#xff08;Microcontroller Unit Screen&#xff09;&#xff0c;在顯示屏背后集成了單片機控制器&#xff0c;因此&#xff0c;MCU屏里面有專用的驅動芯片。驅動芯片如&#xff1a;ILI9488、ILI9341、SSD1963等。驅動芯片里…

7.5 使用MobileNet v3進行圖像的區分

MobileNet v3是Google在2019年提出的輕量級卷積神經網絡結構,旨在提高在移動設備上的速度和準確性,廣泛的用于輕量級網絡。 MobileNet v3-Small的網絡結構如下,它的輸入是224x224的3通道彩色圖片。 使用過程如下: 1.創建模型、修改最終分類數量 #1.創建mobilenet_v3_small…

構建面向大模型訓練與部署的一體化架構:從文檔解析到智能調度

作者&#xff1a;汪玉珠&#xff5c;算法架構師 標簽&#xff1a;大模型訓練、數據集構建、GRPO、自監督聚類、指令調度系統、Qwen、LLaMA3 &#x1f9ed; 背景與挑戰 隨著 Qwen、LLaMA3 等開源大模型不斷進化&#xff0c;行業逐漸從“能跑通”邁向“如何高效訓練與部署”的階…

PostgreSQL技術大講堂 - 第86講:數據安全之--data_checksums天使與魔鬼

PostgreSQL技術大講堂 - 第86講&#xff0c;主題&#xff1a;數據安全之--data_checksums天使與魔鬼 1、data_checksums特性 2、避開DML規則&#xff0c;嫁接非法數據并合法化 3、避開約束規則&#xff0c;嫁接非法數據到表中 4、避開數據檢查&#xff0c;讀取壞塊中的數據…

【機器學習】機器學習筆記

1 機器學習定義 計算機程序從經驗E中學習&#xff0c;解決某一任務T&#xff0c;進行某一性能P&#xff0c;通過P測定在T上的表現因經驗E而提高。 eg&#xff1a;跳棋程序 E&#xff1a; 程序自身下的上萬盤棋局 T&#xff1a; 下跳棋 P&#xff1a; 與新對手下跳棋時贏的概率…

Ubuntu20.04 設置開機自啟

參考&#xff1a; Ubuntu20.04 設置開機自啟_ubuntu進bos系統-CSDN博客

數據庫中存儲過程的流程語句講解

一、流程語句講解 二、總結 一、流程語句講解 1.1 if語句講解 語法&#xff1a; IF condition THENstatements; ELSEIF condition THENstatements; ELSEstatements; END IF; 題目示例&#xff1a; # 判斷成績等級 # 輸入學生的編號,取出學生的第一門課&#xff0c;然后判斷…

kubernetes》》k8s》》ConfigMap 、Secret

configmap官網 ConfigMap是一種 API 對象&#xff0c;使用時&#xff0c; Pods 可以將其用作環境變量、命令行參數或者存儲卷中的配置文件。ConfigMap將配置和Pod解耦&#xff0c;更易于配置文件的更改和管理。ConfigMap 并不提供保密或者加密功能。 如果你想存儲的數據是機密的…

git在IDEA中使用技巧

git在IDEA中使用技巧 merge和rebase 參考&#xff1a;IDEA小技巧-Git的使用 git回滾、強推、代碼找回 參考&#xff1a;https://www.bilibili.com/video/BV1Wa411a7Ek?spm_id_from333.788.videopod.sections&vd_source2f73252e51731cad48853e9c70337d8e cherry pick …

Spring 事務失效的原因及解決方案全解析,來復習了

Spring 事務失效是指在使用 Spring 聲明式事務管理時&#xff0c;預期的事務行為&#xff08;如事務的開啟、提交、回滾等&#xff09;未按預期執行&#xff0c;導致數據操作未滿足 ACID 特性&#xff08;原子性、一致性、隔離性、持久性&#xff09;&#xff0c;從而引發數據不…

「出海匠」借助CloudPilot AI實現AWS降本60%,支撐AI電商高速增長

&#x1f50e;公司簡介 「出海匠」&#xff08;chuhaijiang.com&#xff09;是「數繪星云」公司打造的社交內容電商服務平臺&#xff0c;專注于為跨境生態參與者提供數據支持與智能化工作流。平臺基于大數據與 AI 技術&#xff0c;幫助商家精準分析市場趨勢、優化運營策略&…

python每日一練

題目一 輸入10個整數,輸出其中不同的數,即如果一個數出現了多次,只輸出一次(要求按照每一個不同的數第一次出現的順序輸出)。 解題 錯誤題解 a list(map(int,input().split())) b [] b.append(a[i]) for i in range(2,11):if a[i] not in b:b.append(a[i]) print(b)但是會…

Docker實戰:從零構建高可用的MySQL主從集群與Redis集群

在分布式系統架構中&#xff0c;數據庫集群是保障數據高可用和性能的關鍵組件。本文將通過Docker技術&#xff0c;手把手教你搭建MySQL主從集群和Redis Cluster&#xff0c;并分享獨創的優化技巧與運維實戰經驗。 一、為什么選擇Docker部署集群&#xff1f; 傳統數據庫集群搭…

STM32電機庫 電機控制特性

ST MC FW庫提供FOC和六步法兩種電機控制方式。這使得它能夠驅動永磁同步電機 (PMSM) 和無刷直流電機 (BLDC)。FOC 更適合 PMSM,而六步法更適合 BLDC 電機。該固件可以驅動內嵌式PMSM 和標貼式PMSM。 ST Motor Control 固件庫提供以下功能: FOC SVPWM 生成: 可配置的 PW…

Go:方法

方法聲明 type point struct { X, Y float64 }// 普通函數 func Distance(p, q Point) float64 {return math.Hypot(q.x - p.x, q.y - p.Y) }// Point類型的方法 func (p Point) Distance(q Point) float64 {return math.Hypot(q.x - p.x, q.y - p.Y) }方法聲明與普通函數聲…

前端基礎之《Vue(4)—響應式原理》

一、什么是響應式 1、響應式英文reactive 當你get/set一個變量時&#xff0c;你有辦法可以“捕獲到”這種行為。 2、一個普通對象和一個響應式對象對比 &#xff08;1&#xff09;普通對象 <script>// 這種普通對象不具備響應式var obj1 {a: 1,b: 2} </script>…