延遲 隊列

概念

延遲隊列顧名思義就是消息不立即發送給消費者消費,而是延遲一段時間再交給消費者。

RabbitMQ本身沒有直接支持延遲隊列的的功能,但是可以通過前面所介紹的TTL+死信隊列的方式組合
模擬出延遲隊列的功能.

RabbitMQ 有些版本還支持延遲隊列的插件安裝,我們也可以通過安裝這個插件實現延遲隊列的功能。

TTL + 死信隊列

實現思路:

假設一個應用中需要將每條消息都設置為10秒的延遲,生產者通過normal_exchange這個交換器將發送的消息存儲在normal_queue這個隊列中.消費者訂閱的并非是normal_queue這個隊列,而是dlx_queue這個隊列.當消息從normal_queue這個隊列中過期之后被存入dlx_queue這個隊列中,消費者就恰巧消費到了延遲10秒的這條消息。

在這里插入圖片描述

代碼演示:

常量設置:

    //死信隊列public static final String DL_QUEUE = "DL_QUEUE";public static final String DL_EXCHANGE = "DL_EXCHANGE";public static final String DL_KEY = "DL_KEY";//普通隊列public static final String NORMAL_QUEUE = "NORMAL_QUEUE";public static final String NORMAL_EXCHANGE = "NORMAL_EXCHANGE";public static final String NORMAL_KEY = "NORMAL_KEY";

聲明隊列、交換機、綁定關系:

    //普通隊列@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(MQConstants.NORMAL_QUEUE).deadLetterExchange(MQConstants.DL_EXCHANGE).deadLetterRoutingKey(MQConstants.DL_KEY).build();}@Bean("normalExchange")public Exchange normalExchange() {return ExchangeBuilder.directExchange(MQConstants.NORMAL_EXCHANGE).durable(true).build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalExchange") Exchange exchange, @Qualifier("normalQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.NORMAL_KEY).noargs();}//死信隊列@Bean("dlQueue")public Queue dlQueue() {return QueueBuilder.durable(MQConstants.DL_QUEUE).build();}@Bean("dlExchange")public Exchange dlExchange() {return ExchangeBuilder.directExchange(MQConstants.DL_EXCHANGE).durable(true).build();}@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlExchange") Exchange exchange, @Qualifier("dlQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.DL_KEY).noargs();}

生產者:將消息過期時間設置為 10 s

    @RequestMapping("/dl")public String dl() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(MQConstants.NORMAL_EXCHANGE, MQConstants.NORMAL_KEY, "dl" + i, message -> {message.getMessageProperties().setExpiration("10000");return message;});}return "消息發送成功";}

消費者需要消費的隊列是死信隊列:

@Component
@RabbitListener(queues = MQConstants.DL_QUEUE)
public class DLListener {@RabbitHandlerpublic void handle(String messageContent, Channel channel, Message message) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {channel.basicAck(deliveryTag, false);System.out.println("消息成功消費:" + messageContent);} catch (Exception e) {channel.basicNack(deliveryTag, false, false);}}
}

存在的問題

當我們先發送一條延遲時間長的消息,然后再發送一條延遲時間短的消息,我們會發現,短的消息并沒有被即使消費,而是等到長的消息時間一到,才被消費了

    @RequestMapping("/dl")public String dl() {rabbitTemplate.convertAndSend(MQConstants.NORMAL_EXCHANGE, MQConstants.NORMAL_KEY, "30s ",message -> {message.getMessageProperties().setExpiration("30000");return message;});rabbitTemplate.convertAndSend(MQConstants.NORMAL_EXCHANGE, MQConstants.NORMAL_KEY, "10s ",message -> {message.getMessageProperties().setExpiration("10000");return message;});return "消息發送成功";}

在這里插入圖片描述


