一種多策略下RabbitMQ的延時隊列實現

1.為什么會用到延時隊列?

場景: 最近在開發一款系統中遇到這樣一個場景,A系統開通套餐需要把套餐信息以郵件的形式發送給相關工作人員,經過人工審核通過后,在B系統里面開通,A系統會調B系統套餐列表接口查詢套餐是否開通成功,開通成功則從A系統去完成訂單,假如超過設定時間未開通成功,則關閉訂單并退費.
(這東西俗稱"套娃")
這時候用RabbitMQ的延時隊列就可以完美的解決這個問題

2.為什么會提到多策略?

場景: 假如A系統還有別的功能添加需要經過人工審核之后在B系統中添加成功之后,A系統才會顯示添加成功,但是又不想寫很多隊列啊消費者等代碼.就可以用到這種策略模式,換句話說 就是類似 if… else …能明白了吧.

3.進入今天主題

整體流程圖:
在這里插入圖片描述

生產者生產一條延時消息,根據需要延時時間的不同,利用routingkey將消息路由到延時隊列,隊列都設置了TTL屬性,并綁定到死信交換機中,消息過期后,根據routingkey又會被路由到死信隊列中,消費者只需要監聽死信隊列,拿到消息去具體的策略實現類進行后續業務處理即可。

有了這個圖寫代碼就簡單了.
mq配置類 聲明隊列,路由鍵,交換機之間的關系;以及生產者消費者 rabbitmq等Bean

RabbitMqConfig

注意 監聽我也寫在配置類里面SimpleMessageListenerContainer用的這個類去設置的隊列
simpleMessageListenerContainer.setQueueNames(DEAD_LETTER_QUEUE_NAME);

