Rabbit 實戰指南-學習筆記

第 4 章?RabbitMQ 進階

mandatory 參數?Returning | RabbitMQ

?????????當 mandatory 參數設為 true 時,交換器無法根據自身的類型和路由鍵找到一個符合條件的隊列,那么RabbitMQ 會調用 Basic.Return 命令將消息返回給生產者,通過調用channel.addReturnListener 來添加 ReturnListener 監聽器實現。當 mandatory 參數設置為 false 時,出現上述情形,則消息直接被丟棄。

        channel.basicPublish(EXCHANGE_NAME, "", true, MessageProperties.PERSISTENT_TEXT_PLAIN, "mandatory test".getBytes());channel.addReturnListener(new ReturnListener() {public void handleReturn(int replyCode, String replyText,String exchange, String routingKey,AMQP.BasicProperties basicProperties,byte[] body) throws IOException {String message = new String(body);System.out.println("Basic.Return返回的結果是:" + message);}});

備份交換器?Alternate Exchanges | RabbitMQ

        // 創建備份交換機、隊列、聲明綁定關系String myAeExchange = "myAe";channel.exchangeDeclare(myAeExchange, "fanout", true, false, null);channel.queueDeclare("unroutedQueue", true, false, false, null);channel.queueBind("unroutedQueue", myAeExchange, "");// 創建普通交換機, 并設置 alternate-exchange,隊列、聲明綁定關系Map<String, Object> args = new HashMap<String, Object>();args.put("alternate-exchange", myAeExchange);channel.exchangeDeclare("normalExchange", "direct", true, false, args);channel.queueDeclare("normalQueue", true, false, false, null);channel.queueBind("normalQueue", "normalExchange", "normalKey");

????????如果備份交換器和 mandatory 參數一起使用,那么 mandatory 參數無效。?

4.2過期時間 TTL?Time-To-Live and Expiration | RabbitMQ

????????第一種方法是通過隊列屬性設置,隊列中所有消息都有相同的過期時間。channel.queueDeclare 方法中加入x-message-ttl 參數實現的,這個參數的單位是毫秒。

????????第二種方法是對消息本身進行單獨設置

每條消息的TTL 可以不同。如果兩種方法一起使用,則消息的 TTL 以兩者之間較小的那個數值為準。消息在隊列中的生存時間一旦超過設置的 TTL 值時,就會變成“死信”(Dead Message),消費者將無法再收到該消息(這點不是絕對的,可以參考 4.3 節)。?

            // 聲明帶TTL的隊列Map<String, Object> queueArgs = new HashMap<>();queueArgs.put("x-message-ttl", 30000);channel.queueDeclare("ttl.queue", true, false, false, queueArgs);// 發送帶TTL屬性的消息AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().expiration("5000").build();channel.basicPublish("", "ttl.queue", props, "Test TTL Message".getBytes())

4.3 死信&延時隊列

????????DLX,全稱為 Dead-Letter-Exchange,可以稱之為死信交換器,也有人稱之為死信郵箱。當消息在一個隊列中變成死信(dead message)之后,它能被重新被發送到另一個交換器中,這個交換器就是DLX,綁定DLX 的隊列就稱之為死信隊列。

出現的場景

  • 消息被拒絕(Basic.Reject/Basic.Nack),并且設置 requeue 參數為 false;?
  • 消息過期;?
  • 隊列達到最大長度。
// 創建一個持久化、非排他的、非自動刪除的隊列channel.exchangeDeclare("exchange.dlx", "direct", true);channel.queueDeclare("queue.dlx", true, false, false, null);channel.queueBind("queue.dlx", "exchange.dlx", "routingkey");// 創建一個不用隊列 并設置 TTL 以及 DLX、DLXroutingkeychannel.exchangeDeclare("exchange.normal", "fanout", true);Map<String, Object> args = new HashMap<>();args.put("x-message-ttl", 10000);args.put("x-dead-letter-exchange", "exchange.dlx");args.put("x-dead-letter-routing-key", "routingkey");channel.queueDeclare("queue.normal", true, false, false, args);channel.queueBind("queue.normal", "exchange.normal", "");channel.basicPublish("exchange.normal", "rk",MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx".getBytes());

