Spring Boot 整合 RabbitMQ

Spring Boot 整合 RabbitMQ

一、概述:RabbitMQ 是什么?

你可以把 RabbitMQ 想象成一個「快遞中轉站」。
比如你在網上買了一本書,賣家(生產者)把包裹(消息)交給快遞站(RabbitMQ),快遞站根據包裹上的地址(規則)把包裹分給不同的快遞員(消費者),最后送到你家(業務系統)。

RabbitMQ 是一個專門用來「傳遞消息」的軟件(專業叫「消息中間件」),它能讓不同的程序、不同的電腦之間高效地「傳小紙條」。


二、RabbitMQ 的「快遞分類方式」(交換機類型)

快遞站分包裹時,可能按「地址」「重量」「緊急程度」分類。RabbitMQ 也有類似的「分類規則」,叫 交換機(Exchange)。常用的有 4 種:

1. 直連交換機(Direct Exchange)

規則:包裹上必須寫「精確地址」(路由鍵 Routing Key),只有地址完全匹配的快遞員才能收到。
例子:賣家給「北京-朝陽區」的包裹,只有負責朝陽區的快遞員能接。

2. 扇形交換機(Fanout Exchange)

規則:不管地址,「所有快遞員」都能收到包裹(廣播模式)。
例子:賣家發「雙11大促通知」,所有快遞員都要知道,一起準備加班。

3. 主題交換機(Topic Exchange)

規則:地址可以用「通配符」(比如 * 代表一個詞,# 代表多個詞)。
例子:賣家發「北京.*」的包裹,所有地址以「北京」開頭的快遞員(如北京-朝陽、北京-海淀)都能收到。

4. 頭交換機(Headers Exchange)

規則:不看地址,看包裹上的「標簽」(Headers 頭信息,比如「優先級=高」)。
例子:賣家標「緊急」的包裹,只有關注「緊急」標簽的快遞員能接。


三、RabbitMQ 的使用場景(為什么需要它?)

1. 異步處理:省時間!

比如你在淘寶下單,系統需要「扣庫存+發短信+更新積分」。如果一步步做,可能要等 5 秒;用 RabbitMQ 可以把「發短信」和「更新積分」的任務丟給 RabbitMQ,主流程只需要 1 秒完成下單,剩下的由其他程序慢慢處理。

2. 流量削峰:防崩潰!

雙11時,訂單像洪水一樣涌來,系統直接處理可能被沖垮。RabbitMQ 像「水庫」,把訂單暫時存起來,系統按自己的速度慢慢處理(比如每秒處理 1000 單),避免被瞬間的高流量沖垮。

3. 系統解耦:不互相拖累!

比如電商系統有「訂單模塊」「庫存模塊」「短信模塊」。如果訂單模塊直接調用庫存和短信模塊,一旦短信模塊崩潰,訂單也會失敗。用 RabbitMQ 后,訂單模塊只需要把消息發給 RabbitMQ,其他模塊自己來取,互不影響。

四、整合Springboot

1. 配置 RabbitMQ 連接

1.Maven

<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId>
</dependency>

2.配置文件,yml和properties選擇一個

spring:rabbitmq:host: 117.185.165.187port: 5672username: rabbitmqpassword: j8iG3KYs7Wmxxx
# RabbitMQ 服務器地址(默認 localhost:5672)
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
# 登錄賬號密碼(默認 guest/guest,注意:遠程連接需要改密碼!)
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

2、定義「快遞規則」:交換機和隊列

RabbitMQ 的消息需要通過「交換機(Exchange)」和「隊列(Queue)」傳遞。我們需要先告訴 Spring Boot 要創建哪些交換機和隊列。

新建 RabbitMQConfig.java,用 @Bean 聲明交換機、隊列和綁定關系。

做一個「電商下單后發通知」的功能,需要:

  • 一個直連交換機(order_exchange)。
  • 一個隊列(sms_queue),專門存「需要發短信的訂單」。
  • 把隊列和交換機綁定,路由鍵是 send_sms(只有路由鍵匹配的消息才會進這個隊列)。
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 1. 聲明直連交換機(名字叫 order_exchange)@Beanpublic DirectExchange orderExchange() {return new DirectExchange("order_exchange");}// 2. 聲明隊列(名字叫 sms_queue,存需要發短信的訂單)@Beanpublic Queue smsQueue() {return new Queue("sms_queue");}// 3. 把隊列和交換機綁定,路由鍵是 send_sms(只有路由鍵匹配的消息才會進這個隊列)@Beanpublic Binding smsBinding(Queue smsQueue, DirectExchange orderExchange) {return BindingBuilder.bind(smsQueue).to(orderExchange).with("send_sms");  // 路由鍵必須和生產者發送時一致}
}