package com.king.alice.rabbitmq.config;import com.king.alice.rabbitmq.delay.consumer.MessageConsumer;
import com.king.alice.rabbitmq.delay.consumer.Strategy;
import com.king.alice.rabbitmq.delay.provider.MessageProvider;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** @Author wlt* @Description rabbitmq配置類* @Date 2022/9/4* @Param* @return**/@Configuration
public class RabbitMqConfig {public static final String DELAY_EXCHANGE_NAME = "delay.alice.exchange";public static final String DELAY_QUEUE_NAME = "delay.alice.queue";public static final String DELAY_QUEUE_ROUTING_KEY = "delay.alice.queue.routing.key";public static final String DEAD_LETTER_EXCHANGE = "ttl.alice.exchange";public static final String DEAD_LETTER_QUEUE_ROUTING_KEY = "ttl.alice.queue.routing.key";public static final String DEAD_LETTER_QUEUE_NAME = "ttl.alice.queue";/*** 聲明延時Exchange* @return*/@Bean("delayExchange")public DirectExchange delayExchange(){return new DirectExchange(DELAY_EXCHANGE_NAME);}/*** 功能描述: <br>* <聲明死信Exchange>*/@Bean("deadLetterExchange")public DirectExchange deadLetterExchange(){return new DirectExchange(DEAD_LETTER_EXCHANGE);}/*** 聲明延時隊列 并綁定到對應的死信交換機* @return*/@Bean("delayQueue")public Queue delayQueue(){Map<String, Object> args = new HashMap<>(2);// x-dead-letter-exchange    這里聲明當前隊列綁定的死信交換機args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);// x-dead-letter-routing-key  這里聲明當前隊列的死信路由keyargs.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_ROUTING_KEY);// x-message-ttl  聲明隊列的TTL
//        args.put("x-message-ttl", 6000);return QueueBuilder.durable(DELAY_QUEUE_NAME).withArguments(args).build();}/*** 功能描述: <br>* <聲明死信隊列用于接收延時處理的消息>*/@Bean("deadLetterQueue")public Queue deadLetterQueue(){return new Queue(DEAD_LETTER_QUEUE_NAME);}/*** 功能描述: <br>* <聲明延時隊列綁定關系>* @Param:* @Return:* @Author: 大魔王* @Date: 2023/8/15 20:00*/@Beanpublic Binding delayBinding(@Qualifier("delayQueue") Queue queue,@Qualifier("delayExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY);}/*** 功能描述: <br>* <聲明死信隊列A綁定關系>* @Param:* @Return:* @Author: 大魔王* @Date: 2023/8/15 20:01*/@Beanpublic Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue,@Qualifier("deadLetterExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_ROUTING_KEY);}@Bean@ConditionalOnMissingBeanpublic MessageProvider messageProvider(@Qualifier("delayRabbitTemplate") RabbitTemplate template) {return new MessageProvider(template);}@Bean@ConditionalOnMissingBeanpublic MessageConsumer messageConsumer(ObjectProvider<List<Strategy>> provider) {return new MessageConsumer(provider);}@Bean@ConditionalOnMissingBeanpublic RabbitTemplate delayRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setMessageConverter(new Jackson2JsonMessageConverter());return template;}@BeanSimpleMessageListenerContainer simpleMessageListenerContainer(MessageConsumer messageConsumer, ConnectionFactory factory) {SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(factory);simpleMessageListenerContainer.setQueueNames(DEAD_LETTER_QUEUE_NAME);simpleMessageListenerContainer.setExposeListenerChannel(true);simpleMessageListenerContainer.setMessageListener(messageConsumer);return simpleMessageListenerContainer;}public static final String EXCHANGE_NAME = "alice_topic_exchange";public static final String QUEUE_NAME = "alice_queue";@Bean("aliceExchange")public Exchange aliceExchange() {return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}@Bean("aliceQueue")public Queue aliceQueue() {return QueueBuilder.durable(QUEUE_NAME).build();}@Beanpublic Binding bindQueueExchange(@Qualifier("aliceQueue") Queue queue, @Qualifier("aliceExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("alice.#").noargs();}}

生產者:

MessageProvider

package com.king.alice.rabbitmq.delay.provider;import cn.hutool.core.date.DateUtil;
import com.king.alice.common.json.JSON;
import com.king.alice.rabbitmq.config.RabbitMqConfig;
import com.king.alice.rabbitmq.delay.bean.DelayMessage;
import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Date;/*** @author 大魔王*/
@Slf4j
@Component
public class MessageProvider {@Autowiredprivate final RabbitTemplate rabbitTemplate;public MessageProvider(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}/*** send delay message*/public void sendMessage(DelayMessage delayMessage) {Assert.assertNotNull(delayMessage);log.info(" now date {},delay {} seconds to write to the message queue", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), delayMessage.getDelay());rabbitTemplate.convertAndSend(RabbitMqConfig.DELAY_EXCHANGE_NAME, RabbitMqConfig.DELAY_QUEUE_ROUTING_KEY, delayMessage,message -> {message.getMessageProperties().setExpiration(String.valueOf(delayMessage.getDelay() * 1000));return message;});}}

消費者:

package com.king.alice.rabbitmq.delay.consumer;import cn.hutool.core.util.ObjectUtil;
import com.king.alice.common.json.JSONObject;
import com.king.alice.rabbitmq.delay.bean.AliceMessage;
import com.king.alice.rabbitmq.delay.bean.DelayMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;/*** @author 大魔王*/
@Slf4j
public class MessageConsumer implements MessageListener {private final Map<Type, List<Strategy>> strategyMap = new ConcurrentHashMap<>();public MessageConsumer(ObjectProvider<List<Strategy>> stategyProvider) {List<Strategy> handleList = stategyProvider.getIfAvailable();Optional<? extends List<Strategy>> optionalStrategies = Optional.ofNullable(handleList);optionalStrategies.ifPresent(strategies -> strategies.stream().filter(strategy -> {Type genericInterface = strategy.getClass().getGenericInterfaces()[0];return genericInterface instanceof ParameterizedType;}).map(strategy -> ((ParameterizedType) strategy.getClass().getGenericInterfaces()[0]).getActualTypeArguments()[0]).collect(Collectors.toSet()).forEach(delayMessages -> {List<Strategy> collect = strategies.stream().filter(strategy -> {Type genericInterface = strategy.getClass().getGenericInterfaces()[0];if (genericInterface instanceof ParameterizedType) {Type actualTypeArgument = ((ParameterizedType) genericInterface).getActualTypeArguments()[0];return delayMessages.getTypeName().equals(actualTypeArgument.getTypeName());}return false;}).collect(Collectors.toList());strategyMap.put(delayMessages, collect);}));}@Overridepublic void onMessage(Message message) {MessageConverter messageConverter = new Jackson2JsonMessageConverter();DelayMessage delayMessage = (DelayMessage) messageConverter.fromMessage(message);List<Strategy> strategyList = strategyMap.get(delayMessage.getClass());if (!CollectionUtils.isEmpty(strategyList)) {strategyList.forEach(strategy -> strategy.handle(delayMessage));} else {log.info("Missing message processing class");}}}

策略相關Bean,接口以及實現類

DelayMessage

package com.king.alice.rabbitmq.delay.bean;/*** @author 大魔王*/
public interface DelayMessage{/*** 獲得延遲時間(單位秒)** @return 延遲時間單位秒*/int getDelay();}

AliceMessage

package com.king.alice.rabbitmq.delay.bean;import lombok.Getter;
import lombok.Setter;/*** @author 大魔王*/
@Getter
@Setter
public class AliceMessage implements DelayMessage {/***  用戶郵箱*/String email;/***  訂單類型*/String orderType;/***  執行次數*/Integer dealCount;/***  延時秒數*/int delay;@Overridepublic int getDelay() {return this.delay;}public void setDelay(int delay) {this.delay = delay;}
}

UserMessage

package com.king.alice.rabbitmq.delay.bean;import lombok.Getter;
import lombok.Setter;/*** @author 大魔王*/
@Getter
@Setter
public class UserMessage implements DelayMessage{/***  用戶*/String username;/***  token*/String token;/***  執行次數*/Integer dealCount;/***  延時秒數*/int delay;@Overridepublic int getDelay() {return this.delay;}public void setDelay(int delay) {this.delay = delay;}
}

Strategy

package com.king.alice.rabbitmq.delay.consumer;import com.king.alice.rabbitmq.delay.bean.DelayMessage;/*** @author 大魔王*/
public interface Strategy<T extends DelayMessage> {/*** 處理消息的方法** @param delayMessage 收到的消息*/void handle(T delayMessage);
}

AliceMessageHandler

package com.king.alice.rabbitmq;import com.king.alice.common.json.JSON;
import com.king.alice.common.json.JSONObject;
import com.king.alice.rabbitmq.delay.bean.AliceMessage;
import com.king.alice.rabbitmq.delay.consumer.Strategy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** @author 大魔王*/
@Component
@Slf4j
public class AliceMessageHandler implements Strategy<AliceMessage> {@Overridepublic void handle(AliceMessage delayMessage) {log.info("AliceMessage響應體{}", JSONObject.parseObject(JSON.toJSONString(delayMessage)));}
}

UserMessageHandler

package com.king.alice.rabbitmq;import com.king.alice.common.json.JSON;
import com.king.alice.common.json.JSONObject;
import com.king.alice.rabbitmq.delay.bean.UserMessage;
import com.king.alice.rabbitmq.delay.consumer.Strategy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** @author 大魔王*/
@Slf4j
@Component
public class UserMessageHandler implements Strategy<UserMessage> {@Overridepublic void handle(UserMessage delayMessage) {log.info("UserMessage響應體{}", JSONObject.parseObject(JSON.toJSONString(delayMessage)));}
}

接下來 我們寫個controller測試一下

SysAccountController

package com.king.alice.manage.sys.controller;import cn.hutool.core.date.DateUtil;
import com.king.alice.manage.sys.entity.SysAccount;
import com.king.alice.manage.sys.service.SysAccountService;
import com.king.alice.rabbitmq.delay.bean.AliceMessage;
import com.king.alice.rabbitmq.delay.bean.UserMessage;
import com.king.alice.rabbitmq.delay.provider.MessageProvider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;import javax.annotation.Resource;
import java.util.Date;/*** 賬號表(SysAccount)表控制層** @author makejava* @since 2023-08-09 11:40:16*/
@RestController
@Slf4j
public class SysAccountController {/*** 服務對象*/@Resourceprivate SysAccountService sysAccountService;@Autowiredprivate MessageProvider messageProvider;@PostMapping("/send-alice-message")public String sendMsg(@RequestBody AliceMessage aliceMessage) {messageProvider.sendMessage(aliceMessage);log.info("當前時間:{},收到aliceMessage請求,msg:{}", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), aliceMessage);return "success";}@PostMapping("/send-user-message")public String sendMsg(@RequestBody UserMessage userMessage) {messageProvider.sendMessage(userMessage);log.info("當前時間:{},收到userMessage請求,msg:{}", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), userMessage);return "success";}
}