????????由Web 管理頁面(圖4-3)可以看出,兩個隊列都被標記了“D”,這個是durable 的縮寫,即設置了隊列持久化。queue.normal 這個隊列還配置了 TTL、DLX 和 DLK,其中 DLX 指的是x-dead-letter-routing-key 這個屬性。?

????????在 AMQP 協議中,或者 RabbitMQ 本身沒有直接支持延遲隊列的功能,但是可以通過前面所介紹的 DLX 和 TTL 模擬出延遲隊列的功能。?

4.7 持久化

????????RabbitMQ的持久化分為三個部分:交換器的持久化、隊列的持久化和消息的持久化。?

deliveryMode(2)  // 設置為持久化

????????將交換器、隊列、消息都設置了持久化之后就能百分之百保證數據不丟失了嗎?答案是否定的。?
????????首先從消費者來說,如果在訂閱消費隊列時將 autoAck 參數設置為true,那么當消費者接收到相關消息之后,還沒來得及處理就宕機了,這樣也算數據丟失。這種情況很好解決,將autoAck 參數設置為 false,并進行手動確認,詳細可以參考3.5 節。?

????????其次,在持久化的消息正確存入 RabbitMQ 之后,還需要有一段時間(雖然很短,但是不可忽視)才能存入磁盤之中。RabbitMQ 并不會為每條消息都進行同步存盤(調用內核的fsync1方法)的處理,可能僅僅保存到操作系統緩存之中而不是物理磁盤之中。如果在這段時間內
RabbitMQ 服務節點發生了宕機、重啟等異常情況,消息保存還沒來得及落盤,那么這些消息將丟失。RabbitMQ 在運行時會根據統計的消息傳送速度定期計算一個當前內存中能夠保存的最大消息數量(target_ram_count),如果alpha 狀態的消息數量大于此值時,就會引起消息的狀態轉換會丟失。?這里可以引入 RabbitMQ 的鏡像隊列機制(詳細參考 9.4 節)

4.8 生產者確認?

????????生產者如何知道消息有沒有正確地到達服務器

4.8.1 事務機制

  • 通過事務機制實現;?
  • 通過發送方確認(publisher confirm)機制實現。
