rabbitmq 03

一、mq的作用和使用場景

MQ的基本作用

MQ(Message Queue,消息隊列)是一種應用程序對應用程序的通信方法,主要作用包括:

  1. 異步處理:解耦生產者和消費者,允許生產者發送消息后立即返回,消費者異步處理

  2. 應用解耦:降低系統間的直接依賴,通過消息進行間接通信

  3. 流量削峰:緩沖突發流量,避免系統被壓垮

  4. 消息通信:實現系統間的可靠消息傳遞

  5. 最終一致性:支持分布式事務的最終一致性方案

主要使用場景

1. 異步處理

  • 用戶注冊后發送郵件/短信:注冊流程快速完成,通知類操作異步處理

  • 日志收集:應用將日志發送到MQ,由專門服務異步處理

2. 應用解耦

  • 電商訂單系統:訂單服務生成訂單后通過MQ通知庫存、物流、支付等系統

  • 微服務架構:服務間通過MQ通信而非直接調用

3. 流量削峰

  • 秒殺系統:將大量請求先放入MQ,按系統能力逐步處理

  • 突發流量處理:應對促銷活動等流量高峰

4. 日志處理

  • 大數據分析:收集各系統日志到MQ,由大數據平臺統一處理

  • 實時監控:系統指標通過MQ傳輸到監控平臺

5. 消息通信

  • 聊天系統:用戶消息通過MQ傳遞

  • 通知系統:系統間的事件通知

適用場景總結表

場景關鍵技術優勢
應用解耦消息隊列減少系統間直接依賴
異步處理生產者-消費者模型提升響應速度
流量削峰隊列積壓+限速消費保護后端系統
跨語言通信AMQP 多語言支持統一通信協議
發布/訂閱Exchange(fanout/topic)一對多消息廣播
延遲隊列TTL + 死信隊列實現定時任務

二、mq的優點

1. 解耦系統組件

  • 生產者和消費者無需相互感知對方的存在

  • 系統間通過消息通信而非直接調用,降低耦合度

  • 新增消費者不會影響生產者代碼

2. 異步處理提升性能

  • 生產者發送消息后無需等待消費者處理完成

  • 非關鍵路徑操作可異步執行(如發送通知、記錄日志)

  • 顯著減少系統響應時間,提高吞吐量

3. 流量削峰與過載保護

  • 緩沖突發流量,避免系統被瞬間高峰壓垮

  • 消費者可按自身處理能力從隊列獲取消息

  • 特別適合秒殺、促銷等瞬時高并發場景

4. 提高系統可靠性

  • 消息持久化確保重要數據不丟失

  • 重試機制和死信隊列處理失敗消息

  • 網絡波動時仍能保證消息最終送達

5. 擴展性強

  • 可輕松增加消費者實例提高處理能力

  • 天然支持分布式系統架構

  • 各組件可獨立擴展(生產者、MQ本身、消費者)

6. 順序保證

  • 某些MQ(如Kafka)可保證消息順序性

  • 對需要嚴格順序的業務場景非常重要

7. 最終一致性支持

  • 實現分布式事務的最終一致性方案

  • 通過消息驅動的方式同步系統狀態

  • 比強一致性方案性能更高

8. 靈活的通信模式

  • 支持點對點、發布/訂閱等多種模式

  • 可實現廣播、組播等不同消息分發方式

  • 適應各種業務場景需求

9. 系統恢復能力

  • 消費者宕機恢復后可從斷點繼續消費

  • 避免數據丟失或重復處理

  • 支持消息回溯重新消費

10. 平衡資源利用率

  • 平滑系統負載,避免資源閑置或過載

  • 提高整體資源使用效率

  • 降低系統建設成本(無需按峰值配置資源)

三、mq的缺點

1. 系統復雜度增加

  • 引入MQ后,系統架構變得更加復雜,需要額外維護MQ集群

  • 需要處理消息的發送、接收、確認、重試等邏輯

  • 增加了調試和問題排查的難度(如消息丟失、重復消費等)

2. 消息一致性問題

  • 消息丟失:生產者發送失敗、MQ宕機、消費者處理失敗都可能導致消息丟失

  • 消息重復:網絡問題或消費者超時可能導致消息被重復消費(需業務層做冪等處理)

  • 順序問題:某些MQ(如Kafka)只能保證分區內有序,全局有序需要額外設計

3. 延遲問題

  • 異步處理導致延遲:消息隊列的消費通常是異步的,不適合實時性要求極高的場景(如支付交易)

  • 堆積時延遲加劇:如果消費者處理速度跟不上,消息堆積會導致延遲越來越高

4. 運維成本高

  • 集群管理:MQ本身需要高可用部署(如Kafka的ZooKeeper依賴、RabbitMQ的鏡像隊列)

  • 監控與告警:需監控消息積壓、消費延遲、錯誤率等指標

  • 資源占用:MQ集群可能占用較多CPU、內存和磁盤IO

5. 數據一致性與事務問題

  • 分布式事務挑戰:如果業務涉及數據庫和MQ的協同(如扣庫存+發消息),需要引入事務消息或本地消息表等方案

  • 最終一致性:MQ通常只保證最終一致性,不適合強一致性要求的場景

6. 依賴風險

  • MQ成為單點故障:如果MQ集群崩潰,可能導致整個系統不可用

  • 版本兼容性問題:MQ升級可能影響生產者和消費者的兼容性

7. 消息積壓風險

  • 消費者處理能力不足:如果消費者宕機或處理緩慢,消息會堆積,可能導致MQ存儲爆滿

  • 影響新消息處理:積壓嚴重時,新消息可能被阻塞或丟棄

