Spring Boot 整合 RabbitMQ
一、概述:RabbitMQ 是什么?
你可以把 RabbitMQ 想象成一個「快遞中轉站」。
比如你在網上買了一本書,賣家(生產者)把包裹(消息)交給快遞站(RabbitMQ),快遞站根據包裹上的地址(規則)把包裹分給不同的快遞員(消費者),最后送到你家(業務系統)。
RabbitMQ 是一個專門用來「傳遞消息」的軟件(專業叫「消息中間件」),它能讓不同的程序、不同的電腦之間高效地「傳小紙條」。
二、RabbitMQ 的「快遞分類方式」(交換機類型)
快遞站分包裹時,可能按「地址」「重量」「緊急程度」分類。RabbitMQ 也有類似的「分類規則」,叫 交換機(Exchange)。常用的有 4 種:
1. 直連交換機(Direct Exchange)
規則:包裹上必須寫「精確地址」(路由鍵 Routing Key),只有地址完全匹配的快遞員才能收到。
例子:賣家給「北京-朝陽區」的包裹,只有負責朝陽區的快遞員能接。
2. 扇形交換機(Fanout Exchange)
規則:不管地址,「所有快遞員」都能收到包裹(廣播模式)。
例子:賣家發「雙11大促通知」,所有快遞員都要知道,一起準備加班。
3. 主題交換機(Topic Exchange)
規則:地址可以用「通配符」(比如 *
代表一個詞,#
代表多個詞)。
例子:賣家發「北京.*」的包裹,所有地址以「北京」開頭的快遞員(如北京-朝陽、北京-海淀)都能收到。
4. 頭交換機(Headers Exchange)
規則:不看地址,看包裹上的「標簽」(Headers 頭信息,比如「優先級=高」)。
例子:賣家標「緊急」的包裹,只有關注「緊急」標簽的快遞員能接。
三、RabbitMQ 的使用場景(為什么需要它?)
1. 異步處理:省時間!
比如你在淘寶下單,系統需要「扣庫存+發短信+更新積分」。如果一步步做,可能要等 5 秒;用 RabbitMQ 可以把「發短信」和「更新積分」的任務丟給 RabbitMQ,主流程只需要 1 秒完成下單,剩下的由其他程序慢慢處理。
2. 流量削峰:防崩潰!
雙11時,訂單像洪水一樣涌來,系統直接處理可能被沖垮。RabbitMQ 像「水庫」,把訂單暫時存起來,系統按自己的速度慢慢處理(比如每秒處理 1000 單),避免被瞬間的高流量沖垮。
3. 系統解耦:不互相拖累!
比如電商系統有「訂單模塊」「庫存模塊」「短信模塊」。如果訂單模塊直接調用庫存和短信模塊,一旦短信模塊崩潰,訂單也會失敗。用 RabbitMQ 后,訂單模塊只需要把消息發給 RabbitMQ,其他模塊自己來取,互不影響。
四、整合Springboot
1. 配置 RabbitMQ 連接
1.Maven
<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId>
</dependency>
2.配置文件,yml和properties選擇一個
spring:rabbitmq:host: 117.185.165.187port: 5672username: rabbitmqpassword: j8iG3KYs7Wmxxx
# RabbitMQ 服務器地址(默認 localhost:5672)
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
# 登錄賬號密碼(默認 guest/guest,注意:遠程連接需要改密碼!)
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
2、定義「快遞規則」:交換機和隊列
RabbitMQ 的消息需要通過「交換機(Exchange)」和「隊列(Queue)」傳遞。我們需要先告訴 Spring Boot 要創建哪些交換機和隊列。
新建 RabbitMQConfig.java
,用 @Bean
聲明交換機、隊列和綁定關系。
做一個「電商下單后發通知」的功能,需要:
- 一個直連交換機(
order_exchange
)。 - 一個隊列(
sms_queue
),專門存「需要發短信的訂單」。 - 把隊列和交換機綁定,路由鍵是
send_sms
(只有路由鍵匹配的消息才會進這個隊列)。
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 1. 聲明直連交換機(名字叫 order_exchange)@Beanpublic DirectExchange orderExchange() {return new DirectExchange("order_exchange");}// 2. 聲明隊列(名字叫 sms_queue,存需要發短信的訂單)@Beanpublic Queue smsQueue() {return new Queue("sms_queue");}// 3. 把隊列和交換機綁定,路由鍵是 send_sms(只有路由鍵匹配的消息才會進這個隊列)@Beanpublic Binding smsBinding(Queue smsQueue, DirectExchange orderExchange) {return BindingBuilder.bind(smsQueue).to(orderExchange).with("send_sms"); // 路由鍵必須和生產者發送時一致}
}
如果說是多個隊列按照下面的
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 1. 聲明直連交換機(名字叫 order_exchange)@Beanpublic DirectExchange orderExchange() {return new DirectExchange("order_exchange");}// 2. 聲明 3 個隊列(短信、積分、日志)@Beanpublic Queue smsQueue() {return new Queue("sms_queue"); // 存需要發短信的訂單}@Beanpublic Queue scoreQueue() {return new Queue("score_queue"); // 存需要更新積分的訂單}@Beanpublic Queue logQueue() {return new Queue("log_queue"); // 存需要記錄日志的訂單}// 3. 綁定 sms_queue(路由鍵 send_sms)@Beanpublic Binding smsBinding(Queue smsQueue, DirectExchange orderExchange) {return BindingBuilder.bind(smsQueue).to(orderExchange).with("send_sms"); // 路由鍵:只有 send_sms 的消息會進 sms_queue}// 4. 綁定 score_queue(路由鍵 update_score)@Beanpublic Binding scoreBinding(Queue scoreQueue, DirectExchange orderExchange) {return BindingBuilder.bind(scoreQueue).to(orderExchange).with("update_score"); // 路由鍵:只有 update_score 的消息會進 score_queue}// 5. 綁定 logQueue(路由鍵 log_order)@Beanpublic Binding logBinding(Queue logQueue, DirectExchange orderExchange) {return BindingBuilder.bind(logQueue).to(orderExchange).with("log_order"); // 路由鍵:只有 log_order 的消息會進 log_queue}}
3、生產者:發送消息(賣家發包裹)
用 RabbitTemplate
(Spring 提供的發消息工具)發送消息到交換機。
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class OrderService {// 注入 RabbitTemplate(Spring 自動幫我們創建好的發消息工具)@Autowiredprivate RabbitTemplate rabbitTemplate;// 用戶下單后,發送消息到 RabbitMQpublic void createOrder(String orderInfo) {// 1. 主流程:扣庫存、保存訂單(這里簡化,直接打印)System.out.println("主流程:訂單已保存,開始扣庫存...");// 2. 異步任務:發送短信通知(把消息發給 RabbitMQ)rabbitTemplate.convertAndSend("order_exchange", // 交換機名字"send_sms", // 路由鍵(和隊列綁定的路由鍵一致)orderInfo // 消息內容(比如訂單詳情));System.out.println("已發送短信通知任務到 RabbitMQ");}
}
4、消費者:接收消息(快遞員收包裹)
用 @RabbitListener
注解監聽隊列,自動接收并處理消息。
新建消費者服務類
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SmsConsumer {// 監聽 sms_queue 隊列,有消息就自動觸發這個方法@RabbitListener(queues = "sms_queue")public void sendSms(String orderInfo) {System.out.println("收到短信任務,正在發送...");// 這里調用短信接口(比如阿里云短信),實際代碼需要替換System.out.println("已給用戶發送短信:" + orderInfo);}
}
如果說是多線程處理就多添加一個配置concurrency = "5"
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SmsConsumer {// 監聽 sms_queue 隊列,有消息就自動觸發這個方法@RabbitListener(queues = "sms_queue",concurrency = "5")public void sendSms(String orderInfo) {System.out.println("收到短信任務,正在發送...");// 這里調用短信接口(比如阿里云短信),實際代碼需要替換System.out.println("已給用戶發送短信:" + orderInfo);}
}
1、如何避免消息被重復處理?
如果你的場景是「多個消費者搶著處理同一條消息」(比如并行加速),需要確保 一條消息只被一個消費者處理。RabbitMQ 默認已經幫你實現了這一點!
2、原理:消息確認機制(ACK)
- 當消費者收到消息后,RabbitMQ 會等待消費者「確認」(ACK)。
- 如果消費者正常處理完消息并返回 ACK,RabbitMQ 會刪除這條消息,不會再發給其他消費者。
- 如果消費者處理失敗(比如崩潰),RabbitMQ 會重新將消息分發給其他消費者。
3、注意事項
1. 消息冪等性(防重復處理)
如果消費者處理消息時,因為網絡問題導致 ACK 未成功返回,RabbitMQ 會重新發送消息,可能導致重復處理。
解決方法:
- 消息里加唯一標識(如訂單號)。
- 處理前檢查是否已處理過(比如查數據庫)。
2. 消費者數量別太多!
concurrency
不是越大越好!如果消費者數量超過服務器 CPU 核心數,反而會因為線程切換浪費資源。
建議:根據業務耗時調整,比如處理耗時 1 秒的任務,消費者數量 = CPU 核心數 × 2 比較合理。
3. 手動確認消息(高級場景)
默認是自動 ACK(auto_ack=true
),但如果處理消息可能失敗(比如調用外部接口超時),建議用手動 ACK。
@RabbitListener(queues = "order_queue", ackMode = "MANUAL") // 手動確認
public void processOrder(String orderInfo, Channel channel, Message message) {try {// 處理消息...channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 手動確認成功} catch (Exception e) {// 處理失敗,重新入隊(或發送到死信隊列)channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}
}
五、常見問題 & 注意事項
1. 消息丟失怎么辦?
- 開啟「消息持久化」:在聲明隊列和交換機時,設置
durable=true
(默認是true
,重啟 RabbitMQ 后消息不丟失)。 - 生產者確認:配置
spring.rabbitmq.publisher-confirm-type=correlated
,確保消息成功發到交換機。 - 消費者確認:默認是
auto_ack=true
(自動確認),如果需要手動確認(比如處理消息時可能失敗),可以設置@RabbitListener(ackMode = "MANUAL")
,處理完再調用channel.basicAck()
。
2. 重復消費怎么辦?
- 消息里加唯一標識(如訂單號),消費者處理前檢查是否已處理過(比如查數據庫)。
3. RabbitMQ 連不上?
- 檢查
application.properties
里的host
、port
、username
、password
是否正確。 - 遠程連接時,RabbitMQ 默認禁止
guest
用戶,需要新建用戶并授權(管理界面操作)。
六、總結
用 Spring Boot 整合 RabbitMQ 超簡單!核心步驟就 4 步:
- 配連接:在
application.properties
里填 RabbitMQ 地址。 - 定義規則:用
@Bean
聲明交換機、隊列和綁定關系。 - 發消息:用
RabbitTemplate.convertAndSend()
發送。 - 收消息:用
@RabbitListener
監聽隊列。
適合用 Spring Boot + RabbitMQ 的場景:
- 電商、物流等需要「異步任務」的系統。
- 高并發場景(如雙11訂單洪峰)。
- 多個模塊需要「松耦合」協作的系統(如訂單、短信、積分模塊)。