RabbitMQ高級特性1
- 一.消息確認
- 1.消息確認機制
- 2.手動確認代碼
- 肯定確認
- 否定確認1
- 否定確認2
- Spring中的代碼
- 二.持久性
- 1.交換機持久化
- 2.隊列的持久化
- 3.消息的持久化
- 非持久化代碼實現
- 三方面都持久化,數據也會丟失
- 三.發送方確認
- 1.Confirm確認模式
- 2.return返回模式
- 四.總結
- RabbitMQ保證消息可靠傳輸
一.消息確認
1.消息確認機制
-
自動確認:當autoAck等于true時, RabbitMQ 會自動把發送出去的消息置為確認,然后從內存(或者磁盤)中刪除,而不管消費者是否真正地消費到了這些消息。自動確認模式適合對于消息可靠性要求不高的場景。
-
手動確認:當autoAck等于false時,RabbitMQ會等待消費者顯式地調用Basic.Ack命令,回復確認信號后才從內存(或者磁盤)中移去消息。這種模式適合對消息可靠性要求比較高的場景.
2.手動確認代碼
肯定確認
Channel.basicAck(long deliveryTag, boolean multiple)
RabbitMQ已知道該消息并且成功的處理消息,可以將其丟棄了。
參數說明
1)deliveryTag:消息的唯?標識,它是?個單調遞增的64位的長整型值。 deliveryTag 是每個通道(Channel)獨立維護的,所以在每個通道上都是唯?的。當消費者確認(ack)?條消息時,必須使用對應的通道上進行確認。
2)multiple:是否批量確認。在某些情況下,為了減少網絡流量,可以對?系列連續的 deliveryTag 進行批量確認。值為true則會?次性把ack所有小于或等于指定deliveryTag的消息。值為false,則只確認當前指定deliveryTag的消息。
否定確認1
Channel.basicReject(long deliveryTag, boolean requeue)
RabbitMQ在2.0.0版本開始引?了 Basic.Reject 這個命令,消費者客戶端可以調用
channel.basicReject方法來告訴RabbitMQ拒絕這個消息。
參數說明
1)deliveryTag:參考channel.basicAck。
2)requeue:表示拒絕后,這條消息如何處理。如果requeue參數設置為true,則RabbitMQ會重新將這條消息存入隊列,以便可以發送給下?個訂閱的消費者。如果requeue參數設置為false,則RabbitMQ會把消息從隊列中移除,而不會把它發送給新的消費者。
否定確認2
Channel.basicNack(long deliveryTag, boolean multiple,boolean requeue)
Basic.Reject命令?次只能拒絕?條消息,如果想要批量拒絕消息,則可以使用Basic.Nack這個命令。消費者客戶端可以調用channel.basicNack方法來實現。
參數說明
前面的參數參考上述參數說明。
multiple的參數設置為true則接受deliveryTag編號之前所有未被當前消費者確認的消息,也就是批量處理未被確認的消息。
Spring中的代碼
- AcknowledgeMode.NONE
這種模式下,消息?旦投遞給消費者,不管消費者是否成功處理了消息,RabbitMQ就會自動確認消息,從RabbitMQ隊列中移除消息,如果消費者處理消息失敗,消息可能會丟失。
ym配置
spring:application:name: rabbitmqdemorabbitmq:addresses: amqp://賬號:密碼@IP:端口號/虛擬機listener:simple:acknowledge-mode: none
- AcknowledgeMode.AUTO(默認)
這種模式下,消費者在消息處理成功時會自動確認消息,但如果處理過程中拋出了異常,則不會確認消息,但是會一直嘗試重發消息。
將yml配置中的 acknowledge-mode改成auto。
上述兩種模式代碼相同
Configuration
package com.example.rabbitmqdemo.config;import com.example.rabbitmqdemo.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfiguration {//消息確認//隊列@Bean("ackQueue")public Queue ackQueue() {return QueueBuilder.durable(Constants.ACK_QUEUE).build();}//虛擬機@Bean("directExchange")public DirectExchange directExchange() {return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build();}//隊列和虛擬機綁定@Bean("ackBinding")public Binding ackBinding(@Qualifier("ackQueue") Queue queue, @Qualifier("directExchange") DirectExchange directExchange) {return BindingBuilder.bind(queue).to(directExchange).with("ack");}
}
** Constants**
package com.example.rabbitmqdemo.constant;
public class Constants {public static final String ACK_QUEUE = "ack.queue";public static final String ACK_EXCHANGE = "ack.exchange";
}
Controller
package com.example.rabbitmqdemo.controller;import com.example.rabbitmqdemo.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/producer")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack() {rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack","ack is ok");return "ack is ok!";}
}
Listener
@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws UnsupportedEncodingException {//消費者邏輯System.out.printf("接收到消息: %s , deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//不做具體實現的消費者業務邏輯}}
- AcknowledgeMode.MANUAL
手動確認模式下,消費者必須在成功處理消息后顯式調用 basicAck 方法來確認消息。如果消息未被確認,RabbitMQ會認為消息尚未被成功處理,并且會在消費者可用時重新投遞該消息,這種模式提高了消息處理的可靠性,因為即使消費者處理消息后失敗,消息也不會丟失,而是可以被重新處理。
將yml配置中的 acknowledge-mode改成manual。
Listener
package com.example.rabbitmqdemo.listener;import com.example.rabbitmqdemo.constant.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.io.UnsupportedEncodingException;/*** Created with IntelliJ IDEA.* Description:* User: hp* Date: 2025-04-03* Time: 9:26*/
@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws IOException {//消費者邏輯System.out.printf("接收到消息: %s , deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//不做具體實現的消費者業務邏輯try {//int sum = 3 / 0;//確認消息(肯定)channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch (Exception e) {//否定確認//最后一個參數為true,則發生異常重新入隊,false,為不再入隊channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);}}}
二.持久性
1.交換機持久化
交換器的持久化是通過在聲明交換機時是將durable參數置為true實現的。
相當于將交換機的屬性在服務器內部保存,當MQ的服務器發生意外或關閉之后,重啟 RabbitMQ 時不需要重新去建?交換機,交換機會自動建立,相當于?直存在。
如果交換器不設置持久化,那么在 RabbitMQ 服務重啟之后,相關的交換機元數據會丟失,對?個長期使用的交換器來說,建議將其置為持久化的。
設置交換機的持久化
2.隊列的持久化
隊列的持久化是通過在聲明隊列時將 durable 參數置為true實現的。
如果隊列不設置持久化,那么在RabbitMQ服務重啟之后,該隊列就會被刪掉,此時數據也會丟失。(隊列沒有了,消息也無處可存了)
隊列的持久化能保證該隊列本?的元數據不會因異常情況而丟失,但是并不能保證內部所存儲的消息不會丟失。要確保消息不會丟失,需要將消息設置為持久化。
咱們前面用的創建隊列的方式都是持久化的。
隊列持久化
隊列非持久化
3.消息的持久化
消息實現持久化,需要把消息的投遞模式( MessageProperties 中的 deliveryMode )設置為2,也就是MessageDeliveryMode.PERSISTENT
設置了隊列和消息的持久化,當 RabbitMQ 服務重啟之后,消息依舊存在。如果只設置隊列持久化,重啟之后消息會丟失。
如果只設置消息的持久化,重啟之后隊列消失,繼而消息也丟失。所以單單設置消息持久化而不設置隊列的持久化顯得毫無意義
非持久化代碼實現
交換機、隊列和綁定
//非持久化隊列@Bean("presQueue")public Queue presQueue() {return QueueBuilder.nonDurable(Constants.PRES_QUEUE).build();}//非持久化交換機@Bean("presExchagne")public DirectExchange presExchange() {return ExchangeBuilder.directExchange(Constants.PRES_EXCHANGE).durable(false).build();}@Bean("presBinding")public Binding presBinding(@Qualifier("presQueue") Queue queue,@Qualifier("presExchagne") Exchange exchange) {//如果參數傳遞的是Exchange類型而不是DirectExchang類型就需要使用noargs作為收尾return BindingBuilder.bind(queue).to(exchange).with("pres").noargs();}
Producer
@RequestMapping("/pres")public String pres() {Message message = new Message("Presistent test...".getBytes(),new MessageProperties());message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);rabbitTemplate.convertAndSend(Constants.PRES_EXCHANGE,"pres",message);return "pres is ok!";}
RabbitMQ服務器的虛擬機和隊列
三方面都持久化,數據也會丟失
-
從消費者來說,如果在訂閱消費隊列時將autoAck參數設置為true,那么當消費者接收到相關消息之后,還沒來得及處理就宕機了,這樣也算數據居丟失。這種情況很好解決,將autoAck參數設置為false,并進行手動確認。
-
在持久化的消息正確存入RabbitMQ之后,還需要有?段時間(雖然很短,但是不可忽視)才能存?磁盤中。RabbitMQ并不會為每條消息都進行同步存盤(調用內核的fsync方法)的處理,可能僅僅保存到操作系統緩存之中而不是物理磁盤之中。如果在這段時間內RabbitMQ服務節點發生了宕機、重啟等異常情況,消息保存還沒來得及落盤,那么這些消息將會丟失。
三.發送方確認
1.Confirm確認模式
Producer在發送消息的時候,對發送端設置?個ConfirmCallback的監聽,無論消息是否到達Exchange,這個監聽都會被執行,如果Exchange成功收到,ACK( Acknowledge character ,確認字符)為true,如果沒收到消息,ACK就為false。
yml配置
spring:application:name: rabbitmqdemorabbitmq:addresses: amqp://賬號:Miami@IP:端口號/虛擬機listener:simple:#acknowledge-mode: none#acknowledge-mode: autoacknowledge-mode: manualpublisher-confirm-type: correlated #消息發送確認
Configuration
package com.example.rabbitmqdemo.config;import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);return rabbitTemplate;}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//設置回調函數rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("執行了confirm方法");if (ack) {System.out.printf("接收到消息, 消息ID: %s \n",correlationData == null ? null : correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n",correlationData == null ? null : correlationData.getId(),cause);//相應的業務處理}}});return rabbitTemplate;}
}
Producer
@RequestMapping("/confirm")public String confirm() {CorrelationData correlationData = new CorrelationData("1");rabbitTemplateConfig.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","confirm test...",correlationData);return "confirm is ok!";}
2.return返回模式
Configuration
@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//設置回調函數rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("執行了confirm方法");if (ack) {System.out.printf("接收到消息, 消息ID: %s \n",correlationData == null ? null : correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n",correlationData == null ? null : correlationData.getId(),cause);//相應的業務處理}}});//return模式rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("消息退回: " + returnedMessage);}});return rabbitTemplate;}
四.總結
RabbitMQ保證消息可靠傳輸
Producer -> Broker:發送方確認
- Producer -> Exchange :Confirm模式(網絡問題)
- Exchange -> Queue : return模式(代碼或者配置層錯誤,導致消息路由失敗)
- 隊列移除:死信等
Broker:持久化(RabbitMQ服務器宕機導致消息丟失)
- 交換機持久化
- 隊列持久化
- 消息持久化
Broker -> Consumer 消息確認方式(消費者未來得及消費信息,就宕機了)
- 自動確認
- 手動確認