一、消息確認機制
生產者發送的消息,可能有以下兩種情況:
1> 消息消費成功
2> 消息消費失敗
為了保證消息可靠的到達消費者(!!!注意:消息確認機制和前面的工作模式中的publisher confirms模式有很大區別,消息確認保證的是消息可靠的到達消費者,而publisher confirms保證的是消息可靠的到達RabbitMQServer),RabbitMQ引入了消息確認機制:
消費者在消費消息時,可以指定autoAck參數,對應著兩種確認方式:
(1)自動確認:消息只要到達消費者就會自動確認,不會考慮消費者是否正確消費了這些消息,直接從 內存/磁盤 中刪除消息;
(2)手動確認:消息到達消費者,不會自動確認,會等待消費者調用Basic.Ack命令,才會從內存/磁盤 移除這條消息。
DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}
};
channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);
如果將basicConsume中的true改為false,對于RabbitMQ服務器來說,消息分成兩個部分:
1>? 未發送給消費者的消息;
2>? 已經發送給消費者,但還沒有被確認的消息。
對應管理界面:
二、手動確認方法
RabbitMQ提供了兩種手動確認,分別是肯定確認和否定確認,對應著3個方法:
(1)肯定確認:Channel.basicAck(long deliveryTag, boolean multiple)
??? 其中,deliveryTag為消息的唯一表示,在每一個channel中都是唯一的,相當于TCP協議中的序號的作用,用來確認哪條消息已經收到,multiple為true表示是否批量確認,同樣與TCP中確認序號的作用類似,表示在這個deliveryTag前的消息都已收到,為false則表示一次只確認一條消息。
(2)否定確認:?Channel.basicReject(long deliveryTag, boolean requeue)
? ?如果消息到達消費者后,消費者未正確處理這條消息(如發生異常),就可以通過這個方法進行否定確認,其中,參數requeue表示這條消息是否需要重新入隊,這個方法一次只能確認一條消息。
(3)否定確認:Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
? ?與basicAck一樣,這個方法可以通過multiple來實現批量確認
三、使用Spring Boot演示消息確認機制
? 演示之前,需要先了解Spring-AMQP的確認機制,它和RabbitMQ JDK Client庫的確認機制有些許不同:
1. AcknowledgeMode.NONE
? 和RabbitMQ JDK Client庫的自動確認機制一樣,只要消息到達消費者,這條消息就會從隊列中移除。
2.?AcknowledgeMode.AUTO(和JDK Client庫的區別)
? 消息到達消費者且處理成功,才會自動確認,如果消息在處理過程中發生了異常,則不會自動確認。
3.?AcknowledgeMode.MANUAL
? 和JDK Client庫一樣,屬于手動確認機制,同樣分為肯定確認和否定確認。
接下來,進行準備工作,創建一個Spring Boot項目,添加RabbitMQ依賴:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
添加RabbitMQ相關配置:
spring:rabbitmq:addresses: amqp://study:study@110.41.17.130:5672/extensionlistener:simple:acknowledge-mode: none #表示當前確認機制未AcknowledgeMode.NONE
添加下圖的包,創建對應的類:
?
3.1? AcknowledgeMode.NONE
(1)編寫常量類(由于只是驗證消息確認機制,可以隨便使用一種工作模式)
public class Constants {public static final String ACK_QUEUE = "ack.queue";public static final String ACK_EXCHANGE = "ack.exchange"; }
(2)配置聲明隊列、交換機、交換機與隊列綁定關系
@Configuration public class RabbitMQConfig {//1.聲明隊列@Bean("ackQueue")public Queue ackQueue(){return QueueBuilder.durable(Constants.ACK_QUEUE).build();}//2.聲明交換機@Bean("ackExchange")public FanoutExchange ackExchange(){return ExchangeBuilder.fanoutExchange(Constants.ACK_EXCHANGE).build();}//3.聲明隊列與交換機的綁定關系@Beanpublic Binding ackBinding(@Qualifier("ackExchange") FanoutExchange fanoutExchange,@Qualifier("ackQueue") Queue queue){return BindingBuilder.bind(queue).to(fanoutExchange);} }
(3)編寫生產者代碼
@RestController @RequestMapping("/producer") public class ProducerController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack(){rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"","consumer ack mode test...");return "發送消息成功";} }
(4)編寫消費者代碼
@Component public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void ackListener(Message message, Channel channel) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息:%s,deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//業務邏輯處理System.out.println("業務邏輯處理"); // int num = 10/0;System.out.println("業務處理完成");} }
(5)測試正常消費消息的情況
(6)測試消息消費異常情況(將消費者代碼中得 int num? = 10/0? 注解打開)
(7)總結
? 經過確認,可以發現AcknowledgeMode.NONE機制,無論消費者是否正確消費消息,都會自動確認,不會保留異常消費的消息
3.2 AcknowledgeMode.AUTO
(1)?修改配置文件中的 acknowledge-mode: none 為 acknowledge-mode: auto
(2)演示消息正常消費的情況(將 int num = 10/0 注釋)
@RabbitListener(queues = Constants.ACK_QUEUE)public void ackListener(Message message, Channel channel) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息:%s,deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//業務邏輯處理System.out.println("業務邏輯處理"); // int num = 10/0;System.out.println("業務處理完成");}
運行程序,訪問 producer/ack 接口發送消息:
(3)演示消息處理異常情況(取消 int num = 10/0 的注釋)
@RabbitListener(queues = Constants.ACK_QUEUE)public void ackListener(Message message, Channel channel) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息:%s,deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//業務邏輯處理System.out.println("業務邏輯處理");int num = 10/0;System.out.println("業務處理完成");}
運行程序,訪問生產者端口:
消息沒有成功消費,如果此時我們修改代碼(將 int num = 10/0 注釋,使程序不再發生異常),再次運行程序,隊列中保存的消息就會被正常消費:
3.3 AcknowledgeMode.MANUAL
? (1) 修改配置文件中?acknowledge-mode: auto 為?acknowledge-mode: manul
(2)修改消費者代碼(由于是手動確認,需要在代碼中添加正常消費時的 “肯定確認” 和 消費異常時的 “否定確認”)
@RabbitListener(queues = Constants.ACK_QUEUE)public void ackListener(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try{System.out.printf("接收到消息:%s,deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//業務邏輯處理System.out.println("業務邏輯處理"); // int num = 10/0;System.out.println("業務處理完成");//消息正常消費,肯定確認channel.basicAck(deliveryTag,false);//單條確認}catch (Exception e){//消息消費異常,否定確認channel.basicNack(deliveryTag,false,true);//單條否定,異常后重新入隊}}
(3)消息正常消費時的情況(注釋 int num = 10/0)
接下來注釋掉手動肯定確認的這行代碼,看看會發生什么情況:
//消息正常消費,肯定確認//channel.basicAck(deliveryTag,false);//單條確認
再次運行程序并通過接口 producer/ack 發送消息:
可以看到,在AcknowledgeMode.MANUAL機制下,如果不手動確認,隊列不會移除已經被正常消費的消息:
(3)消息消費異常的情況下(取消 int num = 10/0 的注釋)
接下來注釋掉channel.basicNack,運行程序,訪問接口發送消息:
如果將參數requeue置為false,會怎么樣?
channel.basicNack(deliveryTag,false,false);//單條否定,重新發送消息
運行程序:
(4)總結
1> 無論是否正常處理消息都要進行手動確認;
2> 正常處理消息但未手動確認,管理界面中的隊列會有 一條/多條 Unacked 的消息(重新啟動程序后會重新消費);
3> 異常處理消息且未手動確認,也會有 一條/多條 Unacked 的消息(重啟程序同樣重新消費);
4> 如果異常處理,將requeue置為false,隊列不會保存 這條/多條 異常消費的消息
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?置為true,隊列會不斷重新發送 這條/多條 消息