原因如下:
消息過期之后,不一定會被馬上丟棄因為RabbitMQ只會檢查隊首消息是否過期,如果過期則丟到死信隊列,此時就會造成一個問題,如果第一個消息的延時時間很長,第二個消息的延時時間很短,那第二個
消息并不會優先得到執行。

所以在考慮使用TTL+死信隊列實現延遲任務隊列的時候,需要確認業務上每個任務的延遲時間是一致的,如果遇到不同的任務類型需要不同的延遲的話,需要為每一種不同延遲時間的消息建立單獨的消息隊列。

延遲隊列的插件

安裝

官方文檔:Scheduling Messages with RabbitMQ

下載鏈接:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

下載的插件需要存放到哪個目錄:https://www.rabbitmq.com/docs/installing-plugins

根據你不同的環境去選擇不同的目錄:
在這里插入圖片描述

Linux命令:

#查看插件列表
rabbitmq-plugins list#啟動插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange#重啟服務
service rabbitmq-server restart

我們去到 rabbitmq 管理界面查看 exchange 有沒有延遲類型 “x-delayed-messge” ,如果存在這一類型說明我們的插件安裝成功了

在這里插入圖片描述

代碼演示

常量類:

    //延遲隊列public static final String DELAY_QUEUE = "DELAY_QUEUE";public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";public static final String DELAY_KEY = "DELAY_KEY";

聲明:

    //延遲隊列@Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(MQConstants.DELAY_QUEUE).build();}@Bean("delayExchange")public Exchange delayExchange() {return ExchangeBuilder.directExchange(MQConstants.DL_EXCHANGE).durable(true).delayed().build();}@Bean("delayBinding")public Binding delayBinding(@Qualifier("delayExchange") Exchange exchange, @Qualifier("delayQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.DELAY_KEY).noargs();}

生產者:這里我們發送三條不同過期時間的消息來進行演示:
通過setDelayLong() 方法設置延遲時間

    @RequestMapping("/delay")public String delay() {rabbitTemplate.convertAndSend(MQConstants.DELAY_EXCHANGE, MQConstants.DELAY_KEY, "30s ",message -> {message.getMessageProperties().setDelayLong(30000L);return message;});rabbitTemplate.convertAndSend(MQConstants.DELAY_EXCHANGE, MQConstants.DELAY_KEY, "10s ",message -> {message.getMessageProperties().setDelayLong(10000L);return message;});rabbitTemplate.convertAndSend(MQConstants.DELAY_EXCHANGE, MQConstants.DELAY_KEY, "40s ", message -> {message.getMessageProperties().setDelayLong(40000L);return message;});return "消息發送成功";}

這里我們將確認模式設置為自動模式,不進行手動確認,便于我們書寫代碼:

@Component
@RabbitListener(queues = MQConstants.DELAY_QUEUE)
public class DelayListener {@RabbitHandlerpublic void handle(String message) {System.out.printf("%tc 接收到的消息為:%s\n", new Date(), message);}
}

最終效果:
在這里插入圖片描述

總結

1.基于死信實現的延遲隊列
a優點:1)靈活不需要額外的插件支持
b.缺點: 1) 存在消息順序問題 2)需要額外的邏輯來處理死信隊列的消息,增加了系統的復雜性
2.基于插件實現的延遲隊列

a.優點:1)通過插件可以直接創建延遲隊列,簡化延遲消息的實現. 2)避免了DLX的時序問題
b.缺點:1)需要依賴特定的插件,有運維工作2)只適用特定版本

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/98365.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/98365.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/98365.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Windows+Docker一鍵部署CozeStudio私有化,保姆級

在 ?Windows環境? 下&#xff0c;通過docker&#xff0c;使用 ?火山引擎Doubao-Seed-1.6模型&#xff0c;面向 ?小白新手? 的 ?Coze Studio私有化部署詳細步驟。整個過程分為四大階段&#xff0c;包含每一步的指令、成功標志。 Coze Studio 私有化部署指南&#xff08;W…

【HEMCO Reference Guide 參考指南第二期】配置文件的結構和語法