8. 不適合所有場景

  • 低延遲場景:如高頻交易、實時游戲,MQ的異步機制可能引入不可接受的延遲

  • 小規模系統:如果系統簡單,直接調用可能比引入MQ更高效

四、mq相關產品,每種產品的特點

1. RabbitMQ

特點

  • 基于AMQP協議,支持多種客戶端語言

  • 輕量級,易于部署和管理

  • 提供靈活的路由機制(直連/主題/扇出/頭交換)

  • 支持消息確認、持久化、優先級隊列

  • 集群部署相對簡單

  • 社區活躍,文檔完善

適用場景

  • 中小規模消息處理

  • 需要復雜路由規則的場景

  • 企業級應用集成

  • 對延遲要求不高的異步任務

2. Kafka

特點

  • 超高吞吐量(百萬級TPS)

  • 分布式、高可用設計

  • 基于發布/訂閱模式

  • 消息持久化存儲(可配置保留時間)

  • 支持消息回溯和批量消費

  • 水平擴展能力強

  • 支持流式處理(Kafka Streams)

適用場景

  • 大數據日志收集與分析

  • 實時流處理

  • 高吞吐量消息系統

  • 事件溯源

  • 監控數據聚合

3. RocketMQ

特點

  • 阿里開源,經受雙11考驗

  • 支持事務消息

  • 嚴格的順序消息

  • 支持消息軌跡查詢

  • 分布式架構,高可用

  • 支持定時/延遲消息

  • 支持消息過濾

適用場景

  • 電商交易系統

  • 金融支付場景

  • 需要嚴格順序的消息處理

  • 分布式事務場景

4. ActiveMQ

特點

  • 支持JMS規范

  • 支持多種協議(STOMP、AMQP、MQTT等)

  • 提供消息持久化和事務支持

  • 支持集群部署

  • 相對輕量級

適用場景

  • 傳統企業應用集成

  • 需要JMS支持的場景

  • 中小型消息系統

  • IoT設備通信

5. Pulsar

特點

  • 云原生設計,計算存儲分離架構

  • 支持多租戶

  • 低延遲和高吞吐并存

  • 支持多種消費模式(獨占/共享/故障轉移)

  • 支持分層存儲(熱數據+冷數據)

  • 內置函數計算能力

適用場景

  • 云原生應用

  • 多租戶SaaS平臺

  • 需要統一消息和流處理的場景

  • 混合云部署

6. ZeroMQ

特點

  • 無中間件,基于庫的方式

  • 極高性能(納秒級延遲)

  • 支持多種通信模式(請求-響應/發布-訂閱等)

  • 輕量級,無消息持久化

  • 無broker架構

適用場景

  • 高性能計算

  • 低延遲通信

  • 進程間通信

  • 不需要持久化的場景

7. NATS

特點

  • 極簡設計,性能優異

  • 無持久化(NATS Streaming提供持久化擴展)

  • 支持請求-響應模式

  • 輕量級,適合云環境

  • 低資源消耗

適用場景

  • IoT設備通信

  • 云原生微服務

  • 不需要持久化的實時消息

  • 服務發現和配置分發

選型建議對比表

特性 \ MQRabbitMQKafkaRocketMQPulsarActiveMQ
吞吐量極高
延遲
順序保證有限分區有序嚴格有序分區有序有限
持久化支持支持支持支持支持
事務支持有限支持支持支持支持
集群擴展中等容易中等容易中等
運維復雜度
適用規模中小超大中大中大中小

五、rabbitmq的搭建過程

Docker安裝方式

# 拉取鏡像
docker pull rabbitmq:management
?
# 運行容器
docker run -d --name rabbitmq \
-p 5672:5672 -p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=password \
rabbitmq:management

Linux安裝方式

# 1. 安裝Erlang
sudo apt-get install erlang
?
# 2. 下載RabbitMQ
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.9/rabbitmq-server_3.8.9-1_all.deb
?
# 3. 安裝
sudo dpkg -i rabbitmq-server_3.8.9-1_all.deb
?
# 4. 啟動服務
sudo systemctl start rabbitmq-server
?
# 5. 啟用管理插件
sudo rabbitmq-plugins enable rabbitmq_management
?
# 6. 創建用戶
sudo rabbitmqctl add_user admin password
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

六、rabbitmq相關角色

1. 生產者 (Publisher/Producer)

  • 職責:創建并發送消息到 RabbitMQ 服務器

  • 特點

    • 不直接將消息發送到隊列,而是發送到交換器 (Exchange)

    • 可以設置消息屬性(如持久化、優先級等)

    • 通常不知道消息最終會被哪些消費者接收

2. 消費者 (Consumer)

  • 職責:接收并處理來自隊列的消息

  • 特點

    • 可以訂閱一個或多個隊列

    • 可以手動或自動確認消息 (ack/nack)

    • 可以設置 QoS(服務質量)控制預取數量

3. 代理服務器 (Broker)

  • 職責:RabbitMQ 服務本身,負責接收、路由和傳遞消息

  • 組成

    • Exchange(交換器)

    • Queue(隊列)

    • Binding(綁定)

4. 交換器 (Exchange)

  • 類型

    類型路由規則典型用途
    Direct精確匹配 Routing Key點對點精確路由
    Fanout忽略 Routing Key,廣播到所有綁定隊列廣播通知
    Topic模糊匹配 Routing Key(支持通配符)多條件路由
    Headers根據消息頭屬性匹配復雜路由條件
  • 特性

    • 接收生產者發送的消息

    • 根據類型和綁定規則將消息路由到隊列

    • 可以持久化(在服務器重啟后仍然存在)

