MessageQueue --- RabbitMQ WorkQueue and Prefetch

MessageQueue --- RabbitMQ WorkQueue and Prefetch

  • 什么是WorkQueue
  • 分發機制 --- RoundRobin
  • 分發機制 --- Prefetch
    • Spring example use prefetch --- Fair Dispatch

什么是WorkQueue

  • Work queues,任務模型。簡單來說就是讓多個消費者綁定到一個隊列,共同消費隊列中的消息。
  • 當消息處理比較耗時的時候,可能生產消息的速度會遠遠大于消息的消費速度。長此以往,消息就會堆積越來越多,無法及時處理。此時就可以使用workqueu模型,多個消費者共同處理消息處理,消息處理的速度就能大大提高了。

在這里插入圖片描述

分發機制 — RoundRobin

工作機制:

  • 默認模式:當多個消費者訂閱同一個隊列時,RabbitMQ 會依次將消息分發給每個消費者,按順序循環分配。
  • 示例:
    隊列中有消息 M1, M2, M3, M4,消費者 C1 和 C2 同時訂閱。
    分發順序為:M1 → C1,M2 → C2,M3 → C1,M4 → C2。

特點:

  • 簡單高效:無需額外配置,適合消費者處理速度相近的場景。

潛在問題:

  • 若消費者處理速度差異較大,可能導致某些消費者空閑,而其他消費者積壓消息。
  • 例如:C1 處理速度慢,C2 處理速度快,但 C1 仍會分配到一半的消息,造成負載不均衡。

Example

//消息發送
//循環發送,模擬大量消息堆積現象。
@Test
public void testWorkQueue() throws InterruptedException {// 隊列名稱String queueName = "simple.queue";// 消息String message = "hello, message_";for (int i = 0; i < 50; i++) {// 發送消息,每20毫秒發送一次,相當于每秒發送50條消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}
//消息接收
//模擬多個消費者綁定同一個隊列,我們添加2個方法,
//并且設置不同睡眠時間模擬不同性能讀取
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消費者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消費者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);
}

在這里插入圖片描述

  • 消費者1很快完成了自己的25條消息
  • 消費者2卻在緩慢的處理自己的25條消息。
  • 也就是說消息是平均分配給每個消費者,并沒有考慮到消費者的處理能力。導致1個消費者空閑,另一個消費者忙的不可開交。沒有充分利用每一個消費者的能力,最終消息處理的耗時遠遠超過了1秒。這樣顯然是有問題的。

下面我們介紹prefetch機制,可以做到fair dispatch

分發機制 — Prefetch

工作機制:

  • 配置預取計數(Prefetch Count):通過設置 basicQos 參數,限制每個消費者未確認(unacknowledged)的消息數量。
  • 進入prefetch的消息仍會被保留在隊列中,但是同時也會發給消費者等待處理
    在 RabbitMQ 的原始隊列(Queue)中,會被標記為 “Unacked”(未確認)狀態。
    這些消息不會被其他消費者獲取(即使設置了 prefetch 的消費者崩潰)。
    只有消費者顯式發送 ack 或 nack 后,消息才會從隊列中移除(或重新排隊)。

消息狀態變化流程

  • 消息推送給消費者:
    RabbitMQ 將消息標記為 “Unacked”,但仍在隊列中(占用內存或磁盤,取決于隊列持久化配置)。
    此時消息對其他消費者不可見。
  • 消費者處理消息:
    若成功處理并發送 ack → 消息從隊列中物理刪除。
    若發送 nack(requeue=true) → 消息重新變為 “Ready” 狀態,可被其他消費者獲取。
    若發送 nack(requeue=false) 或者超時→ 消息被放入死信隊列,如果沒有配置死信隊列則被丟棄

示例:

Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue", false, consumer);
Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(0); // No limit for this consumer,allowing any number of unacknowledged messages.
channel.basicConsume("my-queue", false, consumer);
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true);  // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);
//這兩個消費者之間總共最多只能有 15 條未確認消息,且每個消費者最多處理 10 條消息。
//由于需要在 Channel 和隊列之間協調全局限制,該模式的性能會低于前述示例(存在額外開銷)

特點:

  • 負載均衡:處理速度快的消費者會獲取更多消息,避免空閑
  • 可以一次性發送多個消息給消費者處理,減少網絡開銷
  • 可靠性:需配合手動確認(ack)機制,確保消息處理成功后才從隊列移除。
  • 適用場景:消費者處理速度差異較大時(如耗時任務),能顯著提升整體吞吐量。
  • Automatic acknowledgement mode or manual acknowledgement mode with unlimited prefetch should be used with care. 通常設為 100~300,平衡吞吐與內存占用。

Note:

  • AMQP 0-9-1 協議是channel level prefetch,通過 basic.qos 方法限制channel上的未確認消息數
  • channel level有很大缺陷,由于單個channel可能從多個queue消費消息,channel與queue之間需要為每條消息進行協調,以確保不超出限制。這種機制在單機環境下效率較低,而在集群消費場景中性能會顯著下降,大多數使用場景也需要consumer level prefetch
  • 所以RabbitMQ支持consumer level prefetch (也就是以上的例子)
    在這里插入圖片描述

