基于Spring Boot的RabbitMQ延時隊列技術實現

文章目錄

    • 基于Spring Boot的RabbitMQ延時隊列技術實現
      • 延時隊列應用場景
      • 基本概念
      • 實現延時隊列
        • 添加依賴
        • 基礎配置
        • 配置類設計
        • 消息生產者
        • 消息消費者
      • 兩種TTL設置方式
    • 訂單超時關閉實例
      • 訂單服務
      • 消息處理
    • 延遲消息插件
      • 安裝插件
      • 配置延遲交換機

基于Spring Boot的RabbitMQ延時隊列技術實現

延時隊列應用場景

  • 訂單系統:30分鐘未支付訂單自動取消
1. 用戶下單 → 發送延時消息(30分鐘TTL)
2. 消息進入普通隊列等待
3. 30分鐘后消息過期 → 轉入死信隊列
4. 消費者檢查訂單狀態:- 未支付 → 執行關閉操作- 已支付 → 忽略
  • 定時通知:預約提醒服務
場景:會議開始前15分鐘提醒
1. 創建會議時發送延時消息
2. 消息存活直到會議開始前15分鐘
3. 觸發通知服務發送提醒
  • 異步重試:失敗任務延時重試機制
消息處理失敗時:
1. 首次失敗 → 延時5秒重試
2. 二次失敗 → 延時30秒重試
3. 三次失敗 → 進入死信隊列人工處理
  • 物流跟蹤:預計送達時間狀態更新

基本概念

延遲消息:發送者發送消息時指定一個時間,消費者不會立刻收到消息,而是在指定時間之后才收到消息。

延遲任務:設置在一定時間之后才執行的任務

當一個隊列中的消息滿足下列情況之一時,就會成為死信(dead letter):

  • 消息被拒絕且不重新入隊:消費者使用 basic.reject 或 basic.nack 聲明消費失敗,并且消息的 requeue 參數設置為 false
  • 消息過期:消息是一個過期消息(達到了隊列或消息本身設置的過期時間),超時無人消費
  • 隊列達到最大長度:要投遞的隊列消息堆積滿了,最早的消息可能成為死信

如果隊列通過 dead-letter-exchange 屬性指定了一個交換機,那么該隊列中的死信就會投遞到這個交換機中。這個交換機稱為死信交換機(Dead Letter Exchange,簡稱 DLX)。

在這里插入圖片描述


RabbitMQ 本身沒有直接的延時隊列功能,通常是通過死信隊列和**TTL(Time-To-Live)**來實現的。

[生產者] → [普通隊列(設置TTL)] → (消息過期)→ [死信隊列] → [消費者]

實現延時隊列

添加依賴
<!-- amqp 依賴 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Mybatis-Plus包 -->
<dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.1</version>
</dependency>
<!-- MySQL驅動包 -->
<dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope>
</dependency>
<!-- lombok包 -->
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional>
</dependency>
基礎配置
server:port: 8080
spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://127.0.0.1:3306/smbms?useUnicode=true&characterEncoding=UTF-8&useSSL=falseusername: rootpassword: rootrabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest
mybatis-plus:type-aliases-package: com.hz.pojo #類型別名所在的包#控制臺打印sql語句configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImplmap-underscore-to-camel-case: false # 駝峰映射

死信隊列三要素

  1. DLX (Dead-Letter-Exchange):死信轉發交換機
  2. DLK (Dead-Letter-Routing-Key):死信路由鍵
  3. TTL (Time-To-Live):消息存活時間
