上篇文章:
RabbitMQ—消息可靠性保證https://blog.csdn.net/sniper_fandc/article/details/149311576?fromshare=blogdetail&sharetype=blogdetail&sharerId=149311576&sharerefer=PC&sharesource=sniper_fandc&sharefrom=from_link
目錄
1 TTL
1.1 介紹
1.2 消息TTL
1.3 隊列TTL
2 死信隊列
2.1 介紹
2.2 案例演示
2.2.1 消息超過TTL
2.2.2 手動確認拒絕重新入隊
2.2.3 隊列滿了
2.3 總結
3 延遲隊列
3.1 介紹
3.2 實現
3.3 延遲隊列插件
3.3.1 RabbitMQ Delayed Message Plugin插件安裝
3.3.2 插件使用
3.3.3 案例演示
3.4 總結
1 TTL
1.1 介紹
????????TTL表示過期時間,RabbitMQ提供對消息設置TTL和對隊列設置TTL兩種方式:
????????如果對隊列設置TTL,就表示隊列中的消息都是該TTL過期。
????????如果同時設置消息和隊列的TTL,那么取兩者較小值為TTL的時間。
????????TTL的應用場景還是比較廣泛的,比如訂單系統和支付系統通過RabbitMQ通信,訂單具有超時時間,超時未支付就會取消訂單,此時訂單就可以設置TTL。
1.2 消息TTL
????????隊列、交換機名稱和聲明:
public class RabbitMQConnection {public static final String TTL_QUEUE = "ttl.queue";public static final String TTL_EXCHANGE = "ttl.exchange";}
@Configurationpublic class RabbitMQConfig {@Bean("ttlQueue")public Queue ttlQueue(){return QueueBuilder.durable(RabbitMQConnection.TTL_QUEUE).build();}@Bean("ttlExchange")public DirectExchange ttlExchange(){return ExchangeBuilder.directExchange(RabbitMQConnection.TTL_EXCHANGE).durable(true).build();}@Bean("ttlQueueBinding")public Binding ttlQueueBinding(@Qualifier("ttlExchange") DirectExchange directExchange, @Qualifier("ttlQueue") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("ttl");}}
????????生產者:
@RestController@RequestMapping("/producer")public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;@RequestMapping("ttl")public String ttl(){MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("10000");return message;}};rabbitTemplate.convertAndSend(RabbitMQConnection.TTL_EXCHANGE, "ttl","Hello SpringBoot RabbitMQ",messagePostProcessor);return "發送成功";}}
????????設置消息TTL主要是在MessagePostProcessor類中為消息setExpiration()設置過期時間,單位為ms,接受類型為String。設置10s過期時間,觀察結果:
1.3 隊列TTL
????????隊列、交換機名稱和聲明:
public class RabbitMQConnection {public static final String TTL_QUEUE = "ttl.queue";public static final String TTL_QUEUE2 = "ttl.queue2";public static final String TTL_EXCHANGE = "ttl.exchange";}
@Configurationpublic class RabbitMQConfig {@Bean("ttlQueue")public Queue ttlQueue(){return QueueBuilder.durable(RabbitMQConnection.TTL_QUEUE).build();}@Bean("ttlQueue2")public Queue ttlQueue2(){return QueueBuilder.durable(RabbitMQConnection.TTL_QUEUE2).ttl(20000).build();}@Bean("ttlExchange")public DirectExchange ttlExchange(){return ExchangeBuilder.directExchange(RabbitMQConnection.TTL_EXCHANGE).durable(true).build();}@Bean("ttlQueueBinding")public Binding ttlQueueBinding(@Qualifier("ttlExchange") DirectExchange directExchange, @Qualifier("ttlQueue") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("ttl");}@Bean("ttlQueueBinding2")public Binding ttlQueueBinding2(@Qualifier("ttlExchange") DirectExchange directExchange, @Qualifier("ttlQueue2") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("ttl");}}
????????生產者:
@RestController@RequestMapping("/producer")public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;@RequestMapping("ttl")public String ttl(){rabbitTemplate.convertAndSend(RabbitMQConnection.TTL_EXCHANGE, "ttl","Hello SpringBoot RabbitMQ");return "發送成功";}}
????????設置隊列過期時間是在聲明隊列的時候,使用ttl()方法進行設置,單位ms。設置20s隊列過期時間,觀察結果:
????????注意:隊列TTL和消息TTL的區別。隊列TTL:一旦消息過期,會立即刪除消息。消息TTL:消息過期也不會立即從隊列中刪除,而是等待消息即將被投遞到消費者之間再判斷是否過期(是否刪除)。
????????為什么有這樣的區別?隊列TTL:因為隊列的先進先出特性,先進隊列的一定先到期,因此只需要定期從隊頭判斷是否過期,過期即連續刪除。消息TTL:由于每個消息的TTL都不一樣,因此如果要實時判斷消息是否過期就需要不斷掃描全隊列(開銷大),因此采用惰性刪除即消息即將被投遞到消費者之間再判斷是否過期。
????????上述區別會產生一個現象,在隊列TTL中,連續發送的消息(認為是在同一時刻)在超過過期時間后所有消息都會消失。而在消息TTL中,連續發送過期時間長和過期時間短的消息,即使過期時間短的消息已經過期,只要過期時間長的消息還未過期,過期時間短的消息仍然會在隊列中。
2 死信隊列
2.1 介紹
????????當消息因為某些原因,無法再被消費者消費,就變成死信(Dead Letter)。死信就會進入死信交換機(DLX,Dead Letter Exchange),由死信交換機路由到死信隊列(DLQ,Dead Letter Queue)。死信隊列的消息也可以被消費者消費:
????????消息變成死信有如下三種原因:
????????1.消息確認機制采用手動確認,并調用basicReject()或basicNack(),requeue==false。由于拒絕重新入隊,因此消息變為死信。
????????2.消息超過過期時間TTL,超過過期時間的消息無法再被消費,也就變成死信。
????????3.隊列滿了,隊列有長度限制,超出隊列長度的消息就會變為死信。
2.2 案例演示
2.2.1 消息超過TTL
????????隊列、交換機名稱和聲明:
public class RabbitMQConnection {public static final String NORMAL_QUEUE = "normal.queue";public static final String NORMAL_EXCHANGE = "normal.exchange";public static final String DL_QUEUE = "dl.queue";public static final String DL_EXCHANGE = "dl.exchange";}
@Configurationpublic class RabbitMQConfig {//正常隊列正常交換機和死信隊列死信交換機的聲明和綁定@Bean("normalQueue")public Queue normalQueue(){return QueueBuilder.durable(RabbitMQConnection.NORMAL_QUEUE).deadLetterExchange(RabbitMQConnection.DL_EXCHANGE)//綁定死信交換機.deadLetterRoutingKey("dl")//死信攜帶的routingKey.ttl(10000)//制造死信條件(消息過期).build();}@Bean("normalExchange")public DirectExchange normalExchange(){return ExchangeBuilder.directExchange(RabbitMQConnection.NORMAL_EXCHANGE).durable(true).build();}@Bean("normalQueueBinding")public Binding normalQueueBinding(@Qualifier("normalExchange") DirectExchange directExchange, @Qualifier("normalQueue") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("normal");}@Bean("dlQueue")public Queue dlQueue(){return QueueBuilder.durable(RabbitMQConnection.DL_QUEUE).build();}@Bean("dlExchange")public DirectExchange dlExchange(){return ExchangeBuilder.directExchange(RabbitMQConnection.DL_EXCHANGE).durable(true).build();}@Bean("dlQueueBinding")public Binding dlQueueBinding(@Qualifier("dlExchange") DirectExchange directExchange, @Qualifier("dlQueue") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("dl");}}
????????生產者:
@RestController@RequestMapping("/producer")public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;@RequestMapping("dl")public String dl(){rabbitTemplate.convertAndSend(RabbitMQConnection.NORMAL_EXCHANGE, "normal","Hello SpringBoot RabbitMQ");return "發送成功";}}
????????運行代碼可以發現,創建了兩個隊列normal.queue和dl.queue,normal.queue的TTL表示設置隊列過期時間,DLX表示該隊列綁定了死信交換機,DLK則是該隊列消息過期時變成死信發送給死信交換機攜帶了routingKey。
????????剛開始生產者給normal.queue發送消息,10s后隊列消息過期變為死信,就被發送到死信交換機,死信交換機將死信路由到dl.queue死信隊列。
2.2.2 手動確認拒絕重新入隊
????????添加消費者代碼(監聽正常隊列和死信隊列),構造拒絕重新入隊的死信條件(需要提前開啟manual級別):
@Componentpublic class DlListener {@RabbitListener(queues = RabbitMQConnection.NORMAL_QUEUE)public void queueListener1(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.printf("listener [" + RabbitMQConnection.NORMAL_QUEUE + "]收到消息:%s, deliveryTag:%d \n",new String(message.getBody(), "UTF-8"),deliveryTag);int num = 1 / 0;System.out.println("消息處理完成");channel.basicAck(deliveryTag, false);} catch (Exception e) {System.out.println("消息處理發生異常,拒絕重新入隊");channel.basicNack(deliveryTag, false, false);}}@RabbitListener(queues = RabbitMQConnection.DL_QUEUE)public void queueListener2(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("listener [" + RabbitMQConnection.DL_QUEUE + "]收到消息:%s, deliveryTag:%d \n",new String(message.getBody(), "UTF-8"),deliveryTag);System.out.println("listener [" + RabbitMQConnection.DL_QUEUE + "]消息處理完成");channel.basicAck(deliveryTag, false);}}
????????運行代碼可以發現,當正常隊列的消息被消費者1處理時發生異常,由于選擇拒絕重新入隊,因此消息變為死信進入死信隊列,然后就被消費者2監聽到并處理了消息。
2.2.3 隊列滿了
????????修改隊列聲明,添加最大長度屬性(需要先停止服務器并提前在管理界面刪除已經存在的隊列):
????@Bean("normalQueue")public Queue normalQueue(){return QueueBuilder.durable(RabbitMQConnection.NORMAL_QUEUE).deadLetterExchange(RabbitMQConnection.DL_EXCHANGE)//綁定死信交換機.deadLetterRoutingKey("dl")//死信攜帶的routingKey.ttl(10000)//制造死信條件(消息過期).maxLength(10).build();}
????????生產者連續發送20條消息:
????@RequestMapping("dl")public String dl() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(RabbitMQConnection.NORMAL_EXCHANGE, "normal", "Hello SpringBoot RabbitMQ");}return "發送成功";}
????????移除消費者,觀察管理界面的隊列消息變化情況:
????????可以發現normal.queue添加了Lim屬性表示隊列最大長度。由于隊長為10,因此發送20條消息后10條直接變為死信進入死信隊列。而10秒TTL后所有消息都過期,變為死信又進入死信隊列。
2.3 總結
????????死信和死信隊列:死信是指無法被消費者處理或消費的消息,死信隊列就是存儲死信的隊列。
????????死信產生原因:1.消息處理發生異常,手動確認返回Nack時選擇拒絕重新入隊的策略。2.消息TTL過期。3.隊列滿了。
????????死信隊列應用場景:比如訂單系統和支付系統,訂單消息存儲在消息隊列中,如果消息不能正確被支付系統處理,為了保證消息可靠性選擇手動確認并重新入隊,就會導致訂單系統反復處理失敗這條消息,其它消費無法得到處理(消息積壓)。此時就可以利用死信隊列,把處理失敗的消息不讓重新入隊,而是存入死信隊列,讓其它專門處理死信的消費者進行消費(支付失敗的訂單發送工單專門交給人工處理)。
????????還有就是訂單超時,那訂單系統的訂單的狀態就需要修改(待支付變為訂單超時),就可以把超時的訂單存到死信隊列,讓支付系統的其它消費者消費,支付系統通過給訂單系統發送消息來告知訂單狀態的改變。
3 延遲隊列
3.1 介紹
????????消息發送后,讓消費者等待一段時間再允許從隊列中獲取消息,這樣的隊列就是延遲隊列。
????????延遲隊列的作用類似于定時器,只是定時器往往工作在同一個服務場景,而延遲隊列可以工作在分布式場景。常見應用場景如下:
????????1.預約服務:在景區預約平臺或線上會議室預約后,往往都會把消息延遲到景區開門或會議開始前再通知提醒。
????????2.智能家居:用戶在下班前就通過手機或遠程遙控來控制家里的用電系統、空調、熱水器等在用戶快到家前就開始工作,這時控制指令就會延遲推送,待到達指定時間時再發送到家居的控制系統。
3.2 實現
????????RabbitMQ并沒有直接提供延遲隊列,延遲隊列的實現可以使用TTL+死信隊列來實現。
????????具體方案如下:
????????Queue是設置了TTL,需要延遲通知的消費者監聽死信隊列,當發送消息到Queue,假設延遲10s通知,那TTL就是10s。10s后消息變為死信,轉發到死信隊列,此時消費者就可以獲取到延遲10s的消息進行消費了。
????????具體代碼在死信隊列已經實現,不再演示。
3.3 延遲隊列插件
????????上述實現方式存在一個問題,如果設置的不是隊列的TTL,而是消息的TTL,各種消息TTL不一致,就會出現無法按特定時間延遲的結果。
????????比如消息1TTL為10s,消息2TTL為30s,消息2先發送,消息1后發送。由于消息2在隊頭,因此RabbitMQ就會反復掃描隊頭判斷是否過期,只有隊頭判斷過期后才會再判斷消息1,因此消息1本來要延遲10s,結果卻被迫延遲30s后再被消費。
????????要解決這個問題,RabbitMQ提供了一個插件專門用來提供延遲隊列功能。
3.3.1 RabbitMQ Delayed Message Plugin插件安裝
????????首先前往插件的github地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/
????????點擊右側的Releases:
????????找到和RabbitMQ匹配的版本下載.ez文件:
????????根據不同的操作系統尋找不同的安裝路徑,把下載的插件復制到該目錄里(如果沒有該目錄,手動創建):
????????注意:如果是docker,則使用如下命令:docker cp 宿主機文件 容器名稱或ID:容器目錄。
3.3.2 插件使用
????????查看插件:rabbitmq-plugins list
????????啟動插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
????????重啟服務:service rabbitmq-server restart
3.3.3 案例演示
????????隊列交換機名稱和聲明:
public class RabbitMQConnection {public static final String DELAY_QUEUE = "delay.queue";public static final String DELAY_EXCHANGE = "delay.exchange";}
@Configurationpublic class RabbitMQConfig {@Bean("delayQueue")public Queue delayQueue(){return QueueBuilder.durable(RabbitMQConnection.DELAY_QUEUE).build();}@Bean("delayExchange")public DirectExchange delayExchange(){return ExchangeBuilder.directExchange(RabbitMQConnection.DELAY_EXCHANGE).durable(true).delayed().build();}@Bean("delayQueueBinding")public Binding delayQueueBinding(@Qualifier("delayExchange") DirectExchange directExchange, @Qualifier("delayQueue") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("delay");}}
????????聲明交換機時,注意使用delayed()來聲明這是延遲交換機。這也就意味著延遲隊列插件的延遲原理不是放到隊列中延遲一定時間在消費,而是消息在交換機上延遲一定時間再把消息路由到隊列。
????????生產者代碼:
@RestController@RequestMapping("/producer")public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;@RequestMapping("delay")public String delay() {MessagePostProcessor messagePostProcessor1 = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(30000);//延遲30sreturn message;}};MessagePostProcessor messagePostProcessor2 = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(10000);//延遲10sreturn message;}};rabbitTemplate.convertAndSend(RabbitMQConnection.DELAY_EXCHANGE, "delay", "Hello SpringBoot RabbitMQ",messagePostProcessor1);rabbitTemplate.convertAndSend(RabbitMQConnection.DELAY_EXCHANGE, "delay", "Hello SpringBoot RabbitMQ",messagePostProcessor2);System.out.println("消息發送成功,當前時間:"+new Date());return "發送成功";}}
????????消費者代碼:
@Componentpublic class DelayListener {@RabbitListener(queues = RabbitMQConnection.DELAY_QUEUE)public void queueListener2(Message message, Channel channel) throws IOException {System.out.printf("listener [" + RabbitMQConnection.DELAY_QUEUE + "]收到消息:%s, 時間:%s \n",new String(message.getBody(), "UTF-8"), new Date());}}
????????可以發現使用延遲隊列插件,即使先發送延時時間長的消息,消息處理也不會亂序。運行結果如下:
3.4 總結
????????延遲隊列概念:消息需要經過一定時間的延遲后在發送給消費者進行消費,存儲這樣的消息的隊列就是延遲隊列。
????????應用場景:比如訂單超時支付自動取消,訂單系統下單時設置延遲時間,并將訂單消息投遞到RabbitMQ中,消息超時則把訂單消息發送給消費者(訂單系統的訂單狀態處理模塊),訂單系統根據是否收到支付系統支付成功的消息或超時訂單來修改訂單狀態(成功支付或超時未支付)。
????????還有就是用戶發起退款服務,退款信息消息設置延遲24小時并發送給RabbitMQ,當24小時內商家未退款,則退款信息消息自動發送給退款系統的消費者服務,由它們判斷是否需要退款(避免重復退款)。
????????實現方法以及優缺點對比:
????????1.死信隊列+TTL,優點:靈活實現不需要插件;缺點:存在消息順序性問題,需要增加代碼邏輯來實現(需要額外的死信交換機和死信隊列),增加系統復雜性。
????????2.延遲插件,優點:可以直接使用延遲隊列,簡化延遲消息的實現邏輯,同時沒有消息順序性問題;缺點:需要特定的插件,并且插件還得注意版本問題。
下篇文章:
RabbitMQ—事務與消息分發https://blog.csdn.net/sniper_fandc/article/details/149312189?fromshare=blogdetail&sharetype=blogdetail&sharerId=149312189&sharerefer=PC&sharesource=sniper_fandc&sharefrom=from_link