RabbitMQ 高級特性之延遲隊列

1. 簡介

在某些場景下,當生產者發送消息后,可能不需要讓消費者立即接收到,而是讓消息延遲一段時間后再發送給消費者。

2. 實現方式

2.1 TTL + 死信隊列

給消息設置過期時間后,若消息在這段時間內沒有被消費,就會將消息發送到死信隊列中,我們可以利用這一特性,將需要延遲發送的消息設置過期時間,然后再讓消費者從死信隊列中獲取消息,這樣就實現了消息的延遲發送。

隊列與交換機配置如下:

@Configuration
public class DLConfig {/*** 正常隊列、交換機* @return*/@Bean("norQueue")public Queue norQueue() {return QueueBuilder.durable(Constants.NOR_QUEUE).deadLetterExchange(Constants.DL_EXCHANGE) //綁定死信交換機.deadLetterRoutingKey(Constants.DL_ROUTINGKEY).build();}@Bean("norExchange")public DirectExchange norExchange() {return ExchangeBuilder.directExchange(Constants.NOR_EXCHANGE).build();}@Bean("norBind")public Binding norBind(@Qualifier("norExchange") DirectExchange directExchange,@Qualifier("norQueue") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with(Constants.NOR_ROUTINGKEY);}/*** 死信隊列、交換機*/@Bean("dlQueue")public Queue dlQueue() {return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("dlExchange")public DirectExchange dlExchange() {return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();}@Bean("dlBind")public Binding dlBind(@Qualifier("dlExchange") DirectExchange directExchange,@Qualifier("dlQueue") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with(Constants.DL_ROUTINGKEY);}
}

生產者代碼如下:

    @RequestMapping("/dl1")public String dl1() {String messageInfo = "dl... " + new Date();MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("10000"); //10s 后過期return message;}};rabbitTemplate.convertAndSend(Constants.NOR_EXCHANGE, Constants.NOR_ROUTINGKEY, messageInfo, messagePostProcessor);return "消息發送成功";}

消費者代碼如下:

@Component
@Slf4j
public class DLListener {/*** ttl + 死信隊列 -> 延時隊列* @param message*/@RabbitListener(queues = Constants.DL_QUEUE)public void listener(Message message) {String messageInfo = new String(message.getBody());log.info("接收到消息: {}, time: {}", messageInfo, new Date());}
}

由于消息發送到了死信隊列,于是我們只需要從死信隊列中獲取消息即可。

代碼運行結果如下:

從運行結果中可以看出,消息延遲了 10s 才被消費。

這種實現方式的問題:

但是,當我們連續發送兩條消息,第一條消息的過期時間為 15s,第二條消息的過期時間為 10s,代碼運行結果如下:

這里我們看到,雖然第二條消息先過期,但卻和第一條消息一起被消費,按照正常情況下第二條消息應該率先被消費,于是這種實現方式存在一定的問題。

2.2?使用插件?

2.2.1 安裝插件

雖然 RabbitMQ 沒有提供延遲隊列的使用方式,但是提供了延遲隊列的插件,我們可以安裝插件并使用。

插件安裝地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

需要下載 .ez 的插件。

需要根據本機的 RabbitMQ 版本選擇匹配的插件版本,不然無法使用。

插件下載完成后,需要將插件放到?/usr/lib/rabbitmq/plugins 目錄下,若沒有需要進行創建。

安裝完成后,使用下面這行命令啟動插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

啟動完成后,需要重啟 RabbitMQ 服務,這樣插件就能正常運行。

2.2.2 使用插件

插件安裝完成后,交換機的類型就會多出下面一種:

即延遲隊列,于是我們在聲明交換機是,就能夠聲明這個類型的交換機。

隊列與交換機配置如下:

@Configuration
public class DelayConfig {@Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(Constants.DELAY_QUEUE).build();}/*** 延遲交換機* @return*/@Bean("delayExchange")public DirectExchange delayExchange() {return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();}@Bean("delayBind")public Binding delayBind(@Qualifier("delayExchange") DirectExchange directExchange,@Qualifier("delayQueue") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with(Constants.DELAY_ROUTINGKEY);}
}

?在聲明交換機時,使用了 delayed 來聲明該隊列是延遲隊列。

生產者代碼如下:

    @RequestMapping("/delay")public String delay() {String messageInfo = "delay ...";rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, Constants.DELAY_ROUTINGKEY, messageInfo + "25000ms", message -> {message.getMessageProperties().setDelayLong(20000L); //過期時間,單位為 msreturn message;});rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, Constants.DELAY_ROUTINGKEY, messageInfo + "10000ms", message -> {message.getMessageProperties().setDelayLong(10000L); //過期時間,單位為 msreturn message;});return "消息發送成功";}

消費者代碼如下:

@Component
@Slf4j
public class DelayListener {@RabbitListener(queues = Constants.DELAY_QUEUE)public void listener(Message message) {log.info("接收到消息: {}, time: {}", new String(message.getBody()), new Date());}
}

運行結果如下:

從結果中可以看出,雖然第二條消息的過期時間是后入隊列的,但是卻會先被消費,這就解決了 TTL + 死信隊列實現方式的不足。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/88796.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/88796.shtml
英文地址,請注明出處:http://en.pswp.cn/web/88796.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

uniapp app安卓下載文件 圖片 doc xls 數據流文件 app安卓本地路徑下載保存

//下載圖片 downloadToLocal() {plus.android.requestPermissions([android.permission.WRITE_EXTERNAL_STORAGE],(success) > {uni.saveImageToPhotosAlbum({filePath: /static/x.png,//本地地址success: () > {this.$refs.uToast.show({message: "模版下載成功&am…

Context Engineering:從Prompt Engineering到上下文工程的演進

最近在做Deepresearch以及刷到一個不錯的文章:context-engineering-guide ,這篇文章揭示了提示工程以及上下文過程在智能體應用開源流程中,包括Deepresearch,MCP在內的一些概念,起到了非常重要的作用! Cont…

jenkins部署vue前端項目

文章目錄前言一、安裝nginx二、jenkins構建項目總結前言 前面已經使用jenkins部署了后端springboot項目,現在開始學習jenkins部署前端Vue項目。 一、安裝nginx 訪問nginx官網,https://nginx.org/en/download.html下載tar包 上傳到服務器目錄中 然后到…

設計總監年中復盤:用Adobe XD內容識別布局,告別“手動調距”

時至年中,這不僅是檢視上半年項目成果的節點,更是優化團隊工作流、為下半年挑戰儲備動能的關鍵時期。在海外設計界工作的十余年間,我發現,一個高效的設計團隊與一個疲于奔命的團隊之間,最大的差別往往就在于是否建立了…

Unity 在Rider中通過Lingma插件使用MCP

環境: Unity 2022.3.12f1 JetBrains Rider 2025.1.4 Lingma 2.5.14 Python 3.13.4 下載包 首先在unity package manager 加入unity-mcp包 https://github.com/justinpbarnett/unity-mcp.git 然后下載uv包(要先先下載python),網上很多…

pycharm+SSH 深度學習項目 遠程后臺運行命令

pycharmSSH 深度學習項目 遠程后臺運行命令碎碎念,都是實驗室里那說關機就關機,說重啟就重啟的臺式機逼得。。學吧記錄 運行:nohup /root/miniconda3/bin/python -u "run.py" > /root/log/nohup.log 2>&1 &實時查看日…

【Linux | 網絡】應用層(HTTP)

目錄一、認識URL二、urlencode和urldecode三、HTTP協議格式(使用Fiddler抓包)3.1 安裝并使用Fiddler抓包3.2 HTTP協議格式3.2.1 HTTP請求3.2.1.1 資源URL路徑3.2.1.2 請求方法(Method)3.2.1.3 Location頭字段(重定向相…

編程實踐:單例模式(懶漢模式+餓漢模式)

說明:本專欄文章有兩種解鎖方案 1:付費訂閱,暢享所有文章 2:免費獲取,點擊下方鏈接,關注,自動獲取免費鏈接 https://free-img.400040.xyz/4/2025/04/29/6810a50b7ac8b.jpg 主題:C++ 單例模式 什么是單例模式

破局電機制造四大痛點:MES與AI視覺的協同智造實踐

萬界星空科技電機行業MES系統解決方案是針對電機制造過程中多工序協同難、質量追溯復雜、設備管理要求高等痛點設計的數字化管理系統。一、電機行業的核心痛點1. 多工序協同困難 電機制造涉及繞線、裝配、測試等多道工序,工藝銜接復雜,傳統人工調度效率…

HTML 初體驗

HTML(超文本標記語言)全稱:HyperText Markup Language。超文本是什么?答:超文本就是網頁中的鏈接。標記是什么?答:標記也叫標簽,是帶尖括號的文本。需求1:將“我愛中國”…

網絡層TCP機制

1.確認應答機制由于發送信息的距離可能較遠,可能出現后發的信息先到的情況,怎么辦?TCP將每個字節的數據都進行了編號,即為序列號如何分辨一個數據包是普通數據還是應答數據呢2.超時重傳由于丟包是一個隨機的事件,因此在上述tcp傳輸的過程中,丟包就存在兩種情況但是在發送方的角…

【一起來學AI大模型】微調技術:LoRA(Low-Rank Adaptation) 的實戰應用

LoRA(Low-Rank Adaptation) 的實戰應用,使用 Hugging Face 的 peft (Parameter-Efficient Fine-Tuning) 庫對大型語言模型進行高效微調。LoRA 因其顯著降低資源消耗(顯存和計算)同時保持接近全量微調性能的特點&#x…

RedisJSON 內存占用剖析與調優

一、基礎內存模型指針包裝 所有 JSON 值(標量、對象、數組、字符串等)至少占用 8 字節,用于存儲一個帶類型標記的指針。標量與空容器 null、true、false、小整數(靜態緩存)、空字符串、空數組、空對象 均不分配額外內存…

【LeetCode 熱題 100】23. 合并 K 個升序鏈表——(解法一)逐一合并

Problem: 23. 合并 K 個升序鏈表 題目:給你一個鏈表數組,每個鏈表都已經按升序排列。 請你將所有鏈表合并到一個升序鏈表中,返回合并后的鏈表。 文章目錄整體思路完整代碼時空復雜度時間復雜度:O(K * N)空間復雜度:O(1…

垃圾收集器-Serial Old

第一章 引言1.1 JVM 中垃圾收集的簡要概述JVM(Java Virtual Machine)作為 Java 程序的運行時環境,負責將字節碼加載至內存并執行,同時也承擔著內存管理的重任。垃圾收集(Garbage Collection,簡稱 GC&#x…

Docker(02) Docker-Compose、Dockerfile鏡像構建、Portainer

Docker-Compose 1、Docker Desktop 在Windows上安裝Docker服務,可以使用Docker Desktop這個應用程序。 下載并安裝這樣的一個安裝包 安裝好后:執行命令 docker --version 從Docker Hub提取hello-world映像并運行一個容器: docker run h…

大數據時代UI前端的用戶體驗設計新思維:以數據為驅動的情感化設計

hello寶子們...我們是艾斯視覺擅長ui設計和前端數字孿生、大數據、三維建模、三維動畫10年經驗!希望我的分享能幫助到您!如需幫助可以評論關注私信我們一起探討!致敬感謝感恩!一、引言:從 “經驗設計” 到 “數據共情” 的體驗革命傳統 UI 設計常陷入 “設計師主觀經…

TypeScript 學習手冊

1.TypeScript 概念 TypeScript(簡稱 TS,靜態類型)是微軟公司開發的一種基于 JavaScript (簡稱 JS,動態類型)語言的編程語言。TypeScript 可以看成是 JavaScript 的超集(superset)&a…

掌握現代CSS:變量、變形函數與動態計算

CSS近年來發展迅速,引入了許多強大的功能,如變量、高級變形函數和動態計算能力。本文將深入探討如何在CSS中設置并使用變量,以及如何有效利用translate3d、translateY和translateX等變形方法。我們還將解析var()和calc()函數的關鍵作用。一、…

貝爾量子實驗設想漏洞

1 0 1 0 1 1 0 1 0 1 1 1 0 0 1 0 帶墨鏡如果先上下交換再左右交換,很可能不一樣的概率是2%,但是因為交換誕生了一個與之前序列相同的所以不一樣概率變成1%,我們在測的時候不能這么測啊,你得看序列完…