調接口

第一個策略:
在這里插入圖片描述
控制臺打印
在這里插入圖片描述
第二個策略:
在這里插入圖片描述
延時12秒成功接收到消息
在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/39665.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/39665.shtml
英文地址,請注明出處:http://en.pswp.cn/news/39665.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

韋東山-電子量產工具項目:顯示單元

所有代碼都已通過測試跑通&#xff0c;其中代碼結構如下&#xff1a; 一、include文件夾 1.1 disp_manager.h #ifndef _DISP_MANAGER_H //防止頭文件重復包含,只要右邊的出現過&#xff0c;就不會再往下編譯 #define _DISP_MANAGER_H //區域結構體 typedef struct DispBuff …

[element-ui] el-table表格合并 span-method

用rowIndex, columnIndex 找到要合并的開始單元格 return {rowspan: 1,colspan: 1 } 表示表格不變 return {rowspan: 2,colspan: 1 } 表示表格向下合并一個單元格 return {rowspan: 1,colspan: 2 } 表示表格向右合并一個單元格 return {rowspan: 0,colspan: 0 } 表示刪除此單元…

leetcode810. 黑板異或游戲(博弈論 - java)

黑板異或游戲 lc 810 - 黑板異或游戲題目描述博弈論 動態規劃 lc 810 - 黑板異或游戲 難度 - 困難 原題鏈接 - 黑板異或游戲 題目描述 黑板上寫著一個非負整數數組 nums[i] 。 Alice 和 Bob 輪流從黑板上擦掉一個數字&#xff0c;Alice 先手。如果擦除一個數字后&#xff0c;剩…

