RabbitMQ的高級特性還包括我的上篇博客
【RabbitMQ】-----詳解RabbitMQ高級特性之消息確認機制-CSDN博客
目錄
RabbitMQ高級特性之持久性
持久性
交換機持久化
隊列持久化+消息持久化
RabbitMQ高級特性之發送方確認機制
發送方確認
添加配置
常量類
聲明隊列和交換機并綁定二者關系
confirm確認模式?
編寫生產消息代碼
生產消息1
解決方法
解決方法
return 模式
編寫生產消息代碼(路由正確)
編寫生產消息代碼(路由錯誤)
面試題
RabbitMQ高級特性之重試機制
配置類
常量類
聲明隊列和交換機并綁定二者關系
編寫生產消息代碼
編寫消費消息代碼1(自動確認)
編寫消費消息代碼2(自動確認)
編寫消費消息代碼3(手動確認)
RabbitMQ高級特性之TTL
TTL
添加配置
常量類
消息的TTL
聲明隊列和交換機并綁定二者關系
編寫生產消息代碼
生產消息
隊列的TTL
聲明隊列和交換機并綁定二者關系
編寫生產消息代碼
生產消息(消息無TTL)
生產消息(消息有TTL)
消息的TTL和隊列的TTL
RabbitMQ高級特性之死信隊列
死信隊列
添加配置
常量類
聲明隊列和交換機并綁定二者關系
死信--消息過期
給隊列設置TTL
編寫生產消息代碼
編寫消費消息代碼
觀察現象
死信--消息超過隊列最大長度
設置隊列的最大長度
編寫生產消息代碼
編寫消費消息代碼
死信--消息被拒絕
編寫生產消息代碼
面試題
RabbitMQ高級特性之延遲隊列
延遲隊列
應用場景
TTL+死信隊列 實現延遲隊列
添加配置
常量類
聲明隊列和交換機并綁定二者關系
編寫生產消息代碼
編寫消費消息代碼
修改生產消息代碼
延遲隊列插件
兩者的區別:
RabbitMQ高級特性之事務
RabbitMQ高級特性之消息分發
消息分發
添加配置
常量類
聲明配置和交換機并綁定二者關系
限流
編寫生產消息代碼
編寫消費消息代碼1
編寫消費消息代碼2
負載均衡
更改配置
編寫消費消息代碼
RabbitMQ高級特性之持久性
持久性
RabbitMQ的持久化分為三個部分:交換器的持久化、隊列的持久化和消息的持久化。
交換機持久化
交換器的持久化是通過在聲明交換機時是將durable參數置為true實現的.相當于將交換機的屬性在服務器內部保存,當MQ的服務器發?意外或關閉之后,重啟 RabbitMQ 時不需要重新去建?交換機, 交換機會?動建?,相當于?直存在.
如果交換器不設置持久化, 那么在 RabbitMQ 服務重啟之后, 相關的交換機元數據會丟失, 對?個?期使?的交換器來說,建議將其置為持久化的.
重啟RabbitMQ服務前:
重啟RabbitMQ服務后:
由此我們可以看到,重啟RabbitMQ服務之后,重啟服務之前聲明的持久化交換機依舊存在
交換機設置了非持久化之后,當重啟RabbitMQ服務之后,會發現之前聲明的非持久化交換機沒有了,因為消息依托于隊列,而隊列依托于交換機,所以當交換機非持久化之后,無論消息和隊列是否持久化,當重啟RabbitMQ之后,消息,隊列和交換機都不復存在了。
隊列持久化+消息持久化
因為消息和隊列二者之間的關系比較強,所以下面將隊列的持久化和非持久化與消息的持久化和非持久化一起來講解。
隊列的持久化是通過在聲明隊列時將 durable 參數置為 true實現的.
如果隊列不設置持久化, 那么在RabbitMQ服務重啟之后,該隊列就會被刪掉, 此時數據也會丟失. (隊列沒有了, 消息也?處可存了)
隊列的持久化能保證該隊列本?的元數據不會因異常情況?丟失, 但是并不能保證內部所存儲的消息不會丟失. 要確保消息不會丟失, 需要將消息設置為持久化。
消息實現持久化, 需要把消息的投遞模式( MessageProperties 中的 deliveryMode )設置為2,也就是 MessageDeliveryMode.PERSISTENT。
設置了隊列和消息的持久化, 當 RabbitMQ 服務重啟之后, 消息依舊存在. 如果只設置隊列持久化, 重啟之后消息會丟失. 如果只設置消息的持久化, 重啟之后隊列消失, 繼?消息也丟失. 所以單單設置消息持久化?不設置隊列的持久化顯得毫?意義
隊列持久+消息持久
生產消息
重啟RabbitMQ服務后:
此時我們可以看到,當重啟RabbitMQ服務后,重啟RabbitMQ服務之前聲明的持久化隊列和持久化消息都還是存在的。?
隊列持久+消息非持久
重啟RabbitMQ服務后:
隊列非持久+消息持久
重啟RabbitMQ服務后:
隊列非持久+消息非持久
重啟RabbitMQ服務后:
所以只有當交換機+隊列+消息都是持久化的時候才具有持久性
RabbitMQ高級特性之發送方確認機制
發送方確認
在使? RabbitMQ的時候, 可以通過消息持久化來解決因為服務器的異常崩潰?導致的消息丟失, 但是還有?個問題, 當消息的?產者將消息發送出去之后, 消息到底有沒有正確地到達服務器呢? 如果在消息到達服務器之前已經丟失(?如RabbitMQ重啟, 那么RabbitMQ重啟期間?產者消息投遞失敗), 持久化操作也解決不了這個問題,因為消息根本沒有到達服務器,何談持久化?
RabbitMQ為我們提供了兩種解決?案:
a. 通過事務機制實現
b. 通過發送?確認(publisher confirm) 機制實現
事務機制?較消耗性能, 在實際?作中使?也不多, 下面主要介紹confirm機制來實現發送?的確認.
RabbitMQ為我們提供了兩個?式來控制消息的可靠性投遞:
1. confirm確認模式
2. return退回模式
添加配置
spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://study:study@47.98.109.138:5672/extensionpublisher-confirm-type: correlated #消息發送確認
常量類
public class Constants {//發送方確認public static final String CONFIRM_QUEUE = "confirm.queue";public static final String CONFIRM_EXCHANGE = "confirm.exchange";
}
聲明隊列和交換機并綁定二者關系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import rabbitextensionsdemo.constant.Constants;@Configuration
public class RabbitMQConfig {//發送方確認@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(Constants.CONFIRM_QUEUE).build();}@Bean("confirmExchange")public DirectExchange confirmExchange(){return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE).build();}@Bean("confirmBinding")public Binding confirmBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("confirm").noargs();}
}
confirm確認模式?
Producer 在發送消息的時候, 對發送端設置?個ConfirmCallback的監聽, ?論消息是否到達
Exchange, 這個監聽都會被執?, 如果Exchange成功收到, ACK( Acknowledge character , 確認字符)為true, 如果沒收到消息, ACK就為false。
RabbitTemplate.ConfirmCallback 和 ConfirmListener 區別
在RabbitMQ中, ConfirmListener和ConfirmCallback都是?來處理消息確認的機制, 但它們屬于不同的客?端庫, 并且使?的場景和?式有所不同.
1. ConfirmListener 是 RabbitMQ Java Client 庫中的接?. 這個庫是 RabbitMQ 官?提供的?個直接與RabbitMQ服務器交互的客?端庫. ConfirmListener 接?提供了兩個?法: handleAck 和handleNack, ?于處理消息確認和否定確認的事件.
2. ConfirmCallback 是 Spring AMQP 框架中的?個接?. 專?為Spring環境設計. ?于簡化與
RabbitMQ交互的過程. 它只包含?個 confirm ?法,?于處理消息確認的回調.
在 Spring Boot 應?中, 通常會使? ConfirmCallback, 因為它與 Spring 框架的其他部分更加整合, 可以利? Spring 的配置和依賴注?功能. ?在使? RabbitMQ Java Client 庫時, 則可能會直接實現ConfirmListener 接?, 更直接的與RabbitMQ的Channel交互
編寫生產消息代碼
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import rabbitextensionsdemo.constant.Constants;import java.util.Date;@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate confirmRabbitTemplate;@RequestMapping("/confirm")public String confirm() {confirmRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("執行了confirm方法");if (ack){System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);//相應的業務處理}}});CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm", "confirm test...", correlationData);return "消息發送成功";}
}
public interface ConfirmCallback {/*** 確認回調* @param correlationData: 發送消息時的附加信息 , 通常?于在確認回調中識別特定的消 息* @param ack: 交換機是否收到消息 , 收到為 true, 未收到為 false* @param cause: 當消息確認失敗時 , 這個字符串參數將提供失敗的原因 . 這個原因可以?于調 試和錯誤處理 .* 成功時 , cause 為 null*/void confirm ( @Nullable CorrelationData correlationData, boolean ack,@Nullable String cause); }
生產消息1
第一次生產消息
第二次生產消息
此時我們看到,第一次生產消息時能夠正常生產消息,但是當我們第二次生產消息時卻拋異常了,異常信息為:java.lang.IllegalStateException: Only one ConfirmCallback is supported by each RabbitTemplate
解決方法
是為什么呢?從異常信息中我們可以看到,ConfirmCallback只能被設置一次,但是從我們的代碼中可以看到,我們每次生產消息時都會設置一次ConfirmCallback,顯然這就是問題所在。
下面我們把剛剛的ConfirmCallback提取出來,重新設置RabbitTemplate。
RabbitTemplateConfig
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//設置回調方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("執行了confirm方法");if (ack){System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);//相應的業務處理}}});return rabbitTemplate;}
}
ProducerController
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import rabbitextensionsdemo.constant.Constants;@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate RabbitTemplate confirmRabbitTemplate;@RequestMapping("/pres")public String pres() {Message message = new Message("Presistent test...".getBytes(), new MessageProperties());//消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);System.out.println(message);rabbitTemplate.convertAndSend(Constants.PRES_EXCHANGE, "pres", message);return "消息發送成功";}@RequestMapping("/confirm")public String confirm() {CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm", "confirm test...", correlationData);return "消息發送成功";}
}
此時我們可以看到,我們解決了前面多次生產消息導致的ConfirmCallback被設置多次的問題,但是我們此時的代碼就真的沒有問題了嗎?
當我們生產其它消息時,發現我們并沒有給這個生產消息的方法設置ConfirmCallback啊,但是為什么在控制臺上看到執行了我們設置的ConfrimCallback,這是為什么呢?
是因為我們在前面設置了RabbitTemplate,而且使用了@Autowired注解注入了RabbitTemplate,雖然我們注入了兩個,一個是rabbitTemplate,一個是confirmRabbitTemplate,但是這兩個都是同一個RabbitTemplate。
解決方法
解決辦法:我們在RabbitTemplateConfig中設置兩個RabbitTemplate.
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);return rabbitTemplate;}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//設置回調方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("執行了confirm方法");if (ack){System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);//相應的業務處理}}});return rabbitTemplate;}
}
與此同時,我們修改注入方式:
此時,當再次使用/producer/pres來生產消息時,就沒問題了。
下面我們修改一下生產消息時給消息設置的路由規則:
@RequestMapping("/confirm")public String confirm() {CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm111", "confirm test...", correlationData);return "消息發送成功";}
生產消息
我們知道,上面生產消息時給消息設置的路由規則并不存在,按道理說,應該會打印“未收到消息”而非“收到消息”,原因是因為,上面的confirm確認模式是用來確定生產消息是否到達了交換機,而上面的路由規則是針對消息從交換機到隊列的,解決上面的路由問題使用到另一種確認模式。
return 模式
消息到達Exchange之后, 會根據路由規則匹配, 把消息放?Queue中. Exchange到Queue的過程, 如果?條消息?法被任何隊列消費(即沒有隊列與消息的路由鍵匹配或隊列不存在等), 可以選擇把消息退回給發送者. 消息退回給發送者時, 我們可以設置?個返回回調?法, 對消息進?處理。
修改RabbitTemplateConfig,設置消息退回的回調方法
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);return rabbitTemplate;}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//設置回調方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("執行了confirm方法");if (ack){System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);//相應的業務處理}}});//消息被退回時, 回調方法rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息退回:"+returned);}});return rabbitTemplate;}
}
使?RabbitTemplate的setMandatory?法設置消息的mandatory屬性為true(默認為false). 這個屬性的作?是告訴RabbitMQ, 如果?條消息?法被任何隊列消費, RabbitMQ應該將消息返回給發送者, 此時 ReturnCallback 就會被觸發。
回調函數中有?個參數: ReturnedMessage, 包含以下屬性:
public class ReturnedMessage {//返回的消息對象,包含了消息體和消息屬性private final Message message;//由 Broker 提供的回復碼 , 表?消息?法路由的原因 . 通常是?個數字代碼,每個數字代表不同 的含義 .private final int replyCode;//?個?本字符串 , 提供了?法路由消息的額外信息或錯誤描述 .private final String replyText;//消息被發送到的交換機名稱private final String exchange;//消息的路由鍵,即發送消息時指定的鍵private final String routingKey;
}
編寫生產消息代碼(路由正確)
@RequestMapping("/returns")public String returns() {CorrelationData correlationData = new CorrelationData("5");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm", "returns test...", correlationData);return "消息發送成功";}
編寫生產消息代碼(路由錯誤)
@RequestMapping("/returns")public String returns() {CorrelationData correlationData = new CorrelationData("5");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm111", "returns test...", correlationData);return "消息發送成功";}
此時我們可以看到,隊列中依舊是只有1條消息,而且代碼執行了消息退回,而且消息退回時打印了消息信息,顯然我們可以看到,消息的路由規則是錯誤的,不會入隊列。
面試題
如何保證RabbitMQ消息的可靠傳輸?
從這個圖中, 可以看出, 消息可能丟失的場景以及解決?案:
1. ?產者將消息發送到 RabbitMQ失敗
? ? ? ? a. 可能原因: ?絡問題等
? ? ? ? b. 解決辦法: [發送?確認-confirm確認模式]
2. 消息在交換機中?法路由到指定隊列:
? ? ? ? a. 可能原因: 代碼或者配置層?錯誤, 導致消息路由失敗
? ? ? ? b. 解決辦法: [發送?確認-return模式]3. 消息隊列??數據丟失
? ? ? ? a. 可能原因: 消息到達RabbitMQ之后, RabbitMQ Server 宕機導致消息丟失.
? ? ? ? b. 解決辦法: [持久性]. 開啟 RabbitMQ持久化, 就是消息寫?之后會持久化到磁盤, 如果RabbitMQ 掛了, 恢復之后會?動讀取之前存儲的數據. (極端情況下, RabbitMQ還未持久化就掛了, 可能導致少量數據丟失, 這個概率極低, 也可以通過集群的?式提?可靠性)
4. 消費者異常, 導致消息丟失
? ? ? ? a. 可能原因: 消息到達消費者, 還沒來得及消費, 消費者宕機. 消費者邏輯有問題.
? ? ? ? b. 解決辦法: [消息確認]. RabbitMQ 提供了 消費者應答機制 來使 RabbitMQ 能夠感知到消費者是否消費成功消息. 默認情況下消費者應答機制是?動應答的, 可以開啟?動確認, 當消費者確認消費成功后才會刪除消息, 從?避免消息丟失. 除此之外, 也可以配置重試機制, 當消息消費異常時, 通過消息重試確保消息的可靠性。
RabbitMQ高級特性之重試機制
在消息傳遞過程中, 可能會遇到各種問題, 如?絡故障, 服務不可?, 資源不?等, 這些問題可能導致消息處理失敗. 為了解決這些問題, RabbitMQ 提供了重試機制, 允許消息在處理失敗后重新發送.但如果是程序邏輯引起的錯誤, 那么多次重試也是沒有?的, 可以設置重試次數.
添加配置
配置類
spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://study:study@47.98.109.138:5672/extensionlistener:simple:acknowledge-mode: auto #消息接收確認retry:enabled: true # 開啟消費者失敗重試initial-interval: 5000ms # 初始失敗等待時長為5秒max-attempts: 5 # 最大重試次數
常量類
public class Constants {//重試機制public static final String RETRY_QUEUE = "retry.queue";public static final String RETRY_EXCHANGE = "retry.exchange";
}
聲明隊列和交換機并綁定二者關系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import rabbitextensionsdemo.constant.Constants;@Configuration
public class RabbitMQConfig {//重試機制@Bean("retryQueue")public Queue retryQueue(){return QueueBuilder.durable(Constants.RETRY_QUEUE).build();}@Bean("retryExchange")public DirectExchange retryExchange(){return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).build();}@Bean("retryBinding")public Binding retryBinding(@Qualifier("retryQueue") Queue queue, @Qualifier("retryExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("retry").noargs();}
}
編寫生產消息代碼
@RequestMapping("/retry")public String retry() {System.out.println("retry...");rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE, "retry", "retry test...");return "消息發送成功";}
編寫消費消息代碼1(自動確認)
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import rabbitextensionsdemo.constant.Constants;import java.io.UnsupportedEncodingException;@Component
public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void handlerMessage(Message message) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("["+ Constants.RETRY_QUEUE+"]接收到消息: %s, deliveryTag: %s \n", new String(message.getBody(), "UTF-8"), deliveryTag);int num = 3/0;System.out.println("業務處理完成");}
}
此時我們可以看到,消費方發生了異常,接著進行了5次重試,然后就拋異常了。
編寫消費消息代碼2(自動確認)
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import rabbitextensionsdemo.constant.Constants;@Component
public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void handlerMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("["+Constants.RETRY_QUEUE+"]接收到消息: %s, deliveryTag: %s \n", new String(message.getBody(), "UTF-8"), deliveryTag);try {int num = 3/0;System.out.println("業務處理完成");}catch (Exception e){System.out.println("業務處理失敗");}}
}
消費消息
此時我們發現,雖然配置了重試次數為5次,但是在處理異常后,并沒有進行重試。
編寫消費消息代碼3(手動確認)
spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://study:study@47.98.109.138:5672/extensionlistener:simple:acknowledge-mode: manual #消息接收確認retry:enabled: true # 開啟消費者失敗重試initial-interval: 5000ms # 初始失敗等待時長為5秒max-attempts: 5 # 最大重試次數
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import rabbitextensionsdemo.constant.Constants;@Component
public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void handlerMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("["+Constants.RETRY_QUEUE+"]接收到消息: %s, deliveryTag: %s \n", new String(message.getBody(), "UTF-8"), deliveryTag);try {int num = 3/0;System.out.println("業務處理完成");channel.basicAck(deliveryTag, false);}catch (Exception e){System.out.println("業務處理失敗");channel.basicNack(deliveryTag, false, true);}}
}
因為拋出了異常,設置了手動確認,而且設置了拒絕確認時重新入隊,所以會不停地在控制臺打印,而且deleveryTag的值不斷增加,是因為消息不斷重新出隊和入隊。
可以看到, ?動確認模式時, 重試次數的限制不會像在?動確認模式下那樣直接?效, 因為是否重試以及何時重試更多地取決于應?程序的邏輯和消費者的實現.
- ?動確認模式下, RabbitMQ 會在消息被投遞給消費者后?動確認消息. 如果消費者處理消息時拋出異常, RabbitMQ 根據配置的重試參數?動將消息重新?隊, 從?實現重試. 重試次數和重試間隔等參數可以直接在RabbitMQ的配置中設定,并且RabbitMQ會負責執?這些重試策略.
- ?動確認模式下, 消費者需要顯式地對消息進?確認. 如果消費者在處理消息時遇到異常, 可以選擇不確認消息使消息可以重新?隊. 重試的控制權在于應?程序本?, ?不是RabbitMQ的內部機制. 應?程序可以通過??的邏輯和利?RabbitMQ的?級特性來實現有效的重試策略。
使?重試機制時需要注意:
1. ?動確認模式下: 程序邏輯異常, 多次重試還是失敗, 消息就會被?動確認, 那么消息就丟失
了
2. ?動確認模式下: 程序邏輯異常, 多次重試消息依然處理失敗, ?法被確認, 就?直是
unacked的狀態, 導致消息積壓
RabbitMQ高級特性之TTL
TTL
TTL(Time?to?Live, 過期時間), 即過期時間.?RabbitMQ可以對消息和隊列設置TTL.
當消息到達存活時間之后, 還沒有被消費, 就會被?動清除。
咱們在?上購物, 經常會遇到?個場景, 當下單超過24?時還未付款, 訂單會被?動取消
還有類似的, 申請退款之后, 超過7天未被處理, 則?動退款。
添加配置
spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://study:study@47.98.109.138:5672/extension
常量類
public class Constants {//ttlpublic static final String TTL_QUEUE = "ttl.queue";public static final String TTL_QUEUE2 = "ttl2.queue";public static final String TTL_EXCHANGE = "ttl.exchange";
}
消息的TTL
聲明隊列和交換機并綁定二者關系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import rabbitextensionsdemo.constant.Constants;@Configuration
public class RabbitMQConfig { @Bean("ttlQueue")public Queue ttlQueue(){return QueueBuilder.durable(Constants.TTL_QUEUE).build();}@Bean("ttlExchange")public DirectExchange ttlExchange(){return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();}@Bean("ttlBinding")public Binding ttlBinding(@Qualifier("ttlQueue") Queue queue, @Qualifier("ttlExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();}
}
編寫生產消息代碼
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import rabbitextensionsdemo.constant.Constants;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;@RequestMapping("/ttl")public String ttl() {System.out.println("ttl...");rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test 30s...", message -> {message.getMessageProperties().setExpiration("30000"); //單位: 毫秒, 過期時間為30sreturn message;});rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test 10s...", message -> {message.getMessageProperties().setExpiration("10000"); //單位: 毫秒, 過期時間為10sreturn message;});return "消息發送成功";}
}
生產消息
我們可以看到,生產的兩條消息的確消失了,但是耗時30秒,這是為什么呢?
原因是因為設置消息的TTL,哪怕消息過期了,也不會立即刪除,而是在將消息投遞給消費者之前進行判定
隊列的TTL
聲明隊列和交換機并綁定二者關系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import rabbitextensionsdemo.constant.Constants;@Configuration
public class RabbitMQConfig {@Bean("ttlQueue2")public Queue ttlQueue2(){return QueueBuilder.durable(Constants.TTL_QUEUE2).ttl(20000).build(); //設置隊列的ttl為20s}@Bean("ttlExchange")public DirectExchange ttlExchange(){return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();}@Bean("ttlBinding2")public Binding ttlBinding2(@Qualifier("ttlQueue2") Queue queue, @Qualifier("ttlExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();}
}
編寫生產消息代碼
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import rabbitextensionsdemo.constant.Constants;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;@RequestMapping("/ttl")public String ttl() {System.out.println("ttl...");rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test 30s...", message -> {message.getMessageProperties().setExpiration("30000"); //單位: 毫秒, 過期時間為30sreturn message;});return "消息發送成功";}@RequestMapping("/ttl2")public String ttl2() {System.out.println("ttl2...");//發送普通消息rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test...");return "消息發送成功";}
}
生產消息(消息無TTL)
我們可以看到,雖然沒有給消息設置TTL,但是給ttl2.queue隊列設置了20秒的TTL,20秒過后,ttl2.queue隊列中的消息消失了。
生產消息(消息有TTL)
此時我們可以看到,20秒之后,消息消失了,我們給隊列設置的TTL為20秒,給消息設置的TTL為30秒,最終消息的TTL 為 min(消息的TTL,隊列的TTL)
消息的TTL和隊列的TTL
設置隊列TTL屬性的?法, ?旦消息過期, 就會從隊列中刪除
設置消息TTL的?法, 即使消息過期, 也不會?上從隊列中刪除, ?是在即將投遞到消費者之前進?判定的.
為什么這兩種?法處理的?式不?樣?
因為設置隊列過期時間, 隊列中已過期的消息肯定在隊列頭部, RabbitMQ只要定期從隊頭開始掃描是否有過期的消息即可.
?設置消息TTL的?式, 每條消息的過期時間不同, 如果要刪除所有過期消息需要掃描整個隊列, 所以不如等到此消息即將被消費時再判定是否過期, 如果過期再進?刪除即可.
RabbitMQ高級特性之死信隊列
死信隊列
死信(dead message) 簡單理解就是因為種種原因, ?法被消費的信息, 就是死信.
有死信, ?然就有死信隊列. 當消息在?個隊列中變成死信之后,它能被重新被發送到另?個交換器
中,這個交換器就是DLX( Dead Letter Exchange ), 綁定DLX的隊列, 就稱為死信隊列(Dead
Letter Queue,簡稱DLQ).
消息變成死信?般是由于以下?種情況:
1. 消息被拒絕(?Basic.Reject/Basic.Nack ),并且設置 requeue 參數為 false.
2. 消息過期.
3. 隊列達到最??度.
添加配置
spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://study:study@47.98.109.138:5672/extension
常量類
public class Constants {//死信public static final String NORMAL_QUEUE = "normal.queue";public static final String NORMAL_EXCHANGE = "normal.exchange";public static final String DL_QUEUE = "dl.queue";public static final String DL_EXCHANGE= "dl.exchange";
}
聲明隊列和交換機并綁定二者關系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import rabbitextensionsdemo.constant.Constants;@Configuration
public class DLConfig {//正常的交換機和隊列@Bean("normalQueue")public Queue normalQueue(){return QueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dlx").build();}@Bean("normalExchange")public DirectExchange normalExchange(){return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();}//死信交換機和隊列@Bean("dlQueue")public Queue dlQueue(){return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("dlExchange")public DirectExchange dlExchange(){return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();}@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlQueue") Queue queue, @Qualifier("dlExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();}
}
死信--消息過期
給隊列設置TTL
編寫生產消息代碼
@RequestMapping("/dl")public String dl() {System.out.println("dl...");//發送普通消息rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dl test...");System.out.printf("%tc 消息發送成功 \n", new Date());return "消息發送成功";}
編寫消費消息代碼
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import rabbitextensionsdemo.constant.Constants;import java.util.Date;@Component
public class DLListener {@RabbitListener(queues = Constants.DL_QUEUE)public void dlHandMessage(Message message, Channel channel) throws Exception {//消費者邏輯System.out.printf("[dl.queue] %tc 接收到消息: %s, deliveryTag: %d \n", new Date(), new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());}
}
觀察現象
我們可以看到,消息在10秒后過期,從normal隊列進入到了死信隊列,消息進入到死信隊列后被消費。
死信--消息超過隊列最大長度
設置隊列的最大長度
編寫生產消息代碼
@RequestMapping("/dl")public String dl() {//測試隊列長度for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dl test..."+i);}return "消息發送成功";}
編寫消費消息代碼
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import rabbitextensionsdemo.constant.Constants;import java.util.Date;@Component
public class DLListener {@RabbitListener(queues = Constants.DL_QUEUE)public void dlHandMessage(Message message, Channel channel) throws Exception {//消費者邏輯System.out.printf("[dl.queue] %tc 接收到消息: %s, deliveryTag: %d \n", new Date(), new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());}
}
此時我們可以看到,給隊列設置了最大長度為10,但是隊列接收到了20條消息,就會導致前10條消息變成死信。
死信--消息被拒絕
編寫生產消息代碼
@RequestMapping("/dl")public String dl() {System.out.println("dl...");//發送普通消息rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dl test...");System.out.printf("%tc 消息發送成功 \n", new Date());return "消息發送成功";}
編寫消費消息代碼
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import rabbitextensionsdemo.constant.Constants;@Component
public class DLListener {@RabbitListener(queues = Constants.NORMAL_QUEUE)public void handMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消費者邏輯System.out.printf("[normal.queue]接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());//進行業務邏輯處理System.out.println("業務邏輯處理");int num = 3/0;System.out.println("業務處理完成");//肯定確認channel.basicAck(deliveryTag,false);} catch (Exception e) {//否定確認channel.basicNack(deliveryTag, false, false); //requeue為false, 該消息成為死信}}
}
可以看到,normal隊列中的消息在被消費時因為發生了異常而執行到了拒絕消息的代碼,而且設置了消息不重新入隊,導致消息變成了死信,進而進入到了死信隊列。
面試題
1.死信隊列的概念
死信(Dead Letter)是消息隊列中的?種特殊消息, 它指的是那些?法被正常消費或處理的消息. 在消息隊列系統中, 如RabbitMQ, 死信隊列?于存儲這些死信消息。
2.死信的來源
1) 消息過期: 消息在隊列中存活的時間超過了設定的TTL
2) 消息被拒絕: 消費者在處理消息時, 可能因為消息內容錯誤, 處理邏輯異常等原因拒絕處理該消息. 如果拒絕時指定不重新?隊(requeue=false), 消息也會成為死信.
3) 隊列滿了: 當隊列達到最??度, ?法再容納新的消息時, 新來的消息會被處理為死信.
3.死信的應用場景?
對于RabbitMQ來說, 死信隊列是?個?常有?的特性. 它可以處理異常情況下,消息不能夠被消費者正確消費?被置?死信隊列中的情況, 應?程序可以通過消費這個死信隊列中的內容來分析當時所遇到的異常情況, 進?可以改善和優化系統.
?如: ???付訂單之后, ?付系統會給訂單系統返回當前訂單的?付狀態
為了保證?付信息不丟失, 需要使?到死信隊列機制. 當消息消費異常時, 將消息投?到死信隊列中, 由訂單系統的其他消費者來監聽這個隊列, 并對數據進?處理(?如發送?單等,進???確認).
場景的應?場景還有:
? 消息重試:將死信消息重新發送到原隊列或另?個隊列進?重試處理.
? 消息丟棄:直接丟棄這些?法處理的消息,以避免它們占?系統資源.
? ?志收集:將死信消息作為?志收集起來,?于后續分析和問題定位.
RabbitMQ高級特性之延遲隊列
延遲隊列
延遲隊列(Delayed Queue),即消息被發送以后, 并不想讓消費者?刻拿到消息, ?是等待特定時間后,消費者才能拿到這個消息進?消費
應用場景
延遲隊列的使?場景有很多, ?如:
1. 智能家居: ??希望通過?機遠程遙控家?的智能設備在指定的時間進??作. 這時候就可以將??指令發送到延遲隊列, 當指令設定的時間到了再將指令推送到智能設備.
2. ?常管理: 預定會議后,需要在會議開始前?五分鐘提醒參會?參加會議
3. ??注冊成功后, 7天后發送短信, 提???活躍度等
RabbitMQ本?沒有直接?持延遲隊列的的功能, 但是可以通過前?所介紹的TTL+死信隊列的?式組合模擬出延遲隊列的功能.
假設?個應?中需要將每條消息都設置為10秒的延遲, ?產者通過 normal_exchange 這個交換器將發送的消息存儲在 normal_queue 這個隊列中. 消費者訂閱的并?是 normal_queue 這個隊列, ?
是 dlx_queue 這個隊列. 當消息從 normal_queue 這個隊列中過期之后被存? dlx_queue 這個
隊列中,消費者就恰巧消費到了延遲10秒的這條消息.
TTL+死信隊列 實現延遲隊列
添加配置
spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://study:study@47.98.109.138:5672/extension
常量類
public class Constants {//死信public static final String NORMAL_QUEUE = "normal.queue";public static final String NORMAL_EXCHANGE = "normal.exchange";public static final String DL_QUEUE = "dl.queue";public static final String DL_EXCHANGE= "dl.exchange";
}
聲明隊列和交換機并綁定二者關系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import rabbitextensionsdemo.constant.Constants;@Configuration
public class DLConfig {//正常的交換機和隊列@Bean("normalQueue")public Queue normalQueue(){return QueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dlx").build();}@Bean("normalExchange")public DirectExchange normalExchange(){return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();}//死信交換機和隊列@Bean("dlQueue")public Queue dlQueue(){return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("dlExchange")public DirectExchange dlExchange(){return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();}@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlQueue") Queue queue, @Qualifier("dlExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();}
}
編寫生產消息代碼
@RequestMapping("/delay")public String delay() {System.out.println("delay...");rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "ttl test 10s...", message -> {message.getMessageProperties().setExpiration("10000"); //單位: 毫秒, 過期時間為10sreturn message;});rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "ttl test 30s...", message -> {message.getMessageProperties().setExpiration("30000"); //單位: 毫秒, 過期時間為30sreturn message;});System.out.printf("%tc 消息發送成功 \n", new Date());return "消息發送成功";}
編寫消費消息代碼
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import rabbitextensionsdemo.constant.Constants;import java.util.Date;@Component
public class DLListener {@RabbitListener(queues = Constants.DL_QUEUE)public void dlHandMessage(Message message, Channel channel) throws Exception {//消費者邏輯System.out.printf("[dl.queue] %tc 接收到消息: %s, deliveryTag: %d \n", new Date(), new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());}
}
此時我們可以看到,基于TTL+死信隊列實現出來了 延遲隊列 的效果,但是這樣就沒問題了嗎?
修改生產消息代碼
@RequestMapping("/delay")public String delay() {System.out.println("delay...");rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "ttl test 30s...", message -> {message.getMessageProperties().setExpiration("30000"); //單位: 毫秒, 過期時間為30sreturn message;});rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "ttl test 10s...", message -> {message.getMessageProperties().setExpiration("10000"); //單位: 毫秒, 過期時間為10sreturn message;});System.out.printf("%tc 消息發送成功 \n", new Date());return "消息發送成功";}
此時我們看到,設置TTL為10秒的消息居然在30秒后才進入死信隊列,這是為什么呢?
是因為RabbitMQ檢查消息的TTL是在消息發送給消費方的時候進行檢測的,而什么時候發送給發送方又根據隊頭消息的TTL,所以這就是問題所在,也是TTL+死信隊列實現延遲隊列所存在的問題
?那這個問題在上一篇ttl的時候就說過了,這里依然是個問題,雖然設置隊列的ttl不會有這個問題,但是設置隊列ttl我們針對不同延遲時間就需要創建多個隊列,這是不太合理的,所以針對這個問題,我們有一個延遲隊列的插件可以使用
延遲隊列插件
延遲隊列插件,會給我們提供一個特殊的交換機,來完成我們的延遲功能
這是我們插件的下載地址
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
?我們需要找到ez文件并下載,但是注意這里的版本要與你的RabbitMQ版本可以匹配,否則之后會出現問題
那插件下完后,我們要找到對應目錄,下載插件
上面兩個目錄,我們可以任選一個下載即可,如果沒有這個目錄,我們手動創建
然后把下載的ez文件,copy到這個目錄中即可,然后我們可以使用命令 rabbitmq-plugins list 來查看插件列表,看看我們有沒有成功放進去,但是注意,即使我們成功放進去并成功顯示了,也可能會出錯,這就可能是你們下載的RabbitMQ版本與整個延遲插件的版本不匹配,重新下載其他版本即可
然后我們啟動插件?rabbitmq-plugins enable rabbitmq_delayed_message_exchange
之后重啟服務??service?rabbitmq-server?restart
在沒有發生錯誤的情況下,我們就發現我們會多了一個默認的交換機
此時我們代碼中就不需要聲明普通交換機了而是直接使用默認交換機即可
我們生產者代碼是需要改一下的,我們需要調用一個方法來設置延遲時間
@RequestMapping("/delay2")
public String delay2() {//發送帶ttl的消息 rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, "delayed",
"delayed test 20s..."+new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setDelayLong(20000L); return messagePostProcessor;});rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, "delayed",
"delayed test 10s..."+new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setDelayLong(10000L); //設置延遲時間 return messagePostProcessor;});return "發送成功!";
}
此時我們就可以在10s正確接收一個消息,在20s正確接收另一個消息?
兩者的區別:
1.通過TTL+死信
?優點:比較靈活,不需要我們額外引入插件
?缺點:我們設置消息TTL的時候可能會出現順序的問題,而且我們需要多創建死信隊列和死信交換機,完成一些綁定,增加了系統的復雜性
2.基于插件實現的延遲隊列
?優點:通過插件能夠簡化延遲消息的實現,并且避免了時序問題
?缺點:需要依賴插件,不同版本RabbitMQ需要不同版本插件,有運維工作
RabbitMQ高級特性之事務
事務
? ?RabbitMQ是基于AMQP協議實現的,該協議實現了事務機制,所以RabbitMQ也支持事務機制,他的事務允許開發者確保消息的發送和接收時原子性的,要么全部成功,要么全部失敗
我們設置事務有三步,首先就是開啟事務
因為我們是針對rabbitTemplate來作設置的,所以會影響此rabbitTemplate的所有消息,這里我們新開了一個,然后我們使用時要加一個注解
?最后一步我們需要加上事務管理
這樣我們就成功開啟了事務
這三步是必不可少的,缺少一步,都無法成功開啟事務
RabbitMQ高級特性之消息分發
消息分發
RabbitMQ隊列擁有多個消費者時, 隊列會把收到的消息分派給不同的消費者. 每條消息只會發送給訂閱列表?的?個消費者. 這種?式?常適合擴展, 如果現在負載加重,那么只需要創建更多的消費者來消費處理消息即可.
默認情況下, RabbitMQ是以輪詢的?法進?分發的, ?不管消費者是否已經消費并已經確認了消息. 這種?式是不太合理的, 試想?下, 如果某些消費者消費速度慢, ?某些消費者消費速度快, 就可能會導致某些消費者消息積壓, 某些消費者空閑, 進?應?整體的吞吐量下降.
如何處理呢? 我們可以channel.basicQos(int prefetchCount) ?法, 來限制當前信道上的消費者所能保持的最?未確認消息的數量.
?如: 消費端調?了 channelbasicQos(5) , RabbitMQ會為該消費者計數, 發送?條消息計數+1, 消費?條消息計數-1, 當達到了設定的上限, RabbitMQ就不會再向它發送消息了,直到消費者確認了某條消息.類似TCP/IP中的"滑動窗?".
添加配置
spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://study:study@47.98.109.138:5672/extensionlistener:simple:acknowledge-mode: manual #消息接收確認prefetch: 5
常量類
public class Constants {//限流public static final String QOS_QUEUE = "qos.queue";public static final String QOS_EXCHANGE = "qos.exchange";
}
聲明配置和交換機并綁定二者關系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import rabbitextensionsdemo.constant.Constants;@Configuration
public class QosConfig {@Bean("qosQueue")public Queue qosQueue(){return QueueBuilder.durable(Constants.QOS_QUEUE).build();}@Bean("qosExchange")public Exchange qosExchange(){return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build();}@Bean("qosBinding")public Binding qosBinding(@Qualifier("qosQueue") Queue queue, @Qualifier("qosExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();}
}
限流
如下使?場景:
訂單系統每秒最多處理5000請求, 正常情況下, 訂單系統可以正常滿?需求
但是在秒殺時間點, 請求瞬間增多, 每秒1萬個請求, 如果這些請求全部通過MQ發送到訂單系統, ?疑會把訂單系統壓垮.
RabbitMQ提供了限流機制, 可以控制消費端?次只拉取N個請求。
通過設置prefetchCount參數, 同時也必須要設置消息應答?式為?動應答。
prefetchCount: 控制消費者從隊列中預取(prefetch)消息的數量, 以此來實現流控制和負載均衡。
編寫生產消息代碼
@RequestMapping("/qos")public String qos() {System.out.println("qos test...");//發送普通消息for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, "qos", "qos test..."+i);}return "消息發送成功";}
編寫消費消息代碼1
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import rabbitextensionsdemo.constant.Constants;@Component
public class QosListener {@RabbitListener(queues = Constants.QOS_QUEUE)public void handMessage(Message message, Channel channel) throws Exception {//消費者邏輯System.out.printf("111接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());}
}
可以看到,只消費了5條消息,且這5條消息未確定。
編寫消費消息代碼2
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import rabbitextensionsdemo.constant.Constants;@Component
public class QosListener {@RabbitListener(queues = Constants.QOS_QUEUE)public void handMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();//消費者邏輯System.out.printf("111接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());//肯定確認channel.basicAck(deliveryTag,false);}
}
此時我們可以看到,消息一次性被全都消費掉了。
負載均衡
我們也可以?此配置,來實現"負載均衡"
如下圖所?, 在有兩個消費者的情況下,?個消費者處理任務?常快, 另?個?常慢,就會造成?個消費者會?直很忙, ?另?個消費者很閑. 這是因為 RabbitMQ 只是在消息進?隊列時分派消息. 它不考慮消費者未確認消息的數量。
更改配置
spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://study:study@47.98.109.138:5672/extensionlistener:simple:acknowledge-mode: manual #消息接收確認prefetch: 1
編寫消費消息代碼
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import rabbitextensionsdemo.constant.Constants;@Component
public class QosListener {@RabbitListener(queues = Constants.QOS_QUEUE)public void handMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消費者邏輯System.out.printf("111接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());Thread.sleep(2000);//肯定確認channel.basicAck(deliveryTag,false);} catch (Exception e) {//否定確認channel.basicNack(deliveryTag, false, true);}}@RabbitListener(queues = Constants.QOS_QUEUE)public void handMessage2(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消費者邏輯System.out.printf("222接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());Thread.sleep(1000);//肯定確認channel.basicAck(deliveryTag,false);} catch (Exception e) {//否定確認channel.basicNack(deliveryTag, false, true);}}
}
我們可以看到看到,消費者2消費消息的速度是消費者1消費消息的速度的2倍。
以上就是RabbitMQ的絕大多數的高級特性了,希望對你有幫助