SpringBoot使用RabbitMQ實現延遲隊列

SpringBoot使用RabbitMQ實現延遲隊列

    • 需求和目標
    • 名詞解釋
    • 實現方式
    • 引入依賴
    • 添加配置文件
    • 配置類
    • 死信隊列消費者
    • 即時隊列消費者
    • 延遲消息發送
    • 結果
    • 注意

需求和目標

商城系統,用戶下單后若15分鐘內仍未完成支付,則自動取消訂單,若已支付,不做其他特殊操作
系統還需要支持即時消息的功能,即發即收。

名詞解釋

①即時隊列:即發即收
②延遲隊列:發了消息,沒有接收方,只有消息過期后才被處理
③死信隊列:延遲隊列上的消息過期后,會被自動轉發到死信隊列中,從而最終達到延遲的目的

實現方式

本文采用RabbitMQ自身屬性:
TTL(Time To Live存活時間) + DLX(Dead-Letter-Exchange死信交換機)
實現延遲隊列,先將消息發到指定了TTL時長的隊列A中,隊列A沒有消費者,也就是說,隊列A中的消息肯定會過期,等消息過期后,就會加入到隊列B,也就是死信隊列,B隊列是有消費者在監聽的,一旦收到消息,就進行后續的邏輯處理,從而達到延遲效果。
這種實現方式只能為隊列設置消息延遲的時長,不能為每個消息指定延遲時長,粒度比較粗,請注意使用的業務場景!

引入依賴

<!--rabbitmq-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

添加配置文件

分別聲明了:即時、延遲、死信的相關信息
其中,延遲和死信是相互配合形成了延遲隊列

# rabbitMQ配置
mq:rabbit:host: 127.0.0.1:5672virtualHost: /username: testUserpassword: 123456normal-exchange: wms_exchange_normalnormal-queue: wms_queue_normalnormal-routing-key: wms_routing_key_normaldelay-exchange: wms_exchange_delaydelay-queue: wms_queue_delaydelay-routing-key: wms_routing_key_delaydlx-exchange: wms_exchange_dlxdlx-queue: wms_queue_dlxdlx-routing-key: wms_routing_key_dlx

配置類

package com.nwd.common.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {// 從配置文件中讀取參數@Value("${mq.rabbit.host}")String HOST;@Value("${mq.rabbit.username}")String USERNAME;@Value("${mq.rabbit.password}")String PASSWORD;@Value("${mq.rabbit.normal-exchange}")String NORMAL_EXCHANGE;@Value("${mq.rabbit.normal-queue}")String NORMAL_QUEUE;@Value("${mq.rabbit.normal-routing-key}")String NORMAL_ROUTING_KEY;@Value("${mq.rabbit.delay-exchange}")String DELAY_EXCHANGE;@Value("${mq.rabbit.delay-queue}")String DELAY_QUEUE;@Value("${mq.rabbit.delay-routing-key}")String DELAY_ROUTING_KEY;@Value("${mq.rabbit.dlx-exchange}")String DLX_EXCHANGE;@Value("${mq.rabbit.dlx-queue}")String DLX_QUEUE;@Value("${mq.rabbit.dlx-routing-key}")String DLX_ROUTING_KEY;//創建mq連接@Bean(name = "connectionFactory")public ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setUsername(USERNAME);connectionFactory.setPassword(PASSWORD);//connectionFactory.setVirtualHost(virtualHost);connectionFactory.setPublisherConfirms(true);//該方法配置多個host,在當前連接host down掉的時候會自動去重連后面的hostconnectionFactory.setAddresses(HOST);//connectionFactory.setPort(Integer.parseInt(port));return connectionFactory;}// 即時隊列===========================================@Beanpublic Queue normalQueue() {return new Queue(NORMAL_QUEUE);}@Beanpublic DirectExchange normalDirectExchange(){return new DirectExchange(NORMAL_EXCHANGE);}@Beanpublic Binding normalBinding(){return BindingBuilder.bind(normalQueue()).to(normalDirectExchange()).with(NORMAL_ROUTING_KEY);}// 即時隊列===========================================// 延遲隊列===========================================@Beanpublic Queue delayQueue(){Map<String,Object> map = new HashMap<>();//message在該隊列queue的存活時間最大為15分鐘map.put("x-message-ttl", 10000*6*15);//x-dead-letter-exchange參數是設置該隊列的死信交換器(DLX)map.put("x-dead-letter-exchange", DLX_EXCHANGE);//x-dead-letter-routing-key參數是給這個DLX指定路由鍵map.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);return new Queue(DELAY_QUEUE,true,false,false,map);}@Beanpublic DirectExchange delayDirectExchange(){return new DirectExchange(DELAY_EXCHANGE);}@Beanpublic Binding delayBinding(){return BindingBuilder.bind(delayQueue()).to(delayDirectExchange()).with(DELAY_ROUTING_KEY);}// 延遲隊列===========================================// 死信隊列===========================================@Beanpublic Queue dlxQueue() {return new Queue(DLX_QUEUE);}@Beanpublic DirectExchange dlxDirectExchange(){return new DirectExchange(DLX_EXCHANGE);}@Beanpublic Binding dlxBinding(){return BindingBuilder.bind(dlxQueue()).to(dlxDirectExchange()).with(DLX_ROUTING_KEY);}// 死信隊列===========================================
}

