RabbitMQ 高級特性之消息分發

1. 為什么要消息分發

當 broker 擁有多個消費者時,就會將消息分發給不同的消費者,消費者之間的消息不會重復,RabbitMQ 默認的消息分發機制是輪詢,但會無論消費者是否發送了 ack,broker 都會繼續發送消息至消費者,這就會造成消費者壓力增大。于是,可以限制消費者每一次接收到的消息的數量,當消息達到該數量時,broker 就不會繼續給這個消費者發送消息,而是會給其他的消費者派送消息。

所消費者消費了一條消息,并且給 broker 發送了 ack,那么此時消費者所未消耗的消息就沒有達到最大消息數量,于是 broker 就會繼續給消費者分配消息,直到消費者未消費的消息數量達到上限。

對于這種特性,有下面兩個應用場景:

1.1 限流

在某些秒殺場景中,每一秒消費者接收到的消息都會非常大,那么就會造成消費者壓力過大。于是我們就可以限制消費者所能接收的最大消息數量。

配置代碼如下:

spring:rabbitmq:listener:simple:acknowledge-mode: manualprefetch: 5 #每個隊列最多接收五條消息

在配置中,prefetch 表示隊列中的消息數量上限為 5 條,若隊列中未確認的消息數量達到 5 條,此時 broker?就不會繼續給該隊列分配消息,而是給其它的未達到上限的隊列分配消息。

并且此處要設置為手動確認,若使用 auto 或 none,可能業務邏輯還沒有開始消息就已經被簽收,這就無法發揮限流的作用。

隊列、交換機聲明代碼如下:

    @Bean("qosQueue")public Queue qosQueue() {return QueueBuilder.durable(Constants.QOS_QUEUE).build();}@Bean("qoeExchange")public DirectExchange qoeExchange() {return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build();}@Bean("qosBind")public Binding qosBind(@Qualifier("qoeExchange") DirectExchange directExchange,@Qualifier("qosQueue") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with(Constants.QOS_ROUTINGKEY);}

生產者代碼如下:

    @RequestMapping("/qos")public String qos() {for (int i = 0; i < 20; i++) {String messageInfo = "qos... " + i;rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, Constants.QOS_ROUTINGKEY, messageInfo);}return "消息發送成功";}

消費者代碼如下:

@Component
@Slf4j
public class QosListener {@RabbitListener(queues = Constants.QOS_QUEUE)public void listener1(Message message, Channel channel) throws IOException {String messageInfo = new String(message.getBody());long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收到消息: {}, deliveryTag: {}", messageInfo, deliveryTag);}}

此處,消費者沒有給 broker 發送 ack,那么隊列中的消息就會一直存在。

代碼運行結果如下:

這里我們可以看到,一共有 5 條未確認的消息, 已經達到了上限,于是就不會繼續向消費者發送消息。

1.2 負載均衡

使用消息分發,也可以實現負載均衡。

現有兩個消費者 A、B,A 處理消息的速度慢,B處理消息的速度快。若不設置負載均衡,那么就會出現 A 積壓的消息過多,而 B 幾乎沒有什么消息擠壓,這就沒有充分地利用資源。

于是我們可以將?prefetch 設置為 1,那么每次消費者只會接收到一條消息,當 A、B 接收到消息后,在 B 處理完成時 A 還沒有處理完,于是 broker 就會給 B 繼續推送消息,直到 A 處理完成后才會繼續給 A 推送消息。

消費者代碼如下:

@Component
@Slf4j
public class QosListener {/*** 消費者1* @param message* @param channel* @throws IOException*/@RabbitListener(queues = Constants.QOS_QUEUE)public void listener1(Message message, Channel channel) throws IOException {String messageInfo = new String(message.getBody());long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("消費者 1 接收到消息: {}, deliveryTag: {}", messageInfo, deliveryTag);try {Thread.sleep(2000);channel.basicAck(deliveryTag, false);} catch (IOException e) {channel.basicNack(deliveryTag, false, true);} catch (InterruptedException e) {throw new RuntimeException(e);}}/*** 消費者2* @param message* @param channel* @throws IOException*/@RabbitListener(queues = Constants.QOS_QUEUE)public void listener2(Message message, Channel channel) throws IOException {String messageInfo = new String(message.getBody());long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("消費者 2 接收到消息: {}, deliveryTag: {}", messageInfo, deliveryTag);try {Thread.sleep(1000);channel.basicAck(deliveryTag, false);} catch (IOException e) {channel.basicNack(deliveryTag, false, true);} catch (InterruptedException e) {throw new RuntimeException(e);}}
}

上述代碼,將消費者 2 的處理速度是消費者 1 的兩倍,代碼運行結果如下:

可以看出,消費者? 2 每消耗 2 條數據,消費者 1 才消耗 1 條數據。,也就達到了負載均衡的作用。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/90686.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/90686.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/90686.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Linux操作系統從入門到實戰:怎么查看,刪除,更新本地的軟件鏡像源

Linux操作系統從入門到實戰&#xff1a;怎么查看&#xff0c;刪除&#xff0c;更新本地的軟件鏡像源前言一、 查看當前鏡像源二、刪除當前鏡像源三、更新鏡像源四、驗證前言 我的Linux版本是CentOS 9 stream本篇博客我們來講解怎么查看&#xff0c;刪除&#xff0c;更新國內本…

兩臺電腦通過網線直連形成局域網,共享一臺wifi網絡實現上網

文章目錄一、背景二、實現方式1、電腦A&#xff08;主&#xff09;2、電腦B3、防火墻4、驗證三、踩坑1、有時候B上不了網一、背景 兩臺windows電腦A和B&#xff0c;想通過**微軟無界鼠標&#xff08;Mouse without Borders&#xff09;**實現一套鍵盤鼠標控制兩臺電腦&#xf…

Java Reference類及其實現類深度解析:原理、源碼與性能優化實踐

1. 引言&#xff1a;Java引用機制的核心地位在JVM內存管理體系中&#xff0c;Java的四種引用類型&#xff08;強、軟、弱、虛&#xff09;構成了一個精巧的內存控制工具箱。它們不僅決定了對象的生命周期&#xff0c;還為緩存設計、資源釋放和內存泄漏排查提供了基礎設施支持。…

華為云對碳管理系統的全生命周期數據處理流程

碳管理系統的全生命周期數據處理流程包含完整的數據采集、處理、治理、分析和應用的流程架構,可以理解為是一個核心是圍繞數據的“采集-傳輸-處理-存儲-治理-分析-應用”鏈路展開。以下是對每個階段的解釋,以及它們與數據模型、算法等的關系: 1. 設備接入(IoTDA) 功能: …

大模型安全風險與防護產品綜述 —— 以 Otter LLM Guard 為例

大模型安全風險與防護產品綜述 —— 以 Otter LLM Guard 為例 一、背景與安全風險 近年來&#xff0c;隨著大規模預訓練語言模型&#xff08;LLM&#xff09;的廣泛應用&#xff0c;人工智能已成為推動文檔處理、代碼輔助、內容審核等多領域創新的重要技術。然而&#xff0c;…

1.2.2 計算機網絡分層結構(下)

繼續來看計算機網絡的分層結構&#xff0c;在之前的學習中&#xff0c;我們介紹了計算機網絡的分層結構&#xff0c;以及各層之間的關系。我們把工作在某一層的軟件和硬件模塊稱為這一層的實體&#xff0c;為了完成這一層的某些功能&#xff0c;同一層的實體和實體之間需要遵循…

實訓八——路由器與交換機與網線

補充——基本功能路由器&#xff1a;用于不同邏輯網段通信的交換機&#xff1a;用于相同邏輯網段通信的1.網段邏輯網段&#xff08;IP地址網段&#xff09;&#xff1a;IP地址的前三組數字代表不同的邏輯網段&#xff08;有限條件下&#xff09;&#xff1b;IP地址的后一組數字…

C++——構造函數的補充:初始化列表

C中&#xff0c;構造函數為成員變量賦值的方法有兩種&#xff1a;構造函數體賦值和初始化列表。構造函數體賦值是在構造函數里面為成員變量賦值&#xff0c;如&#xff1a;class Data { public://構造函數體賦值Data(int year,int month,int day){_year year;_month month;_d…

代碼隨想錄|圖論|12島嶼周長

leetcode:106. 島嶼的周長 題目 題目描述 給定一個由 1&#xff08;陸地&#xff09;和 0&#xff08;水&#xff09;組成的矩陣&#xff0c;島嶼是被水包圍&#xff0c;并且通過水平方向或垂直方向上相鄰的陸地連接而成的。 你可以假設矩陣外均被水包圍。在矩陣中恰好擁有…

開發制作模仿參考抄別人的小程序系統

很多老板看見別人公司的小程序系統界面好看&#xff0c;功能強大&#xff0c;使用人數多。就想要抄襲模仿參考別人家的小程序系統。想要了解一下有沒有侵權風險&#xff0c;以及怎么開發制作開發制作模仿參考抄別人的小程序系統。首先回答第一個問題&#xff0c;只要你的小程序…

c語言中的數組IV

數組的集成初始化 集成初始化的定位 數組的大小 數組的賦值 不能直接將一個數組a賦值給數組b&#xff0c;只能通過遍歷來實現 遍歷數組 示例——檢索元素在數組中的位置#include <stdio.h> int search(int key,int a[],int length); int main(void){int a[] {2,4,6,7,1,…

LDO選型

目錄 一、最大輸出電流 二、最大輸入電壓 三、最大功率&#xff1a;Pmax 四、負載動態調整率 五、輸入電源紋波抑制比&#xff1a;PSRR 一、最大輸出電流 參考TI LM1117IMPX-3.3/NOPB數據手冊 由于LDO轉換效率很低&#xff0c;LDO的標稱最大電流 ≥ 實際最大負載電流 1…

飛算JavaAI:重構Java開發的“人機協同”新范式

目錄一、從需求到架構&#xff1a;AI深度參與開發“頂層設計”1.1 需求結構化&#xff1a;自然語言到技術要素的準確轉換1.2 架構方案生成&#xff1a;基于最佳實踐的動態匹配二、編碼全流程&#xff1a;從“手寫代碼”到“人機協同創作”2.1 復雜業務邏輯生成&#xff1a;以“…

解決SQL Server SQL語句性能問題(9)——SQL語句改寫(7)

9.4.15. 消除join場景一 與Oracle等其他關系庫類似,SQL Server中,join作為基本語法用于SQL語句中相關表之間的連接,有些場景中,join既可以增強SQL語句的可讀性,同時,又可以提升SQL語句的性能,但有些場景中,join會導致CBO為SQL語句產生次優的查詢計劃,進而出現SQL語句…

深度學習-數據準備

一、數據準備 1.1定義 數據準備&#xff08;Data Preparation&#xff09; 是數據分析與機器學習流程中的核心環節&#xff0c;指將原始數據轉換為適合分析或建模的結構化格式的過程。 1.2組成 數據準備主要由兩個部分組成&#xff0c;一個是劃分數據集&#xff0c;一個是構建…

IPA軟件源預覽系統源碼(源碼下載)

這是一款IPA軟件源預覽系統源碼&#xff0c;搭建這個源碼一定記住沒有軟件源的別搭建&#xff0c;因為你玩不明白&#xff0c;不是做IPA軟件源的不要下載這套源碼&#xff0c;簡單的測試了&#xff0c;UI很舒服&#xff0c;喜歡的自行部署&#xff01; 源碼下載&#xff1a;htt…

python 數據分析 單細胞測序數據分析 相關的圖表,常見于腫瘤免疫微環境、細胞亞群功能研究 ,各圖表類型及邏輯關系如下

這是一組 單細胞測序數據分析 相關的圖表&#xff0c;常見于腫瘤免疫微環境、細胞亞群功能研究 &#xff0c;各圖表類型及邏輯關系如下&#xff1a;使用kimi doubao 和deepseek &#xff0c;分析圖標和pdf 豆包最好&#xff0c;用豆包分析| 圖表類型 A、E&#xff08;堆疊柱狀…

表達式索引海外云持久化實踐:關鍵技術解析與性能優化

隨著全球數字化轉型加速&#xff0c;表達式索引技術正成為海外云服務商提升數據庫性能的核心方案。本文將深度解析如何通過云原生架構實現索引持久化&#xff0c;對比主流云平臺的技術實現差異&#xff0c;并給出跨國業務場景下的優化建議。 表達式索引海外云持久化實踐&#x…

sprinboot團隊任務管理系統 計算機畢業設計源碼32322

摘 要 隨著團隊協作模式的日益多樣化&#xff0c;傳統的任務管理方法已無法滿足現代團隊對高效協作和任務分配的需求。因此&#xff0c;本研究通過引入信息化設計并實現了一套團隊任務管理系統&#xff0c;旨在為管理員、成員用戶和團長用戶等用戶提供高效、靈活的任務管理…

單鏈表,咕咕咕

1.引入單鏈表順序表對于中間或者頭部的刪除&#xff0c;時間復雜度為O(N)&#xff0c;增容需要申請新的空間&#xff0c;拷貝數據&#xff0c;釋放就空間&#xff0c;消耗。增容一般是2倍的增長&#xff0c;會有空間的浪費。為了解決這些問題&#xff0c;引入了單鏈表。2.單鏈表…