談談網絡協議的定義、組成和重要性

個人主頁&#xff1a;insist--個人主頁?????? 本文專欄&#xff1a;網絡基礎——帶你走進網絡世界 本專欄會持續更新網絡基礎知識&#xff0c;希望大家多多支持&#xff0c;讓我們一起探索這個神奇而廣闊的網絡世界。 目錄 一、網絡協議的定義 二、網絡協議的組成 1、…

出于網絡安全考慮,印度啟用本土操作系統”瑪雅“取代Windows

據《印度教徒報》報道&#xff0c;印度將放棄微軟系統&#xff0c;選擇新的操作系統和端點檢測與保護系統。 備受期待的 "瑪雅操作系統 "將很快用于印度國防部的數字領域&#xff0c;而新的端點檢測和保護系統 "Chakravyuh "也將一起面世。 不過&#xf…

C++--類型轉換

1.什么是類型轉換 在傳統C語言中&#xff0c;由強制類型轉換和隱式類型轉換&#xff0c;隱式類型轉換&#xff0c;編譯器在在編譯階段自動處理&#xff0c;能轉換則轉換&#xff0c;強制類型轉換由用戶自己轉換。 缺陷&#xff1a; 轉換的可視性比較差&#xff0c;所有的轉換形…

Go語言中關鍵字type的多重應用場景詳解

當談及Go語言中的關鍵字type時&#xff0c;我們通常會想到用于定義結構體和接口的常見用法。然而&#xff0c;"type"關鍵字實際上有許多其他用法&#xff0c;本文將對其中幾種常見用法進行簡要總結記錄。 定義結構體和方法 在Go中&#xff0c;我們可以使用type來定…

運維監控學習筆記5

Linux的內存是虛擬內存&#xff0c;是物理內存和交換分區swap。 內存&#xff1a; 頁&#xff1a;4K&#xff0c; 硬盤&#xff1a;塊。 尋址&#xff1a; 空間&#xff1a;內存的合并。大頁內存。 free命令&#xff1a; [rootvm1 ~]# free -htotal used fre…

javap獲取Kotlin方法JNI方法簽名

獲取Kotlin方法簽名和JAVA不一樣的地方就是需要使用Kotlin 命令行編譯器生成.class文件&#xff1a; 編寫一個Kotlin類&#xff0c;添加JNI方法&#xff1a; class TestLib {external fun init(callBack: CallBack)interface CallBack{fun onData(count:Int,data:String)} }在…

cesium學習記錄08-鼠標繪制多邊形