如果說是多個隊列按照下面的

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 1. 聲明直連交換機(名字叫 order_exchange)@Beanpublic DirectExchange orderExchange() {return new DirectExchange("order_exchange");}// 2. 聲明 3 個隊列(短信、積分、日志)@Beanpublic Queue smsQueue() {return new Queue("sms_queue");  // 存需要發短信的訂單}@Beanpublic Queue scoreQueue() {return new Queue("score_queue");  // 存需要更新積分的訂單}@Beanpublic Queue logQueue() {return new Queue("log_queue");  // 存需要記錄日志的訂單}// 3. 綁定 sms_queue(路由鍵 send_sms)@Beanpublic Binding smsBinding(Queue smsQueue, DirectExchange orderExchange) {return BindingBuilder.bind(smsQueue).to(orderExchange).with("send_sms");  // 路由鍵:只有 send_sms 的消息會進 sms_queue}// 4. 綁定 score_queue(路由鍵 update_score)@Beanpublic Binding scoreBinding(Queue scoreQueue, DirectExchange orderExchange) {return BindingBuilder.bind(scoreQueue).to(orderExchange).with("update_score");  // 路由鍵:只有 update_score 的消息會進 score_queue}// 5. 綁定 logQueue(路由鍵 log_order)@Beanpublic Binding logBinding(Queue logQueue, DirectExchange orderExchange) {return BindingBuilder.bind(logQueue).to(orderExchange).with("log_order");  // 路由鍵:只有 log_order 的消息會進 log_queue}}

3、生產者:發送消息(賣家發包裹)

RabbitTemplate(Spring 提供的發消息工具)發送消息到交換機。

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class OrderService {// 注入 RabbitTemplate(Spring 自動幫我們創建好的發消息工具)@Autowiredprivate RabbitTemplate rabbitTemplate;// 用戶下單后,發送消息到 RabbitMQpublic void createOrder(String orderInfo) {// 1. 主流程:扣庫存、保存訂單(這里簡化,直接打印)System.out.println("主流程:訂單已保存,開始扣庫存...");// 2. 異步任務:發送短信通知(把消息發給 RabbitMQ)rabbitTemplate.convertAndSend("order_exchange",  // 交換機名字"send_sms",        // 路由鍵(和隊列綁定的路由鍵一致)orderInfo          // 消息內容(比如訂單詳情));System.out.println("已發送短信通知任務到 RabbitMQ");}
}

4、消費者:接收消息(快遞員收包裹)

@RabbitListener 注解監聽隊列,自動接收并處理消息。

新建消費者服務類

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SmsConsumer {// 監聽 sms_queue 隊列,有消息就自動觸發這個方法@RabbitListener(queues = "sms_queue")public void sendSms(String orderInfo) {System.out.println("收到短信任務,正在發送...");// 這里調用短信接口(比如阿里云短信),實際代碼需要替換System.out.println("已給用戶發送短信:" + orderInfo);}
}

