RabbitMQ 的經典問題

文章目錄

  • 前言
  • 一、防止消息丟失
    • 1.1 ConfirmCallback/ReturnCallback
    • 1.2 持久化
    • 1.3 消費者確認消息
  • 二、防止重復消費
  • 三、處理消息堆積
  • 四、有序消費消息
  • 五、實現延時隊列
  • 六、小結
  • 推薦閱讀

前言

當設計和運維消息隊列系統時,如 RabbitMQ,有幾個關鍵問題需要特別關注:消息丟失、重復消費、消息堆積、有序消費和延時隊列。這些問題直接影響系統的可靠性、性能和數據完整性。本文將深入探討如何在使用 RabbitMQ 時有效地解決這些問題。

一、防止消息丟失

在 RabbitMQ 中,消息從生產者到消費者需要經歷多個階段。以下是消息傳遞的流程:

  1. 生產者創建消息:生產者(Producer)創建要發送的消息。
  2. 消息發送到交換機:生產者將消息發送到 RabbitMQ 服務器上的交換機(Exchange)。
  3. 消息進入隊列:交換機將消息路由到一個或多個隊列(Queue)。
  4. 消息從隊列投遞到消費者:RabbitMQ 將消息從隊列中取出并投遞給訂閱的消費者。消費者接收到消息后,可以開始處理消息。

在這里插入圖片描述

在這個流程中,可能會發生以下幾個問題:

  1. 生產者無法確認消息是否發送到 RabbitMQ 服務器,若中途出現意外,則可能丟失消息
  2. 消息在 RabbitMQ 服務器內存中,若服務器出現異常,會丟失消息
  3. RabbitMQ 服務器無法確定消費者是否正常消費消息,中途出現異常,會丟失消息

1.1 ConfirmCallback/ReturnCallback

為了解決第一個問題,生產者無法確認消息是否正確發送到 RabbitMQ 服務器,RabbitMQ 給出的解決方案是:

  1. ConfirmCallback: 用來確認消息是否被 RabbitMQ 服務器成功接收的回調接口。
    • 當消息成功發送到交換機時,會調用 ConfirmCallback 的 confirm 方法。
    • 如果消息發送到交換機失敗(比如交換機不存在),也會調用 ConfirmCallback 的 confirm 方法,此時 ack 參數為 false。
    • 通常情況下,ConfirmCallback 用來確認消息是否成功發送到交換機
  2. ReturnCallback: 用來處理未被路由到合適 Queue 的消息的回調接口。
    • 當消息從交換機發送到隊列失敗時,如果設置了 mandatory 為 true,則會調用 ReturnCallback。
    • 如果消息成功路由到隊列,則不會調用 ReturnCallback。
    • 在消息無法被路由到隊列時,會調用 ReturnCallback 的 returnedMessage 方法,可以在該方法中處理未路由消息的相關邏輯,比如重新發送或記錄日志等。

1.2 持久化

針對第二個問題:RabbitMQ 服務器異常,內存中的內容無法持久化導致消息丟失。RabbitMQ 提供了相應的持久化方案:

  1. 交換機持久化:當聲明一個交換機時,可以選擇將其標記為持久化。持久化的交換機將會存儲在磁盤上,即使 RabbitMQ 服務器重啟,也不會丟失。
  2. 隊列持久化:類似于交換機,可以聲明一個隊列為持久化。持久化的隊列會在 RabbitMQ 服務器重啟后保留,并且其中的消息也會被保留。這確保了即使發生了服務中斷或者重啟,消息也不會丟失。
  3. 消息持久化:當發布一條消息到 RabbitMQ 時,可以選擇使消息持久化。持久化的消息將會寫入磁盤,這樣即使 RabbitMQ 服務器在消息到達消費者之前崩潰,消息也不會丟失。持久化消息比非持久化消息更加安全,但是也會有性能開銷,因為需要寫入磁盤。

