一、mq的作用和使用場景
MQ的基本作用
MQ(Message Queue,消息隊列)是一種應用程序對應用程序的通信方法,主要作用包括:
-
異步處理:解耦生產者和消費者,允許生產者發送消息后立即返回,消費者異步處理
-
應用解耦:降低系統間的直接依賴,通過消息進行間接通信
-
流量削峰:緩沖突發流量,避免系統被壓垮
-
消息通信:實現系統間的可靠消息傳遞
-
最終一致性:支持分布式事務的最終一致性方案
主要使用場景
1. 異步處理
-
用戶注冊后發送郵件/短信:注冊流程快速完成,通知類操作異步處理
-
日志收集:應用將日志發送到MQ,由專門服務異步處理
2. 應用解耦
-
電商訂單系統:訂單服務生成訂單后通過MQ通知庫存、物流、支付等系統
-
微服務架構:服務間通過MQ通信而非直接調用
3. 流量削峰
-
秒殺系統:將大量請求先放入MQ,按系統能力逐步處理
-
突發流量處理:應對促銷活動等流量高峰
4. 日志處理
-
大數據分析:收集各系統日志到MQ,由大數據平臺統一處理
-
實時監控:系統指標通過MQ傳輸到監控平臺
5. 消息通信
-
聊天系統:用戶消息通過MQ傳遞
-
通知系統:系統間的事件通知
適用場景總結表
場景 | 關鍵技術 | 優勢 |
---|---|---|
應用解耦 | 消息隊列 | 減少系統間直接依賴 |
異步處理 | 生產者-消費者模型 | 提升響應速度 |
流量削峰 | 隊列積壓+限速消費 | 保護后端系統 |
跨語言通信 | AMQP 多語言支持 | 統一通信協議 |
發布/訂閱 | Exchange(fanout/topic) | 一對多消息廣播 |
延遲隊列 | TTL + 死信隊列 | 實現定時任務 |
二、mq的優點
1. 解耦系統組件
-
生產者和消費者無需相互感知對方的存在
-
系統間通過消息通信而非直接調用,降低耦合度
-
新增消費者不會影響生產者代碼
2. 異步處理提升性能
-
生產者發送消息后無需等待消費者處理完成
-
非關鍵路徑操作可異步執行(如發送通知、記錄日志)
-
顯著減少系統響應時間,提高吞吐量
3. 流量削峰與過載保護
-
緩沖突發流量,避免系統被瞬間高峰壓垮
-
消費者可按自身處理能力從隊列獲取消息
-
特別適合秒殺、促銷等瞬時高并發場景
4. 提高系統可靠性
-
消息持久化確保重要數據不丟失
-
重試機制和死信隊列處理失敗消息
-
網絡波動時仍能保證消息最終送達
5. 擴展性強
-
可輕松增加消費者實例提高處理能力
-
天然支持分布式系統架構
-
各組件可獨立擴展(生產者、MQ本身、消費者)
6. 順序保證
-
某些MQ(如Kafka)可保證消息順序性
-
對需要嚴格順序的業務場景非常重要
7. 最終一致性支持
-
實現分布式事務的最終一致性方案
-
通過消息驅動的方式同步系統狀態
-
比強一致性方案性能更高
8. 靈活的通信模式
-
支持點對點、發布/訂閱等多種模式
-
可實現廣播、組播等不同消息分發方式
-
適應各種業務場景需求
9. 系統恢復能力
-
消費者宕機恢復后可從斷點繼續消費
-
避免數據丟失或重復處理
-
支持消息回溯重新消費
10. 平衡資源利用率
-
平滑系統負載,避免資源閑置或過載
-
提高整體資源使用效率
-
降低系統建設成本(無需按峰值配置資源)
三、mq的缺點
1. 系統復雜度增加
-
引入MQ后,系統架構變得更加復雜,需要額外維護MQ集群
-
需要處理消息的發送、接收、確認、重試等邏輯
-
增加了調試和問題排查的難度(如消息丟失、重復消費等)
2. 消息一致性問題
-
消息丟失:生產者發送失敗、MQ宕機、消費者處理失敗都可能導致消息丟失
-
消息重復:網絡問題或消費者超時可能導致消息被重復消費(需業務層做冪等處理)
-
順序問題:某些MQ(如Kafka)只能保證分區內有序,全局有序需要額外設計
3. 延遲問題
-
異步處理導致延遲:消息隊列的消費通常是異步的,不適合實時性要求極高的場景(如支付交易)
-
堆積時延遲加劇:如果消費者處理速度跟不上,消息堆積會導致延遲越來越高
4. 運維成本高
-
集群管理:MQ本身需要高可用部署(如Kafka的ZooKeeper依賴、RabbitMQ的鏡像隊列)
-
監控與告警:需監控消息積壓、消費延遲、錯誤率等指標
-
資源占用:MQ集群可能占用較多CPU、內存和磁盤IO
5. 數據一致性與事務問題
-
分布式事務挑戰:如果業務涉及數據庫和MQ的協同(如扣庫存+發消息),需要引入事務消息或本地消息表等方案
-
最終一致性:MQ通常只保證最終一致性,不適合強一致性要求的場景
6. 依賴風險
-
MQ成為單點故障:如果MQ集群崩潰,可能導致整個系統不可用
-
版本兼容性問題:MQ升級可能影響生產者和消費者的兼容性
7. 消息積壓風險
-
消費者處理能力不足:如果消費者宕機或處理緩慢,消息會堆積,可能導致MQ存儲爆滿
-
影響新消息處理:積壓嚴重時,新消息可能被阻塞或丟棄
8. 不適合所有場景
-
低延遲場景:如高頻交易、實時游戲,MQ的異步機制可能引入不可接受的延遲
-
小規模系統:如果系統簡單,直接調用可能比引入MQ更高效
四、mq相關產品,每種產品的特點
1. RabbitMQ
特點:
-
基于AMQP協議,支持多種客戶端語言
-
輕量級,易于部署和管理
-
提供靈活的路由機制(直連/主題/扇出/頭交換)
-
支持消息確認、持久化、優先級隊列
-
集群部署相對簡單
-
社區活躍,文檔完善
適用場景:
-
中小規模消息處理
-
需要復雜路由規則的場景
-
企業級應用集成
-
對延遲要求不高的異步任務
2. Kafka
特點:
-
超高吞吐量(百萬級TPS)
-
分布式、高可用設計
-
基于發布/訂閱模式
-
消息持久化存儲(可配置保留時間)
-
支持消息回溯和批量消費
-
水平擴展能力強
-
支持流式處理(Kafka Streams)
適用場景:
-
大數據日志收集與分析
-
實時流處理
-
高吞吐量消息系統
-
事件溯源
-
監控數據聚合
3. RocketMQ
特點:
-
阿里開源,經受雙11考驗
-
支持事務消息
-
嚴格的順序消息
-
支持消息軌跡查詢
-
分布式架構,高可用
-
支持定時/延遲消息
-
支持消息過濾
適用場景:
-
電商交易系統
-
金融支付場景
-
需要嚴格順序的消息處理
-
分布式事務場景
4. ActiveMQ
特點:
-
支持JMS規范
-
支持多種協議(STOMP、AMQP、MQTT等)
-
提供消息持久化和事務支持
-
支持集群部署
-
相對輕量級
適用場景:
-
傳統企業應用集成
-
需要JMS支持的場景
-
中小型消息系統
-
IoT設備通信
5. Pulsar
特點:
-
云原生設計,計算存儲分離架構
-
支持多租戶
-
低延遲和高吞吐并存
-
支持多種消費模式(獨占/共享/故障轉移)
-
支持分層存儲(熱數據+冷數據)
-
內置函數計算能力
適用場景:
-
云原生應用
-
多租戶SaaS平臺
-
需要統一消息和流處理的場景
-
混合云部署
6. ZeroMQ
特點:
-
無中間件,基于庫的方式
-
極高性能(納秒級延遲)
-
支持多種通信模式(請求-響應/發布-訂閱等)
-
輕量級,無消息持久化
-
無broker架構
適用場景:
-
高性能計算
-
低延遲通信
-
進程間通信
-
不需要持久化的場景
7. NATS
特點:
-
極簡設計,性能優異
-
無持久化(NATS Streaming提供持久化擴展)
-
支持請求-響應模式
-
輕量級,適合云環境
-
低資源消耗
適用場景:
-
IoT設備通信
-
云原生微服務
-
不需要持久化的實時消息
-
服務發現和配置分發
選型建議對比表
特性 \ MQ | RabbitMQ | Kafka | RocketMQ | Pulsar | ActiveMQ |
---|---|---|---|---|---|
吞吐量 | 中 | 極高 | 高 | 高 | 中 |
延遲 | 低 | 中 | 低 | 低 | 中 |
順序保證 | 有限 | 分區有序 | 嚴格有序 | 分區有序 | 有限 |
持久化 | 支持 | 支持 | 支持 | 支持 | 支持 |
事務支持 | 有限 | 支持 | 支持 | 支持 | 支持 |
集群擴展 | 中等 | 容易 | 中等 | 容易 | 中等 |
運維復雜度 | 低 | 高 | 中 | 中 | 低 |
適用規模 | 中小 | 超大 | 中大 | 中大 | 中小 |
五、rabbitmq的搭建過程
Docker安裝方式:
# 拉取鏡像
docker pull rabbitmq:management
?
# 運行容器
docker run -d --name rabbitmq \
-p 5672:5672 -p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=password \
rabbitmq:management
Linux安裝方式:
# 1. 安裝Erlang
sudo apt-get install erlang
?
# 2. 下載RabbitMQ
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.9/rabbitmq-server_3.8.9-1_all.deb
?
# 3. 安裝
sudo dpkg -i rabbitmq-server_3.8.9-1_all.deb
?
# 4. 啟動服務
sudo systemctl start rabbitmq-server
?
# 5. 啟用管理插件
sudo rabbitmq-plugins enable rabbitmq_management
?
# 6. 創建用戶
sudo rabbitmqctl add_user admin password
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
六、rabbitmq相關角色
1. 生產者 (Publisher/Producer)
-
職責:創建并發送消息到 RabbitMQ 服務器
-
特點:
-
不直接將消息發送到隊列,而是發送到交換器 (Exchange)
-
可以設置消息屬性(如持久化、優先級等)
-
通常不知道消息最終會被哪些消費者接收
-
2. 消費者 (Consumer)
-
職責:接收并處理來自隊列的消息
-
特點:
-
可以訂閱一個或多個隊列
-
可以手動或自動確認消息 (ack/nack)
-
可以設置 QoS(服務質量)控制預取數量
-
3. 代理服務器 (Broker)
-
職責:RabbitMQ 服務本身,負責接收、路由和傳遞消息
-
組成:
-
Exchange(交換器)
-
Queue(隊列)
-
Binding(綁定)
-
4. 交換器 (Exchange)
-
類型:
類型 路由規則 典型用途 Direct 精確匹配 Routing Key 點對點精確路由 Fanout 忽略 Routing Key,廣播到所有綁定隊列 廣播通知 Topic 模糊匹配 Routing Key(支持通配符) 多條件路由 Headers 根據消息頭屬性匹配 復雜路由條件 -
特性:
-
接收生產者發送的消息
-
根據類型和綁定規則將消息路由到隊列
-
可以持久化(在服務器重啟后仍然存在)
-
5. 隊列 (Queue)
-
特性:
-
消息存儲的緩沖區
-
可以有多個消費者(競爭消費模式)
-
可配置屬性:
-
持久化(Durable)
-
自動刪除(Auto-delete)
-
排他性(Exclusive)
-
消息 TTL(存活時間)
-
最大長度等
-
-
6. 綁定 (Binding)
-
作用:連接 Exchange 和 Queue 的規則
-
組成要素:
-
Exchange 名稱
-
Queue 名稱
-
Routing Key(或用于 Headers Exchange 的匹配參數)
-
7. 通道 (Channel)
-
特點:
-
在 TCP 連接上建立的虛擬連接
-
輕量級,減少 TCP 連接開銷
-
每個 Channel 有獨立 ID
-
建議每個線程使用獨立的 Channel
-
8. 虛擬主機 (Virtual Host)
-
作用:提供邏輯隔離環境
-
特點:
-
類似于命名空間
-
每個 vhost 有獨立的 Exchange、Queue 和綁定
-
需要單獨配置權限
-
默認 vhost 為 "/"
-
9. 管理員角色 (Administrator)
-
權限:
-
管理用戶權限
-
創建/刪除 vhost
-
查看所有資源
-
通常通過 rabbitmqctl 工具或管理界面操作
-
10. 插件系統 (Plugins)
-
常見插件:
-
rabbitmq_management
:提供 Web 管理界面 -
rabbitmq_shovel
:跨集群消息轉移 -
rabbitmq_federation
:分布式部署支持 -
rabbitmq_delayed_message_exchange
:延遲消息
-
角色交互示意圖
+------------+ ? ? ? +---------+ ? ? ? +-------+ ? ? ? +--------+
| Publisher | ----> | Exchange| ====> | Queue | <---- | Consumer|
+------------+ ? ? ? +---------+ ? ? ? +-------+ ? ? ? +--------+(Binding)
七、rabbitmq內部組件
1、ConnectionFactory(連接管理器):應用程序與Rabbit之間建立連接的管理器,程序代碼中使用。
2、Channel(信道):消息推送使用的通道。
3、Exchange(交換器):用于接受、分配消息。
4、Queue(隊列):用于存儲生產者的消息。
5、RoutingKey(路由鍵):用于把生成者的數據分配到交換器上。
6、BindingKey(綁定鍵):用于把交換器的消息綁定到隊列上。
八、生產者發送消息的過程?
一、建立連接階段
-
TCP連接建立
-
生產者應用通過AMQP客戶端庫發起TCP連接
-
默認端口5672(帶管理插件時為5672/15672)
-
三次握手完成后建立物理連接
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("rabbitmq-host"); factory.setPort(5672); Connection connection = factory.newConnection();
-
-
認證與vhost選擇
-
發送START/START-OK協議幀進行認證
-
選擇虛擬主機(vhost),默認"/"
-
認證失敗會收到CONNECTION-CLOSE幀
-
二、通道創建階段
-
通道(Channel)初始化
-
在TCP連接上創建虛擬通道(Channel)
-
每個Channel有唯一ID(從1開始遞增)
-
通道參數協商:
-
Frame Max Size(默認128KB)
-
Channel Max(默認2047)
-
Channel channel = connection.createChannel();
-
-
交換器聲明(可選)
-
檢查目標Exchange是否存在
-
不存在時根據參數自動創建
-
關鍵參數:
-
type:exchange類型(direct/fanout/topic/headers)
-
durable:是否持久化
-
autoDelete:無綁定時是否自動刪除
-
channel.exchangeDeclare("order.exchange", "direct", true);
-
三、消息發布階段
-
消息構造
-
組成結構:
{"body": "消息內容(二進制)","properties": {"delivery_mode": 2, # 1-非持久化 2-持久化"priority": 0, ? ? ? # 0-9優先級"headers": {}, ? ? ? # 自定義頭"timestamp": 1620000000} }
-
-
發布消息到Exchange
-
通過Basic.Publish命令發送
-
關鍵參數:
-
exchange:目標交換器名稱
-
routingKey:路由鍵
-
mandatory:是否觸發Return回調
-
immediate:已廢棄參數
-
channel.basicPublish("order.exchange", "order.create", MessageProperties.PERSISTENT_TEXT_PLAIN,messageBodyBytes );
-
四、消息路由階段
-
Exchange路由決策
-
根據Exchange類型處理:
-
Direct:精確匹配routingKey
-
Fanout:忽略routingKey,廣播到所有綁定隊列
-
Topic:通配符匹配(*匹配一個詞,#匹配零或多個詞)
-
Headers:匹配header鍵值對
-
-
-
隊列投遞
-
成功匹配:消息進入隊列內存緩沖區
-
無匹配時處理:
-
設置了alternate-exchange:轉到備用交換器
-
未設置備用交換器且mandatory=true:觸發Return回調
-
否則丟棄消息
-
-
五、確認階段
-
Confirm模式(可選)
-
開啟方式:
channel.confirmSelect(); // 開啟Confirm模式
-
確認機制:
-
單條確認:
waitForConfirms()
-
批量確認:
waitForConfirmsOrDie()
-
異步回調:
channel.addConfirmListener((sequenceNumber, multiple) -> {// 處理ack }, (sequenceNumber, multiple) -> {// 處理nack });
-
-
-
事務模式(可選)
-
事務操作流程:
channel.txSelect(); // 開啟事務 try {channel.basicPublish(...);channel.txCommit(); // 提交事務 } catch (Exception e) {channel.txRollback(); // 回滾事務 }
-
六、資源釋放階段
-
通道關閉
-
發送Channel.Close命令
-
處理未確認消息:
-
事務模式:回滾未提交消息
-
Confirm模式:未確認消息會觸發nack
-
-
-
連接關閉
-
發送Connection.Close命令
-
服務端釋放相關資源
-
客戶端等待TCP連接正常關閉
-
九、消費者接收消息過程?
一、連接建立階段
-
TCP連接初始化
-
消費者客戶端與RabbitMQ服務器建立TCP連接(默認端口5672)
-
完成AMQP協議握手:
-
-
通道創建
-
在TCP連接上創建虛擬通道(Channel)
-
每個Channel獨立維護消息流狀態
-
關鍵參數設置:
Channel channel = connection.createChannel(); channel.basicQos(10); // 設置prefetch count
-
二、隊列訂閱階段
-
隊列聲明與檢查
-
檢查目標隊列是否存在
-
自動創建隊列(如果不存在且允許):
channel.queueDeclare("order.queue", true, false, false, null);
-
隊列參數解析:
-
durable:是否持久化
-
exclusive:是否排他隊列
-
autoDelete:無消費者時是否自動刪除
-
arguments:擴展參數(TTL、死信等)
-
-
-
消費者注冊
-
向Broker注冊消費者標簽(consumer tag)
-
選擇消費模式:
-
推模式(Push API):服務端主動推送
-
拉模式(Basic.Get):客戶端主動拉取
-
-
三、消息接收階段
-
消息推送機制
-
Broker按照QoS設置推送消息:
while (unacked_count < prefetch_count) and (queue.has_messages):message = queue.next_message()send_to_consumer(message)unacked_count += 1
-
消息幀結構:
Basic.Deliver(consumer-tag,delivery-tag,redelivered,exchange,routing-key ) Message Body
-
-
消息處理流程
-
消費者接收消息后的處理步驟:
-
反序列化消息體
-
驗證消息完整性
-
執行業務邏輯
-
發送ack/nack
-
處理異常情況
-
-
四、確認與反饋階段
-
消息確認機制
-
自動確認(autoAck=true):
-
消息發出即視為成功
-
高風險(消息可能處理失敗但已確認)
-
-
手動確認(autoAck=false):
// 成功處理 channel.basicAck(deliveryTag, false); // 處理失敗(requeue=true重新入隊) channel.basicNack(deliveryTag, false, true);
-
關鍵參數:
-
deliveryTag:消息唯一標識
-
multiple:是否批量操作
-
requeue:是否重新入隊
-
-
-
拒絕消息處理
-
三種拒絕方式對比:
方法 是否批量 是否重入隊列 適用場景 basicReject 否 可配置 單條消息處理失敗 basicNack 是 可配置 批量消息處理異常 basicRecover - 是 重新投遞未ack消息
-
五、流量控制機制
-
QoS預取設置
-
作用:限制未確認消息數量
-
全局 vs 通道級:
// 單通道限制 channel.basicQos(10); // 全局限制(所有通道總和) channel.basicQos(10, true);
-
最佳實踐值:
-
高吞吐場景:100-300
-
高延遲任務:5-10
-
-
-
流控(Flow Control)
-
當消費者處理能力不足時:
-
Broker暫停發送新消息
-
觸發Channel.Flow命令
-
消費者處理積壓后恢復流動
-
-
六、異常處理階段
-
連接中斷處理
-
自動恢復機制:
factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(5000);
-
恢復過程:
-
重建TCP連接
-
恢復所有Channel
-
重新注冊消費者
-
恢復未ack消息(根據redelivered標記)
-
-
-
死信處理
-
觸發條件:
-
消息被拒絕且requeue=false
-
消息TTL過期
-
隊列達到長度限制
-
-
死信隊列配置:
Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", "dlx.exchange"); channel.queueDeclare("order.queue", true, false, false, args);
-
消費者最佳實踐
-
冪等性設計
// 使用消息ID實現冪等 if (processedMessageIds.contains(messageId)) {channel.basicAck(tag, false);return; }
-
批量確認優化
// 每處理100條消息批量確認一次 if (messageCount % 100 == 0) {channel.basicAck(lastTag, true); }
-
死信監控
// 監聽死信隊列 channel.basicConsume("dlx.queue", false, (tag, msg) -> {log.error("死信消息: {}", msg.getBody());channel.basicAck(tag, false); });
-
消費者標簽管理
// 優雅關閉消費者 void shutdown() {channel.basicCancel(consumerTag);// 等待處理中的消息完成while (inProgressCount > 0) {Thread.sleep(100);} }
十、springboot項目中如何使用mq?
十一、如何保障消息不丟失?
1、發送階段:發送階段保障消息到達交換機 事務機制|confirm確認機制
2、存儲階段:持久化機制 交換機持久化、隊列的持久化、消息內容的持久化
3、消費階段:消息的確認機制 自動ack|手動ack
接收方消息確認機制
自動ack|手動ack
spring:rabbitmq:host: 1.94.230.82port: 5672username: adminpassword: 123456virtual-host: /yan3listener:simple:acknowledge-mode: manualdirect:acknowledge-mode: manual
package com.hl.rabbitmq01.web;
?
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
?
import java.io.IOException;
?
@RestController
@RequestMapping("/c")
public class ConsumerController {
?@RabbitListener(queues = {"topicQueue01"})public void receive(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());System.out.println(msg);//業務邏輯 比如傳入訂單id,根據訂單id,減少庫存、支付等,// 如果操作成功,確認消息(從隊列移除),如果操作失敗,手動拒絕消息if(msg.length() >= 5){//確認消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}else{//拒絕消息 not ack// 第三個參數:requeue:重回隊列。如果設置為true,則消息重新回到queue,broker會重新發送該消息給消費端channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
// ? ? ? ? ? channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}
?
?}
}
消息的持久化機制
交換機的持久化
隊列的持久化
消息內容的持久化
package com.hl.rabbitmq01.direct;
?
import com.hl.rabbitmq01.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
/*
生產者 javaSE方式簡單測試
發布訂閱-------direct模型
生產者----消息隊列----消費者*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1、創建連接Connection connection = MQUtil.getConnection();//2、基于連接,創建信道Channel channel = connection.createChannel();//3、基于信道,創建隊列/*參數:1. queue:隊列名稱,如果沒有一個名字叫simpleQueue01的隊列,則會創建該隊列,如果有則不會創建2. durable:是否持久化,當mq重啟之后,消息還在3. exclusive:* 是否獨占。只能有一個消費者監聽這隊列4。當Connection關閉時,是否刪除隊列autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉5. arguments:參數。*/channel.queueDeclare("directQueue01", true, false, false, null);channel.queueDeclare("directQueue02", false, false, false, null);/*聲明交換機參數1:交換機名稱參數2:交換機類型*/channel.exchangeDeclare("directExchange01", BuiltinExchangeType.DIRECT,true);/*綁定交換機和隊列參數1:隊列名參數2:交換機名稱參數3:路由key 廣播模型 不支持路由key ""*/channel.queueBind("directQueue01","directExchange01","error");channel.queueBind("directQueue02","directExchange01","error");channel.queueBind("directQueue02","directExchange01","info");channel.queueBind("directQueue02","directExchange01","trace");//發送消息到消息隊列/*參數:1. exchange:交換機名稱。簡單模式下交換機會使用默認的 ""2. routingKey:路由名稱,簡單模式下路由名稱使用消息隊列名稱3. props:配置信息4. body:發送消息數據*/
?channel.basicPublish("directExchange01","user", MessageProperties.PERSISTENT_TEXT_PLAIN,("Hello World ").getBytes());
?
?//4、關閉信道,斷開連接channel.close();connection.close();}
}
package com.hl.rabbitmq01.web;
?
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
?
import java.io.IOException;
import java.nio.charset.StandardCharsets;
?
@RestController
@RequestMapping("/p")
public class ProducerController {@Autowiredprivate AmqpTemplate amqpTemplate;@Autowiredprivate RabbitTemplate rabbitTemplate;
?
?@RequestMapping("/send")public void send(@RequestParam(defaultValue = "user") String key,@RequestParam(defaultValue = "hello") String msg) throws IOException {//amqpTemplate.convertAndSend("topicExchange", key, msg);
// ? ? ? rabbitTemplate.convertAndSend("topicExchange",key,msg);Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false); //false 非事務模式運行 無需手動提交channel.basicPublish("topicExchange", key,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());}
}
?
/*
創建交換機*/
@Bean
public TopicExchange topicExchange(){return ExchangeBuilder.topicExchange("topicExchange").durable(true) ?//是否支持持久化機制.build();
}
/*
創建隊列*/
@Bean
public Queue queue(){return QueueBuilder.durable("topicQueue01").build();
}
發送方的消息確認機制
1、事務機制
消耗資源
RabbitMQ中與事務有關的主要有三個方法:
-
txSelect() 開始事務
-
txCommit() 提交事務
-
txRollback() 回滾事務
txSelect主要用于將當前channel設置成transaction模式,txCommit用于提交事務,txRollback用于回滾事務。
當我們使用txSelect提交開始事務之后,我們就可以發布消息給Broke代理服務器,如果txCommit提交成功了,則消息一定到達了Broke了,如果在txCommit執行之前Broker出現異常崩潰或者由于其他原因拋出異常,這個時候我們便可以捕獲異常通過txRollback方法進行回滾事務了。
示例
@RestController
public class RabbitMQController {
?@Autowiredprivate RabbitTemplate rabbitTemplate;
?@RequestMapping("/send")public String sendMessage(String message){rabbitTemplate.setChannelTransacted(true); //開啟事務操作rabbitTemplate.execute(channel -> {try {channel.txSelect();//開啟事務
?channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
?int i = 5/0;
?channel.txCommit();//沒有問題提交事務}catch (Exception e){e.printStackTrace();channel.txRollback();//有問題回滾事務}
?return null;});
?return "success";}
?
}
消費者沒有任何變化。
通過測試會發現,發送消息時只要Broker出現異常崩潰或者由于其他原因拋出異常,就會捕獲異常通過txRollback方法進行回滾事務了,則消息不會發送,消費者就獲取不到消息。
2、confirm確認機制
推薦
同步通知
channel.confirmSelect(); //開始confirm操作
?
channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
?
if (channel.waitForConfirms()){System.out.println("發送成功");
}else{//進行消息重發System.out.println("消息發送失敗,進行消息重發");
}
異步通知
channel.confirmSelect();
?
channel.addConfirmListener(new ConfirmListener() {//消息正確到達broker,就會發送一條ack消息@Overridepublic void handleAck(long l, boolean b) throws IOException {System.out.println("發送消息成功");}
?//RabbitMQ因為自身內部錯誤導致消息丟失,就會發送一條nack消息@Overridepublic void handleNack(long l, boolean b) throws IOException {System.out.println("發送消息失敗,重新發送消息");}
});
?
channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
?
十二、死信交換機和死信隊列
在實際開發項目時,在較為重要的業務場景中,要確保未被消費的消息不被丟棄(例如:訂單業務),那為了保證消息數據的不丟失,可以使用RabbitMQ的死信隊列機制,當消息消費發生異常時,將消息投入到死信隊列中進行處理。
死信隊列:RabbitMQ中并不是直接聲明一個公共的死信隊列,然后死信消息就會跑到死信隊列中。而是為每個需要使用死信的消息隊列配置一個死信交換機,當消息成為死信后,可以被重新發送到死信交換機,然后再發送給使用死信的消息隊列。
死信交換機:英文縮寫:DLX 。Dead Letter Exchange(死信交換機),死信交換機其實就是普通的交換機,通過給隊列設置參數: x-dead-letter-exchange 和x-dead-letter-routing-key,來指向死信交換機
RabbitMQ規定消息符合以下某種情況時,將會成為死信
-
隊列消息長度到達限制(隊列消息個數限制);
-
消費者拒接消費消息,basicNack/basicReject,并且不把消息重新放入原目標隊列,requeue=false;
-
原隊列存在消息過期設置,消息到達超時時間未被消費;
死信消息會被RabbitMQ特殊處理,如果配置了死信隊列,則消息會被丟到死信隊列中,如果沒有配置死信隊列,則消息會被丟棄。
Map<String,Object> map = new HashMap<>();map.put("x-dead-letter-exchange","deadExchange");//當前隊列和死信交換機綁定map.put("x-dead-letter-routing-key","user.#");//當前隊列和死信交換機綁定的路由規則
// ? ? ? map.put("x-max-length",2);//隊列長度map.put("x-message-ttl",10000);//隊列消息過期時間,時間ms
?
// ? ? ? return QueueBuilder.durable("topicQueue01").build();return QueueBuilder.durable("topicQueue").withArguments(map).build();
十三、延遲隊列簡介
延遲隊列,即消息進入隊列后不會立即被消費,只有到達指定時間后,才會被消費。
RabbitMQ中沒有延遲隊列,但是可以用ttl(time to live)+死信隊列方式和延遲插件兩種方式來實現
ttl+死信隊列代碼在講死信隊列時已經實現,這個不再闡述。
延遲插件
人們一直在尋找用RabbitMQ實現延遲消息的傳遞方法,到目前為止,公認的解決方案是混合使用TTL和DLX。rabbitmq_delayed_message_exchange插件就是基于此來實現的,RabbitMQ延遲消息插件新增了一種新的交換器類型,消息通過這種交換器路由就可以實現延遲發送。
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
十四、RabbitMQ消息重復消費
RabbitMQ消息重復消費問題_rabbitmq重復消費的問題解決-CSDN博客
業務背景 消息隊列在數據傳輸的過程中,為了保證消息傳遞的可靠性,一般會對消息采用ack確認機制,如果消息傳遞失敗,消息隊列會進行重試,此時便可能存在消息重復消費的問題。
比如,用戶到銀行取錢后會收到扣款通知短信,如果用戶收到多條扣款信息通知則會有困惑。
解決方法一:send if not exist 首先將 RabbitMQ 的消息自動確認機制改為手動確認,然后每當有一條消息消費成功了,就把該消息的唯一ID記錄在Redis 上,然后每次發送消息時,都先去 Redis 上查看是否有該消息的 ID,如果有,表示該消息已經消費過了,不再處理,否則再去處理。
2.1 利用數據庫唯一約束實現冪等
解決方法二:insert if not exist 可以通過給消息的某一些屬性設置唯一約束,比如增加唯一uuid,添加的時候查詢是否存對應的uuid,存在不操作,不存在則添加,那樣對于相同的uuid只會存在一條數據
解決方法三:sql的樂觀鎖 比如給用戶發送短信,變成如果該用戶未發送過短信,則給用戶發送短信,此時的操作則是冪等性操作。但在實際上,對于一個問題如何獲取前置條件往往比較復雜,此時可以通過設置版本號version,每修改一次則版本號+1,在更新時則通過判斷兩個數據的版本號是否一致。
十五、RabbitMQ消息積壓
RabbitMq——消息積壓分析和解決思路_rabbitmq消息積壓-CSDN博客
消息積壓產生的原因 正常而言,一般的消息從消息產生到消息消費需要經過以下幾種階段。
以Direct模式為例:
消息由生產者產生,比如新訂單的創建等,經過交換機,將消息發送至指定的隊列中,然后提供給對應的消費者進行消費。
在這個鏈路中,存在消息積壓的原因大致分為以下幾種:
1、消費者宕機,導致消息隊列中的消息無法及時被消費,出現積壓。
2、消費者沒有宕機,但因為本身邏輯處理數據耗時,導致消費者消費能力不足,引起隊列消息積壓。
3、消息生產方單位時間內產生消息過多,比如“雙11大促活動”,導致消費者處理不過來。
消息積壓問題解決 針對上面消息積壓問題的出現,大致進行了分析,那么根據分析則能制定相關的應對方法。如下所示:
1、大促活動等,導致生產者流量過大,引起積壓問題。
提前增加服務器的數量,增加消費者數目,提升消費者針對指定隊列消息處理的效率。
2、上線更多的消費者,處理消息隊列中的數據。(和1中的大致類似)
3、如果成本有限,則可以專門針對這個隊列,編寫一個另類的消費者。
當前另類消費者,不進行復雜邏輯處理,只將消息從隊列中取出,存放至數據庫中,然后basicAck反饋給消息隊列。
十六、消息入庫(消息補償)
如果RabbitMQ收到消息還沒來得及將消息持久化到硬盤時,RabbitMQ掛了,這樣消息還是丟失了,或者RabbitMQ在發送確認消息給生產端的過程中,由于網絡故障而導致生產端沒有收到確認消息,這樣生產端就不知道RabbitMQ到底有沒有收到消息,這樣也不太好進行處理。所以為了避免RabbitMQ持久化失敗而導致數據丟失,我們自己也要做一些消息補償機制,以應對一些極端情況。
在使用消息隊列(Message Queue)時,消息的補償機制是一種處理消息處理失敗或異常情況的方法。當消息消費者無法成功處理消息時,補償機制允許系統將消息重新發送或執行其他操作,以確保消息的可靠傳遞和處理。
補償機制通常涉及以下幾個方面:
-
重試機制:當消息處理失敗時,補償機制會嘗試重新發送消息給消費者,以便重新處理。重試間隔和重試次數可以根據具體情況進行配置,以避免重復投遞導致的消息處理失敗。
-
延時隊列:補償機制還可以使用延時隊列來處理無法立即處理的消息。當某個消息處理失敗時,可以將該消息放入到延時隊列中,在一定的延時之后再次嘗試發送給消費者進行處理。
-
死信隊列:當消息無法被成功處理時,可以將這些無法處理的消息發送到死信隊列(Dead Letter Queue)。死信隊列通常用于存儲無法被消費者處理的消息,以便后續進行排查和處理。
-
可視化監控和報警:補償機制還可以包括對消息隊列的監控和報警功能,以便及時發現和處理異常情況。通過可視化監控工具可以實時查看消息隊列的狀態和處理情況,及時發現問題并采取相應的補救措施。
補償機制的設計和實現密切依賴于具體的消息中間件和使用場景,不同的消息隊列系統可能提供不同的補償機制。因此,在選擇和使用消息隊列時,需要根據自身的需求和系統特點來選擇適合的消息補償機制。