Rabbitmq延遲消息

目錄

  • 一、延遲消息
    • 1.基于死信實現延遲消息
      • 1.1 消息的TTL(Time To Live)
      • 1.2 死信交換機 Dead Letter Exchanges
      • 1.3 代碼實現
    • 2.基于延遲插件實現延遲消息
      • 2.1 插件安裝
      • 2.2 代碼實現
    • 3.基于延遲插件封裝消息

一、延遲消息

延遲消息有兩種實現方案:
1,基于死信隊列
2,集成延遲插件

1.基于死信實現延遲消息

使用RabbitMQ來實現延遲消息必須先了解RabbitMQ的兩個概念:
消息的TTL(存活時間)和死信交換機Exchange,通過這兩者的組合來實現延遲隊列

1.1 消息的TTL(Time To Live)

消息的TTL就是消息的存活時間。RabbitMQ可以對隊列和消息分別設置TTL。對隊列設置就是隊列沒有消費者連著的保留時間,也可以對每一個單獨的消息做單獨的設置。超過了這個時間,我們認為這個消息就死了,稱之為死信。
如何設置TTL:
我們創建一個隊列queue.temp,在Arguments 中添加x-message-ttl 為5000 (單位是毫秒),那所在壓在這個隊列的消息在5秒后會消失。

1.2 死信交換機 Dead Letter Exchanges

一個消息在滿足如下條件下,會進死信路由,記住這里是路由而不是隊列,一個路由可以對應很多隊列。
(1) 一個消息被Consumer拒收了,并且reject方法的參數里requeue是false。也就是說不會被再次放在隊列里,被其他消費者使用。
(2)上面的消息的TTL到了,消息過期了。
(3)隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。
Dead Letter Exchange其實就是一種普通的exchange,和創建其他exchange沒有兩樣。只是在某一個設置Dead Letter Exchange的隊列中有消息過期了,會自動觸發消息的轉發,發送到Dead Letter Exchange中去。
在這里插入圖片描述
我們現在可以測試一下延遲隊列。
(1)創建死信隊列
(2)創建交換機
(3)建立交換器與隊列之間的綁定
(4)創建隊列

1.3 代碼實現

在service-mq 中添加配置類

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DeadLetterMqConfig {// 聲明一些變量public static final String exchange_dead = "exchange.dead";public static final String routing_dead_1 = "routing.dead.1";public static final String routing_dead_2 = "routing.dead.2";public static final String queue_dead_1 = "queue.dead.1";public static final String queue_dead_2 = "queue.dead.2";// 定義交換機@Beanpublic DirectExchange exchange(){return new DirectExchange(exchange_dead,true,false,null);}@Beanpublic Queue queue1(){// 設置如果隊列一 出現問題,則通過參數轉到exchange_dead,routing_dead_2 上!HashMap<String, Object> map = new HashMap<>();// 參數綁定 此處的key 固定值,不能隨意寫map.put("x-dead-letter-exchange",exchange_dead);map.put("x-dead-letter-routing-key",routing_dead_2);// 設置延遲時間map.put("x-message-ttl ", 10 * 1000);// 隊列名稱,是否持久化,是否獨享、排外的【true:只可以在本次連接中訪問】,是否自動刪除,隊列的其他屬性參數return new Queue(queue_dead_1,true,false,false,map);}@Beanpublic Binding binding(){// 將隊列一 通過routing_dead_1 key 綁定到exchange_dead 交換機上return BindingBuilder.bind(queue1()).to(exchange()).with(routing_dead_1);}// 這個隊列二就是一個普通隊列@Beanpublic Queue queue2(){return new Queue(queue_dead_2,true,false,false,null);}// 設置隊列二的綁定規則@Beanpublic Binding binding2(){// 將隊列二通過routing_dead_2 key 綁定到exchange_dead交換機上!return BindingBuilder.bind(queue2()).to(exchange()).with(routing_dead_2);}
}

配置發送消息