1.3 消費者確認消息

RabbitMQ 是閱后即焚機制,即 RabbitMQ 將消息發送到消費者后會立刻刪除。如果 RabbitMQ 將消息投遞給消費者后,RabbitMQ 將消息進行了刪除。然而,消費者出現了異常,并沒有正常處理消息。就會導致消息丟失。

在 RabbitMQ 中,消費者可以設置不同的確認模式(acknowledgement mode),以確定當消費者收到消息時如何向 RabbitMQ 確認消息已經被處理。這些確認模式通常稱為手動確認(manual acknowledgment)、自動確認(automatic acknowledgment)和無確認(no acknowledgment),也可以簡寫為 manual、auto 和 none。

  1. Manual Acknowledgment (manual):
    • 在手動確認模式下,消費者收到消息后,必須顯式地向 RabbitMQ 發送一個確認(ack)來告知它已經處理了該消息。
    • 這種方式可以確保消息只有在消費者成功處理后才被標記為已傳遞(delivered),避免消息在處理過程中丟失。
  2. Automatic Acknowledgment (auto):
    • 在自動確認模式下,當消費者收到消息并且 RabbitMQ 將其成功傳遞給消費者時,它會自動向 RabbitMQ 發送一個確認。這意味著一旦消息傳遞給消費者,RabbitMQ 就會將其標記為已傳遞,而無需消費者顯式確認。
    • 這種模式適合于那些允許偶爾丟失一些消息的應用場景,因為在消息傳遞給消費者后,就無法確保消費者是否成功處理了消息。
  3. No Acknowledgment (none):
    • 在無確認模式下,消費者在接收到消息后,RabbitMQ 不會等待消費者的確認,也不會嘗試重新傳遞消息,而是將消息傳遞給消費者并將其標記為已傳遞,然后立即將其視為已處理。
    • 這種模式通常用于那些不需要保證每條消息都被處理的場景,例如日志處理等。

二、防止重復消費

RabbitMQ 的重復消費問題指的是在消息隊列中,消費者可能會多次處理相同的消息,從而導致業務邏輯出現問題或者數據處理不一致的情況。這種問題通常發生在以下幾種情況下:

  1. 消息重新投遞
    • 當消費者處理消息時發生了異常或者消費者未能確認消息的處理完成(Ack),RabbitMQ 可能會將消息重新投遞到同一個消費者或者其他消費者,導致消息被重復消費。
    • 這種情況下,如果消費者處理消息的過程中出現了異常或者無法確認消息處理結果,RabbitMQ 會將消息重新發送給消費者,以確保消息不會丟失。
  2. 消費者處理時間過長
    • 如果消息處理需要較長時間,而消費者在處理過程中并未確認消息的完成(Ack),則 RabbitMQ 可能會誤以為消息未能成功處理,再次將消息投遞給消費者。這樣可能導致消息被重復處理。

在 RabbitMQ 中,防止重復消費的方法通常涉及以下幾種策略和技術:

  1. 消息去重(Message Deduplication)
    • 使用唯一標識符(如消息的 ID 或者業務相關的唯一鍵)來標記每條消息。在消費者端,在處理消息之前,可以通過維護一個已處理消息的列表或者使用緩存(如 Redis)來檢查這個唯一標識符是否已經被處理過。如果已經處理過,則可以選擇性地丟棄這條消息或者執行相應的處理邏輯。
    • 如果消息具有全局唯一性要求,可以在生產者端確保生成的消息 ID 或者唯一鍵的唯一性,以確保在消費者端能夠準確判斷消息是否已經處理過。
  2. 冪等性操作(Idempotent Consumers)
    • 在設計消費者應用程序時,盡量確保消費者的處理邏輯具有冪等性。即使同一條消息被消費多次,也不會引起意外的副作用或者數據不一致的情況。例如,數據庫操作中的冪等性可以通過使用唯一鍵約束、UPSERT(如果支持)、樂觀鎖等技術來實現。
    • 這種方式適用于那些無法避免消息重復投遞的場景,或者確保消費者的處理邏輯具備強大的魯棒性和數據一致性。
  3. 消息確認和消息預取(Message Acknowledgment and Prefetching)
    • RabbitMQ 提供了消息確認機制,即消費者在處理完消息后需要顯式地確認消息。確保消費者確認消息后再進行后續的處理操作,可以防止因為消費者未能成功處理消息而導致消息重復消費的問題。
    • 同時,通過合理設置消費者的預取數(Prefetch Count),可以控制消費者從隊列中預先獲取的消息數量。這可以幫助在一定程度上減少因消息處理過長或者消費者出現故障而導致的消息重復消費。