Spring example use prefetch — Fair Dispatch

  • 在spring中有一個prefetch的配置,我們修改consumer服務的application.yml文件,添加配置:
spring:rabbitmq:listener:simple:acknowledge-mode: manual # 設置確認方式為手動確認prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個消息

在這里插入圖片描述

  • 可以發現,由于消費者1處理速度較快,所以處理了更多的消息;消費者2處理速度較慢,只處理了6條消息。而最終總的執行耗時也在1秒左右,大大提升
  • 還可根據實際情況自定義prefetch count,達到限流的目的
spring:rabbitmq:listener:simple:acknowledge-mode: manual # 設置確認方式為手動確認prefetch: 5 # 限制消費者只能接收5條消息

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/74729.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/74729.shtml
英文地址,請注明出處:http://en.pswp.cn/web/74729.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

RNN模型與NLP應用——(9/9)Self-Attention(自注意力機制)

聲明&#xff1a; 本文基于嗶站博主【Shusenwang】的視頻課程【RNN模型及NLP應用】&#xff0c;結合自身的理解所作&#xff0c;旨在幫助大家了解學習NLP自然語言處理基礎知識。配合著視頻課程學習效果更佳。 材料來源&#xff1a;【Shusenwang】的視頻課程【RNN模型及NLP應用…

詳解AI采集框架Crawl4AI,打造智能網絡爬蟲

大家好&#xff0c;Crawl4AI作為開源Python庫&#xff0c;專門用來簡化網頁爬取和數據提取的工作。它不僅功能強大、靈活&#xff0c;而且全異步的設計讓處理速度更快&#xff0c;穩定性更好。無論是構建AI項目還是提升語言模型的性能&#xff0c;Crawl4AI都能幫您簡化工作流程…

從零開始玩python--python版植物大戰僵尸來襲

大家好呀&#xff0c;小伙伴們&#xff01;今天要給大家介紹一個超有趣的Python項目 - 用pygame制作植物大戰僵尸游戲的進階版本。相信不少小伙伴都玩過這款經典游戲&#xff0c;今天我們就用Python來實現它&#xff0c;讓編程學習變得更加有趣&#xff01;&#x1f31f; 一、…

圖解AUTOSAR_SWS_FlashTest

AUTOSAR Flash Test模塊詳解 基于AUTOSAR 4.4.0規范的Flash測試模塊分析與圖解 目錄 概述 1.1 Flash Test模塊的作用 1.2 工作原理架構設計 2.1 整體架構 2.2 依賴關系狀態管理 3.1 狀態轉換圖 3.2 前臺與后臺測試模式配置結構 4.1 配置類圖 4.2 關鍵配置參數交互流程 5.1 序列…

【mongodb】mongodb的字段類型

目錄 1. 基本數據類型1.1 String1.2 Number1.3 Boolean1.4 Date1.5 Null1.6 ObjectId1.7 Array1.8 Binary Data1.9 Object 2. 特殊數據類型2.1 Regular Expression2.2 JavaScript2.3 Symbol2.4 Decimal1282.5 Timestamp2.6 MinKey/MaxKey2.7 DBPointer 3. 常用字段類型示例4. 注…

MySQL篇(五)MySQL主從同步原理深度剖析

MySQL篇&#xff08;五&#xff09;MySQL主從同步原理深度剖析 MySQL篇&#xff08;五&#xff09;MySQL主從同步原理深度剖析一、引言二、MySQL主從同步基礎概念主庫&#xff08;Master&#xff09;從庫&#xff08;Slave&#xff09;二進制日志&#xff08;Binary Log&#x…

論文學習16:Learning Transferable Visual Models From Natural Language Supervision

代碼來源 Learning Transferable Visual Models From Natural Language Supervisionhttps://arxiv.org/pdf/2103.00020 模塊作用 當前最先進的計算機視覺系統被訓練用于預測一組固定的、預先定義的目標類別。這種受限的監督方式限制了它們的通用性和可用性&#xff0c;因為要…

[MySQL初階]MySQL(9)事務機制

標題&#xff1a;[MySQL初階]MySQL&#xff08;9&#xff09;事物機制 水墨不寫bug 文章目錄 一、認識事務1、多線程訪問數據庫出現的問題2、對CURD的限制是通過事務機制實現的3、事務的四個屬性4、哪些引擎支持事務 二、事務的提交與autocommit設置三、事務的隔離性和隔離級別…

spring-cloud-alibaba-nacos-config使用說明

一、核心功能與定位 Spring Cloud Alibaba Nacos Config 是 Spring Cloud Alibaba 生態中的核心組件之一&#xff0c;專為微服務架構提供動態配置管理能力。它通過整合 Nacos 的配置中心功能&#xff0c;替代傳統的 Spring Cloud Config&#xff0c;提供更高效的配置集中化管理…