5. 隊列 (Queue)

  • 特性

    • 消息存儲的緩沖區

    • 可以有多個消費者(競爭消費模式)

    • 可配置屬性:

      • 持久化(Durable)

      • 自動刪除(Auto-delete)

      • 排他性(Exclusive)

      • 消息 TTL(存活時間)

      • 最大長度等

6. 綁定 (Binding)

  • 作用:連接 Exchange 和 Queue 的規則

  • 組成要素

    • Exchange 名稱

    • Queue 名稱

    • Routing Key(或用于 Headers Exchange 的匹配參數)

7. 通道 (Channel)

  • 特點

    • 在 TCP 連接上建立的虛擬連接

    • 輕量級,減少 TCP 連接開銷

    • 每個 Channel 有獨立 ID

    • 建議每個線程使用獨立的 Channel

8. 虛擬主機 (Virtual Host)

  • 作用:提供邏輯隔離環境

  • 特點

    • 類似于命名空間

    • 每個 vhost 有獨立的 Exchange、Queue 和綁定

    • 需要單獨配置權限

    • 默認 vhost 為 "/"

9. 管理員角色 (Administrator)

  • 權限

    • 管理用戶權限

    • 創建/刪除 vhost

    • 查看所有資源

    • 通常通過 rabbitmqctl 工具或管理界面操作

10. 插件系統 (Plugins)

  • 常見插件

    • rabbitmq_management:提供 Web 管理界面

    • rabbitmq_shovel:跨集群消息轉移

    • rabbitmq_federation:分布式部署支持

    • rabbitmq_delayed_message_exchange:延遲消息

角色交互示意圖

+------------+ ? ? ? +---------+ ? ? ? +-------+ ? ? ? +--------+
| Publisher  | ----> | Exchange| ====> | Queue | <---- | Consumer|
+------------+ ? ? ? +---------+ ? ? ? +-------+ ? ? ? +--------+(Binding)

七、rabbitmq內部組件

1、ConnectionFactory(連接管理器):應用程序與Rabbit之間建立連接的管理器,程序代碼中使用。

2、Channel(信道):消息推送使用的通道。

3、Exchange(交換器):用于接受、分配消息。

4、Queue(隊列):用于存儲生產者的消息。

5、RoutingKey(路由鍵):用于把生成者的數據分配到交換器上。

6、BindingKey(綁定鍵):用于把交換器的消息綁定到隊列上。

八、生產者發送消息的過程?

一、建立連接階段

  1. TCP連接建立

    • 生產者應用通過AMQP客戶端庫發起TCP連接

    • 默認端口5672(帶管理插件時為5672/15672)

    • 三次握手完成后建立物理連接

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("rabbitmq-host");
    factory.setPort(5672);
    Connection connection = factory.newConnection();
  2. 認證與vhost選擇

    • 發送START/START-OK協議幀進行認證

    • 選擇虛擬主機(vhost),默認"/"

    • 認證失敗會收到CONNECTION-CLOSE幀

二、通道創建階段

  1. 通道(Channel)初始化

    • 在TCP連接上創建虛擬通道(Channel)

    • 每個Channel有唯一ID(從1開始遞增)

    • 通道參數協商:

      • Frame Max Size(默認128KB)

      • Channel Max(默認2047)

    Channel channel = connection.createChannel();

  2. 交換器聲明(可選)

    • 檢查目標Exchange是否存在

    • 不存在時根據參數自動創建

    • 關鍵參數:

      • type:exchange類型(direct/fanout/topic/headers)

      • durable:是否持久化

      • autoDelete:無綁定時是否自動刪除

    channel.exchangeDeclare("order.exchange", "direct", true);