三、處理消息堆積

RabbitMQ 消息堆積問題指的是在消息隊列中積累大量未被消費的消息,導致隊列中的消息數量急劇增加,可能會對系統的性能和穩定性產生負面影響。這種問題通常由以下幾個原因引起:

  1. 生產者速度快于消費者速度
    • 如果生產者生產消息的速度遠快于消費者處理消息的速度,未被消費的消息會不斷積累在隊列中,最終導致隊列中消息數量急劇增加,形成消息堆積。
    • 這種情況可能發生在消費者處理能力不足、消費者出現故障或者網絡延遲等情況下。
  2. 消費者處理能力不足
    • 當消費者處理消息的速度跟不上生產者發送消息的速度時,未處理的消息會在隊列中積累,最終導致消息堆積問題。
    • 消費者處理能力不足可能是因為消費者數量不足、消費者處理邏輯復雜或者消費者出現瓶頸等原因引起的。
  3. 隊列設置不當
    • 隊列的容量設置不當,例如隊列的最大長度設置過小,無法應對高峰期的消息積壓。

解決 RabbitMQ 消息堆積問題需要綜合考慮生產者和消費者之間的消息流量控制、隊列的監控與管理以及系統的容錯處理。以下是一些常見的解決方法和策略:

  1. 增加消費者數量
    • 最直接的方法是增加消費者的數量,以提升消息處理的能力。通過增加消費者,可以分擔隊列中消息的處理壓力,減少消息堆積的可能性。
    • 可以動態地根據系統負載情況或者隊列中消息數量來調整消費者的數量。
  2. 優化消費者處理能力
    • 優化消費者的處理邏輯,確保消費者能夠高效地處理消息。這包括優化數據庫訪問、提升算法效率、避免長時間阻塞操作等。
    • 使用并發處理和異步處理技術,充分利用多線程或者異步框架來提高消費者的并發處理能力。
  3. 調整隊列的配置參數
    • 根據實際情況調整隊列的容量限制、消息過期時間等參數,避免隊列過于擁擠或者消息長時間積壓。

四、有序消費消息

在消息隊列系統中,通常情況下,消息在生產者發送到隊列后,會按照 FIFO(先進先出)的原則被消費者處理。但是,當存在多個消費者同時消費同一個隊列時,RabbitMQ 無法保證消息的嚴格順序性,因為不同的消費者可能以不同的速度處理消息,導致消息的處理順序可能被打亂。

在 RabbitMQ 中實現有序消費消息通常涉及以下幾種方法和技術:

  1. 單隊列單消費者
    • 最簡單的方式是使用單隊列和單消費者。RabbitMQ 會確保對于同一個隊列,消息的投遞順序與其被消費的順序一致。這意味著當消費者從隊列中接收消息時,它們會按照它們被發送的順序進行處理。
    • 在這種情況下,RabbitMQ 的默認行為會保證消息的有序性,只要消費者處理消息的速度足夠快,就能保證消息的有序消費。
  2. 使用消息的順序屬性
    • 在生產者端,可以為每條消息添加一個標識其順序的屬性,例如序號或者時間戳。然后在消費者端,通過這些屬性來排序和處理消息。
    • 消費者可以在消費消息之前先對消息進行排序,或者根據屬性來判斷是否需要延遲處理某些消息,以保證消息的有序性。
  3. 使用全局順序化插件(RabbitMQ Global Ordered Queue)
    • RabbitMQ 提供了全局順序化插件,它可以確保所有隊列中的消息都按照一定的順序被投遞和消費。這個插件適用于一些需要強有序性的場景,但需要注意它可能會引入一些性能上的限制和開銷。