死信隊列消費者

package com.nwd.module.mq;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 死信隊列消息處理* 此隊列消費到的,是經過延遲之后的消息* @author niuwenda* @since 2024-06-03  09:50*/
@Slf4j
@Component
@RabbitListener(queues = "${mq.rabbit.dlx-queue}")
public class DlxMsgConsumer {@RabbitHandler(isDefault = true)public void process(String msg, Message message, Channel channel) {try {// 處理消息的業務邏輯log.info("RabbitMq:死信隊列接收到消息,{}",msg);// 此處應判斷訂單是否已完成支付,若未完成,后續繼續編寫取消訂單邏輯// .....} catch (Exception e) {// 發生異常時,打印日志并拒絕消息(不重新放入隊列)System.out.println("Error processing message: " + e.getMessage());/*try {channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception ex) {// 處理拒絕消息的異常}*/}}
}

即時隊列消費者

保證系統有即發即收的功能,此處代碼與訂單需求無關

package com.nwd.module.mq;import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** mq消息接收處理器* @author niuwenda* @since 2024-06-03  09:50*/
@Slf4j
@Component
@RabbitListener(queues = "${mq.rabbit.normal-queue}")
public class MqMsgConsumer {@RabbitHandler(isDefault = true)public void process(String msg, Message message, Channel channel) {try {// 處理消息的業務邏輯log.info("RabbitMq1:接收到消息,{}",msg);JSONObject msgObj = JSONObject.parseObject(msg);// 手動確認消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 發生異常時,打印日志并拒絕消息(不重新放入隊列)System.out.println("Error processing message: " + e.getMessage());/*try {channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception ex) {// 處理拒絕消息的異常}*/}}
}

延遲消息發送

可以寫在controller中,測試時,用接口調用來發送消息

@Resource
private RabbitTemplate rabbitTemplate;@Value("${mq.rabbit.delay-exchange}")
private String exchange;rabbitTemplate.convertAndSend(exchange, routingKey, param);
log.info("RabbitMq發送消息成功:{}", param);

結果

可看到,消息延遲了10秒收到

2024-06-03 16:09:23.640  INFO  RabbitMqUtil : RabbitMq發送消息成功:helloMQ
2024-06-03 16:09:33.655  INFO DlxMsgConsumer : RabbitMq:死信隊列接收到消息,helloMQ

注意

延遲消息插件內部會維護一個本地數據庫表,同時使用Elang Timers功能實現計時。如果消息的延遲時間設置較長,可能會導致堆積的延遲消息非常多,會帶來較大的CPU開銷,同時延遲消息的時間會存在誤差。

因此,不建議設置延遲時間過長的延遲消息,如果時間過長,建議使用任務調度。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/22057.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/22057.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/22057.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

重組蛋白的定量定性方法,你了解嗎?

重組蛋白的定量和定性分析是蛋白質工程和生物技術中至關重要的步驟&#xff0c;用于確保蛋白質的表達、純度和功能性符合預期。以下是小編整理的一些常用的方法以及實驗介紹&#xff0c;希望這些方法幫助研究人員詳細了解重組蛋白的特性。 主要的定性方法 1 WB&#xff08;Wes…

AIGC 011-SAM第一個圖像分割大模型-分割一切!

AIGC 011-SAM第一個圖像分割大模型-分割一切&#xff01; 文章目錄 0 論文工作1論文方法2 效果 0 論文工作 這篇論文介紹了 Segment Anything (SA) 項目&#xff0c;這是一個全新的圖像分割任務、模型和數據集。SA 項目是一個具有里程碑意義的工作&#xff0c;它為圖像分割領域…

基于springboot的多媒體素材庫源碼數據庫

基于springboot的多媒體素材庫源碼數據庫 近年來&#xff0c;信息化管理行業的不斷興起&#xff0c;使得人們的日常生活越來越離不開計算機和互聯網技術。首先&#xff0c;根據收集到的用戶需求分析&#xff0c;對設計系統有一個初步的認識與了解&#xff0c;確定多媒體素材庫…

迎七一黨史知識競賽答題怎么做

迎七一黨史知識競賽答題&#xff0c;不僅是對于黨史知識的檢驗&#xff0c;更是對于參賽者學習態度和綜合能力的考量。在參與這類競賽時&#xff0c;我們需要做好充分的準備&#xff0c;掌握一定的答題技巧&#xff0c;才能取得好的成績。 首先&#xff0c;我們要深入了解競賽…

FFmpeg播放器的相關概念【1】

播放器框架 相關術語 ?容器&#xff0f;文件&#xff08;Conainer/File&#xff09;&#xff1a;即特定格式的多媒體文件&#xff0c;比如mp4、flv、mkv等。 ? 媒體流&#xff08;Stream&#xff09;&#xff1a;表示時間軸上的一段連續數據&#xff0c;如一段聲音數據、一段…

UFS Explorer Professional Recovery: 如何從啟用了 mSATA 緩存的 Drobo 設備中恢復數據

天津鴻萌科貿發展有限公司是 UFS Explorer Professional Recovery 數據恢復軟件的授權代理商。 UFS Explorer Professional Recovery 數據恢復軟件提供綜合性的解決方案&#xff0c;用于解決復雜的數據恢復案例&#xff0c;包括那些采用特殊存儲技術的案例&#xff0c;或介質受…

上海亞商投顧:創業板指震蕩收漲 超70家ST股跌停

上海亞商投顧前言&#xff1a;無懼大盤漲跌&#xff0c;解密龍虎榜資金&#xff0c;跟蹤一線游資和機構資金動向&#xff0c;識別短期熱點和強勢個股。 一.市場情緒 滬指昨日震蕩震蕩&#xff0c;創業板指走勢稍強&#xff0c;盤中一度漲超1%&#xff0c;黃白二線分化嚴重。算…

vue ts 導入 @/assets/ 紅色顯示的問題解決

vue ts 導入 /assets/ 紅色顯示的問題解決 一、問題描述 在使用的時候這樣導入會出現如上的錯誤。 在使用的時候&#xff0c;導入的類型也沒有對應的代碼提示&#xff0c;說明導入有問題。 二、解決 在 tsconfig.json 中添加如下內容&#xff1a; {"compilerOptions&…

AI大模型探索之路-實戰篇15: Agent智能數據分析平臺之整合封裝Tools和Memory功能代碼

系列篇章&#x1f4a5; AI大模型探索之路-實戰篇4&#xff1a;深入DB-GPT數據應用開發框架調研 AI大模型探索之路-實戰篇5&#xff1a;探索Open Interpreter開放代碼解釋器調研 AI大模型探索之路-實戰篇6&#xff1a;掌握Function Calling的詳細流程 AI大模型探索之路-實戰篇7…

模式識別判斷題

貝葉斯估計的方法類似于貝葉斯決策&#xff0c;也需要定義損失函數。&#xff08;正確&#xff09; 解釋&#xff1a;貝葉斯估計是一種基于貝葉斯定理的參數估計方法&#xff0c;它在估計參數時考慮了參數的先驗分布。與貝葉斯決策類似&#xff0c;貝葉斯估計也需要定義損失函數…

46.ThreadPoolExcutor接口

線程池狀態 ThreadPoolExcutor使用int高3位來表示線程池狀態&#xff0c;低29位表示線程數量 狀態高三位接收新任務處理阻塞隊列任務說明RUNNING111YYSHUTDOWN000NY不會接收新任務&#xff0c;但會處理阻塞隊列剩余任務&#xff0c;比較溫和&#xff0c;已經提交的任務都會執…

15.1 測試-重要性與testing包

1. 測試的重要性 1.1 單元測試 單元測試是針對一小部分代碼進行獨立地測試。 單元測試的對象通常是單個函數或方法&#xff0c;而要測試的是它在接受給定的輸入后&#xff0c;能否產生符合預期的輸出。 單元測試的作用主要表現在以下兩個方面&#xff1a; 驗證程序的最小…

C++ STL-迭代器函數對象適配器

目錄 一.迭代器 二. 函數對象 三. 適配器 一.迭代器 是一種通用的指針類型&#xff0c;可以用來遍歷 STL 容器中的元素。 具有以下作用和意義&#xff1a; 提供一種通用的方式來訪問容器中的元素。允許對不同類型的容器進行統一的操作。增強了代碼的靈活性和可擴展性。 一…

The Best Toolkit 最好用的工具集

The Best Toolkit 工欲善其事&#xff0c;必先利其器&#xff0c;整理過往工作與生活中遇到的最好的工具軟件 PDF合并等 PDF24 Tools PDF查看器 SumatraPDF 可以使用黑色來查看&#xff0c;相對不傷眼睛&#xff0c;也有電子書相關的閱讀器 Kindle pdf裁邊工具 briss 軟件卸載…

【C++題解】1085 - 尋找雷劈數

問題&#xff1a;1085 - 尋找雷劈數 類型&#xff1a;for循環 題目描述&#xff1a; 把整數 3025 從中剪開分為 30 和 25 兩個數&#xff0c;此時再將這兩數之和平方&#xff0c;計算結果又等于原數。 (3025)(3025)55553025 &#xff0c;這樣的數叫“雷劈數”。 求所有符合這…

Photoshop版本選擇及系統要求

1、ps2018cc/2020cc版本 適合新手&#xff0c;增加了很多智能化操作&#xff0c;非常方便好上手。 2020&#xff1a; 2、ps2015版本 cc2015版本不論是功能還是硬件上&#xff0c;都是不二選擇&#xff0c;適合于配置較低的電腦&#xff0c;該有的基本功能它都有。 3、2021/2…

std::numeric_limits::max和宏定義重復報錯問題

問題描述 今天在編譯Beckhoff ADS開源組件的時候發現編譯報錯&#xff0c;報錯代碼如下 long AdsDevice::ReadReqEx2(uint32_t group, uint32_t offset, size_t length, void* buffer, uint32_t* bytesRead) const {if (length > std::numeric_limits<uint32_t>::ma…

Algorand 的復興之路:改變游戲規則,打造 RWA 第一公鏈

TLDR 發布 AlgoKit 2.0&#xff0c;支持 Python 原生語言&#xff0c;打造開發者友好的開發環境&#xff0c;Algorand 的開發者社區規模迅速擴大。 升級共識激勵機制&#xff0c;用 ALGO 獎勵共識節點參與共識的執行&#xff0c;增加 ALGO 的應用場景&#xff0c;同時進一步確…

GB28181的主動、被動的含義

GB28181有點象視頻會議&#xff1a; 終端通過SIP登錄服務器。管理員點擊某個終端&#xff0c;進行視頻。 就是這個主動、被動沒有聽說。于是問了同事&#xff0c;他說&#xff1a; 主動被動是從服務器角度來看的。所謂被動&#xff0c;就是服務器開一個端口&#xff0c;被動接…

鋼結構乙級資質延期,企業如何降低經營風險

當企業面臨鋼結構乙級資質延期時&#xff0c;為了降低經營風險&#xff0c;可以采取以下措施&#xff1a; 1. 提前規劃與準備 資質延續規劃&#xff1a;在資質到期前&#xff0c;提前規劃資質延續的相關工作&#xff0c;包括準備所需材料、明確流程和時間節點等。 項目評估&…