配置類設計
@Configuration
public class RabbitMQConfig {// 業務交換機public static final String BUSINESS_EXCHANGE = "business.exchange";// 業務隊列public static final String BUSINESS_QUEUE = "business.queue";// 死信交換機public static final String DLX_EXCHANGE = "dlx.exchange";// 死信隊列public static final String DLX_QUEUE = "dlx.queue";// 業務隊列路由鍵private static final String BUSINESS_ROUTING_KEY = "business.key";// 死信路由鍵private static final String DLX_ROUTING_KEY = "dlx.key";// 聲明業務交換機(直連型)@Beanpublic DirectExchange businessExchange() {return new DirectExchange(BUSINESS_EXCHANGE);}// 聲明死信交換機@Beanpublic DirectExchange dlxExchange() {return new DirectExchange(DLX_EXCHANGE);}// 聲明業務隊列(綁定死信屬性)@Beanpublic Queue businessQueue() {Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", DLX_EXCHANGE); // 設置死信交換機args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY); // 設置死信路由鍵args.put("x-message-ttl", 10000); // 隊列統一TTL(單位:毫秒)return new Queue(BUSINESS_QUEUE, true, false, false, args);}// 聲明死信隊列@Beanpublic Queue dlxQueue() {return new Queue(DLX_QUEUE);}// 綁定業務隊列到交換機@Beanpublic Binding businessBinding() {return BindingBuilder.bind(businessQueue()).to(businessExchange()).with(BUSINESS_ROUTING_KEY);}// 綁定死信隊列到交換機@Beanpublic Binding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);}
}
消息生產者
@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 發送延時消息* @param message 消息內容* @param ttl 單位:秒*/public void sendDelayMessage(String message, int ttl) {// 消息屬性設置MessagePostProcessor processor = message -> {message.getMessageProperties().setExpiration(String.valueOf(ttl * 1000)); // 消息級別TTLreturn message;};rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE,RabbitMQConfig.BUSINESS_ROUTING_KEY,message,processor);}
}
消息消費者
@Component
public class MessageConsumer {@Autowiredprivate BillService billService;@RabbitListener(queues = RabbitMQConfig.DLX_QUEUE)public void processDelayMessage(String billCode) {System.out.println("收到延時消息:" + billCode);billService.closeBill(billCode);System.out.println("超時未支付,訂單已關閉--------------");}
}

兩種TTL設置方式

隊列級別TTL

args.put("x-message-ttl", 10000);

隊列中所有消息統一過期時間;消息實際存活時間 = 隊列TTL;性能更優(RabbitMQ統一處理)

消息級別TTL

message.getMessageProperties().setExpiration("5000");

每個消息可以設置不同TTL;實際存活時間取最小值(隊列TTL vs 消息TTL);需要逐個處理消息,性能開銷較大

訂單超時關閉實例

在這里插入圖片描述

訂單服務

@Service
public class BillService {@Autowiredprivate MessageProducer messageProducer;@Resourceprivate BillMapper billMapper;public void createBill(Bill bill) {// 保存訂單到數據庫bill.setIsPayment(1); // 設置初始狀態 1:未支付 2:已支付 3:已關閉billMapper.insert(bill);// 發送延時消息(10s)messageProducer.sendDelayMessage(bill.getBillCode(), 10);}public void closeBill(String billCode) {Bill bill = billMapper.selectOne(new QueryWrapper<Bill>().eq("billCode", billCode));if (bill != null && bill.getIsPayment() == 1) {bill.setIsPayment(3);billMapper.updateById(bill);}}
}

消息處理

@RestController
@RequestMapping("/bill")
public class BillController {@Autowiredprivate BillService billService;@GetMapping("/send")public String send(){// 創建測試訂單Bill bill = new Bill();bill.setBillCode("BILL2025_999");bill.setProductName("可口可樂");// 創建賬單并發送延時消息billService.createBill(bill);return "訂單創建成功,10秒后未支付將自動關閉。訂單號:" + bill.getBillCode();}
}

流程:

  1. 訪問 localhost:8080/bill/send 創建測試訂單

    在這里插入圖片描述

  2. 訂單初始狀態為待支付(1)

    在這里插入圖片描述

  3. 消息經過10秒延遲進入死信隊列

    在這里插入圖片描述

  4. 消費者處理消息時檢查訂單狀態

  5. 若仍為未支付狀態,更新為已關閉(3)