五、實現延時隊列

在常規的消息隊列中,消息一旦發送到隊列中,就會盡快被消費者獲取和處理。然而,某些業務場景可能需要延遲發送消息,或者延遲消費消息,這時就需要使用延時隊列來實現這一需求。

在 RabbitMQ 中實現延時隊列通常涉及使用以下幾種方法:

  1. 使用 TTL(Time-To-Live)和死信隊列(Dead Letter Exchange)
    • RabbitMQ 支持設置消息的 TTL,即消息的存活時間。通過設置消息的 TTL,可以讓消息在指定的時間段后過期,然后被發送到死信隊列(Dead Letter Queue)。
    • 死信隊列是一個特殊的隊列,它接收所有因某些原因未能成功消費的消息,包括過期的消息。可以通過設置隊列的 x-dead-letter-exchangex-dead-letter-routing-key 參數,將消息發送到指定的交換器和路由鍵,實現延時消息的轉發和消費。
  2. 利用插件實現延時隊列
    • RabbitMQ 社區提供了一些插件,例如 rabbitmq_delayed_message_exchange 插件,它能夠在 RabbitMQ 中實現更精確的延時消息發送和消費。
    • 這種插件通過引入一個特殊的交換器類型(Delayed Message Exchange),可以讓生產者在消息發送時指定一個延遲時間,消息將會在指定的延遲時間之后被交換器路由到相應的隊列,然后被消費者處理。
  3. 使用定時器和消息輪詢
    • 在消費者端,可以使用定時器或者消息輪詢的方式,定期檢查隊列中的消息是否已經到達了處理時間。一旦消息到達處理時間,消費者就可以將其從隊列中取出并處理。
    • 這種方法雖然沒有直接利用 RabbitMQ 的內置功能,但適用于一些簡單的延時消息處理需求,可以通過編碼實現。

六、小結

在使用 RabbitMQ 構建消息隊列系統時,關鍵要點包括確保消息的可靠性和完整性、實現消費者的冪等性、有效管理消息堆積、處理有序消息以及實現延時隊列功能。通過合理配置和實施這些策略,可以提升系統的性能和可靠性,確保消息系統穩定運行。

推薦閱讀

  1. Spring 三級緩存
  2. 深入了解 MyBatis 插件:定制化你的持久層框架
  3. Zookeeper 注冊中心:單機部署
  4. 【JavaScript】探索 JavaScript 中的解構賦值
  5. 深入理解 JavaScript 中的 Promise、async 和 await

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/37393.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/37393.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/37393.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

第100+13步 ChatGPT學習:R實現決策樹分類

