RocketMQ 詳細教程(Spring Boot Spring Cloud Alibaba)

1. RocketMQ 簡介

RocketMQ 是阿里巴巴開源的一款分布式消息隊列,具有高吞吐量、低延遲、可靠性等特點,廣泛應用于金融、電商、物聯網等領域。

  • RocketMQ 的核心特性:
    • 高可靠性:支持消息存儲、重復消費、失敗重試等
    • 高可用性:分布式架構,支持主從復制
    • 高性能:高吞吐量、低延遲

2. 引入依賴

首先,在 Spring Boot 項目 中引入 RocketMQ 和 Spring Cloud Alibaba 的依賴。

2.1 配置依賴

pom.xml 中添加以下依賴:

<dependencies><!-- Spring Boot --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- Spring Cloud Alibaba RocketMQ --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-rocketmq</artifactId></dependency><!-- Spring Cloud Alibaba --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-alibaba</artifactId></dependency>
</dependencies>

2.2 配置 RocketMQ

application.yml 中配置 RocketMQ 連接信息:

spring:cloud:alibaba:rocketmq:name-server: 127.0.0.1:9876  # RocketMQ NameServer 地址producer:group: my-producer-group   # 生產者組consumer:group: my-consumer-group   # 消費者組

3. 生產者代碼實現

3.1 創建消息生產者

我們可以通過注解 @RocketMQMessageListener 來創建 RocketMQ 生產者,消息可以通過 @Value 傳遞或直接通過 Bean 注入。

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class RocketMQProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;// 發送普通消息public void sendMessage(String topic, String message) {rocketMQTemplate.convertAndSend(topic, message);}
}

3.2 發送消息

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class ProducerController {@Autowiredprivate RocketMQProducer rocketMQProducer;@GetMapping("/send")public String sendMessage() {String message = "Hello, RocketMQ!";rocketMQProducer.sendMessage("my-topic", message);return "Message sent: " + message;}
}

訪問 http://localhost:8080/send 發送消息,RocketMQ 將開始處理消息。


4. 消費者代碼實現

4.1 創建消息消費者

在消費者端,我們需要創建一個消息監聽器,利用 @RocketMQMessageListener 注解監聽 RocketMQ 消息。

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;@Service
@RocketMQMessageListener(topic = "my-topic", consumerGroup = "my-consumer-group")
public class RocketMQConsumer {@org.springframework.messaging.handler.annotation.MessageMappingpublic void listen(String message) {System.out.println("Received message: " + message);}
}

4.2 消費消息

每次生產者發送的消息,消費者都會通過 listen 方法進行接收。控制臺會打印出收到的消息。


5. 順序消息

RocketMQ 支持順序消息,可以通過設置 MessageQueueSelector 來保證消息的順序性。

5.1 發送順序消息

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class OrderProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;// 發送順序消息public void sendOrderedMessage(String topic, String message, int orderId) {rocketMQTemplate.convertAndSend(topic, message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// 根據orderId選擇隊列return mqs.get(orderId % mqs.size());}}, orderId);}
}

5.2 接收順序消息

順序消息的消費邏輯和普通消息相似,只不過要保證順序消息的消費順序。


6. 事務消息

RocketMQ 提供了事務消息功能,能夠保證消息在分布式事務中的可靠性。我們通過 @RocketMQTransactionListener 來實現。

6.1 配置事務消息

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransaction;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
@RocketMQTransactionListener
public class TransactionProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendTransactionMessage(String topic, String message) {rocketMQTemplate.sendMessageInTransaction(topic, message, null);}public RocketMQLocalTransaction executeLocalTransaction(String message, Object arg) {// 事務操作try {// 業務操作return RocketMQLocalTransaction.SUCCESS;} catch (Exception e) {return RocketMQLocalTransaction.ROLLBACK;}}
}

6.2 事務回查

RocketMQ 支持事務回查機制,如果事務消息發送后沒有明確的提交或回滾,RocketMQ 會通過回查接口查詢事務狀態。


7. Spring Cloud Alibaba 集成

7.1 配置 Spring Cloud RocketMQ

application.yml 配置 Spring Cloud Alibaba 與 RocketMQ 集成。

spring:cloud:alibaba:rocketmq:name-server: 127.0.0.1:9876producer:group: my-producer-groupconsumer:group: my-consumer-group

7.2 集成 OpenFeign 與 RocketMQ

通過 OpenFeign 實現遠程服務調用,可以和 RocketMQ 一起工作。例如,將 RocketMQ 生產者集成到一個微服務中,使用 OpenFeign 調用。

@FeignClient("rocketmq-producer-service")
public interface RocketMQFeignClient {@PostMapping("/send")void sendMessage(@RequestBody String message);
}