上一篇學習了實體的一些基礎知識&#xff0c;這一篇來學習鼠標繪制實體多邊形的實現 一、方法一&#xff1a; 1&#xff0c;結果顯示 貼地&#xff1a; 不貼地&#xff1a; 2&#xff0c;方法全部代碼&#xff1a; 主方法&#xff1a; /*** 繪制多邊形* param {Object} op…

華為OD機試 - 公共子串計算(Java 2023 B卷 100分)

目錄 專欄導讀一、題目描述二、輸入描述三、輸出描述四、解題思路五、Java算法源碼六、效果展示 華為OD機試 2023B卷題庫瘋狂收錄中&#xff0c;刷題點這里 專欄導讀 本專欄收錄于《華為OD機試&#xff08;JAVA&#xff09;真題&#xff08;A卷B卷&#xff09;》。 刷的越多&…

VictoriaMetrics部署及vmalert集成釘釘告警

1、部署VictoriaMetrics cd /usr/local wget https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v1.65.0/victoria-metrics-amd64-v1.65.0.tar.gz mkdir victoria-metrics && tar -xvzf victoria-metrics-amd64-v1.65.0.tar.gz && \ mv …

論AI GPT跨境貿易架構及其應用

摘要 2023年初,我司啟動了智慧化跨境貿易供應鏈一體化平臺的建設工作。我在該項目中擔任系統架構設計師的職務,主要負責設計平臺系統架構和安全體系架構。該平臺以移動信息化發展為契機,采用”平臺+AI”的模式解決現有應用的集中移動化需求。平臺整體的邏輯復雜,對系統的高…

react之Hooks的介紹、useState與useEffect副作用的使用

react之Hooks的介紹、useState與useEffect副作用的使用 一、Hooks的基本介紹二、useState的使用2.1 簡單使用2.2 數組結構簡化2.3 狀態的讀取和修改2.3 組件的更新過程 三、useEffect的使用3.1 副作用介紹3.2 基本使用3.3 依賴3.4 不要對依賴項撒謊3.5 依賴項可以是空數組3.6 清…

ZZULIOJ 1193: 單科成績排序(結構體專題),Java

ZZULIOJ 1193: 單科成績排序&#xff08;結構體專題&#xff09;&#xff0c;Java 題目描述 有一學生成績表&#xff0c;包括學號、姓名、3門課程成績。請按要求排序輸出&#xff1a;若輸入1&#xff0c;則按第1門課成績降序輸出成績表&#xff0c;若輸入為i&#xff08;1<…

清風數學建模——擬合算法

擬合算法 文章目錄 擬合算法概念 確定擬合曲線最小二乘法的幾何解釋求解最小二乘法matlab求解最小二乘法如何評價擬合的好壞計算擬合優度的代碼 概念 在前面的篇幅中提到可以使用插值算法&#xff0c;通過給定的樣本點推算出一定的曲線從而推算出一些想要的值。但存在一些問題…

解決內網GitLab 社區版 15.11.13項目拉取失敗

問題描述 GitLab 社區版 發布不久&#xff0c;搭建在內網拉取項目報錯&#xff0c;可能提示 unable to access https://github.comxxxxxxxxxxx: Failed to connect to xxxxxxxxxxxxxGit clone error - Invalid argument error:14077438:SSL routines:SSL23_GET_S 15.11.13ht…

QT網絡編程之TCP

QT網絡編程之TCP TCP 編程需要用到倆個類: QTcpServer 和 QTcpSocket。 #------------------------------------------------- # # Project created by QtCreator 2023-08-

mysql截取最后一個字符之前的數據

1、mysql截取最后一個字符之前的數據 select --截取斜杠之前的數據REVERSE(SUBSTR(REVERSE(SPNH-dfg-2012) ; --截取斜杠后的數據 INSTR(REVERSE(SPNH-fg-2012),-)1))2、mysql獲取最后一個字符后的數據 select SUBSTRING_INDEX(SPNH-dfg-2012,-,-1) 3、mysql更新某個字段…

SpringBoot 該如何預防 XSS 攻擊

XSS 漏洞到底是什么&#xff0c;說實話我講不太清楚。但是可以通過遇到的現象了解一下。在前端Form表單的輸入框中&#xff0c;用戶沒有正常輸入&#xff0c;而是輸入了一段代碼&#xff1a;</input><img src1 onerroralert1> 這個正常保存沒有問題。問題出在了列表…