【RabbitMQ】延遲隊列

1.概述

延遲隊列其實就是隊列里的消息是希望在指定時間到了以后或之前取出和處理,簡單來說,延時隊列就是用來存放需要在指定時間被處理的元素的隊列。

延時隊列的使用場景:

1.訂單在十分鐘之內未支付則自動取消

2.新創建的店鋪,如果在十天內都沒有上傳過商品,則自動發送消息提醒。

3.用戶注冊成功后,如果三天內沒有登陸則進行短信提醒。

4.用戶發起退款,如果三天內沒有得到處理則通知相關運營人員。

5.預定會議后,需要在預定的時間點前十分鐘通知各個與會人員參加會議

2.代碼演示?

代碼是用springboot整合的。

先導入依賴

<dependencies><!--RabbitMQ 依賴--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--RabbitMQ 測試依賴--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency>
</dependencies>

配置文件

spring.rabbitmq.host=192.168.10.137
spring.rabbitmq.port=5672
spring.rabbitmq.username=rabbit
spring.rabbitmq.password=rabbit
spring.rabbitmq.virtual-host=/

啟動器

@SpringBootApplicationpublic class App {public static void main(String[] args) {SpringApplication.run(App.class,args);}}

?需求如下:創建兩個隊列 QA 和 QB,兩者隊列 TTL 分別設置為 10S 和 40S,然后在創建一個交 換機 X 和死信交 換機 Y,它們的類型都是 direct,創建一個死信隊列 QD,它們的綁定關系如下:

定義配置類,描述上圖的隊列,交換機以及隊列和交換機之間的關系

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;// 使用@Configuration注解表明這是一個配置類,Spring容器會掃描該類來獲取Bean的定義信息
@Configuration
public class DelayedQueueConfig {// 定義直連類型(Direct)的交換機,名稱為xExchange// @Bean注解用于將方法返回的對象注冊為Spring容器中的一個Bean,"xExchange"是該Bean的名稱@Bean("xExchange")public DirectExchange xExchange() {// 創建并返回一個名為"X"的DirectExchange實例,DirectExchange類型的交換機根據路由鍵直接轉發消息return new DirectExchange("X");}// 聲明另一個直連類型的交換機,名稱為yExchange@Bean("yExchange")public DirectExchange yExchange() {// 創建并返回一個名為"Y"的DirectExchange實例return new DirectExchange("Y");}// 聲明隊列queueA@Bean("queueA")public Queue queueA() {// 創建一個HashMap用于存儲隊列的屬性參數Map<String, Object> args = new HashMap<>();// 設置死信交換機(當消息在隊列中過期或被否定確認等情況時,消息會被轉發到這個交換機)為yExchangeargs.put("x-dead-letter-exchange", "Y");// 設置死信路由鍵,當消息進入死信交換機后,根據這個路由鍵來路由到對應的死信隊列args.put("x-dead-letter-routing-key", "YD");// 設置消息在隊列中的存活時間(即延時時間)為10000毫秒,也就是10秒,這使得queueA成為一個延時隊列args.put("x-message-ttl", 10000);// 創建一個持久化的隊列(QueueBuilder.durable方法),名稱為"QA",并帶上前面設置的屬性參數return QueueBuilder.durable("QA").withArguments(args).build();}// 綁定交換機xExchange和隊列queueA// @Qualifier注解用于根據指定的Bean名稱來注入對應的Bean實例,這里分別指定了要注入的隊列和交換機實例@Beanpublic Binding queueQABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange) {// 使用BindingBuilder將隊列queueA綁定到交換機xExchange上,綁定的路由鍵為"XA"return BindingBuilder.bind(queueA).to(xExchange).with("XA");}// 聲明隊列queueB,設置其延時時間為40秒@Bean("queueB")public Queue queueB() {Map<String, Object> args = new HashMap<>();// 同樣設置死信交換機為yExchangeargs.put("x-dead-letter-exchange", "Y");// 設置死信路由鍵為"YD"args.put("x-dead-letter-routing-key", "YD");// 設置消息在隊列中的存活時間為40000毫秒,即40秒,使queueB成為延時隊列args.put("x-message-ttl", 40000);// 創建一個持久化的隊列,名稱為"QB",并帶上相關屬性參數return QueueBuilder.durable("QB").withArguments(args).build();}// 綁定交換機xExchange和隊列queueB@Beanpublic Binding queueQBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange) {// 將隊列queueB綁定到交換機xExchange上,綁定的路由鍵為"XB"return BindingBuilder.bind(queueB).to(xExchange).with("XB");}// 聲明死信隊列queueD@Bean("queueD")public Queue queueD() {// 創建一個名稱為"QD"的隊列,用于接收從延時隊列中過期轉移過來的消息return new Queue("QD");}// 聲明死信交換機yExchange和死信隊列queueD的綁定關系@Beanpublic Binding deadQueueBindingQD(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange) {// 使用BindingBuilder將死信隊列queueD綁定到死信交換機yExchange上,綁定的路由鍵為"YD"return BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}

?生產者

// @RestController注解表明該類是一個RESTful風格的控制器,用于處理HTTP請求并返回JSON等格式的數據
@RestController 
// @RequestMapping("ttl")注解用于映射請求路徑,所有以"/ttl"開頭的請求會被該控制器處理
@RequestMapping("ttl") 
public class SendMessageController { // @Resource注解用于自動裝配RabbitTemplate實例,RabbitTemplate是Spring AMQP提供的用于操作RabbitMQ的工具類@Resource RabbitTemplate rabbitTemplate; // 該方法用于處理"/sendMsg/{message}"路徑的請求,是消息發送的邏輯所在// @RequestMapping("sendMsg/{message}")注解將該方法映射到指定的請求路徑,其中{message}是一個路徑變量@RequestMapping("sendMsg/{message}") public void sendMessage(@PathVariable("message") String message){ // 使用rabbitTemplate的convertAndSend方法向RabbitMQ發送消息// 第一個參數"X"指定交換機名稱,對應前面配置的xExchange// 第二個參數"XA"指定路由鍵,用于將消息路由到綁定了該路由鍵的隊列(這里是queueA)// 消息內容是拼接后的字符串,表明消息來自ttl為10秒鐘的延時隊列,并帶上傳入的參數messagerabbitTemplate.convertAndSend("X", "XA", "消息來自ttl為10秒鐘的延時隊列" + message); // 同理,這條消息發送到交換機"X",通過路由鍵"XB"路由到queueB// 消息內容表明來自ttl為40秒鐘的延時隊列rabbitTemplate.convertAndSend("X", "XB", "消息來自ttl為40秒鐘的延時隊列" + message); }
}

消費者

// @Component注解將該類標記為一個Spring組件,使其能被Spring容器掃描并管理
@Component 
// @Slf4j注解是Lombok提供的,用于自動生成日志對象log,方便在類中記錄日志
@Slf4j 
public class MessageConsumerListener { // @RabbitListener(queues = "QD")注解表明該方法是一個RabbitMQ消息監聽器,監聽名為"QD"的隊列// 當隊列"QD"(即前面配置的死信隊列)中有消息時,該方法會被觸發執行@RabbitListener(queues = "QD") public void getMessage(Message message, Channel channel) throws Exception{ // 獲取消息體內容,將消息的字節數組轉換為字符串String msg = new String(message.getBody()); // 使用日志對象log記錄信息,輸出當前時間以及從死信隊列收到的消息內容log.info("當前時間是:{},收到死信隊列的消息{}",new Date().toString(),msg); }
}

我們上面構建的延時隊列太局限性了,因為我們直接寫死了延時隊列的時間,但我們實際的應用中很多情況都是根據客戶端動態設置時間,比如騰訊會議我們要預定多久的會。

所以下面這個案例新增了一個隊列QC,他不設置TTL,而是根據傳送的數據來動態設定。

我們在配置類中加上QC和交換機x交換機y之間的綁定關系

@Configuration
public class DelayedQueueConfig {@Bean("queueC")public Queue queueC() {Map<String, Object> args = new HashMap<>();//設置綁定死信交換機的屬性args.put("x-dead-letter-exchange", "Y");args.put("x-dead-letter-routing-key", "YD");return QueueBuilder.durable("QC").withArguments(args).build();}//綁定隊列QC和交換機X之間的關系@Beanpublic Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueC).to(xExchange).with("XC");}
}

?定義生產者

// @RestController注解表明該類是一個RESTful風格的控制器,用于處理HTTP請求并返回JSON等格式的數據
@RestController 
// @RequestMapping("ttl")注解用于映射請求路徑,所有以"/ttl"開頭的請求會被該控制器處理
@RequestMapping("ttl") 
public class SendMessageController { // @Resource注解用于自動裝配RabbitTemplate實例,RabbitTemplate是Spring AMQP提供的用于操作RabbitMQ的工具類@Resource RabbitTemplate rabbitTemplate; // 該方法用于處理"/sendttlMessage/{message}/{ttl}"路徑的請求,是消息發送的邏輯所在// @RequestMapping("sendttlMessage/{message}/{ttl}")注解將該方法映射到指定的請求路徑,其中{message}和{ttl}是路徑變量@RequestMapping("sendttlMessage/{message}/{ttl}") public void sendTtlMessage(@PathVariable("message") String message, @PathVariable("ttl") String ttl) { // 使用rabbitTemplate的convertAndSend方法向RabbitMQ發送消息// 第一個參數"X"指定交換機名稱// 第二個參數"XC"指定路由鍵,用于將消息路由到綁定了該路由鍵的隊列// 第三個參數message是消息內容// 第四個參數是一個Lambda表達式,用于在發送消息前設置消息的過期時間(通過setExpiration方法)// 設置的過期時間由路徑變量ttl傳入,單位為毫秒rabbitTemplate.convertAndSend("X", "XC", message, msg -> { msg.getMessageProperties().setExpiration(ttl); return msg; }); }
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/76393.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/76393.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/76393.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Linux筆記之Ubuntu系統設置自動登錄tty1界面

Ubuntu22.04系統 編輯getty配置文件 vim /etc/systemd/system/gettytty1.service.d/override.conf如果該目錄或者文件不存在&#xff0c;進行創建。 在override.conf文件中進行編輯&#xff1a; [Service] ExecStart ExecStart-/sbin/agetty --autologin yourusername --no…

C++程序詩篇的靈動賦形:多態

文章目錄 1.什么是多態&#xff1f;2.多態的語法實現2.1 虛函數2.2 多態的構成2.3 虛函數的重寫2.3.1 協變2.3.2 析構函數的重寫 2.4 override 和 final 3.抽象類4.多態原理4.1 虛函數表4.2 多態原理實現4.3 動態綁定與靜態綁定 5.繼承和多態常見的面試問題希望讀者們多多三連支…

算法訓練之動態規劃(三)

???~~~~~~歡迎光臨知星小度博客空間~~~~~~??? ???零星地變得優秀~也能拼湊出星河~??? ???我們一起努力成為更好的自己~??? ???如果這一篇博客對你有幫助~別忘了點贊分享哦~??? ???如果有什么問題可以評論區留言或者私信我哦~??? ?????? 個…

$_GET變量

$_GET 是一個超級全局變量&#xff0c;在 PHP 中用于收集通過 URL 查詢字符串傳遞的參數。它是一個關聯數組&#xff0c;包含了所有通過 HTTP GET 方法發送到當前腳本的變量。 預定義的 $_GET 變量用于收集來自 method"get" 的表單中的值。 從帶有 GET 方法的表單發…

jQuery多庫共存

在現代Web開發中&#xff0c;項目往往需要集成多種JavaScript庫或框架來滿足不同的功能需求。然而&#xff0c;當多個庫同時使用時&#xff0c;可能會出現命名沖突、功能覆蓋等問題。幸運的是&#xff0c;jQuery提供了一些機制來確保其可以與其他庫和諧共存。本文將探討如何實現…

MySQL 中的聚簇索引和非聚簇索引有什么區別?

MySQL 中的聚簇索引和非聚簇索引有什么區別&#xff1f; 1. 從不同存儲引擎去考慮 在MySIAM存儲引擎中&#xff0c;索引和數據是分開存儲的&#xff0c;包括主鍵索引在內的所有索引都是“非聚簇”的&#xff0c;每個索引的葉子節點存儲的是數據記錄的物理地址&#xff08;指針…

Java從入門到“放棄”(精通)之旅——啟航①

&#x1f31f;Java從入門到“放棄 ”精通之旅&#x1f680; 今天我將要帶大家一起探索神奇的Java世界&#xff01;希望能幫助到同樣初學Java的你~ (??????)?? &#x1f525; Java是什么&#xff1f;為什么這么火&#xff1f; Java不僅僅是一門編程語言&#xff0c;更…

三相電為什么沒零線也能通電

要理解三相電為什么沒零線也能通電&#xff0c;就要從發電的原理說起 1、弧形磁鐵中加入電樞&#xff0c;旋轉切割磁感線會產生電流 隨著電樞旋轉的角度變化&#xff0c;電樞垂直切割磁感線 電樞垂直切割磁感線&#xff0c;此時會產生最大電壓 當轉到與磁感線平行時&#xf…

文件上傳做題記錄

1&#xff0c;[SWPUCTF 2021 新生賽]easyupload2.0 直接上傳php 再試一下phtml 用蟻劍連發現連不上 那就只要命令執行了 2&#xff0c;[SWPUCTF 2021 新生賽]easyupload1.0 當然&#xff0c;直接上傳一個php是不行的 phtml也不行&#xff0c;看下是不是前端驗證&#xff0c;…

【Pandas】pandas DataFrame head

Pandas2.2 DataFrame Indexing, iteration 方法描述DataFrame.head([n])用于返回 DataFrame 的前幾行 pandas.DataFrame.head pandas.DataFrame.head 是一個方法&#xff0c;用于返回 DataFrame 的前幾行。這個方法非常有用&#xff0c;特別是在需要快速查看 DataFrame 的前…

日語學習-日語知識點小記-構建基礎-JLPT-N4階段(1):承上啟下,繼續上路

日語學習-日語知識點小記-構建基礎-JLPT-N4階段(1):承上啟下,繼續上路 1、前言(1)情況說明(2)工程師的信仰2、知識點(1)普通形(ふつうけい)と思います(2)辭書形ことができます(3)Vたことがあります。(4)Vた とき & Vる とき3、單詞(1)日語單詞(2…

碼率自適應(ABR)相關論文閱讀簡報

標題&#xff1a;Quality Enhanced Multimedia Content Delivery for Mobile Cloud with Deep Reinforcement Learning 作者&#xff1a;Muhammad Saleem , Yasir Saleem, H. M. Shahzad Asif, and M. Saleem Mian 單位: 巴基斯坦拉合爾54890工程技術大學計算機科學與工程系 …

匯編語言:指令詳解

零、前置知識 1、數據類型修飾符 名稱解釋byte一個字節&#xff0c;8bitword單字&#xff0c;占2個字節&#xff0c;16bitdword雙字&#xff0c;占4個字節&#xff0c;32bitqword四字&#xff0c;占8個字節&#xff0c;64bit 2、關鍵詞解釋 ptr&#xff1a;它代表 pointer&a…

藍橋杯c ++筆記(含算法 貪心+動態規劃+dp+進制轉化+便利等)

藍橋杯 #include <iostream> #include <vector> #include <algorithm> #include <string> using namespace std; //常使用的頭文件動態規劃 小藍在黑板上連續寫下從 11 到 20232023 之間所有的整數&#xff0c;得到了一個數字序列&#xff1a; S12345…

【C++算法】54.鏈表_合并 K 個升序鏈表

文章目錄 題目鏈接&#xff1a;題目描述&#xff1a;解法C 算法代碼&#xff1a; 題目鏈接&#xff1a; 23. 合并 K 個升序鏈表 題目描述&#xff1a; 解法 解法一&#xff1a;暴力解法 每個鏈表的平均長度為n&#xff0c;有k個鏈表&#xff0c;時間復雜度O(nk^2) 合并兩個有序…

Java中的注解技術講解

Java中的注解&#xff08;Annotation&#xff09;是一種在代碼中嵌入元數據的機制&#xff0c;不直接參與業務邏輯&#xff0c;而是為編譯器、開發工具以及運行時提供額外的信息和指導。下面我們將由淺入深地講解Java注解的概念、實現原理、各種應用場景&#xff0c;并通過代碼…

京東與喜茶關系破裂:切斷所有合作 禁止進入辦公場所

快科技4月10日消息&#xff0c;據報道&#xff0c;京東集團近日被曝出內部下發全員禁令&#xff0c;全面封殺喜茶產品進入辦公區域。 據知情人士透露&#xff0c;京東人力行政部門發布的通知明確規定&#xff1a;全國各職場禁止與喜茶品牌開展任何形式的合作&#xff1b;員工不…

+++++背到厭倦。持續更新

Spring IoC 的工作流程: 讀取 BeanDefinition: Spring 容器啟動時&#xff0c;會讀取 Bean 的配置信息 (例如 XML 配置文件、注解或 Java 代碼)&#xff0c;并將這些配置信息轉換為 BeanDefinition 對象。創建 Bean 實例: 根據 BeanDefinition 中的信息&#xff0c;Spring 容器…

如何在Git歷史中抹掉中文信息并翻譯成英文

如何在Git歷史中抹掉中文信息并翻譯成英文 在軟件開發和版本控制領域&#xff0c;維護一個清晰、一致的代碼歷史記錄是至關重要的。然而&#xff0c;有時我們可能會遇到需要修改歷史提交的情況&#xff0c;比如刪除敏感信息或修正錯誤。本文將詳細探討如何在Git歷史中抹掉中文…

21 天 Python 計劃:MySQL中DML與權限管理

文章目錄 前言一、介紹二、MySQL數據操作&#xff1a;DML2.1 插入數據&#xff08;INSERT&#xff09;2.1.1 插入完整數據&#xff08;順序插入&#xff09;2.1.2 指定字段插入數據2.1.3 插入多條記錄2.1.4 插入查詢結果 2.2 更新數據&#xff08;UPDATE&#xff09;2.3 刪除數…