如果說是多線程處理就多添加一個配置concurrency = "5"

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SmsConsumer {// 監聽 sms_queue 隊列,有消息就自動觸發這個方法@RabbitListener(queues = "sms_queue",concurrency = "5")public void sendSms(String orderInfo) {System.out.println("收到短信任務,正在發送...");// 這里調用短信接口(比如阿里云短信),實際代碼需要替換System.out.println("已給用戶發送短信:" + orderInfo);}
}
1、如何避免消息被重復處理?

如果你的場景是「多個消費者搶著處理同一條消息」(比如并行加速),需要確保 一條消息只被一個消費者處理。RabbitMQ 默認已經幫你實現了這一點!

2、原理:消息確認機制(ACK)
  • 當消費者收到消息后,RabbitMQ 會等待消費者「確認」(ACK)。
  • 如果消費者正常處理完消息并返回 ACK,RabbitMQ 會刪除這條消息,不會再發給其他消費者。
  • 如果消費者處理失敗(比如崩潰),RabbitMQ 會重新將消息分發給其他消費者。
3、注意事項
1. 消息冪等性(防重復處理)

如果消費者處理消息時,因為網絡問題導致 ACK 未成功返回,RabbitMQ 會重新發送消息,可能導致重復處理。
解決方法

  • 消息里加唯一標識(如訂單號)。
  • 處理前檢查是否已處理過(比如查數據庫)。
2. 消費者數量別太多!

concurrency 不是越大越好!如果消費者數量超過服務器 CPU 核心數,反而會因為線程切換浪費資源。
建議:根據業務耗時調整,比如處理耗時 1 秒的任務,消費者數量 = CPU 核心數 × 2 比較合理。

3. 手動確認消息(高級場景)

默認是自動 ACK(auto_ack=true),但如果處理消息可能失敗(比如調用外部接口超時),建議用手動 ACK。