    在這里插入圖片描述

延遲消息插件

RabbitMQ 提供了官方插件 rabbitmq_delayed_message_exchange,它允許你發送延遲消息而無需設置消息的 TTL 和死信隊列。這個插件提供了一個新的交換機類型 x-delayed-message,可以用來實現消息的延遲投遞。

安裝插件

可以從 RabbitMQ 的插件頁面下載,或者直接使用以下命令進行安裝(假設 RabbitMQ 安裝在默認位置):

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

安裝完成后,重啟 RabbitMQ 服務。

配置延遲交換機

@Bean
public CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange("delayed.exchange", "x-delayed-message", true, false, args);
}// 發送消息時設置延遲頭
rabbitTemplate.convertAndSend("delayed.exchange", "routing.key", message, msg -> {msg.getMessageProperties().setHeader("x-delay", 5000);return msg;
});

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/896069.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/896069.shtml
英文地址,請注明出處:http://en.pswp.cn/news/896069.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

畢業項目推薦:基于yolov8/yolov5/yolo11的番茄成熟度檢測識別系統(python+卷積神經網絡)

文章目錄 概要一、整體資源介紹技術要點功能展示&#xff1a;功能1 支持單張圖片識別功能2 支持遍歷文件夾識別功能3 支持識別視頻文件功能4 支持攝像頭識別功能5 支持結果文件導出&#xff08;xls格式&#xff09;功能6 支持切換檢測到的目標查看 二、數據集三、算法介紹1. YO…

【智能客服】ChatGPT大模型話術優化落地方案

本文原創作者:姚瑞南 AI-agent 大模型運營專家,先后任職于美團、獵聘等中大廠AI訓練專家和智能運營專家崗;多年人工智能行業智能產品運營及大模型落地經驗,擁有AI外呼方向國家專利與PMP項目管理證書。(轉載需經授權) 目錄 一、項目背景 1.1 行業背景 1.2 業務現…

STM32的HAL庫開發---單通道ADC采集(DMA讀取)實驗

一、實驗簡介 正常單通道ADC采集順序是先開啟ADC采集&#xff0c;然后等待ADC轉換完成&#xff0c;也就是判斷EOC位置1&#xff0c;然后再讀取數據寄存器的值。 如果配置了DMA功能&#xff0c;在EOC位被硬件置1后&#xff0c;自動產生DMA請求&#xff0c;然后DMA進行數據搬運…

編譯原理基礎(1)

1.什么是ASCII碼&#xff1f; ASCII碼即美國信息交換標準代碼&#xff0c;是基于拉丁字母的電腦編碼系統&#xff0c;用于顯示現代英語和部分西歐語言。其7位編碼范圍0-127&#xff0c;8位擴展到0-255。字符集含控制字符&#xff08;0-31、127&#xff0c;用于控制設備或表示通…

基于 Highcharts 實現 Vue 中的答題統計柱狀圖組件

在現代 Web 開發中&#xff0c;數據可視化是一個重要的組成部分&#xff0c;而 Highcharts 是一個廣泛使用的 JavaScript 圖表庫&#xff0c;可以幫助開發者在 Web 頁面上輕松地繪制豐富的圖表。在本文中&#xff0c;我們將基于 Highcharts 創建一個用于答題統計的柱狀圖&#…

SQLAlchemyError: A transaction is already begun on this Session.

資料 sqlalchemy 事務 - 簡書 在 SQLAlchemy 中&#xff0c;事務是通過會話來管理的。當你開始一個事務&#xff08;例如使用 async with db.begin()&#xff09;&#xff0c;它會開啟一個新的事務&#xff0c;并在事務塊結束時自動提交或回滾。如果在同一個會話中&#xff0c…

Java Web開發實戰與項目——Spring Boot與Redis實現緩存管理

緩存技術在現代Web開發中至關重要&#xff0c;尤其是在高并發的環境中&#xff0c;緩存能夠有效減少數據庫訪問壓力、提高系統性能。Redis作為最流行的內存數據存儲系統之一&#xff0c;常用于緩存管理。本節將講解如何在Spring Boot項目中集成Redis&#xff0c;實現緩存管理&a…

C語言學習【1】C語言關于寄存器的封裝

目錄 1.封裝寄存的C語言的語法volatile&#xff1a;unsigned int:*pGpiobOdrvolatile unsigned int * 2.進一步C語言的封裝 在嵌入式中&#xff0c;底層一定是操作寄存器&#xff0c;我有一個理念&#xff0c;凡事一定要想清楚&#xff0c;把任何知識點融入自己的理解之中&…

#滲透測試#批量漏洞挖掘#暢捷通T+遠程命令執行漏洞

免責聲明 本教程僅為合法的教學目的而準備,嚴禁用于任何形式的違法犯罪活動及其他商業行為,在使用本教程前,您應確保該行為符合當地的法律法規,繼續閱讀即表示您需自行承擔所有操作的后果,如有異議,請立即停止本文章讀。 目錄 一、漏洞概況 二、攻擊特征 三、應急處置…

ollama 學習筆記

1. 參考博客&#xff1a;1. Ollama完整教程&#xff1a;本地LLM管理、WebUI對話、Python/Java客戶端API應用&#xff1a;https://blog.csdn.net/python122_/article/details/1409457202. https://gitee.com/ai-big-model/ollama/tree/main --》REST APIollama 離線安裝包 ollam…

ARM Linux平臺下 OpenCV Camera 實驗

一、硬件原理 1. OV2640 1.1 基本功能 OV2640 是一款低功耗、高性能的圖像傳感器&#xff0c;支持以下功能&#xff1a; 最高分辨率&#xff1a;200 萬像素&#xff08;1600x1200&#xff09;。 輸出格式&#xff1a;JPEG、YUV、RGB。 內置圖像處理功能&#xff1a;自動曝…

vue2.x中父組件通過props向子組件傳遞數據詳細解讀

1. 父組件向子組件傳遞數據的步驟 在子組件中定義 props&#xff1a; 子組件通過 props 選項聲明它期望接收的數據。props 可以是數組形式&#xff08;簡單聲明&#xff09;或對象形式&#xff08;支持類型檢查和默認值&#xff09;。 在父組件中使用子組件時綁定 props&#x…

【Gin】2:快速上手Gin框架(模版、cookie、session)

本文目錄 一、模版渲染二、自定義模版函數三、cookie四、Session五、cookie、session區別六、會話攻擊 一、模版渲染 在 Gin 框架中&#xff0c;模板主要用于動態生成 HTML 頁面&#xff0c;結合 Go 語言的模板引擎功能&#xff0c;實現數據與視圖的分離。 模板渲染是一種動態…

【AI繪畫】大衛? 霍克尼風格——自然的魔法(一丹一世界)

大衛? 霍克尼&#xff0c;很喜歡這個老頭&#xff0c;“藝術是一場戰斗”。老先生零九年有了iphone&#xff0c;開始用iphone畫畫&#xff0c;一零年開始用ipad畫畫&#xff0c;用指頭劃拉&#xff0c;據說五分鐘就能畫一幅&#xff0c;每天早上隨手畫幾幅送給身邊的朋友。很c…

解碼 NLP:從萌芽到蓬勃的技術蛻變之旅

內容概況&#xff1a; 主要講述NLP專欄的內容和NLP的發展及其在現代生活中的廣泛應用。專欄強調實踐為主、理論為輔的學習方法&#xff0c;并通過多個生活場景展示了NLP技術的實際應用&#xff0c;如對話機器人、搜索引擎、翻譯軟件、電商推薦和智能客服等。 這邊我就不多做自我…

解決DeepSeek服務器繁忙問題的實用指南

目錄 簡述 1. 關于服務器繁忙 1.1 服務器負載與資源限制 1.2 會話管理與連接機制 1.3 客戶端配置與網絡問題 2. 關于DeepSeek服務的備用選項 2.1 納米AI搜索 2.2 硅基流動 2.3 秘塔AI搜索 2.4 字節跳動火山引擎 2.5 百度云千帆 2.6 英偉達NIM 2.7 Groq 2.8 Firew…

前端(AJAX)學習筆記(CLASS 2):圖書管理案例以及圖片上傳

* BootStrap彈框 功能&#xff1a;不離開當前頁面&#xff0c;顯示單獨內容&#xff0c;供用戶操作 步驟&#xff1a; 1、引入bootstrap.css和bootstrap.js 2、準備彈框標簽&#xff0c;確認結構 3、通過自定義屬性&#xff0c;控制彈框的顯示和隱藏 其中的bootstrap.css…

數據結構:雙鏈表list

list 是 C 標準庫中的雙向鏈表容器。 list初始化示例&#xff1a; #include <list>int n 7;std::list<int> lst; // 初始化一個空的雙向鏈表 lststd::list<int> lst(n); // 初始化一個大小為 n 的鏈表 lst&#xff0c;鏈表中的值默認都為 0std::list<i…

AI Agent Service Toolkit:一站式大模型智能體開發套件

項目簡介 該工具包基于LangGraph、FastAPI和Streamlit構建,提供了構建和運行大模型Agent的最小原子能力,包含LangGraph代理、FastAPI服務、用于與服務交互的客戶端以及一個使用客戶端提供聊天界面的Streamlit應用。用戶可以利用該工具包提供的模板快速搭建基于LangGraph框架…

論文概覽 |《Urban Analytics and City Science》2023.10 Vol.50 Issue.8

本次給大家整理的是《Environment and Planning B: Urban Analytics and City Science》雜志2023年10月第50卷第8期的論文的題目和摘要&#xff0c;一共包括21篇SCI論文&#xff01; 論文1 Advances in geospatial approaches to transport networks and sustainable mobility …