通過 Feign 客戶端發送請求并觸發 RocketMQ 生產者的消息發送。


8. 總結

功能總結

功能說明
消息生產使用 RocketMQTemplate 發送消息
消息消費使用 @RocketMQMessageListener 監聽消息
順序消息使用 MessageQueueSelector 保證消息順序
事務消息使用 RocketMQTransactionListener 保證事務一致性
集成 Spring Cloud結合 Spring Cloud Alibaba RocketMQ 進行分布式消息通信

集成 RocketMQ 的好處

  • 提高系統解耦,避免直接調用遠程服務。
  • 支持異步、可靠的消息傳遞。
  • 通過順序消息保證業務流程的順序性。
  • 事務消息保證分布式事務的一致性。

這篇教程將幫助你實現 Spring Boot 和 Spring Cloud Alibaba 集成 RocketMQ 的基本功能,之后可以根據業務需求進行擴展。如果這篇教程對你有幫助,記得點贊、收藏哦! 🚀

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/72016.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/72016.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/72016.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Spring(七)AOP-代理模式

目錄 代理模式 一 靜態代理 一、核心作用 二、使用場景 二 動態代理 一、核心作用 二、使用場景 具體實現&#xff1a;&#xff08;初始&#xff09; 具體實現&#xff1a;&#xff08;改進&#xff09; 一、核心業務邏輯 1. 接口 MathCalculator 2. 實現類 MathCa…

Java Lambda表達式:現代編程的簡潔之道

引言 在Java 8中&#xff0c;Lambda表達式的引入標志著Java語言向函數式編程邁出了重要一步。Lambda不僅簡化了代碼結構&#xff0c;還提升了開發效率&#xff0c;使得Java能夠更靈活地應對現代編程需求。本文將深入探討Lambda表達式的核心概念、語法規則、應用場景及其對Java…

BGP分解實驗·21——BGP選路原則之本地優先級

當使用BGP路徑屬性——本地優先級&#xff0c;進行路由優選時&#xff0c;優選“本地優先級”數值較大的那個。&#xff08;eBGP之間更新不攜帶這個屬性&#xff09; 實驗拓撲如下&#xff1a; 在未實現本地優先級策略前&#xff0c;先在各個BGP之間配置完成基本連接。 R1的基…

【redis】應用場景:共享會話和手機驗證碼

文章目錄 共享會話實現思路 手機驗證碼實現思路偽代碼實現生成驗證碼驗證驗證碼 共享會話 實現思路 如果每個應用服務器&#xff0c;維護自己的會話數據&#xff0c;此時彼此之間胡共享&#xff0c;用戶請求訪問到不同的服務器上&#xff0c;就可能會出現一些不能正確處理的情…

通義萬相 2.1 + 藍耘算力,AI 視頻生成的夢幻組合

在這個科技日新月異的時代&#xff0c;人工智能不斷刷新著我們對世界的認知。一次偶然的機會&#xff0c;我借助北京藍耘科技股份有限公司提供的算力支持&#xff0c;踏上了使用通義萬相 2.1 進行 AI 視頻生成的奇妙之旅。 目錄 1.1初遇藍耘科技&#xff1a; 1.2通義萬相 2.1…

【Go萬字洗髓經】Golang內存模型與內存分配管理

本文目錄 1. 操作系統中的虛擬內存分頁與進程管理虛擬內存與內存隔離 2. Golang中的內存模型內存分配流程內存單元mspan線程緩存mcache中心緩存mcentral全局堆緩存mheapheapArena空閑頁索引pageAlloc 3. Go對象分配mallocgc函數tiny對象分配內存 4.結合GMP模型來看內存模型tiny…

33.HarmonyOS NEXT NumberBox 步進器高級技巧與性能優化

HarmonyOS NEXT NumberBox 步進器高級技巧與性能優化 一、高級交互設計 1. 組件聯動控制 // 與Slider雙向綁定 State value: number 50Slider({value: this.value,onChange: (v) > this.value v })NumberBox({value: this.value,onChange: (v) > this.value v })2. …

關于ModbusTCP/RTU協議轉Ethernet/IP(CIP)協議的方案

IGT-DSER智能網關模塊支持西門子、倍福(BECKHOFF)、羅克韋爾AB&#xff0c;以及三菱、歐姆龍等各種品牌的PLC之間通訊&#xff0c;支持Ethernet/IP(CIP)、Profinet(S7)&#xff0c;以及FINS、MC等工業自動化常用協議&#xff0c;同時也支持PLC與Modbus協議的工業機器人、智能儀…

通義萬相2.1 × 藍耘智算:AIGC 界的「黃金搭檔」如何重塑創作未來?