三、消息發布階段

  1. 消息構造

    • 組成結構:

      {"body": "消息內容(二進制)","properties": {"delivery_mode": 2,  # 1-非持久化 2-持久化"priority": 0, ? ? ? # 0-9優先級"headers": {}, ? ? ? # 自定義頭"timestamp": 1620000000}
      }
  2. 發布消息到Exchange

    • 通過Basic.Publish命令發送

    • 關鍵參數:

      • exchange:目標交換器名稱

      • routingKey:路由鍵

      • mandatory:是否觸發Return回調

      • immediate:已廢棄參數

    channel.basicPublish("order.exchange", "order.create", MessageProperties.PERSISTENT_TEXT_PLAIN,messageBodyBytes
    );

四、消息路由階段

  1. Exchange路由決策

    • 根據Exchange類型處理:

      • Direct:精確匹配routingKey

      • Fanout:忽略routingKey,廣播到所有綁定隊列

      • Topic:通配符匹配(*匹配一個詞,#匹配零或多個詞)

      • Headers:匹配header鍵值對

  2. 隊列投遞

    • 成功匹配:消息進入隊列內存緩沖區

    • 無匹配時處理:

      • 設置了alternate-exchange:轉到備用交換器

      • 未設置備用交換器且mandatory=true:觸發Return回調

      • 否則丟棄消息

五、確認階段

  1. Confirm模式(可選)

    • 開啟方式:

      channel.confirmSelect();  // 開啟Confirm模式

    • 確認機制:

      • 單條確認:waitForConfirms()

      • 批量確認:waitForConfirmsOrDie()

      • 異步回調:

        channel.addConfirmListener((sequenceNumber, multiple) -> {// 處理ack
        }, (sequenceNumber, multiple) -> {// 處理nack
        });

  2. 事務模式(可選)

    • 事務操作流程:

      channel.txSelect();  // 開啟事務
      try {channel.basicPublish(...);channel.txCommit();  // 提交事務
      } catch (Exception e) {channel.txRollback(); // 回滾事務
      }

六、資源釋放階段

  1. 通道關閉

    • 發送Channel.Close命令

    • 處理未確認消息:

      • 事務模式:回滾未提交消息

      • Confirm模式:未確認消息會觸發nack

  2. 連接關閉

    • 發送Connection.Close命令

    • 服務端釋放相關資源

    • 客戶端等待TCP連接正常關閉

九、消費者接收消息過程?

一、連接建立階段

  1. TCP連接初始化

    • 消費者客戶端與RabbitMQ服務器建立TCP連接(默認端口5672)

    • 完成AMQP協議握手:

  2. 通道創建

    • 在TCP連接上創建虛擬通道(Channel)

    • 每個Channel獨立維護消息流狀態

    • 關鍵參數設置:

      Channel channel = connection.createChannel();
      channel.basicQos(10); // 設置prefetch count

二、隊列訂閱階段

  1. 隊列聲明與檢查

    • 檢查目標隊列是否存在

    • 自動創建隊列(如果不存在且允許):

      channel.queueDeclare("order.queue", true, false, false, null);
    • 隊列參數解析:

      • durable:是否持久化

      • exclusive:是否排他隊列

      • autoDelete:無消費者時是否自動刪除

      • arguments:擴展參數(TTL、死信等)

  2. 消費者注冊

    • 向Broker注冊消費者標簽(consumer tag)

    • 選擇消費模式:

      • 推模式(Push API):服務端主動推送

      • 拉模式(Basic.Get):客戶端主動拉取

三、消息接收階段

  1. 消息推送機制

    • Broker按照QoS設置推送消息:

      while (unacked_count < prefetch_count) and (queue.has_messages):message = queue.next_message()send_to_consumer(message)unacked_count += 1
    • 消息幀結構:

      Basic.Deliver(consumer-tag,delivery-tag,redelivered,exchange,routing-key
      )
      Message Body
  2. 消息處理流程

    • 消費者接收消息后的處理步驟:

      1. 反序列化消息體

      2. 驗證消息完整性

      3. 執行業務邏輯

      4. 發送ack/nack

      5. 處理異常情況

四、確認與反饋階段

  1. 消息確認機制

    • 自動確認(autoAck=true)

      • 消息發出即視為成功

      • 高風險(消息可能處理失敗但已確認)

    • 手動確認(autoAck=false)

      // 成功處理
      channel.basicAck(deliveryTag, false); 
      // 處理失敗(requeue=true重新入隊)
      channel.basicNack(deliveryTag, false, true);
    • 關鍵參數:

      • deliveryTag:消息唯一標識

      • multiple:是否批量操作

      • requeue:是否重新入隊

  2. 拒絕消息處理

    • 三種拒絕方式對比:

      方法是否批量是否重入隊列適用場景
      basicReject可配置單條消息處理失敗
      basicNack可配置批量消息處理異常
      basicRecover-重新投遞未ack消息

五、流量控制機制

  1. QoS預取設置

    • 作用:限制未確認消息數量

    • 全局 vs 通道級:

      // 單通道限制
      channel.basicQos(10); 
      // 全局限制(所有通道總和)
      channel.basicQos(10, true);
    • 最佳實踐值:

      • 高吞吐場景:100-300

      • 高延遲任務:5-10

  2. 流控(Flow Control)

    • 當消費者處理能力不足時:

      1. Broker暫停發送新消息

      2. 觸發Channel.Flow命令

      3. 消費者處理積壓后恢復流動

六、異常處理階段

  1. 連接中斷處理

    • 自動恢復機制:

      factory.setAutomaticRecoveryEnabled(true);
      factory.setNetworkRecoveryInterval(5000);
    • 恢復過程:

      1. 重建TCP連接

      2. 恢復所有Channel

      3. 重新注冊消費者

      4. 恢復未ack消息(根據redelivered標記)

  2. 死信處理

    • 觸發條件:

      • 消息被拒絕且requeue=false

      • 消息TTL過期

      • 隊列達到長度限制

    • 死信隊列配置:

      Map<String, Object> args = new HashMap<>();
      args.put("x-dead-letter-exchange", "dlx.exchange");
      channel.queueDeclare("order.queue", true, false, false, args);

消費者最佳實踐

  1. 冪等性設計

    // 使用消息ID實現冪等
    if (processedMessageIds.contains(messageId)) {channel.basicAck(tag, false);return;
    }
  2. 批量確認優化

    // 每處理100條消息批量確認一次
    if (messageCount % 100 == 0) {channel.basicAck(lastTag, true);
    }
  3. 死信監控

    // 監聽死信隊列
    channel.basicConsume("dlx.queue", false, (tag, msg) -> {log.error("死信消息: {}", msg.getBody());channel.basicAck(tag, false);
    });
  4. 消費者標簽管理

    // 優雅關閉消費者
    void shutdown() {channel.basicCancel(consumerTag);// 等待處理中的消息完成while (inProgressCount > 0) {Thread.sleep(100);}
    }

十、springboot項目中如何使用mq?

十一、如何保障消息不丟失?

1、發送階段:發送階段保障消息到達交換機 事務機制|confirm確認機制

2、存儲階段:持久化機制 交換機持久化、隊列的持久化、消息內容的持久化

3、消費階段:消息的確認機制 自動ack|手動ack

接收方消息確認機制

自動ack|手動ack

spring:rabbitmq:host: 1.94.230.82port: 5672username: adminpassword: 123456virtual-host: /yan3listener:simple:acknowledge-mode: manualdirect:acknowledge-mode: manual
package com.hl.rabbitmq01.web;
?
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
?
import java.io.IOException;
?
@RestController
@RequestMapping("/c")
public class ConsumerController {
?@RabbitListener(queues = {"topicQueue01"})public void receive(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());System.out.println(msg);//業務邏輯 比如傳入訂單id,根據訂單id,減少庫存、支付等,// 如果操作成功,確認消息(從隊列移除),如果操作失敗,手動拒絕消息if(msg.length() >= 5){//確認消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}else{//拒絕消息 not ack// 第三個參數:requeue:重回隊列。如果設置為true,則消息重新回到queue,broker會重新發送該消息給消費端channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
// ? ? ? ? ?  channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}
?
?}
}

消息的持久化機制

交換機的持久化

隊列的持久化

消息內容的持久化

package com.hl.rabbitmq01.direct;
?
import com.hl.rabbitmq01.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
/*
生產者  javaSE方式簡單測試
發布訂閱-------direct模型
生產者----消息隊列----消費者*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1、創建連接Connection connection = MQUtil.getConnection();//2、基于連接,創建信道Channel channel = connection.createChannel();//3、基于信道,創建隊列/*參數:1. queue:隊列名稱,如果沒有一個名字叫simpleQueue01的隊列,則會創建該隊列,如果有則不會創建2. durable:是否持久化,當mq重啟之后,消息還在3. exclusive:* 是否獨占。只能有一個消費者監聽這隊列4。當Connection關閉時,是否刪除隊列autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉5. arguments:參數。*/channel.queueDeclare("directQueue01", true, false, false, null);channel.queueDeclare("directQueue02", false, false, false, null);/*聲明交換機參數1:交換機名稱參數2:交換機類型*/channel.exchangeDeclare("directExchange01", BuiltinExchangeType.DIRECT,true);/*綁定交換機和隊列參數1:隊列名參數2:交換機名稱參數3:路由key 廣播模型 不支持路由key  ""*/channel.queueBind("directQueue01","directExchange01","error");channel.queueBind("directQueue02","directExchange01","error");channel.queueBind("directQueue02","directExchange01","info");channel.queueBind("directQueue02","directExchange01","trace");//發送消息到消息隊列/*參數:1. exchange:交換機名稱。簡單模式下交換機會使用默認的 ""2. routingKey:路由名稱,簡單模式下路由名稱使用消息隊列名稱3. props:配置信息4. body:發送消息數據*/
?channel.basicPublish("directExchange01","user", MessageProperties.PERSISTENT_TEXT_PLAIN,("Hello World ").getBytes());
?
?//4、關閉信道,斷開連接channel.close();connection.close();}
}
package com.hl.rabbitmq01.web;
?
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
?
import java.io.IOException;
import java.nio.charset.StandardCharsets;
?
@RestController
@RequestMapping("/p")
public class ProducerController {@Autowiredprivate AmqpTemplate amqpTemplate;@Autowiredprivate RabbitTemplate rabbitTemplate;
?
?@RequestMapping("/send")public void send(@RequestParam(defaultValue = "user") String key,@RequestParam(defaultValue = "hello") String msg) throws IOException {//amqpTemplate.convertAndSend("topicExchange", key, msg);
// ? ? ?  rabbitTemplate.convertAndSend("topicExchange",key,msg);Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false); //false 非事務模式運行 無需手動提交channel.basicPublish("topicExchange", key,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());}
}
?
/*
創建交換機*/
@Bean
public TopicExchange topicExchange(){return ExchangeBuilder.topicExchange("topicExchange").durable(true) ?//是否支持持久化機制.build();
}
/*
創建隊列*/
@Bean
public Queue queue(){return QueueBuilder.durable("topicQueue01").build();
}

發送方的消息確認機制

1、事務機制

消耗資源

RabbitMQ中與事務有關的主要有三個方法:

  • txSelect() 開始事務

  • txCommit() 提交事務

  • txRollback() 回滾事務

txSelect主要用于將當前channel設置成transaction模式,txCommit用于提交事務,txRollback用于回滾事務。

當我們使用txSelect提交開始事務之后,我們就可以發布消息給Broke代理服務器,如果txCommit提交成功了,則消息一定到達了Broke了,如果在txCommit執行之前Broker出現異常崩潰或者由于其他原因拋出異常,這個時候我們便可以捕獲異常通過txRollback方法進行回滾事務了。

示例

@RestController
public class RabbitMQController {
?@Autowiredprivate RabbitTemplate rabbitTemplate;
?@RequestMapping("/send")public String sendMessage(String message){rabbitTemplate.setChannelTransacted(true); //開啟事務操作rabbitTemplate.execute(channel -> {try {channel.txSelect();//開啟事務
?channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
?int i = 5/0;
?channel.txCommit();//沒有問題提交事務}catch (Exception e){e.printStackTrace();channel.txRollback();//有問題回滾事務}
?return null;});
?return "success";}
?
}

消費者沒有任何變化。

通過測試會發現,發送消息時只要Broker出現異常崩潰或者由于其他原因拋出異常,就會捕獲異常通過txRollback方法進行回滾事務了,則消息不會發送,消費者就獲取不到消息。

2、confirm確認機制

推薦

同步通知
channel.confirmSelect(); //開始confirm操作
?
channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
?
if (channel.waitForConfirms()){System.out.println("發送成功");
}else{//進行消息重發System.out.println("消息發送失敗,進行消息重發");
}