try { channel.txSelect(); channel.basicPublish(exchange, routingKey,  MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); int result = 1 / 0; channel.txCommit(); 
} catch (Exception e) { e.printStackTrace(); channel.txRollback(); 
} 

    4.8.2 發送方確認機制

    ??采用事務機制實現會嚴重降低 RabbitMQ 的消息吞吐量,這里就引入了一種輕量級的方式——發送方確認(publisher confirm)機制。?

    ????????生產者通過調用 channel.confirmSelect 方法(即 Confirm.Select 命令)將信道設置為 confirm 模式,之后 RabbitMQ 會返回 Confirm.Select-Ok 命令表示同意生產者將當前信道設置為 confirm 模式。所有被發送的后續消息都被 ack 或者 nack 一次,不會出現一條消
    息既被 ack 又被 nack 的情況,并且 RabbitMQ 也并沒有對消息被 confirm 的快慢做任何保證

            try {channel.confirmSelect();//將信道置為publisher confirm模式//之后正常發送消息channel.basicPublish("exchange", "routingKey", null,"publisher confirm test".getBytes());if (!channel.waitForConfirms()) {// 當消息發送不成功時候進入 if 代碼塊 do something else....System.out.println("send message failed");}} catch (InterruptedException e) {e.printStackTrace();}

    注意要點:?
    (1)事務機制和 publisher confirm 機制兩者是互斥的,不能共存。如果企圖將已開啟事務模式的信道再設置為 publisher confirm 模式,RabbitMQ 會報錯:{amqp_error, precondition_?failed, "cannot switch from tx to confirm mode", 'confirm.select'};或者如果企圖將已開啟 publisher confirm 模式的信道再設置為事務模式,RabbitMQ 也會報錯:{amqp_error, precondition_failed, "cannot switch from confirm to tx?
    mode", 'tx.select' }。?
    (2)事務機制和 publisher confirm 機制確保的是消息能夠正確地發送至 RabbitMQ,這里的“發送至 RabbitMQ”的含義是指消息被正確地發往至 RabbitMQ 的交換器,如果此交換器沒有匹配的隊列,那么消息也會丟失。所以在使用這兩種機制的時候要確保所涉及的交換器能夠有匹配的隊列。更進一步地講,發送方要配合 mandatory 參數或者備份交換器一起使用來提高消息傳輸的可靠性。?

    ????????publisher confirm 的優勢在于并不一定需要同步確認。這里我們改進了一下使用方式,總結有如下兩種:

    • 批量 confirm 方法:每發送一批消息后,調用 channel.waitForConfirms 方法,等待服務器的確認返回。?

    ????????相比于前面示例中的普通 confirm 方法,批量極大地提升了 confirm 的效率,但是問題在于出現返回 Basic.Nack 或者超時情況時,客戶端需要將這一批次的消息全部重發,這會帶來明顯的重復消息數量,并且當消息經常丟失時,批量 confirm 的性能應該是不升反降的。?

    channel.confirmSelect(); // 開啟確認模式
    for(int i=0; i<100; i++){channel.basicPublish("", "queue", null, message.getBytes());
    }
    // 批量確認所有未確認消息
    channel.waitForConfirmsOrDie(5000); // 超時5秒// 缺點:簡單但會阻塞生產者線程,批量失敗需重發全部消息
    ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
    channel.addConfirmListener((sequenceNumber, multiple) -> {if(multiple) {outstandingConfirms.headMap(sequenceNumber, true).clear();} else {outstandingConfirms.remove(sequenceNumber);}
    }, (sequenceNumber, multiple) -> {// NACK處理邏輯
    });// 批量發送100條消息for (int i = 0; i < 100; i++) {String message = "Msg-" + i;long seqNo = channel.getNextPublishSeqNo();outstandingConfirms.put(seqNo, message);channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());}

    4.9 消費端要點介紹

    ????????消費者客戶端可以通過推模式或者拉模式(推薦方式)的方式來獲取并消費消息,當消費者處理完業務邏輯需要手動確認消息已被接收,這樣 RabbitMQ才能把當前消息從隊列中標記清除。當然如果消費者由于某些原因無法處理當前接收到的消息,可以通過 channel.basicNack 或者 channel.basicReject 來拒絕掉。?

    • 消息分發;?
    • 消息順序性;?
    • 棄用 QueueingConsumer。?

    消息順序

    在RabbitMQ中保證消息順序性需結合隊列特性和業務設計,以下是核心方案:

    一、基礎保障機制

    1. ?單隊列單消費者模式?

      • 利用隊列FIFO特性,僅允許一個消費者處理隊列,避免并發消費導致亂序
      • 缺點:吞吐量受限,需配合消息持久化和手動ACK確保可靠性
    2. ?分區消費策略?

      • 通過路由鍵將關聯消息(如相同訂單ID)固定路由到同一隊列,每個隊列對應獨立消費者
      • 示例:使用Direct交換機按業務ID路由,實現"局部順序性"

    二、增強控制手

    1. ?消息序列化標記?

      • 在消息體中嵌入序列號,消費者端通過緩存排序實現邏輯順序控制
      • 需配合冪等處理避免重復消息干擾
    2. ?單活消費者模式?

      • 通過x-single-active-consumer參數確保隊列同一時間僅有一個活躍消費者,故障時自動切換
      // Spring AMQP配置示例 
      Map<String, Object> args = new HashMap<>(); 
      args.put("x-single-active-consumer", true); 
      new Queue("seq_queue", true, false, false, args);

    三、高級方案

    1. ?事務與發布確認?

      • 生產者啟用事務或發布確認機制,確保消息按發送順序持久化到隊列
      • 事務適用于批量消息,確認機制適合單條消息
    2. ?死信隊列重試?

      • 對處理失敗的消息進入死信隊列延時重試,避免立即重入破壞順序

      4.10 消息傳輸保障

      第九章 RabbitMQ 高階

      9.1 存儲機制?

      ????????不管是持久化的消息還是非持久化的消息都可以被寫入到磁盤。這兩種類型的消息的落盤處理都在RabbitMQ 的“持久層”中完成。?

      • 持久化的消息也會在內存中保存一份備份,這樣可以提高一定的性能,當內存吃緊的時候會從內存中清除。
      • 非持久化的消息一般只保存在內存中,在內存吃緊的時候會被換入到磁盤中,以節省內存空間。

      持久層

      • 隊列索引(rabbit_queue_index):負責維護隊列中落盤消息的信息,包括消息的存儲地點、是否已被交付給消費者、是否已被消費者ack 等。每個隊列都有與之對應的一個rabbit_queue_index
      • 消息存儲(rabbit_msg_store):以鍵值對的形式存儲消息,它被所有隊列共享,在每個節點中有且只有一個。從技術層面上來說,rabbit_msg_store 具體還可以分為msg_store_persistent 和 msg_store_transient
      • ????????msg_store_persistent 負責持久化消息的持久化,重啟后消息不會丟失;
      • ????????msg_store_transient 負責非持久化消息的持久化,重啟后消息會丟失。

      結構查看??/opt/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@node1??

      ?隊列的結構

      • rabbit_amqqueue_process 負責協議相關的消息處理,即接收生產者發布的消息、向消費者交付消息、處理消息的確認(包括生產端的 confirm 和消費端的 ack)等。
      • backing_queue 是消息存儲的具體形式和引擎,并向rabbit_amqqueue_process提供相關的接口以供調用。

      隊列消息狀態

      • alpha:消息內容(包括消息體、屬性和 headers)和消息索引都存儲在內存中。 (alpha狀態最耗內存,但很少消耗CPU)
      • beta:消息內容保存在磁盤中,消息索引保存在內存中(只需要一次I/O 操作就可以讀取到消息(從 rabbit_msg_store 中))。?
      • gamma:消息內容保存在磁盤中,消息索引在磁盤和內存中都有(只需要一次I/O 操作就可以讀取到消息(從 rabbit_msg_store 中))。?
      • delta:消息內容和索引都在磁盤中。(狀態基本不消耗內存,但是需要消耗更多的 CPU 和磁盤 I/O 操作,delta 狀態需要執行兩次I/O 操作才能讀取到消息,一次是讀消息索引(從 rabbit_queue_index 中),一次是讀消息
        內容(從 rabbit_msg_store 中))

      本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
      如若轉載,請注明出處:http://www.pswp.cn/web/94185.shtml
      繁體地址,請注明出處:http://hk.pswp.cn/web/94185.shtml
      英文地址,請注明出處:http://en.pswp.cn/web/94185.shtml

      如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

      相關文章

      BEVDet4D

      1. BEVDet4D算法動機及開創性思路 1&#xff09;BEVDet算法概述輸入輸出&#xff1a;輸入為6視角圖像&#xff08;NuScenes數據集&#xff09;&#xff0c;輸出為3D檢測結果核心模塊&#xff1a; 圖像編碼器&#xff1a;由Backbone網絡和多尺度特征融合網絡組成&#xff0c;處理…

      當 AI 學會 “理解” 人類:自然語言處理的進化與倫理邊界

      大家可以去我的資源看看&#xff0c;有很多關于AI的免費資源可以下載&#xff0c;不下載也可以看看&#xff0c;真的對你有用引言&#xff1a;從 “對話” 到 “理解”——AI 語言能力的時代躍遷現實錨點&#xff1a;以日常場景切入&#xff08;如 ChatGPT 流暢回應復雜問題、A…

      WPF控件隨窗體大寬度高度改變而改變

      前臺控件中&#xff1a;Width"{Binding RelativeSource{RelativeSource AncestorTypeWindow}, PathWidth}"后臺代碼&#xff1a;定義在加載事件里面this.SizeChanged ProductData_SizeChanged;private void ProductData_SizeChanged(object sender, SizeChangedEven…

      E10 通過RPC實現賬號批量鎖定與解鎖

      需求背景&#xff1a;賬號信息由三方系統管理&#xff0c;包含賬號狀態&#xff0c;所以需要通過提供給三方的 Rest 接口中&#xff0c;實現賬號鎖定與解鎖。參考基線版本&#xff1a;10.0.2506.01&#xff0c;過低的版本可能無法使用。 鎖定分為兩種&#xff1a; &#xff08;…

      什么是AI寵物

      什么是AI寵物AI寵物是由AI大腦驅動的生命體AI產品。它能主動產生情緒和意圖&#xff0c;并通過情緒和意圖去驅動自己的動作和行為。它根據自己的意愿和用戶互動&#xff0c;不受用戶控制。從一定意義上講&#xff0c;它擁有了人工生命和自由意志。它有自己的行為邏輯&#xff0…

      簡單AI:搜狐公司旗下AI繪畫產品

      本文轉載自&#xff1a;簡單AI&#xff1a;搜狐公司旗下AI繪畫產品 - Hello123工具導航 ** 一、平臺定位與技術特性 搜狐簡單 AI 是搜狐推出的多模態 AI 創作平臺&#xff0c;基于自研大模型提供文生圖、文生文等能力。它專注于零門檻內容生成&#xff0c;用戶無需專業技能即…

      vue3 3d餅圖

      完整3D餅圖項目下載 https://download.csdn.net/download/weixin_54645059/91716476 只有一個vue文件 直接下滑到完整代碼就闊以 本文介紹了如何使用ECharts和ECharts-GL插件實現3D餅圖效果&#xff0c;并提出了數值顯示未解決的問題。主要包含以下內容&#xff1a; 安裝所需…

      全球電商業財一體化趨勢加速,巨益科技助力品牌出海精細化運營

      行業背景&#xff1a;跨境電商進入品牌化發展新階段隨著國內電商市場競爭日趨激烈&#xff0c;跨境電商已成為中國品牌尋求增長突破的重要賽道&#xff0c;在TikTok、Temu等平臺出海浪潮推動下&#xff0c;越來越多的中國品牌開始布局全球市場。然而&#xff0c;從單一市場的鋪…

      【序列晉升】13 Spring Cloud Bus微服務架構中的消息總線

      Spring Cloud Bus作為微服務架構中的關鍵組件&#xff0c;通過消息代理實現分布式系統中各節點的事件廣播與狀態同步&#xff0c;解決了傳統微服務架構中配置刷新效率低下、系統級事件傳播復雜等問題。它本質上是一個輕量級的事件總線&#xff0c;將Spring Boot Actuator的端點…

      [激光原理與應用-314]:光學設計 - 光學系統設計與電子電路設計的相似或相同點

      光學系統設計與電子電路設計雖分屬不同工程領域&#xff0c;但在設計理念、方法論和工程實踐中存在諸多相似或相同點。這些共性源于兩者均需解決復雜系統的優化問題&#xff0c;并遵循工程設計的通用規律。以下是具體分析&#xff1a;一、設計流程的相似性需求分析與規格定義光…

      Linux學習:信號的保存

      目錄1. 進程的異常終止與core dump標志位1.1 進程終止的方式1.2 core方案的作用與使用方式2. 信號的保存2.1 信號的阻塞2.2 操作系統中的sigset_t信號集類型2.3 進程PCB中修改block表的系統調用接口2.4 信號阻塞的相關問題驗證1. 進程的異常終止與core dump標志位 1.1 進程終止…

      數據分析編程第二步: 最簡單的數據分析嘗試

      2.1 數據介紹有某公司的銷售數據表 sales.csv 如下:第一行是標題&#xff0c;解釋每一列存了什么東西。第二行開始每一行是一條數據&#xff0c;對應一個訂單。這種數據有個專業的術語&#xff0c;叫結構化數據。這是現代數據處理中最常見的數據類型。整個表格的數據統稱為一個…

      UDP報文的數據結構

      主要內容參照https://doc.embedfire.com/net/lwip/zh/latest/doc/chapter14/chapter14.html#id6&#xff0c;整理出來自用。 1. UDP 報文首部結構體&#xff08;udp_hdr&#xff09; 為清晰定義 UDP 報文首部的各個字段&#xff0c;LwIP 設計了udp_hdr結構體&#xff0c;其包含…

      圖論與最短路學習筆記

      圖論與最短路在數學建模中的應用 一、圖論模型圖 G(V,E)G(V,E)G(V,E) VVV&#xff1a;頂點集合EEE&#xff1a;邊集合每條邊 (u,v)(u,v)(u,v) 賦予權值 w(u,v)w(u,v)w(u,v)&#xff0c;可用 鄰接矩陣 或 鄰接表 表示。二、最短路問題的數學形式 目標&#xff1a;尋找從源點 sss…

      第九節 Spring 基于構造函數的依賴注入

      當容器調用帶有一組參數的類構造函數時&#xff0c;基于構造函數的 DI 就完成了&#xff0c;其中每個參數代表一個對其他類的依賴。接下來&#xff0c;我們將通過示例來理解 Spring 基于構造函數的依賴注入。示例&#xff1a;下面的例子顯示了一個類 TextEditor&#xff0c;只能…

      【數據庫】PostgreSQL詳解:企業級關系型數據庫

      文章目錄什么是PostgreSQL&#xff1f;核心特性1. 標準兼容性2. 擴展性3. 高級功能4. 可靠性數據類型1. 基本數據類型2. 高級數據類型基本操作1. 數據庫操作2. 表操作3. 數據操作高級查詢1. 連接查詢2. 子查詢3. 窗口函數JSON操作1. JSON數據類型2. JSON查詢3. JSON索引全文搜索…

      FFMPEG相關解密,打水印,合并,推流,

      1&#xff1a;ffmepg進行打水印解密 前提ffmepg安裝利用靜態版就可以這個什么都有&#xff0c;不用再配置其他信息&#xff1a;&#xff08;這個利用ffmpeg終端命令是沒問題的&#xff0c;但是如果要是再C中調用ffmpeg庫那么還需要從新編譯安裝下&#xff09; 各個版本 Inde…

      MySql知識梳理之DML語句

      注意: 插入數據時&#xff0c;指定的字段順序需要與值的順序是一一對應的。 字符串和日期型數據應該包含在引號中。 插入的數據大小&#xff0c;應該在字段的規定范圍內注意:修改語句的條件可以有&#xff0c;也可以沒有&#xff0c;如果沒有條件&#xff0c;則會修改整張表的所…

      GaussDB GaussDB 數據庫架構師修煉(十八)SQL引擎-SQL執行流程

      1 SQL執行流程查詢解析&#xff1a;詞法分析、語法分析、 語義分析 查詢重寫&#xff1a;視圖和規則展開、基于規則的查詢優化 計劃生成&#xff1a;路徑搜索和枚舉、選出最優執行計劃 查詢執行&#xff1a;基于優化器生成的物理執行計劃對數據進行獲取和計算2 解析器和優化器S…

      grpc 1.45.2 在ubuntu中的編譯

      要在 Ubuntu 上編譯 gRPC 1.45.2&#xff0c;需要按照以下步驟操作。以下指南基于 gRPC 官方文檔和相關資源&#xff0c;確保環境配置正確并成功編譯。請確保你有管理員權限&#xff08;sudo&#xff09;以安裝依賴項和執行相關命令。 1. 準備環境 確保你的 Ubuntu 系統已安裝…