@RestController
@RequestMapping("/mq")
@Slf4j
public class MqController {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate RabbitService rabbitService;@GetMapping("sendDeadLettle")public Result sendDeadLettle() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");this.rabbitTemplate.convertAndSend(DeadLetterMqConfig.exchange_dead, DeadLetterMqConfig.routing_dead_1, "ok");System.out.println(sdf.format(new Date()) + " Delay sent.");return Result.ok();}
}

消息接收方

@Component
public class DeadLetterReceiver {@RabbitListener(queues = DeadLetterMqConfig.queue_dead_2)public void getMessage(String msg, Message message, Channel channel) throws IOException {//時間格式化SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd  HH:mm:ss");System.out.println("消息接收的時間:\t"+simpleDateFormat.format(new Date()));System.out.println("消息的內容"+msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}
}

在這里插入圖片描述

2.基于延遲插件實現延遲消息

2.1 插件安裝

Rabbitmq實現了一個插件x-delay-message來實現延時隊列

  1. 首先我們將剛下載下來的rabbitmq_delayed_message_exchange-3.9.0.ez文件上傳到RabbitMQ所在服務器,下載地址:https://www.rabbitmq.com/community-plugins.html
  2. 切換到插件所在目錄,執行 docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins 命令,將剛插件拷貝到容器內plugins目錄下
  3. 執行 docker exec -it rabbitmq /bin/bash 命令進入到容器內部,并 cd plugins 進入plugins目錄
  4. 執行 ls -l|grep delay 命令查看插件是否copy成功
  5. 在容器內plugins目錄下,執行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 命令啟用插件
  6. exit命令退出RabbitMQ容器內部,然后執行 docker restart rabbitmq 命令重啟RabbitMQ容器

2.2 代碼實現

配置隊列

@Configuration
public class DelayedMqConfig {public static final String exchange_delay = "exchange.delay";public static final String routing_delay = "routing.delay";public static final String queue_delay_1 = "queue.delay.1";@Beanpublic Queue delayQeue1() {// 第一個參數是創建的queue的名字,第二個參數是是否支持持久化return new Queue(queue_delay_1, true);}@Beanpublic CustomExchange delayExchange() {Map<String, Object> args = new HashMap<String, Object>();args.put("x-delayed-type", "direct");return new CustomExchange(exchange_delay, "x-delayed-message", true, false, args);}@Beanpublic Binding delayBbinding1() {return BindingBuilder.bind(delayQeue1()).to(delayExchange()).with(routing_delay).noargs();}
}

發送消息

@GetMapping("sendelay")
public Result sendDelay() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");this.rabbitTemplate.convertAndSend(DelayedMqConfig.exchange_delay, DelayedMqConfig.routing_delay, sdf.format(new Date()), new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(10 * 1000);System.out.println(sdf.format(new Date()) + " Delay sent.");return message;}});return Result.ok();
}

接收消息

@Component
public class DelayReceiver {@RabbitListener(queues = DelayedMqConfig.queue_delay_1)public void get(String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("Receive queue_delay_1: " + sdf.format(new Date()) + " Delay rece." + msg);}}

3.基于延遲插件封裝消息

/*** 封裝發送延遲消息方法* @param exchange* @param routingKey* @param msg* @param delayTime* @return*/
public Boolean sendDelayMsg(String exchange,String routingKey, Object msg, int delayTime){//  將發送的消息 賦值到 自定義的實體類GmallCorrelationData gmallCorrelationData = new GmallCorrelationData();//  聲明一個correlationId的變量String correlationId = UUID.randomUUID().toString().replaceAll("-","");gmallCorrelationData.setId(correlationId);gmallCorrelationData.setExchange(exchange);gmallCorrelationData.setRoutingKey(routingKey);gmallCorrelationData.setMessage(msg);gmallCorrelationData.setDelayTime(delayTime);gmallCorrelationData.setDelay(true);//  將數據存到緩存this.redisTemplate.opsForValue().set(correlationId,JSON.toJSONString(gmallCorrelationData),10,TimeUnit.MINUTES);//  發送消息this.rabbitTemplate.convertAndSend(exchange,routingKey,msg,message -> {//  設置延遲時間message.getMessageProperties().setDelay(delayTime*1000);return message;},gmallCorrelationData);//  默認返回return true;
}

修改retrySendMsg方法 – 添加判斷是否屬于延遲消息

//  判斷是否屬于延遲消息
if (gmallCorrelationData.isDelay()){//  屬于延遲消息this.rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(),gmallCorrelationData.getRoutingKey(),gmallCorrelationData.getMessage(),message -> {//  設置延遲時間message.getMessageProperties().setDelay(gmallCorrelationData.getDelayTime()*1000);return message;},gmallCorrelationData);
}else {//  調用發送消息方法 表示發送普通消息  發送消息的時候,不能調用 new RabbitService().sendMsg() 這個方法this.rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(),gmallCorrelationData.getRoutingKey(),gmallCorrelationData.getMessage(),gmallCorrelationData);
}

利用封裝好的工具類 測試發送延遲消息

//  基于延遲插件的延遲消息
@GetMapping("sendDelay")
public Result sendDelay(){//  聲明一個時間對象SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("發送時間:"+simpleDateFormat.format(new Date()));this.rabbitService.sendDelayMsg(DelayedMqConfig.exchange_delay,DelayedMqConfig.routing_delay,"iuok",3);return Result.ok();
}

重試了4次,所以我們需要保證冪等性
在這里插入圖片描述
結果會 回發送三次,也被消費三次!
如何保證消息冪等性?
1.使用數據方式
2.使用redis setnx 命令解決 — 推薦

@SneakyThrows
@RabbitListener(queues = DelayedMqConfig.queue_delay_1)
public void getMsg2(String msg,Message message,Channel channel){//  使用setnx 命令來解決 msgKey = delay:iuokString msgKey = "delay:"+msg;Boolean result = this.redisTemplate.opsForValue().setIfAbsent(msgKey, "0", 10, TimeUnit.MINUTES);//  result = true : 說明執行成功,redis 里面沒有這個key ,第一次創建, 第一次消費。//  result = false : 說明執行失敗,redis 里面有這個key//  不能: 那么就表示這個消息只能被消費一次!  那么第一次消費成功或失敗,我們確定不了!  --- 只能被消費一次!//        if (result){//            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//            System.out.println("接收時間:"+simpleDateFormat.format(new Date()));//            System.out.println("接收的消息:"+msg);//            //  手動確認消息//            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//        } else {//          //    不能消費!//        }//  能: 保證消息被消費成功    第二次消費,可以進來,但是要判斷上一個消費者,是否將消息消費了。如果消費了,則直接返回,如果沒有消費成功,我消費。//  在設置key 的時候給了一個默認值 0 ,如果消費成功,則將key的值 改為1if (!result){//  獲取緩存key對應的數據String status = (String) this.redisTemplate.opsForValue().get(msgKey);if ("1".equals(status)){//  手動確認channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);return;} else {//  說明第一個消費者沒有消費成功,所以消費并確認SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("接收時間:"+simpleDateFormat.format(new Date()));System.out.println("接收的消息:"+msg);//  修改redis 中的數據this.redisTemplate.opsForValue().set(msgKey,"1");channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);return;}}SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("接收時間:"+simpleDateFormat.format(new Date()));System.out.println("接收的消息:"+msg);//  修改redis 中的數據this.redisTemplate.opsForValue().set(msgKey,"1");//  手動確認消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/37918.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/37918.shtml
英文地址,請注明出處:http://en.pswp.cn/news/37918.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

2016年,進了百度

昨在深圳出差&#xff0c;與微信里的朋友吃了個便飯&#xff0c;他是今年四月份加的我微信&#xff08;gaoyang677&#xff09;&#xff0c;他的經歷很有意思&#xff0c;經他許可&#xff0c;分享給大家。 2012年時候&#xff0c;他大學畢業來到深圳&#xff0c;進了廠子&…

vue3 setup+Taro3 調用原生小程序自定義年月日時分多列選擇器,NutUI改造

vue3 setupTaro3 調用原生小程序自定義年月日時分多列選擇器&#xff0c;NutUI改造 NutUI 有日期時間選擇器&#xff0c;但是滑動效果太差&#xff0c;卡頓明顯。換成 原生小程序 很順暢 上代碼&#xff1a; <template><view><pickermode"multiSelector&…

2023牛客暑期多校訓練營9-J Puzzle: Star Battle

2023牛客暑期多校訓練營9-J Puzzle: Star Battle https://ac.nowcoder.com/acm/contest/57363/J 文章目錄 2023牛客暑期多校訓練營9-J Puzzle: Star Battle題意解題思路代碼 題意 解題思路 出題人都說是詐騙題&#xff08;&#xff0c;可以發現滿足每行每列恰好有 n n n個星…

python數據結構和算法

python數據結構和算法 參考 python圖解算法 選擇/快速排序 哈希表 廣度優先搜索算法 迪杰斯特拉算法 貪婪算法 動態規劃 K-鄰近算法 計算機科學是解決問題的研究。計算機科學使用抽象作為表示過程和數據的工具。抽象的數據類型允許程序員通過隱藏數據的細節來管理問題領域的…

【解決】Kafka Exception thrown when sending a message with key=‘null‘ 異常

問題原因&#xff1a; 如下圖&#xff0c;kafka 中配置的是監聽域名的方式&#xff0c;但程序里使用的是 ip:port 的連接方式。 解決辦法&#xff1a; kafka 中配置的是域名的方式&#xff0c;程序里也相應配置成 域名:port 的方式&#xff08;注意&#xff1a;本地h…

機器學習筆記之優化算法(十三)關于二次上界引理

機器學習筆記之優化算法——關于二次上界引理 引言回顧&#xff1a;利普希茲連續梯度下降法介紹 二次上界引理&#xff1a;介紹與作用二次上界與最優步長之間的關系二次上界引理證明過程 引言 本節將介紹二次上界的具體作用以及它的證明過程。 回顧&#xff1a; 利普希茲連續…

uniapp 微信小程序 訂閱消息

第一步&#xff0c;需要先去小程序官方挑選一下訂閱模板拿到模板id 訂閱按鈕在頭部導航上&#xff0c;所以 <u-navbar :bgColor"bgColor"><view class"u-nav-slot" slot"left" click"goSubscribe"><image :src"g…

阿里社招一面記錄

一輪電話面試&#xff0c;一個半小時&#xff0c;昨天晚上面試的&#xff0c;今早面試官打電話約了二面&#xff08;為啥是一面面試官:&#xff09; 自我介紹 工作經歷&#xff0c;項目經歷項目挑兩個介紹一下 這里介紹了一個偏技術的基于Mysql搭建的olap系統&#xff0c;數據…

綜述:計算機視覺中的圖像分割

一、說明 這篇文章是關于圖像分割的探索&#xff0c;這是解決計算機視覺問題&#xff08;如對象檢測、對象識別、圖像編輯、醫學圖像分析、自動駕駛汽車等&#xff09;的重要步驟之一。讓我們從介紹開始。 二、圖像分割介紹 圖像分割是計算機視覺中的一項基本任務&#xff0c;涉…

【Maven】SpringBoot項目使用maven-assembly-plugin插件多環境打包

SpringBoot項目使用maven-assembly-plugin插件多環境打包 1.創建SpringBoot項目并在pom.xml文件中添加maven-assembly-plugin配置 <!-- 多環境配置 --><profiles><!-- 開發環境 --><profile><id>dev</id><properties><prof…

新一代分布式融合存儲,數據場景All In One

1、摘要 2023年5月11日&#xff0c;浪潮信息全國巡展廣州站正式啟航。會上&#xff0c;重磅發布新一代分布式融合存儲AS13000G7&#xff0c;其采用極致融合架構設計理念&#xff0c;實現同一套存儲滿足四種非結構化數據的“All In One”高效融合&#xff0c;數據存力提升300%&a…

基于WebSocket的在線文字聊天室

與Ajax不同&#xff0c;WebSocket可以使服務端主動向客戶發送響應&#xff0c;本案例就是基于WebSocket的一個在線聊天室&#xff0c;不過功能比較簡單&#xff0c;只能滿足文字交流。演示如下。 案例學習于b站up主&#xff0c;鏈接 。這位up主講的非常清楚&#xff0c;值得去學…

item_get_sales-獲取TB商品銷量詳情

一、接口參數說明&#xff1a; item_get_sales-獲取商品銷量詳情&#xff0c;點擊更多API調試&#xff0c;請移步注冊API賬號點擊獲取測試key和secret 公共參數 請求地址: https://api-gw.onebound.cn/taobao/item_get_sales 名稱類型必須描述keyString是調用key&#xff08…

Idea 快捷鍵整理

Idea快捷鍵和自動代碼補全匯總 idea快捷鍵匯總 Ctrl 快捷鍵說明Ctrl F在當前文件進行文本查找 &#xff08;必備&#xff09;Ctrl R在當前文件進行文本替換 &#xff08;必備&#xff09;Ctrl Z撤銷 &#xff08;必備&#xff09;Ctrl Y刪除光標所在行 或 刪除選中的行 &am…

設計HTML5圖像和多媒體

在網頁中的文本信息直觀、明了&#xff0c;而多媒體信息更富內涵和視覺沖擊力。恰當使用不同類型的多媒體可以展示個性&#xff0c;突出重點&#xff0c;吸引用戶。在HTML5之前&#xff0c;需要借助插件為網頁添加多媒體&#xff0c;如Adobe Flash Player、蘋果的QuickTime等。…

【C++精華鋪】6.C++類和對象(下)類與對象的知識補充及編譯器優化

目錄 1. 再談構造 1.1 成員變量的初始化&#xff08;初始化列表&#xff09; 1.2 初始化列表的行為 1.3 explicit關鍵字 2. 類中的static成員 2.1 靜態成員變量 2.2 靜態成員函數 3. 友元 3.1 友元函數 3.1 友元類 4. 內部類 5. 匿名對象 6. 對象拷貝時候的編譯器優化…

GitHub 打不開解決方案

GitHub 這幾年國內普通用戶越來越難以訪問&#xff0c;github 作為全球最大的開源平臺&#xff0c;里面有用的內容很多&#xff0c;不管是對專業用戶還是普通用戶&#xff0c;無法訪問都是很嚴重的問題。 1.GitHub 加速鏡像 kgithub 是一個公益加速項目&#xff0c;僅需在 gi…

【LeetCode每日一題】——41.缺失的第一個正數

文章目錄 一【題目類別】二【題目難度】三【題目編號】四【題目描述】五【題目示例】六【題目提示】七【解題思路】八【時間頻度】九【代碼實現】十【提交結果】 一【題目類別】 哈希表 二【題目難度】 困難 三【題目編號】 41.缺失的第一個正數 四【題目描述】 給你一個…

Compute shader SV 理解圖

本圖轉子&#xff1a;【Computeshader】個人總結_蔣偉博的博客-CSDN博客

【Rust】Rust學習 第十二章一個 I/O 項目:構建一個命令行程序

本章既是一個目前所學的很多技能的概括&#xff0c;也是一個更多標準庫功能的探索。我們將構建一個與文件和命令行輸入/輸出交互的命令行工具來練習現在一些你已經掌握的 Rust 技能。 Rust 的運行速度、安全性、單二進制文件輸出和跨平臺支持使其成為創建命令行程序的絕佳選擇…