異步通知
channel.confirmSelect();
?
channel.addConfirmListener(new ConfirmListener() {//消息正確到達broker,就會發送一條ack消息@Overridepublic void handleAck(long l, boolean b) throws IOException {System.out.println("發送消息成功");}
?//RabbitMQ因為自身內部錯誤導致消息丟失,就會發送一條nack消息@Overridepublic void handleNack(long l, boolean b) throws IOException {System.out.println("發送消息失敗,重新發送消息");}
});
?
channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
?

十二、死信交換機和死信隊列

在實際開發項目時,在較為重要的業務場景中,要確保未被消費的消息不被丟棄(例如:訂單業務),那為了保證消息數據的不丟失,可以使用RabbitMQ的死信隊列機制,當消息消費發生異常時,將消息投入到死信隊列中進行處理。

死信隊列:RabbitMQ中并不是直接聲明一個公共的死信隊列,然后死信消息就會跑到死信隊列中。而是為每個需要使用死信的消息隊列配置一個死信交換機,當消息成為死信后,可以被重新發送到死信交換機,然后再發送給使用死信的消息隊列。

死信交換機:英文縮寫:DLX 。Dead Letter Exchange(死信交換機),死信交換機其實就是普通的交換機,通過給隊列設置參數: x-dead-letter-exchange 和x-dead-letter-routing-key,來指向死信交換機

RabbitMQ規定消息符合以下某種情況時,將會成為死信

  • 隊列消息長度到達限制(隊列消息個數限制);

  • 消費者拒接消費消息,basicNack/basicReject,并且不把消息重新放入原目標隊列,requeue=false;

  • 原隊列存在消息過期設置,消息到達超時時間未被消費;

死信消息會被RabbitMQ特殊處理,如果配置了死信隊列,則消息會被丟到死信隊列中,如果沒有配置死信隊列,則消息會被丟棄。

Map<String,Object> map = new HashMap<>();map.put("x-dead-letter-exchange","deadExchange");//當前隊列和死信交換機綁定map.put("x-dead-letter-routing-key","user.#");//當前隊列和死信交換機綁定的路由規則
// ? ? ?  map.put("x-max-length",2);//隊列長度map.put("x-message-ttl",10000);//隊列消息過期時間,時間ms
?
// ? ? ?  return QueueBuilder.durable("topicQueue01").build();return QueueBuilder.durable("topicQueue").withArguments(map).build();

十三、延遲隊列簡介

延遲隊列,即消息進入隊列后不會立即被消費,只有到達指定時間后,才會被消費。

RabbitMQ中沒有延遲隊列,但是可以用ttl(time to live)+死信隊列方式和延遲插件兩種方式來實現

ttl+死信隊列代碼在講死信隊列時已經實現,這個不再闡述。

延遲插件

人們一直在尋找用RabbitMQ實現延遲消息的傳遞方法,到目前為止,公認的解決方案是混合使用TTL和DLX。rabbitmq_delayed_message_exchange插件就是基于此來實現的,RabbitMQ延遲消息插件新增了一種新的交換器類型,消息通過這種交換器路由就可以實現延遲發送。

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

十四、RabbitMQ消息重復消費

RabbitMQ消息重復消費問題_rabbitmq重復消費的問題解決-CSDN博客

業務背景 消息隊列在數據傳輸的過程中,為了保證消息傳遞的可靠性,一般會對消息采用ack確認機制,如果消息傳遞失敗,消息隊列會進行重試,此時便可能存在消息重復消費的問題。

比如,用戶到銀行取錢后會收到扣款通知短信,如果用戶收到多條扣款信息通知則會有困惑。

解決方法一:send if not exist 首先將 RabbitMQ 的消息自動確認機制改為手動確認,然后每當有一條消息消費成功了,就把該消息的唯一ID記錄在Redis 上,然后每次發送消息時,都先去 Redis 上查看是否有該消息的 ID,如果有,表示該消息已經消費過了,不再處理,否則再去處理。

2.1 利用數據庫唯一約束實現冪等

解決方法二:insert if not exist 可以通過給消息的某一些屬性設置唯一約束,比如增加唯一uuid,添加的時候查詢是否存對應的uuid,存在不操作,不存在則添加,那樣對于相同的uuid只會存在一條數據

解決方法三:sql的樂觀鎖 比如給用戶發送短信,變成如果該用戶未發送過短信,則給用戶發送短信,此時的操作則是冪等性操作。但在實際上,對于一個問題如何獲取前置條件往往比較復雜,此時可以通過設置版本號version,每修改一次則版本號+1,在更新時則通過判斷兩個數據的版本號是否一致。

十五、RabbitMQ消息積壓

RabbitMq——消息積壓分析和解決思路_rabbitmq消息積壓-CSDN博客

消息積壓產生的原因 正常而言,一般的消息從消息產生到消息消費需要經過以下幾種階段。

以Direct模式為例:

消息由生產者產生,比如新訂單的創建等,經過交換機,將消息發送至指定的隊列中,然后提供給對應的消費者進行消費。

在這個鏈路中,存在消息積壓的原因大致分為以下幾種:

1、消費者宕機,導致消息隊列中的消息無法及時被消費,出現積壓。

2、消費者沒有宕機,但因為本身邏輯處理數據耗時,導致消費者消費能力不足,引起隊列消息積壓。

3、消息生產方單位時間內產生消息過多,比如“雙11大促活動”,導致消費者處理不過來。

消息積壓問題解決 針對上面消息積壓問題的出現,大致進行了分析,那么根據分析則能制定相關的應對方法。如下所示:

1、大促活動等,導致生產者流量過大,引起積壓問題。