@RabbitListener(queues = "order_queue", ackMode = "MANUAL")  // 手動確認
public void processOrder(String orderInfo, Channel channel, Message message) {try {// 處理消息...channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);  // 手動確認成功} catch (Exception e) {// 處理失敗,重新入隊(或發送到死信隊列)channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}
}

五、常見問題 & 注意事項

1. 消息丟失怎么辦?

  • 開啟「消息持久化」:在聲明隊列和交換機時,設置 durable=true(默認是 true,重啟 RabbitMQ 后消息不丟失)。
  • 生產者確認:配置 spring.rabbitmq.publisher-confirm-type=correlated,確保消息成功發到交換機。
  • 消費者確認:默認是 auto_ack=true(自動確認),如果需要手動確認(比如處理消息時可能失敗),可以設置 @RabbitListener(ackMode = "MANUAL"),處理完再調用 channel.basicAck()

2. 重復消費怎么辦?

  • 消息里加唯一標識(如訂單號),消費者處理前檢查是否已處理過(比如查數據庫)。

3. RabbitMQ 連不上?

  • 檢查 application.properties 里的 hostportusernamepassword 是否正確。
  • 遠程連接時,RabbitMQ 默認禁止 guest 用戶,需要新建用戶并授權(管理界面操作)。

六、總結

用 Spring Boot 整合 RabbitMQ 超簡單!核心步驟就 4 步:

  1. 配連接:在 application.properties 里填 RabbitMQ 地址。
  2. 定義規則:用 @Bean 聲明交換機、隊列和綁定關系。
  3. 發消息:用 RabbitTemplate.convertAndSend() 發送。
  4. 收消息:用 @RabbitListener 監聽隊列。

適合用 Spring Boot + RabbitMQ 的場景

  • 電商、物流等需要「異步任務」的系統。
  • 高并發場景(如雙11訂單洪峰)。
  • 多個模塊需要「松耦合」協作的系統(如訂單、短信、積分模塊)。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/88268.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/88268.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/88268.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Unity Demo-3DFarm詳解-其一

我們來拆解一個種田游戲&#xff0c;這個游戲種類內部的功能還是比較模板化的&#xff0c;我們來一點點說。我們大體上分為這么幾個部分&#xff1a;農場運營玩法角色與玩家互動物品與背包存檔和進度管理用戶界面系統農場運營可以大體上分為&#xff1a;種植系統&#xff1a;支…

esp8266驅動下載

問題描述&#xff1a;esp8266插上電腦&#xff0c;設備管理器無法識別&#xff0c;顯示為USB serial&#xff08;黃色感嘆號&#xff09; 首先確認你的esp8266是不是 CH340 系列的 USB 轉串口芯片 CH340驅動下載地址

大語言模型的極限:知識、推理與創造力的邊界探析

大語言模型的極限&#xff1a;知識、推理與創造力的邊界探析 人工智能領域的快速發展推動了大語言模型&#xff08;LLM&#xff09;的廣泛應用&#xff0c;這些模型在文本生成、知識問答和創意表達等方面展現出前所未有的能力。然而&#xff0c;隨著應用場景的深化&#xff0c;…

git中的fork指令解釋

在Git中&#xff0c;Fork 是指將他人的代碼倉庫&#xff08;Repository&#xff09;復制到自己的賬戶下&#xff0c;創建一個完全獨立的副本[1][2]。以下是關于Fork的詳細說明&#xff1a; Fork的定義與核心作用 定義&#xff1a;Fork是代碼托管平臺&#xff08;如GitHub&#…

iPhone 抓包工具有哪些?多工具對比分析優缺點

iOS 平臺一向以安全性著稱&#xff0c;這也使得對其進行網絡調試和抓包變得異常困難。相比安卓&#xff0c;iPhone 抓包難點主要在以下幾點&#xff1a; 系統限制代理設置的靈活性無法自由安裝根證書抓包常涉及 HTTPS 解密與雙向認證破解普通用戶設備無 root 或越獄權限 因此&a…

使用 libcu++ 庫

文章目錄使用 libcu 庫安裝與設置基本組件1. 原子操作2. 內存管理3. 類型特性4. 同步原語編譯選項注意事項使用 libcu 庫 libcu 是 NVIDIA 提供的 CUDA C 標準庫實現&#xff0c;它為 CUDA 開發者提供了類似 C 標準庫的功能和接口。以下是使用 libcu 的基本指南&#xff1a; …

[Leetcode] 預處理 | 多叉樹bfs | 格雷編碼 | static_cast | 矩陣對角線

魔術排列模擬一個特定的洗牌過程&#xff0c;并找到使得經過一系列洗牌和取牌操作后&#xff0c;能夠與給定的目標數組target相匹配的最小k值核心思想: 預處理初始排列&#xff1a;從一個按順序排列的數組&#xff08;例如&#xff0c;{1, 2, 3, ..., n}&#xff09;開始。洗牌…

【技術追蹤】SynPo:基于高質量負提示提升無訓練少樣本醫學圖像分割性能(MICCAI-2025)

SAM 新用法&#xff0c;無需訓練&#xff0c;利用高質量負提示提升分割性能~ 論文&#xff1a;SynPo: Boosting Training-Free Few-Shot Medical Segmentation via High-Quality Negative Prompts 代碼&#xff1a;https://liu-yufei.github.io/synpo-project-page/ 0、摘要 大…

深入理解機器學習

一.前言本章節開始來講解一下機器學習的知識&#xff0c;本期作為一個了解就大概介紹一下&#xff0c;我們不會從機器學習基礎開始介紹&#xff0c;但是后面會來補充&#xff0c;隨著ai的不斷發展&#xff0c;機器學習在ai的領域里面的占比越來約少&#xff0c;我們還是以應用為…

數據結構 順序表(1)

目錄 1.線性表 2.順序表 1.線性表 線性表&#xff08;linear list&#xff09;是n個具有相同特性的數據元素的有限序列。線性表是一種在實際中廣泛使用 的數據結構&#xff0c;常見的線性表&#xff1a;順序表、鏈表、棧、隊列、字符串… 線性表在邏輯上是線性結構&#…

openssl 生成國密證書

openssl生成證書生成CA私鑰 openssl ecparam -genkey -name SM2 -out ca.key.pem -noout證書請求 openssl req -new -key ca.key.pem -out ca.cert.req -subj “/CNrtems-strongswan-CA”生成證書 openssl x509 -req -days 3650 -in ca.cert.req -signkey ca.key.pem -out ca.c…

系統架構設計師論文分享-論分布式事務技術及其應用

我的軟考歷程 摘要 2023年9月&#xff0c;我所在的公司通過了研發紗線MES系統的立項&#xff0c;該系統為國內紗線工廠提供SAAS服務&#xff0c;旨在提高紗線工廠的數字化和智能化水平。我在該項目中擔任系統架構設計師一職&#xff0c;負責該項目的架構設計工作。本文結合我…

東土科技智能塔機系統亮相南京,助力智能建造高質量發展

近日&#xff0c;由南京市城鄉建設委員會、江蘇省土木建筑學會主辦的“無人駕駛智能塔機觀摩會”&#xff0c;在中建三局一公司南京揚子江智慧中心項目現場成功舉辦。作為全國首批智能建造試點城市&#xff0c;南京市已出臺20余項支持政策&#xff0c;落地93個試點項目&#xf…

3D Surface Reconstruction with Enhanced High-Frequency Details

3D Surface Reconstruction with Enhanced High-Frequency Details核心問題&#xff1a;當前基于神經隱式表示&#xff08;如 NeuS&#xff09;的 3D 表面重建方法&#xff0c;通常采用隨機采樣策略。這種隨機采樣難以充分捕捉圖像中的高頻細節區域&#xff08;如紋理、邊緣、光…

Science Robotics 耶魯大學開源視觸覺新范式,看出機器人柔性手的力感知

摘要&#xff1a;在機器人視觸覺傳感領域&#xff0c;如何兼顧成本與性能始終是一大挑戰。耶魯大學在《Science Robotics》上發表最新研究&#xff0c;提出了一種“Forces for Free”&#xff08;F3&#xff09;新范式。該研究通過觀測一個經過特殊優化的開源柔性手&#xff08…

關于java項目中maven的理解

我的理解&#xff1a;maven是java項目的依賴管理工具&#xff0c;通過pom.xml文件配置要下載的依賴&#xff0c;settings.xml配置maven下載的鏡像沒有就默認在maven中央倉庫下載依賴&#xff0c;本地倉庫是存儲下載好的依賴ai:1. 功能定位局限Maven 不只是依賴管理工具&#xf…

緩存三大問題詳解與工業級解決方案

文章目錄緩存三大問題詳解與工業級解決方案概念總覽問題詳解1. 緩存穿透 (Cache Penetration)問題描述典型場景危害2. 緩存擊穿 (Cache Breakdown)問題描述典型場景危害3. 緩存雪崩 (Cache Avalanche)問題描述典型場景危害工業級解決方案緩存穿透解決方案方案1: 布隆過濾器方案…

FreeRTOS 中主函數 while 循環與任務創建的緊密聯系

FreeRTOS 中主函數 while 循環與任務創建的緊密聯系 在嵌入式開發領域&#xff0c;FreeRTOS 是一款被廣泛應用的輕量級實時操作系統&#xff0c;為開發者提供了高效的多任務調度機制。對于初學者來說&#xff0c;理解主函數中的 while 循環與通過 xTaskCreate 創建的任務之間的…

Flutter基礎(前端教程⑦-Http和卡片)

1. 假設后端返回的數據格式{"code": 200,"data": [{"name": "張三","age": 25,"email": "zhangsanexample.com","avatar": "https://picsum.photos/200/200?random1","statu…

pytorch chunk 切塊

目錄 chunk切塊 chunk???????切塊 import torch# 創建一個形狀為 [2, 3, 4] 的張量 x torch.arange(6).reshape(2, 3) print("原始張量形狀:", x.shape) print("x:", x) # 輸出: 原始張量形狀: torch.Size([2, 3, 4])# 沿著最后一個維度分割成 2 …