消息的TTL(Time To Live)就是消息的存活時間。
RabbitMQ可以對隊列和消息分別設置TTL。
對隊列設置存活時間,就是隊列沒有消費者連著的保留時間。
對每一個單獨的消息單獨的設置存活時間。超過了這個時間,我們認為這個消息就死了,稱之為死信。
如果隊列設置了TTL,消息也設置了,那么會取小的。所以一個消息如果被路由到不同的隊列中,這個消息死亡的時間有可能不一樣(不同的隊列設置)。這里單講單個消息的TTL,因為它才是實現延遲任務的關鍵。可以通過設置消息的expiration字段或者xmessage-ttl屬性來設置時間,兩者是一樣的效果。
死信路由?
一個消息在成為死信后,會進死信路由。
記住這里是路由而不是隊列,一個路由可以對應很多隊列。(什么是死信)
普通消息成為死信的條件:死信將被路由到死信交換機、再路由到普通隊列,普通隊列發送消息給消費者。
被拒收的消息:一個消息被Consumer拒收了,并且reject方法的參數里requeue是false。也就是說不會被再次放在隊列里,被其他消費者使用。(basic.reject/ basic.nack)requeue=false
TTL到期的消息:
消息的TTL到了,消息過期了。被隊列丟棄的消息:
隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上
死信交換機?
死信交換機Dead Letter Exchange其實就是一種普通的exchange,和創建其他exchange沒有兩樣。只是在某一個設置Dead Letter Exchange的隊列中有消息過期了,會自動觸發消息的轉發,發送到Dead Letter Exchange中去。
延時隊列?
先設置消息在TTL后變成死信,再設置隊列里死信統一被路由到一個指定的交換機,那個指定的交換機專門處理死信,達成延時隊列的效果。
手動ack&異常消息統一放在一個隊列處理建議的兩種方式
catch異常后,手動發送到指定隊列,然后使用channel給rabbitmq確認消息已消費
給Queue綁定死信隊列,使用nack(requque為false)確認消息消費失敗
?
?
/*** 創建隊列,交換機,延遲隊列,綁定關系 的configuration* 不會重復創建覆蓋* 1、第一次使用隊列【監聽】的時候才會創建* 2、Broker沒有隊列、交換機才會創建*/
@Configuration
public class MyRabbitMQConfig {// ? ?@RabbitHandler
// ? ?public void listen(Message message){
// ? ? ? ?System.out.println("收到消息:------>"+message);
// ? ?}/*** 延時隊列* @return*/@Beanpublic Queue orderDelayQueue(){HashMap<String, Object> arguments = new HashMap<>();/*** x-dead-letter-exchange :order-event-exchange 設置死信路由* x-dead-letter-routing-key : order.release.order 設置死信路由鍵* x-message-ttl :60000*/arguments.put("x-dead-letter-exchange","order-event-exchange");arguments.put("x-dead-letter-routing-key","order.release.order");arguments.put("x-message-ttl",30000);Queue queue = new Queue("order.delay.queue", true, false, false,arguments);return queue;}/*** 死信隊列* @return*/@Beanpublic Queue orderReleaseQueue(){Queue queue = new Queue("order.release.order.queue",true,false,false);return queue;}/*** 死信路由[普通路由]* @return*/@Beanpublic Exchange orderEventExchange(){/** ? String name,* ? boolean durable,* ? boolean autoDelete,* ? Map<String, Object> arguments* */TopicExchange topicExchange = new TopicExchange("order-event-exchange",true,false);return topicExchange;}/*** 交換機與延時隊列的綁定* @return*/@Beanpublic Binding orderCreateOrderBinding(){/** String destination, 目的地(隊列名或者交換機名字)* DestinationType destinationType, 目的地類型(Queue、Exhcange)* String exchange,* String routingKey,* Map<String, Object> arguments* */Binding binding = new Binding("order.delay.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null);return binding;}/*** 死信路由與普通死信隊列的綁定* @return*/@Beanpublic Binding orderReleaseOrderBinding(){Binding binding = new Binding("order.release.order.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);return binding;}
}