一、持久性
前面學習消息確認機制時,是為了保證Broker到消費者直接的可靠傳輸的,但是如果是Broker出現問題(如停止服務),如何保證消息可靠性?對此,RabbitMQ提供了持久化功能:
持久化分為三種:1. 交換機持久化? ?2. 隊列持久化? ?3.消息持久化
1.1 交換機持久化
一、交換機持久化方法
在聲明交換機時,將durable置為true即可,如果不指定,默認為true
二、交換機持久化的作用
避免了當Broker重啟時,未重新執行交換機聲明代碼,而導致生產者消息無法路由
1.2 隊列持久化
一、隊列持久化方法
在聲明隊列時,使用durable方法聲明的隊列為持久化隊列,使用nonDurable聲明的隊列為非持久化隊列
對應管理界面:
二、隊列持久化的作用
在RabbitMQ服務器重啟時,未持久化的隊列將丟失,持久化隊列保留
1.3 消息持久化
?一、消息持久化方法
前面在發送消息時,都是直接指定一個字符串來發送消息,如:
![]()
我們先進入convertAndSend方法,觀察其源碼:
接下來再進入MessageProperties,觀察其源碼:
可以看到,不指定消息是否持久化,默認為持久化。
也就是說,如果要指定消息為非持久化,就選喲給convertAndSend傳入一個Message對象而不是僅傳一個消息字符串,接下來學習如何設置消息非持久化:
1> 創建一個Message對象
Message message = new Message("persistent test...".getBytes(),new MessageProperties());
2> 獲取MessageProperties對象,通過setDeliveryMode方法設置消息非持久化
/* *如果要設置為持久化,可以直接換一個String的消息,也可以將這里的 *MessageDeliveryMode.NON_PERSISTENT改為MessageDeliveryMode.PERSISTENT */ message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
3> 將Message對象作為參數傳遞給converAndSend方法
rabbitTemplate.convertAndSend(Constants.NO_PERSISTENT_EXCHANGE,"",message);
整體代碼:
二、消息持久化作用
RabbitMQ服務器重啟時,未持久化的消息將丟失(即使消息所在隊列未持久化隊列),持久化的消息將保留(前提是消息所在的隊列是持久化隊列)
但是,有了消息確認機制以及持久性就能保證消息傳輸的可靠性了嗎?顯然不是,因為消息確認機制保證的是Broker到消費者的可靠性?,持久性保證的是Broker內部的可靠性,還有生產者到Broker的可靠性沒有被保證,因此,RabbitMQ引入了publiser confirms(發送方確認)機制
二、發送方確認機制
前面已經學習了RabbitMQ核心機制之一——持久化,但是這就能保證消息傳輸的可靠性了嗎?顯然不是,如果發送方發送的消息沒有到達Broker,又談何持久化?因此,我們還需要了解RabbitMQ的發送方確認機制(通過事務也能解決,但是比較復雜,這里不談)
publisher confirms 機制又包含兩種模式:
1> confirm確認模式
2> return退回模式
2.1?confirm確認模式
一、觸發機制
Producer向Broker發送消息時,需要設置一個ConfirmCallback監聽,這樣消息無論是否到達exchange,這個監聽都會觸發,如果消息到達exchange,ACK為true,如果沒有到達exchange,ACK為false。
二、代碼演示
1> 添加RabbitMQ配置
publisher-confirm-type: correlated #配置publisher confirm機制
2> 代碼實現(隊列、交換機隨便聲明一個就行,類型隨意)
@RequestMapping("/confirm")public String confirm(){//設置回調方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("執行confirm方法");if(ack){//ack為true,消息到達exchangeSystem.out.printf("接收到消息,消息ID:%s \n",correlationData==null ? null : correlationData.getId());}else {//ack為false,消息為到達exchangeSystem.out.printf("未接收到消息,消息ID:%s , cause: %s \n",correlationData==null ? null : correlationData.getId(),cause);}}});CorrelationData correlationData = new CorrelationData("1");//發送消息rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","confirm test...",correlationData);return "消息發送成功";}
3>運行程序,測試接口
? ? 1.正確發送消息(交換機名、routingKey存在)
消息發送成功,接下來查看控制臺信息:
可以看到,交換機成功接收到消息
??2.錯誤發送消息(改為一個不存在的交換機名)
再次運行程序,訪問接口,發送消息:
消息發送成功,查看控制臺:
可以看到,消息并沒有到達指定交換機,原因是不存在這個交換機。
? 3.錯誤發送消息(改為一個不存在的routingKey)
運行程序,測試接口:
消息發送成功,查看控制臺:
可以看到,在routingKey不存在的情況下,消息還是到達了交換機,但是這個消息一定是無法路由到隊列的,因此就需要通過publsher confirm的 return退回模式 來解決了
4> 上述代碼編寫存在的問題
? 上面我們設置了ConfirmCallback監聽經過測試,似乎并沒有問題,但是仔細思考就會發現,我們在上面的代碼中是通過rabbitTemplate這個對象來設置的,那豈不是前面所有使用rabbitTemplate的接口都被設置了監聽?訪問其它接口也一樣會打印回調方法中的信息?
下面我們測試一下下面的方法:
運行程序,測試接口:
消息發送成功,查看控制臺:
可以看到,同樣會觸發監聽,為了避免這個問題,我們可以在config包中自己配置一個RabbitTemplate對象并注入進來:
@Configuration public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);return rabbitTemplate;}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//消息到達exchange時的回調方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("執行confirm方法");if(ack){//ack為true,表示消息到達交換機System.out.printf("接收到消息,消息ID:%s \n",correlationData==null ? null : correlationData.getId());}else{//ack為false,表示消息未到達交換機System.out.printf("未接收到消息,消息ID:%s , cause: %s \n",correlationData==null ? null : correlationData.getId(),cause);//業務邏輯,如重發等}}});//消息退回時的回調方法rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息退回: " + returned);}});return rabbitTemplate;} }
2.2 return退回模式
?一、觸發機制
當消息到達exchange后,需要路由到queue中,如果一條消息無法被任何queue消費(routingKey不存在或隊列不存在),可以把消息退回給producer,退回時可以設置一個回調方法ReturnCallback,對消息進行處理
二、代碼演示
//消息退回時的回調方法rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息退回: " + returned);}});
修啊routingKey為一個不存在的routingKey:
運行程序,測試接口:
查看控制臺:
可以看到,消息被退回
2.3 總結
publisher confirms 機制可以保證消息從生產者到Broker的可靠性,其中confirm模式工作在生產者到exchange之間,return模式工作在exchange到queue之間?