提前增加服務器的數量,增加消費者數目,提升消費者針對指定隊列消息處理的效率。

2、上線更多的消費者,處理消息隊列中的數據。(和1中的大致類似)

3、如果成本有限,則可以專門針對這個隊列,編寫一個另類的消費者。

當前另類消費者,不進行復雜邏輯處理,只將消息從隊列中取出,存放至數據庫中,然后basicAck反饋給消息隊列。

十六、消息入庫(消息補償)

如果RabbitMQ收到消息還沒來得及將消息持久化到硬盤時,RabbitMQ掛了,這樣消息還是丟失了,或者RabbitMQ在發送確認消息給生產端的過程中,由于網絡故障而導致生產端沒有收到確認消息,這樣生產端就不知道RabbitMQ到底有沒有收到消息,這樣也不太好進行處理。所以為了避免RabbitMQ持久化失敗而導致數據丟失,我們自己也要做一些消息補償機制,以應對一些極端情況。

在使用消息隊列(Message Queue)時,消息的補償機制是一種處理消息處理失敗或異常情況的方法。當消息消費者無法成功處理消息時,補償機制允許系統將消息重新發送或執行其他操作,以確保消息的可靠傳遞和處理。

補償機制通常涉及以下幾個方面:

  1. 重試機制:當消息處理失敗時,補償機制會嘗試重新發送消息給消費者,以便重新處理。重試間隔和重試次數可以根據具體情況進行配置,以避免重復投遞導致的消息處理失敗。

  2. 延時隊列:補償機制還可以使用延時隊列來處理無法立即處理的消息。當某個消息處理失敗時,可以將該消息放入到延時隊列中,在一定的延時之后再次嘗試發送給消費者進行處理。

  3. 死信隊列:當消息無法被成功處理時,可以將這些無法處理的消息發送到死信隊列(Dead Letter Queue)。死信隊列通常用于存儲無法被消費者處理的消息,以便后續進行排查和處理。

  4. 可視化監控和報警:補償機制還可以包括對消息隊列的監控和報警功能,以便及時發現和處理異常情況。通過可視化監控工具可以實時查看消息隊列的狀態和處理情況,及時發現問題并采取相應的補救措施。

補償機制的設計和實現密切依賴于具體的消息中間件和使用場景,不同的消息隊列系統可能提供不同的補償機制。因此,在選擇和使用消息隊列時,需要根據自身的需求和系統特點來選擇適合的消息補償機制。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/916273.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/916273.shtml
英文地址,請注明出處:http://en.pswp.cn/news/916273.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Ubuntu 24.04 顯示中文+使用中文鍵盤

ubuntu 24.04 中文顯示中文鍵盤Ubuntu中文輸入重啟iBus服務Ubuntu中文輸入 安裝的Ubuntu24.04&#xff0c;一般默認是英文的&#xff0c;要使用中文的話&#xff0c;可以通過命令行設置&#xff0c;也可以使用‘設置’&#xff0c;在圖形化界面中操作。 下面是在‘設置’的圖形…

Docker實戰:Tomcat容器從部署到自定義網頁的完整操作

Docker實戰&#xff1a;Tomcat容器從部署到自定義網頁的完整操作 繼Nginx容器部署后&#xff0c;我們再來實操Tomcat容器的使用——從拉取鏡像、啟動容器&#xff0c;到端口映射、網頁掛載&#xff0c;全程通過實際命令演示&#xff0c;帶你掌握Tomcat在Docker中的核心用法。 一…

使用cherry studio離線搭建私人知識庫流程記錄

本篇文章記錄近期嘗試在個人筆記本上、全離線狀態下搭建知識庫的流程。用到的工具包括&#xff1a;Cherry Studio、ollama。主要過程是&#xff1a;首先下載ollama用于管理大模型&#xff1b;然后&#xff0c;從魔塔社區下載需要的deepseek、千問大模型和bge-m3嵌入模型&#x…

【工具類】Linux 環境利用 uv 安裝多版本 python

文章目錄前置工作環境說明如果kali無法訪問網絡pypi 換源安裝 uvuv 寫入環境變量臨時寫入永久寫入無法打開 github 解決方案&#xff08;注意此方法可能也會失效&#xff09;安裝多版本 python查看已安裝的pythonuv python install到 uv 的 github 主頁&#xff0c;找安裝文件下…

求職招聘小程序源碼招聘小程序開發定制

身份&#xff1a;求職者、企業求職者&#xff1a;完善簡歷&#xff0c;簡歷投遞企業&#xff1a;企業入駐&#xff0c;查看簡歷企業會員&#xff1a;半年 、年度 權益&#xff1a;每日發布條數、刷新條數&#xff0c;簡歷下載數量聊天&#xff1a;求職者可以和企業聊天招聘會…

Git 使用全指南:從配置到免密登錄

Git 使用全指南&#xff1a;從配置到免密登錄一、Git 基礎配置二、Git 代碼提交流程2.1 克隆遠程倉庫2.2 創建并切換分支2.3 暫存文件2.4 提交到本地倉庫2.5 拉取遠程最新代碼2.6 推送本地分支到遠程三、VSCode 服務器免密登錄配置3.1 生成 Windows SSH 密鑰3.2 復制公鑰到服務…

組合期權:領式策略

文章目錄0.簡介1.多頭領式策略&#xff08;Long Collar&#xff09;?1.1 策略構成1.2 適用場景?1.3 損益分析1.4 案例示范2.空頭領式策略&#xff08;Short Collar&#xff09;?2.1 策略構成2.2 適用場景2.3 損益分析2.4 案例示范參考文獻0.簡介 領式策略&#xff08;Colla…