在人工智能生成內容&#xff08;AIGC&#xff09;領域&#xff0c;通義萬相2.1與藍耘智算的結合&#xff0c;正以技術協同效應重新定義創作的可能性。這一組合不僅突破了傳統創作工具的效率瓶頸&#xff0c;更通過算法與算力的深度融合&#xff0c;為影視、廣告、游戲、教育等領…

【FreeRTOS】FreeRTOS操作系統在嵌入式單片機上裸機移植

目錄 一 RTOS概述 二 FreeRTOS移植 三 FreeRTOS使用 四 附錄 一 RTOS概述 先了解一些基礎概念&#xff0c;以下內容摘自FreeRTOS官網&#xff08;FreeRTOS? - FreeRTOS?&#xff09;&#xff1a; 【1】RTOS基礎知識 實時操作系統 (RTOS) 是一種體積小巧、確定性強的計算機…

文件包含漏洞第一關

一、什么是文件包含漏洞 1.文件包含漏洞概述 和SQL注入等攻擊方式一樣&#xff0c;文件包含漏洞也是一種注入型漏洞&#xff0c;其本質就是輸入一段用戶能夠控制的腳本或者代碼&#xff0c;并讓服務端執行。 什么叫包含呢&#xff1f;以PHP為例&#xff0c;我們常常把可重復使…

瑞芯微RK3576(1)-硬件設計

過年期間&#xff0c;趁著放假時間做了一款3576的核心板 方案是2G DDR432G emmc 引出所有IO口 關于接口方面&#xff0c;考慮了一段時間&#xff0c;最終決定使用BTB的模式&#xff0c;主要是能夠出更多的IO&#xff0c;方便拆卸&#xff0c;最讓我擔心的是BTB的位置問題 為了…

Java 大視界 -- Java 大數據在智能醫療藥品研發數據分析與決策支持中的應用(126)

&#x1f496;親愛的朋友們&#xff0c;熱烈歡迎來到 青云交的博客&#xff01;能與諸位在此相逢&#xff0c;我倍感榮幸。在這飛速更迭的時代&#xff0c;我們都渴望一方心靈凈土&#xff0c;而 我的博客 正是這樣溫暖的所在。這里為你呈上趣味與實用兼具的知識&#xff0c;也…

JWT的學習

1、HTTP無狀態及解決方案 HTTP一種是無狀態的協議&#xff0c;每次請求都是一次獨立的請求&#xff0c;一次交互之后就是陌生人。 以CSDN為例&#xff0c;先登錄一次&#xff0c;然后瀏覽器退出&#xff0c;這個時候在進入CSDN&#xff0c;按理說服務器是不知道你已經登陸了&…

時序和延時

1、延遲模型的類型 verilog有三種類型的延遲模型&#xff1a;分布延遲 、 集總延遲 、 路徑延遲&#xff08;pin to pin&#xff09; 1.1、 分布延遲 分布延遲是在每個獨立元件的基礎上進行定義的。 module M(output wire out ,input wire a …

SpringBoot基礎Kafka示例

這里將生產者和消費者放在一個應用中 使用的Boot3.4.3 引入Kafka依賴 <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency>yml配置 spring:application:name: kafka-1#kafka…

API調試工具的無解困境:白名單、動態IP與平臺設計問題

引言 你是否曾經在開發中遇到過這樣的尷尬情形&#xff1a;你打開了平臺的API調試工具&#xff0c;準備一番操作&#xff0c;結果卻發現根本無法連接到平臺&#xff1f;別急&#xff0c;問題出在調試工具本身。今天我們要吐槽的就是那些神奇的開放平臺API調試工具&#xff0c;…

多方安全計算(MPC)電子拍賣系統

目錄 一、前言二、多方安全計算(MPC)與電子拍賣系統概述2.1 多方安全計算(MPC)的基本概念2.2 電子拍賣系統背景與需求三、MPC電子拍賣系統設計原理3.1 系統總體架構3.2 電子拍賣中的安全協議3.3 數學與算法證明四、數據加解密模塊設計五、GPU加速與系統性能優化六、GUI設計與系…

【Linux篇】初識Linux指令(上篇)

Linux命令世界&#xff1a;從新手到高手的必備指南 一 Linux發展與歷史1.1 Linux起源與發展1.2 Linux與Windows操作系統對比 二 Linux常用操作指令2.1 ls命令 - “List”&#xff08;列出文件)2.2 pwd指令- "打印當前工作目錄"2.3 cd指令 - “Change Directory”&…

編程視界:C++命名空間

目錄 命名空間 為什么要使用命名空間 什么是命名空間 命名空間的使用方式 關鍵點總結 命名空間的嵌套使用 匿名命名空間 跨模塊調用問題 命名空間可以多次定義 總結 首先從C的hello,world程序入手&#xff0c;來認識一下C語言 #include <iostream> using name…