1. 簡介
RabbitMQ 的消息發送流程:
- producer 將消息發送給 broker,consumer 從 broker 中獲取消息并消費
那么在這里就涉及到了兩種消息發送,即 producer 與 broker 之間和 consumer 與 broker 之間。
“消息確認” 討論的是 consumer 與 broker 之間的消息發送。
2. 為什么會有這個特性
當 broker 給 consumer 發送消息時,可能會出現下面兩種情況:
- 消息未成功到達 consumer;
- 消息成功到達 consumer,但是 consumer 沒有成功消費這條消息,如:在處理消息時發生異常等情況。
這時,就需要有一種解決方案,保證 broker 與 consumer 之間消息傳輸的可靠性,于是就有了消息確認這一特性。
3. 使用 RabbitMQ Java 時如何進行消息確認(不是重點)
public class ConsumerDemo1 {public static void main(String[] args) throws IOException, TimeoutException {//建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USERNAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//創建信道Channel channel = connection.createChannel();//聲明交換機channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);//聲明隊列//如果隊列不存在,就創建channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);//消費消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);}
}
這是一段路由模式的代碼,在這段代碼中,有下面一條語句:
channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);
在這個方法中,有三個參數:
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
- queue:consumer 通過哪個隊列獲取 broker 發送的消息;
- autoAck:是否自動確認;
- callback:consumer 消費消息的邏輯。
其中,autoAck 就是消息確認的體現:
- autoAck 為 true:RabbitMQ 會將發送給 consumer 的消息視為已被成功接收和消費(consumer 可能并沒有成功接收到或成功消費,但是 RabbitMQ 不管了),就會被將這條消息刪除;
- autoAck 為 false:當 RabbitMQ 發送消息后,并不會馬上就將消息刪除,而是會等 consumer 調用 Basic.Ack,收到 ack 后,才會將消息刪除。
將 autoAck 設置為 false 后,若 broker 長時間沒有收到 consumer 發送的 ack 且 consumer 已經斷開連接,就會將這條消息重新入隊列,繼續發送給 consumer 進行消費,此時,隊列中的消息就分為了兩種:
- 還未被發送的消息;
- 已經發送了的消息,但是沒有收到 ack 而重新入隊列等待被消費。
4. 在 spring 中使用 RabbitMQ 時如何進行消息確認
4.1 basicAck
在 spring 下的 Channel 類中提供了下面幾種方法:
void basicAck(long deliveryTag, boolean multiple) throws IOException;
在這個方法中,有三個參數:
- deliveryTag:是 broker 給 consumer 發送消息的唯一標識,在一個 channel 中 deliveryTag 是唯一的;
- mulitple: 是否批量確認
使用這個方法后,就會告知 broker 這條消息已經成功被消費,可以將其刪除。
4.2 basicNack
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
?在這個方法中,多了一個參數:
- requeue:是否重新入隊列。
使用這個方法,就相當于給 broker 發送 nack,即這條消息沒有被正確消費。
若 requeue 為 true,就會將這條消息重新入隊列,繼續給 consumer 消費;
若 requeue 為 false,broker 就會這條消息刪除。
4.3 basicReject
void basicReject(long deliveryTag, boolean requeue) throws IOException;
這個方法與 basicNack 大致相同,此處省略。
4.4 配置
在 spring 中,提供了三種配置用于消息確認:
- none:當消息發送給 consumer,不管 consumer 是否成功消費了消息,broker 都會當作這條消息被成功消費了,然后刪除這條消息;
- auto:在 consumer 處理消息時沒有拋出異常時,就會確認消息,反之就不會確認,并且將消息重新放入隊列中,進行下一次的消費;
- manual:手動確認,我們需要在代碼中指定這條消息是消費成功還是消費失敗,分別使用 basicAck 和 basicNack。
spring:rabbitmq:listener:simple:acknowledge-mode: none
5. 代碼測試
@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack() {String messageInfo = "consumer ack mode test...";rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, Constants.ACK_ROUTINGKEY, messageInfo);return "消息發送成功";}
}
這段代碼代表的是一個 producer,下面接收到的消息都是通過這段代碼發送的。
5.1 none
① 無異常時的消費者代碼:
@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);}
代碼運行結果如下:
我們可以通過訪問 RabbitMQ 客戶端來觀察這條消息是否成功被消費:
?
可以看到,Messages 這一列中,Ready 和 Unacked 都為 0,表示消息被成功消費。?
?② 有異常時的消費者代碼:
@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();int num = 1 / 0;log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);}
代碼運行結果如下:
由于我們使用了除零操作,于是拋出了異常,我們可以通過訪問?RabbitMQ 來觀察這條消息是否被刪除:
和上面一樣,在 broker 中這條消息已經被刪除,這與 none 配置性質一致。
5.2 auto?
① 無異常時的消費者代碼:
@Component
@Slf4j
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);}}
代碼運行結果如下:
在 RabbitMQ 客戶端中顯示,這條消息已經被成功消費:
?
② 有異常時的消費者代碼:
@Component
@Slf4j
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();int num = 1 / 0;log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);}}
?代碼運行結果如下:
在運行結果中,一直會有報錯產生,并且都是兩個兩個為一組,并且在報錯信息中可以看到,producer 發送的消息一直在被消費,這是因為存在異常,就會導致這條消息一直在隊列中,通過觀察 RabbitMQ 客戶端可以看出,這條消息依然保存在隊列中:
5.3??manual
① 無異常的消費者代碼如下:
@Component
@Slf4j
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, true);}}
}
在這段代碼中,我們使用了 basicAck 和 basicNack 來進行消息確認,當消息處理成功后,就會執行 basicAck,告訴 broker 這條消息已經被成功消費,可以將其刪除;當消息執行發生異常后,就會執行 basicNack,并且根據 requeue 參數決定如何處理這條消息。
代碼運行結果如下:
RabbitMQ 客戶端顯示這條消息被成功消費:
?
?② 有異常的消費者代碼如下:
當 requeue 為 true:
@Component
@Slf4j
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);int n = 1 / 0;channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, true);}}
}
在此處的 basicNack,將 requeue 設置為了 true,當消息處理失敗后,就會將消息重新入隊列,重新被消費:
我們可以看到,這條消息一直在被消費,并且 delivertTag 在遞增。
并且從?RabbitMQ 客戶端中可以看到,這條消息依然存在,等待被成功消費:
?當 requeue 為 false:
當處理消息發生異常后,就會將消息從隊列中刪除。
代碼運行結果如下:
雖然異常依然存在,但是消息卻沒有重復發送,并且 RabbitMQ 中也將這條消息刪除:
?
?