ECSPI控制器

目錄 SPI協議簡介 極性與相位 SPI框圖 單字節收發 發送數據流程 接收數據流程 ECSPI控制器 關鍵特性 時鐘源 主機模式 等待狀態 片選控制 單突發傳輸 多突發傳輸 相位控制 ECSPI Memory Map ECSPI寄存器 ECSPIx_RXDATA ECSPIx_TXDATA ?編輯 ECSPIx_CONREG …

HTTP 與 SpringBoot 參數提交與接收協議方式

HTTP 協議支持多種參數提交方式&#xff0c;主要取決于請求方法(Method)和內容類型(Content-Type)。以下是主要的參數提交協議&#xff1a;1. URL 查詢參數 (Query Parameters)請求方法: GET (也可用于其他方法)格式: ?key1value1&key2value2示例: GET /users?id123&…

Lua(數組)

Lua 數組基礎概念Lua 中的數組實際上是用整數索引的 table&#xff0c;是一種特殊形式的表。數組索引通常從 1 開始&#xff08;Lua 慣例&#xff09;&#xff0c;但也可以從其他值開始。創建數組通過表構造器初始化數組&#xff1a;-- 索引從 1 開始的數組 local arr {10, …

【Docker項目實戰】在Docker環境下部署go-file文件分享工具

【Docker項目實戰】在Docker環境下部署go-file文件分享工具一、go-file介紹1.1 go-file簡介1.2 go-file特點1.3 go-file使用場景二、本地環境介紹2.1 本地環境規劃2.2 本次實踐介紹三、本地環境檢查3.1 檢查Docker服務狀態3.2 檢查Docker版本3.3 檢查docker compose 版本四、下…

C++基礎學習——文件操作詳解

一、文件流類概述 C 標準庫提供了三個主要的文件流類&#xff1a; ifstream (輸入文件流)&#xff1a;用于從文件讀取數據ofstream (輸出文件流)&#xff1a;用于向文件寫入數據fstream (文件流)&#xff1a;既可讀又可寫 這些類都繼承自 iostream 類&#xff0c;因此可以使用 …

Android補全計劃 DrawerLayout使用

DrawerLayout其實用了很久了&#xff0c;甚至封裝了一些代碼方便不同項目使用&#xff0c;但重構代碼的時候突然意識到這塊內容很不成體系&#xff0c;因此又參考了些文檔&#xff0c;組建了自己的一個文檔。 toolbardrawerlayout能寫的效果很多&#xff0c;在此我也只是截取了…

人工智能之數學基礎:概率論之韋恩圖的應用

韋恩圖的應用由于事件的計算有時候太過于抽象了&#xff0c;此時我們可以使用韋恩圖的方式來進行驗證&#xff0c;我們下面來舉一個例子&#xff0c;A∪B&#xff09;-CA∪(B-C)是否成立&#xff1f;我們可以通過韋恩圖來完成這個任務&#xff1a;我們通過這種方式來一點一點的…

小白成長之路-部署Zabbix7(二)

文章目錄一、zabbix-自動發現二、自動注冊三、zabbix-網易云郵箱-發送消息二、zabbix-釘釘告警總結一、zabbix-自動發現 1.在準備兩臺虛擬機&#xff0c;我的是192.168.144.12,192.168.144.13 server換成zabbix服務器的ip地址 vim /etc/zabbix/zabbix_agentd.conf 2.創建自動…

CMU15445-2024fall-project4踩坑經歷

project4目錄Task1Task2ReconstructSeqScanTask3InsertCommitTxnMgrDbgGenerateNewUndoLog And GenerateUpdateUndoLogUpdate And Delete垃圾回收Task4Index Insert并發控制Index ScanDelete、Update并發控制主鍵更新Bonus 1Bonus 2處理寫傾斜感謝CMU的教授們給我們分享了如此精…

C++20 協程

摘要&#xff1a;C20 引入的協程機制為異步編程提供了輕量級解決方案&#xff0c;其核心優勢在于通過用戶態調度實現高效的上下文切換&#xff0c;適用于 I/O 密集型任務、生成器模式等場景。本文系統闡述 C20 協程的底層原理與實踐要點&#xff0c;首先解析協程的基本結構&…

《計算機組成原理與匯編語言程序設計》實驗報告二 基本數字邏輯及漢字顯示

目 錄 一、實驗學時 二、實驗目的 三、實驗要求 四、實驗內容 五、實驗步驟 1、打開Logisim軟件&#xff0c;列出并行四位二進制全加器邏輯電路真值表&#xff0c;并使用與、或、非、異或等基本原件實現并行四位二進制全加器邏輯電路&#xff0c;鋪設完成后進行測試進而…

問卷調查小程序的設計與實現

問卷調查小程序的設計與實現&#xff1a;技術與功能全解析在數字化時代&#xff0c;問卷調查成為數據收集的重要工具。一款高效、易用的問卷調查小程序能夠顯著提升用戶體驗和數據質量。本文將深入探討基于現代技術棧的問卷調查小程序的設計與實現&#xff0c;涵蓋核心功能、技…

STM32項目實戰:正弦波

波形發生器對我的錢包不怎么友好&#xff0c;手里面有stm32f103c8t6&#xff0c;于是就想,放在哪兒吃灰也是吃灰&#xff0c;不如做個正弦波發生器。方案 dac沒怎么用過&#xff0c;所以打算使用輸出模擬正弦波。我們決定采用以下方案&#xff1a;1.使用TIM2_CH3&#xff08;PA…