目錄
- 一、延遲消息
- 1.基于死信實現延遲消息
- 1.1 消息的TTL(Time To Live)
- 1.2 死信交換機 Dead Letter Exchanges
- 1.3 代碼實現
- 2.基于延遲插件實現延遲消息
- 2.1 插件安裝
- 2.2 代碼實現
- 3.基于延遲插件封裝消息
一、延遲消息
延遲消息有兩種實現方案:
1,基于死信隊列
2,集成延遲插件
1.基于死信實現延遲消息
使用RabbitMQ來實現延遲消息必須先了解RabbitMQ的兩個概念:
消息的TTL(存活時間)和死信交換機Exchange,通過這兩者的組合來實現延遲隊列
1.1 消息的TTL(Time To Live)
消息的TTL就是消息的存活時間。RabbitMQ可以對隊列和消息分別設置TTL。對隊列設置就是隊列沒有消費者連著的保留時間,也可以對每一個單獨的消息做單獨的設置。超過了這個時間,我們認為這個消息就死了,稱之為死信。
如何設置TTL:
我們創建一個隊列queue.temp,在Arguments 中添加x-message-ttl 為5000 (單位是毫秒),那所在壓在這個隊列的消息在5秒后會消失。
1.2 死信交換機 Dead Letter Exchanges
一個消息在滿足如下條件下,會進死信路由,記住這里是路由而不是隊列,一個路由可以對應很多隊列。
(1) 一個消息被Consumer拒收了,并且reject方法的參數里requeue是false。也就是說不會被再次放在隊列里,被其他消費者使用。
(2)上面的消息的TTL到了,消息過期了。
(3)隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。
Dead Letter Exchange其實就是一種普通的exchange,和創建其他exchange沒有兩樣。只是在某一個設置Dead Letter Exchange的隊列中有消息過期了,會自動觸發消息的轉發,發送到Dead Letter Exchange中去。
我們現在可以測試一下延遲隊列。
(1)創建死信隊列
(2)創建交換機
(3)建立交換器與隊列之間的綁定
(4)創建隊列
1.3 代碼實現
在service-mq 中添加配置類
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DeadLetterMqConfig {// 聲明一些變量public static final String exchange_dead = "exchange.dead";public static final String routing_dead_1 = "routing.dead.1";public static final String routing_dead_2 = "routing.dead.2";public static final String queue_dead_1 = "queue.dead.1";public static final String queue_dead_2 = "queue.dead.2";// 定義交換機@Beanpublic DirectExchange exchange(){return new DirectExchange(exchange_dead,true,false,null);}@Beanpublic Queue queue1(){// 設置如果隊列一 出現問題,則通過參數轉到exchange_dead,routing_dead_2 上!HashMap<String, Object> map = new HashMap<>();// 參數綁定 此處的key 固定值,不能隨意寫map.put("x-dead-letter-exchange",exchange_dead);map.put("x-dead-letter-routing-key",routing_dead_2);// 設置延遲時間map.put("x-message-ttl ", 10 * 1000);// 隊列名稱,是否持久化,是否獨享、排外的【true:只可以在本次連接中訪問】,是否自動刪除,隊列的其他屬性參數return new Queue(queue_dead_1,true,false,false,map);}@Beanpublic Binding binding(){// 將隊列一 通過routing_dead_1 key 綁定到exchange_dead 交換機上return BindingBuilder.bind(queue1()).to(exchange()).with(routing_dead_1);}// 這個隊列二就是一個普通隊列@Beanpublic Queue queue2(){return new Queue(queue_dead_2,true,false,false,null);}// 設置隊列二的綁定規則@Beanpublic Binding binding2(){// 將隊列二通過routing_dead_2 key 綁定到exchange_dead交換機上!return BindingBuilder.bind(queue2()).to(exchange()).with(routing_dead_2);}
}
配置發送消息
@RestController
@RequestMapping("/mq")
@Slf4j
public class MqController {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate RabbitService rabbitService;@GetMapping("sendDeadLettle")public Result sendDeadLettle() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");this.rabbitTemplate.convertAndSend(DeadLetterMqConfig.exchange_dead, DeadLetterMqConfig.routing_dead_1, "ok");System.out.println(sdf.format(new Date()) + " Delay sent.");return Result.ok();}
}
消息接收方
@Component
public class DeadLetterReceiver {@RabbitListener(queues = DeadLetterMqConfig.queue_dead_2)public void getMessage(String msg, Message message, Channel channel) throws IOException {//時間格式化SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("消息接收的時間:\t"+simpleDateFormat.format(new Date()));System.out.println("消息的內容"+msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}
}
2.基于延遲插件實現延遲消息
2.1 插件安裝
Rabbitmq實現了一個插件x-delay-message來實現延時隊列
- 首先我們將剛下載下來的rabbitmq_delayed_message_exchange-3.9.0.ez文件上傳到RabbitMQ所在服務器,下載地址:https://www.rabbitmq.com/community-plugins.html
- 切換到插件所在目錄,執行 docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins 命令,將剛插件拷貝到容器內plugins目錄下
- 執行 docker exec -it rabbitmq /bin/bash 命令進入到容器內部,并 cd plugins 進入plugins目錄
- 執行 ls -l|grep delay 命令查看插件是否copy成功
- 在容器內plugins目錄下,執行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 命令啟用插件
- exit命令退出RabbitMQ容器內部,然后執行 docker restart rabbitmq 命令重啟RabbitMQ容器
2.2 代碼實現
配置隊列
@Configuration
public class DelayedMqConfig {public static final String exchange_delay = "exchange.delay";public static final String routing_delay = "routing.delay";public static final String queue_delay_1 = "queue.delay.1";@Beanpublic Queue delayQeue1() {// 第一個參數是創建的queue的名字,第二個參數是是否支持持久化return new Queue(queue_delay_1, true);}@Beanpublic CustomExchange delayExchange() {Map<String, Object> args = new HashMap<String, Object>();args.put("x-delayed-type", "direct");return new CustomExchange(exchange_delay, "x-delayed-message", true, false, args);}@Beanpublic Binding delayBbinding1() {return BindingBuilder.bind(delayQeue1()).to(delayExchange()).with(routing_delay).noargs();}
}
發送消息
@GetMapping("sendelay")
public Result sendDelay() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");this.rabbitTemplate.convertAndSend(DelayedMqConfig.exchange_delay, DelayedMqConfig.routing_delay, sdf.format(new Date()), new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(10 * 1000);System.out.println(sdf.format(new Date()) + " Delay sent.");return message;}});return Result.ok();
}
接收消息
@Component
public class DelayReceiver {@RabbitListener(queues = DelayedMqConfig.queue_delay_1)public void get(String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("Receive queue_delay_1: " + sdf.format(new Date()) + " Delay rece." + msg);}}
3.基于延遲插件封裝消息
/*** 封裝發送延遲消息方法* @param exchange* @param routingKey* @param msg* @param delayTime* @return*/
public Boolean sendDelayMsg(String exchange,String routingKey, Object msg, int delayTime){// 將發送的消息 賦值到 自定義的實體類GmallCorrelationData gmallCorrelationData = new GmallCorrelationData();// 聲明一個correlationId的變量String correlationId = UUID.randomUUID().toString().replaceAll("-","");gmallCorrelationData.setId(correlationId);gmallCorrelationData.setExchange(exchange);gmallCorrelationData.setRoutingKey(routingKey);gmallCorrelationData.setMessage(msg);gmallCorrelationData.setDelayTime(delayTime);gmallCorrelationData.setDelay(true);// 將數據存到緩存this.redisTemplate.opsForValue().set(correlationId,JSON.toJSONString(gmallCorrelationData),10,TimeUnit.MINUTES);// 發送消息this.rabbitTemplate.convertAndSend(exchange,routingKey,msg,message -> {// 設置延遲時間message.getMessageProperties().setDelay(delayTime*1000);return message;},gmallCorrelationData);// 默認返回return true;
}
修改retrySendMsg方法 – 添加判斷是否屬于延遲消息
// 判斷是否屬于延遲消息
if (gmallCorrelationData.isDelay()){// 屬于延遲消息this.rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(),gmallCorrelationData.getRoutingKey(),gmallCorrelationData.getMessage(),message -> {// 設置延遲時間message.getMessageProperties().setDelay(gmallCorrelationData.getDelayTime()*1000);return message;},gmallCorrelationData);
}else {// 調用發送消息方法 表示發送普通消息 發送消息的時候,不能調用 new RabbitService().sendMsg() 這個方法this.rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(),gmallCorrelationData.getRoutingKey(),gmallCorrelationData.getMessage(),gmallCorrelationData);
}
利用封裝好的工具類 測試發送延遲消息
// 基于延遲插件的延遲消息
@GetMapping("sendDelay")
public Result sendDelay(){// 聲明一個時間對象SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("發送時間:"+simpleDateFormat.format(new Date()));this.rabbitService.sendDelayMsg(DelayedMqConfig.exchange_delay,DelayedMqConfig.routing_delay,"iuok",3);return Result.ok();
}
重試了4次,所以我們需要保證冪等性
結果會 回發送三次,也被消費三次!
如何保證消息冪等性?
1.使用數據方式
2.使用redis setnx 命令解決 — 推薦
@SneakyThrows
@RabbitListener(queues = DelayedMqConfig.queue_delay_1)
public void getMsg2(String msg,Message message,Channel channel){// 使用setnx 命令來解決 msgKey = delay:iuokString msgKey = "delay:"+msg;Boolean result = this.redisTemplate.opsForValue().setIfAbsent(msgKey, "0", 10, TimeUnit.MINUTES);// result = true : 說明執行成功,redis 里面沒有這個key ,第一次創建, 第一次消費。// result = false : 說明執行失敗,redis 里面有這個key// 不能: 那么就表示這個消息只能被消費一次! 那么第一次消費成功或失敗,我們確定不了! --- 只能被消費一次!// if (result){// SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// System.out.println("接收時間:"+simpleDateFormat.format(new Date()));// System.out.println("接收的消息:"+msg);// // 手動確認消息// channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);// } else {// // 不能消費!// }// 能: 保證消息被消費成功 第二次消費,可以進來,但是要判斷上一個消費者,是否將消息消費了。如果消費了,則直接返回,如果沒有消費成功,我消費。// 在設置key 的時候給了一個默認值 0 ,如果消費成功,則將key的值 改為1if (!result){// 獲取緩存key對應的數據String status = (String) this.redisTemplate.opsForValue().get(msgKey);if ("1".equals(status)){// 手動確認channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);return;} else {// 說明第一個消費者沒有消費成功,所以消費并確認SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("接收時間:"+simpleDateFormat.format(new Date()));System.out.println("接收的消息:"+msg);// 修改redis 中的數據this.redisTemplate.opsForValue().set(msgKey,"1");channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);return;}}SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("接收時間:"+simpleDateFormat.format(new Date()));System.out.println("接收的消息:"+msg);// 修改redis 中的數據this.redisTemplate.opsForValue().set(msgKey,"1");// 手動確認消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}