配置文件的結構和語法 HEMCO 配置文件的結構和語法(The HEMCO configuration file) 1. Settings(設置) 2. Extension Switches(擴展模塊開關) 3. Base Emissions(基礎排放配置) 4. Scale Factors(縮放因子) 5. Masks(掩膜區域) 6. Data Collections(數據集合) 參…

01.單例模式基類模塊

一、單例模式的構成1、私有的靜態成員變量2、公共的靜態成員屬性或方法3、私有構造函數using System.Collections; using System.Collections.Generic; using UnityEngine;public class BaseManager : MonoBehaviour {void Start(){}// Update is called once per framevoid Up…

[網絡入侵AI檢測] 深度前饋神經網絡(DNN)模型

第4章&#xff1a;深度前饋神經網絡&#xff08;DNN&#xff09;模型 歡迎回來&#x1f43b;??? 在第1章&#xff1a;分類任務配置&#xff08;二分類 vs. 多分類&#xff09;中&#xff0c;我們學習了如何配置模型以回答不同類型的問題&#xff1b;在第2章&#xff1a;數…

【目錄-多選】鴻蒙HarmonyOS開發者基礎

All look at the answer 針對包含文本元素的組件&#xff0c;例如Text、Button、TextInput等&#xff0c;可以使用下列哪些屬性關于ForEach(arr, itemGenerator, index)組件的描述正確的是下面哪些容器組件是可以滾動的關于Tabs組件和TabContent組件&#xff0c;下列描述正確的…

第一講 Vscode+Python+anaconda 安裝

1、vscode下載和安裝官網下載最新版&#xff1a;https://code.visualstudio.com/Download注&#xff1a;文件夾最好不要出現中文和空格 2、將vscode修改為中文環境注意&#xff1a;右下角彈出提示框&#xff0c;點擊“yes”若不慎關閉了對話框&#xff0c;也不要緊&#xff0c;…

《sklearn機器學習——回歸指標2》

均方對數誤差&#xff08;mean_squared_log_error函數&#xff09; mean_squared_log_error函數計算與平方&#xff08;二次方&#xff09;對數誤差或損失的期望值相一致的風險指標。 Mean Squared Logarithmic Error 參數與返回值 函數簡介 mean_squared_log_error 是用于計算…

當電力設計遇上AI:良策金寶AI如何重構行業效率邊界?

在工程設計行業&#xff0c;我們常說“經驗為王”。一個資深工程師的價值&#xff0c;往往體現在他對規范的熟悉、對計算的把握、對圖紙的掌控。但今天&#xff0c;這個“王座”正在被重新定義。不是經驗不重要了&#xff0c;而是——效率的邊界&#xff0c;正在被AI重構。以良…

【深度學習】重采樣(Resampling)

在深度學習的背景下&#xff0c;重采樣主要涉及兩個方面&#xff1a; 數據層面的重采樣&#xff1a;處理不平衡數據集。模型層面的重采樣&#xff1a;在神經網絡內部進行上采樣&#xff08;UpSampling&#xff09;或下采樣&#xff08;DownSampling&#xff09;&#xff0c;常見…

計算機實現乘法運算的方式---ChatGPT 5 thinking作答

計算機如何實現“乘法” 下面分層次把乘法在數據表示 → 整數硬件/軟件 → 大整數 → 浮點數 → 特殊場景里的主流實現方式講清楚&#xff0c;并給出取舍建議與簡單偽代碼。0&#xff09;前置&#xff1a;數的表示 無符號整數&#xff1a;按二進制位權求值。有符號整數&#xf…

Ubuntu 安裝 / 配置 VNC

一、基礎環境準備 1. 更新 sudo apt update 2. 安裝 VNC 服務器 & 輕量桌面(XFCE) # 安裝 TightVNC 服務器 + XFCE 桌面(推薦輕量方案) sudo apt install tightvncserver xfce4 xfce4-goodies xterm -y二、核心配置:讓 VNC 加載桌面環境 1. 初始化 VNC 密碼(首次…

計算機大數據畢業設計推薦:基于Spark的新能源汽車保有量可視化分析系統

精彩專欄推薦訂閱&#xff1a;在下方主頁&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; &#x1f496;&#x1f525;作者主頁&#xff1a;計算機畢設木哥&#x1f525; &#x1f496; 文章目錄 一、項目介紹二、…

Android Looper源碼閱讀

看下Android Looper源代碼&#xff0c;有助于理解Android系統消息循環流程、handler機制。Looper注釋為class used to run a message loop for a thread&#xff0c; 即用于為一個線程運行消息循環&#xff0c; 或者說循環處理一個線程的消息。 Looper源碼先看下這個類里的變量…

uni-app 和 uni-app x 的區別

差異解析 uni-app 是 DCloud 推出的成熟跨平臺前端框架&#xff0c;基于 Vue.js JavaScript/TypeScript。支持廣泛平臺&#xff1a;iOS、Android、HarmonyOS、Web、小程序等&#xff0c;用一套代碼同時生成多個端應用。渲染方式主要通過 WebView 或小程序原生框架 JS 邏輯&am…

數據結構:深度優先搜索 (Depth-First Search, DFS)

目錄 DFS的誕生——“不撞南墻不回頭” DFS的核心機制——如何實現“回溯”&#xff1f; DFS算法流程圖解&#xff08;遞歸版&#xff09; C/C代碼實現 DFS的應用 上一節我們學習了廣度優先搜索 (BFS)&#xff0c;它像水面的波紋一樣&#xff0c;一層一層地向外探索。今天…

Spring Boot中策略模式結合依賴注入的實現方式

在Spring Boot項目開發中&#xff0c;常常會遇到根據不同的業務場景執行不同邏輯的需求&#xff0c;策略模式就是一種很好的設計模式來應對這種情況。同時&#xff0c;Spring Boot強大的依賴注入機制可以方便地將不同的策略類進行管理和調用。 1. 定義策略接口 定義一個策略接口…

深入剖析Spring Boot中Spring MVC的請求處理流程

對于任何使用Spring Boot進行Web開發的開發者而言&#xff0c;深入理解Spring MVC的執行流程都是至關重要的。這不僅有助于我們編寫更清晰、更高效的代碼&#xff0c;更是我們排查詭異問題、進行高級定制開發的知識基石。今天&#xff0c;我們將一起深入Spring Boot應用的內核&…

X448 算法簽名驗簽流程深度解析及代碼示例

一、引言&#xff1a;X448 算法的定位與價值在橢圓曲線密碼學&#xff08;ECC&#xff09;體系中&#xff0c;X448 是基于蒙哥馬利曲線&#xff08;Curve448&#xff09;的密鑰交換算法&#xff0c;但其底層數學原理也可支撐簽名驗簽功能&#xff08;實際工程中常與 Ed448 簽名…

2025-2026單片機物聯網畢業設計題目推薦(定稿付款)

51.基于單片機的非接觸式防疫自動門系&#xff08;1&#xff09;人員檢測&#xff1a;利用超聲波模塊進行人員檢測&#xff0c;檢測到人員靠近門體時觸發相應的操作&#xff1b;&#xff08;2&#xff09;門控制&#xff1a;通過舵機實現自動門的開閉控制&#xff0c;當檢測到有…

一文詳解大模型強化學習(RLHF)算法:PPO、DPO、GRPO、ORPO、KTO、GSPO

一、 引言 大模型強化學習的核心目標是讓模型的輸出與人類目標、真實場景需求對齊。在工作和學習中&#xff0c;大模型強化學習訓練經常會遇到各種算法&#xff0c;各種O&#xff0c;在強化學習訓練選型過程中經常容易混淆&#xff0c;也分不清各種訓練算法的使用場景和優缺點。…