SonarQube數據庫配置

SonarQube部署完成后&#xff0c;在瀏覽器地址欄輸入http://IP:9000可以進入登錄頁面&#xff0c;以本機運行為例&#xff0c;地址為http://127.0.0.1:9000/&#xff0c;默認登錄名&#xff1a;admin&#xff0c;登錄密碼也是admin。登錄后會要求設置密碼&#xff1a; 按要求設…

醫藥檔案區塊鏈系統

1. 醫生用戶模塊?? ??目標用戶??&#xff1a;醫護人員 ??核心功能??&#xff1a; ??檢索檔案??&#xff1a;通過關鍵詞或篩選條件快速定位患者健康檔案。??請求授權??&#xff1a;向個人用戶發起檔案訪問權限申請&#xff0c;需經對方確認。??查看檔案?…

CSS3學習教程,從入門到精通, 化妝品網站 HTML5 + CSS3 完整項目(26)

化妝品網站 HTML5 CSS3 完整項目 下面是一個完整的化妝品網站項目&#xff0c;包含主頁、登錄頁面和注冊頁面。我將按照您的要求提供詳細的代碼和注釋。 1. 網站規劃與需求分析 需求分析 展示化妝品產品信息提供用戶注冊和登錄功能響應式設計&#xff0c;適配不同設備美觀…

ROS2 多機時間同步(Chrony配置簡明指南)

適用場景&#xff1a; 主機運行 ROS2 Humble&#xff08;發布 /scan 等&#xff09;&#xff0c;板子運行 ROS2 Foxy&#xff08;發布 /tf 等&#xff09;&#xff0c;兩邊通過 ROS_DOMAIN_ID 跨平臺通訊。需要保證系統時間對齊&#xff0c;避免 TF 插值失敗、建圖抖動等問題。…

Nginx配置偽靜態,URL重寫

Nginx配置偽靜態&#xff0c;URL重寫 [ Nginx ] 在Nginx低版本中&#xff0c;是不支持PATHINFO的&#xff0c;但是可以通過在Nginx.conf中配置轉發規則實現&#xff1a; location / { // …..省略部分代碼if (!-e $request_filename) {rewrite ^(.*)$ /index.php?s/$1 l…

電路筆記(元器件):ADC LTC系列模數轉換器的輸出范圍+滿量程和偏移調整

LTC1740(LTC1740官方文檔)是Analog Devices&#xff08;原Linear Technology&#xff09;公司生產的一款高性能、低功耗的14位模數轉換器(ADC)。它通常用于需要高精度和快速采樣率的應用中&#xff0c;如通信系統、數據采集設備等。同類產品 LTC1746&#xff1a;一款14位、40Ms…

續-算法-數學知識

3、歐拉函數 1、定義&#xff1a; 1~n 中與 n 互質的數的個數 例如&#xff1a;6 的有 1 2 3 4 5 6 其中&#xff0c;與 n 互質 的 數的個數為 2個分別是&#xff1a;1、5 2、計算&#xff1a; $ N p_1^{a1} p_2^{a2} p_3^{a3} … p_k^{ak} $&#xff08;例如&#x…

C/C++測試框架googletest使用示例

文章目錄 文檔編譯安裝示例參考文章 文檔 https://github.com/google/googletest https://google.github.io/googletest/ 編譯安裝 googletest是cmake項目&#xff0c;可以用cmake指令編譯 cmake -B build && cmake --build build將編譯產物lib和include 兩個文件夾…

LintCode第974題-求矩陣各節點的最短路徑(以0為標準)

描述 給定一個由0和1組成的矩陣&#xff0c;求每個單元格最近的0的距離。 兩個相鄰細胞之間的距離是1。 給定矩陣的元素數不超過10,000。 在給定的矩陣中至少有一個0。 單元格在四個方向上相鄰:上&#xff0c;下&#xff0c;左和右。 樣例 例1: 輸入: [[0,0,0],[0,0,0],[0…

Redis核心機制-緩存、分布式鎖

目錄 緩存 緩存更新策略 定期生成 實時生成 緩存問題 緩存預熱&#xff08;Cache preheating&#xff09; 緩存穿透&#xff08;Cache penetration&#xff09; 緩存雪崩&#xff08;Cache avalanche&#xff09; 緩存擊穿&#xff08;Cache breakdown&#xff09; 分…

CF每日5題(1300-1500)

最近急速補練藍橋杯中&#xff0c;疏于cf練習。 感覺自己過題還是太慢了。 今日水題&#xff0c;我水水水水。 1- 1979C lcm 水 1400 第 i i i局贏了&#xff0c;1個硬幣頂 k [ i ] k[i] k[i]個貢獻&#xff0c;所以每局分硬幣 x i 1 k [ i ] x_i{1\over k[i]} xi?k[i]1?個…