基于R 4.2.2版本演示 一、寫在前面 有不少大佬問做機器學習分類能不能用R語言,不想學Python咯。 答曰:可!用GPT或者Kimi轉一下就得了唄。 加上最近也沒啥內容寫了,就幫各位搬運一下吧。 二、R代碼實現決策樹分類 (…

【漏洞復現】宏景HCM人力資源信息管理系統——任意文件讀取漏洞

聲明:本文檔或演示材料僅供教育和教學目的使用,任何個人或組織使用本文檔中的信息進行非法活動,均與本文檔的作者或發布者無關。 文章目錄 漏洞描述漏洞復現測試工具 漏洞描述 宏景HCM人力資源信息管理系統是一款全面覆蓋人力資源管理各模塊…

docker pull 鏡像的時候遇到Pulling fs layer問題

最近遇到一個很奇怪的問題,docker pull 鏡像的時候,總是出現Pulling fs layer問題,導致鏡像拉取不成功,以前是安裝好docker,正常拉取鏡像都是沒什么問題的,在這里記錄一下這個問題的解決方法,當然,可能并不通用。 1、進入阿里云容器服務 地址:https://cr.console.aliy…

Spring Boot中的熱部署配置

Spring Boot中的熱部署配置 大家好,我是免費搭建查券返利機器人省錢賺傭金就用微賺淘客系統3.0的小編,也是冬天不穿秋褲,天冷也要風度的程序猿!今天我們將探討如何在Spring Boot項目中實現熱部署配置,提升開發效率和項…

C++實現Qt的信號+槽功能

在 Visual Studio (VS) 上使用 C 實現類似 Qt 的信號和槽機制是完全可能的,但 Qt 的信號和槽系統是基于其特定的元對象系統(Meta-Object System, MOC)的,這需要一些特定的預處理器和代碼生成步驟。 如果你不想使用 Qt,…

vue路由傳參和react 路由傳參

路由跳轉的方式 1、聲明式導航 <router-link to"導航的地址"> 2、編程式導航 編程式導航有三種方法來進行導航 router.push router.replace router.go params傳參和query傳參 1、 params 傳參(不在URL中顯示參數) 在父路由跳轉到子路由時&#xff0c;也可…

【Django】網上蛋糕項目商城-熱銷和新品

概念 本文將完成實現項目的熱銷和新品兩個分類的商品列表進行分頁展示。 熱銷和新品功能實現步驟 在head.html頭部頁面中點擊這兩個超鏈接向服務器發送請求。 在urls.py文件中定義該請求地址 path(goodsrecommend_list/,views.goodsrecommend_list) 在views.py文件中定義g…

JDBC中的批處理是什么?如何使用?

JDBC中的批處理是指將多個關聯的SQL語句組合成一個批處理&#xff0c;并將它們作為一個調用提交給數據庫。這種方法可以減少通信的資源消耗&#xff0c;從而提高性能。以下是關于JDBC批處理的具體使用和步驟&#xff1a; 1. JDBC批處理的基本概念 批處理定義&#xff1a;將多…

英飛凌TC3xx之一起認識GTM(十五)GTM常見配置問題總結

英飛凌TC3xx之一起認識GTM(十五)GTM常見配置問題總結 1 關于TGC/AGC的配置注意事項2 關于HOST_TRIG的使用3 關于SOMC模式中MCS與ARU的合并使用配置4 深入理解SOMP模式中RST_CCU0的配置5 關于CCUx中斷的使用6 TIM如何捕獲ATOM的輸出7 總結前面幾篇關鍵文章信息鏈接匯總如下: …

AV Foundation學習筆記二 - 播放器

ASSets AVFoundation框架的最核心的類是AVAsset&#xff0c;該類是整個AVFoundation框架設計的中心。AVAsset是一個抽象的&#xff08;意味著你不能調用AVAsset的alloc或者new方法來創建一個AVAsset實例對象&#xff0c;而是通過該類的靜態方法來創建實例對象&#xff09;、不…

DevOps CMDB平臺整合Jira工單

背景 在DevOps CMDB平臺建設的過程中&#xff0c;我們可以很容易的將業務應用所涉及的云資源&#xff08;WAF、K8S、虛擬機等&#xff09;、CICD工具鏈&#xff08;Jenkins、ArgoCD&#xff09;、監控、日志等一次性的維護到CMDB平臺&#xff0c;但隨著時間的推移&#xff0c;…

Stirling PDF 部署 - 強大的PDF Web在線編輯工具箱

簡介 這是一個強大的、可本地托管的、基于 Web 的 PDF 操作工具&#xff0c;可使用 Docker部署。它使您能夠對 PDF 文件執行各種操作&#xff0c;包括拆分、合并、轉換、重組、添加圖像、旋轉、壓縮等。這個本地托管的 Web 應用程序已經發展到包含一套全面的功能&#xff0c;可…

PHP爬蟲類的并發與多線程處理技巧

PHP爬蟲類的并發與多線程處理技巧 引言&#xff1a; 隨著互聯網的快速發展&#xff0c;大量的數據信息存儲在各種網站上&#xff0c;獲取這些數據已經成為很多業務場景下的需求。而爬蟲作為一種自動化獲取網絡信息的工具&#xff0c;被廣泛應用于數據采集、搜索引擎、輿情分析…

關于組織赴俄羅斯(莫斯科)第 28 屆國際汽車零部件、汽車維修設備和商品展覽會商務考察的通知

關于組織赴俄羅斯&#xff08;莫斯科&#xff09; 第 28 屆國際汽車零部件、汽車維修設備和商品展覽會商務考察的通知 展會名稱&#xff1a;俄羅斯&#xff08;莫斯科&#xff09;第 28 屆國際汽車零部件、汽車零部件、汽車維修設備和商品展覽會 時間&#xff1a;2024 年 8 月…

Python | Leetcode Python題解之第204題計數質數

題目&#xff1a; 題解&#xff1a; MX5000000 is_prime [1] * MX is_prime[0]is_prime[1]0 for i in range(2, MX):if is_prime[i]:for j in range(i * i, MX, i):#循環每次增加iis_prime[j] 0 class Solution:def countPrimes(self, n: int) -> int:return sum(is_prim…

【MongoDB】分布式數據庫入門級學習

SueWakeup 個人主頁&#xff1a;SueWakeup 系列專欄&#xff1a;為祖國的科技進步添磚Java 個性簽名&#xff1a;保留赤子之心也許是種幸運吧 本文封面由 凱楠&#x1f4f8;友情提供 凱楠&#x1f4f8; - 不夜長安 目錄 MongoDB 相關 數據庫排行榜單 MongoDB 中文官網 菜鳥…

如何把mkv轉成mp4?介紹一下將mkv轉成MP4的幾種方法

如何把mkv轉成mp4&#xff1f;如果你有一個MKV格式的視頻文件&#xff0c;但是需要將其轉換為MP4格式以便更廣泛地在各種設備和平臺上播放和共享&#xff0c;你可以通過進行簡單的文件格式轉換來實現。轉換MKV到MP4格式可以提供更好的兼容性&#xff0c;并確保你的視頻文件能夠…

在預訓練語言模型主流架構

文章目錄 編碼器-解碼器架構因果解碼器架構前綴解碼器架構在預訓練語言模型時代,自然語言處理領域廣泛采用了預訓練 + 微調的范式,并誕生了以 BERT 為代表的編碼器(Encoder-only)架構、以 GPT 為代表的解碼器(Decoder-only)架構和以 T5 為代表的編碼器-解碼器(Encoder-d…

華為OD機試C卷(100分)-執行任務賺積分(c語言)

題目描述 現有N個任務需要處理&#xff0c;同一時間只能處理一個任務&#xff0c;處理每個任務所需要的時間固定為1。 每個任務都有最晚處理時間限制和積分值&#xff0c;在最晚處理時間點之前處理完成任務才可獲得對應的積分獎勵。 可用于處理任務的時間有限&#xff0c;請問…

AI學習指南機器學習篇-隨機森林超參數選擇與調優

AI學習指南機器學習篇-隨機森林超參數選擇與調優 隨機森林是一種強大的機器學習算法&#xff0c;它能夠處理復雜的數據集&#xff0c;并且對于大部分實際問題都表現出色。然而&#xff0c;要充分發揮隨機森林的性能&#xff0c;需要對其超參數進行合理選擇和調優。本文將介紹隨…