RabbitMQ 核心功能
RabbitMQ 高級特性解析:RabbitMQ 消息可靠性保障 (上)-CSDN博客
RabbitMQ 高級特性:從 TTL 到消息分發的全面解析 (下)-CSDN博客
前言
最近再看?RabbitMQ,看了看自己之前寫的博客,誒,一言難盡,當時學的懵懵懂懂的。這里重新整理?RabbitMQ 的核心功能。
在分布式系統中,消息隊列是實現異步通信、解耦服務的關鍵組件。RabbitMQ 作為一款功能強大的消息隊列,其消息可靠性是確保系統穩定運行的重要因素。這里將深入探討 RabbitMQ 的消息確認機制、持久化策略、發送方確認機制以及重試機制!!
一、消息確認機制
1.1 消息確認機制概述
生產者發送消息到消費端后,可能出現消息處理成功或異常的情況。如果 RabbitMQ 在發送消息后就將其刪除,當消息處理異常時,就會造成消息丟失。為了確保消費端成功接收并正確處理消息,RabbitMQ 提供了消息確認機制(message acknowledgement)。
消費者在訂閱隊列時,可以指定
autoAck
參數,根據該參數設置,消息確認機制分為自動確認和手動確認兩種:
- 自動確認(autoAck=true):RabbitMQ 會自動把發送出去的消息置為確認,然后從內存(或者磁盤)中刪除,而不管消費者是否真正消費到了這些消息。這種模式適合對消息可靠性要求不高的場景。
- 手動確認(autoAck=false):RabbitMQ 會等待消費者顯式地調用
Basic.Ack
命令,回復確認信號后才從內存(或者磁盤)中移去消息。這種模式適合對消息可靠性要求比較高的場景。
以下是basicConsume
方法的定義:
/*** Start a non-nolocal, non-exclusive consumer, with* a server-generated consumerTag.* @param queue the name of the queue* @param autoAck true if the server should consider messages* acknowledged once delivered; false if the server should expect* explicit acknowledgements* @param callback an interface to the consumer object* @return the consumerTag generated by the server* @throws java.io.IOException if an error is encountered* @see com.rabbitmq.client.AMQP.Basic.Consume* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)*/
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
1.2 手動確認方法
消費者在收到消息之后,可以選擇確認、直接拒絕或者跳過,RabbitMQ 提供了三種不同的確認應答方式:
- 肯定確認:
Channel.basicAck(long deliveryTag, boolean multiple)
:RabbitMQ 已知道該消息并且成功處理消息,可以將其丟棄。
deliveryTag
:消息的唯一標識,是一個單調遞增的 64 位長整型值,每個通道(Channel)獨立維護,所以在每個通道上都是唯一的。當消費者確認(ack)一條消息時,必須使用對應的通道進行確認。multiple
:是否批量確認。值為true
則會一次性 ack 所有小于或等于指定deliveryTag
的消息;值為false
,則只確認當前指定deliveryTag
的消息。- 否定確認:
Channel.basicReject(long deliveryTag, boolean requeue)
:消費者客戶端可以調用channel.basicReject
方法來告訴 RabbitMQ 拒絕這個消息。
deliveryTag
:參考channel.basicAck
。requeue
:表示拒絕后,這條消息如何處理。如果requeue
參數設置為true
,則 RabbitMQ 會重新將這條消息存入隊列,以便可以發送給下一個訂閱的消費者;如果requeue
參數設置為false
,則 RabbitMQ 會把消息從隊列中移除,而不會把它發送給新的消費者。- 否定確認:
Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
:如果想要批量拒絕消息,可以使用Basic.Nack
命令。
- 參數介紹參考前面兩個方法。
multiple
參數設置為true
則表示拒絕deliveryTag
編號之前所有未被當前消費者確認的消息。
1.3 代碼示例
我們基于 Spring Boot 來演示消息的確認機制,Spring - AMQP 對消息確認機制提供了三種策略:
public enum AcknowledgeMode {NONE,MANUAL,AUTO;
}
1.3.1?AcknowledgeMode.NONE
- 配置確認機制:
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: none
- 發送消息:
public class Constant {public static final String ACK_EXCHANGE_NAME = "ack_exchange";public static final String ACK_QUEUE = "ack_queue";
}@Configuration
public class RabbitmqConfig {@Bean("ackExchange")public Exchange ackExchange(){return ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();}@Bean("ackQueue")public Queue ackQueue() {return QueueBuilder.durable(Constant.ACK_QUEUE).build();}@Bean("ackBinding")public Binding ackBinding(@Qualifier("ackExchange") Exchange exchange, @Qualifier("ackQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ack").noargs();}
}@RestController
@RequestMapping("/producer")
public class ProductController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack(){rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", "consumer ack test...");return "發送成功!";}
}
- 消費端邏輯:
@Component
public class AckQueueListener {@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throws Exception {System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());//模擬處理失敗int num = 3/0;System.out.println("處理完成");}
}
運行結果:消息處理失敗,但消息已從 RabbitMQ 中移除,因為NONE
模式下消息一旦投遞就會被自動確認。
1.3.2?AcknowledgeMode.AUTO
- 配置確認機制:
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: auto
重新運行程序,當消費者出現異常時,RabbitMQ 會不斷重發消息,由于異常多次重試還是失敗,消息沒被確認,也無法 nack,就一直是unacked
狀態,導致消息積壓。
1.3.3?AcknowledgeMode.MANUAL
- 配置確認機制:
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: manual
- 消費端手動確認邏輯:
@Component
public class AckQueueListener {@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1. 接收消息System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());//2. 處理業務邏輯System.out.println("處理業務邏輯");//手動設置一個異常, 來測試異常拒絕機制// int num = 3/0;//3. 手動簽收channel.basicAck(deliveryTag, true);} catch (Exception e) {//4. 異常了就拒絕簽收//第三個參數requeue, 是否重新發送, 如果為true, 則會重新發送, 若為false, 則直接丟棄channel.basicNack(deliveryTag, true, true);}}
}
運行結果:消息正常處理時會被簽收;異常時會不斷重試。
二、持久性
2.1 交換機持久化
交換器的持久化是通過在聲明交換機時將durable
參數置為true
實現的。這樣當 MQ 的服務器發生意外或關閉之后,重啟 RabbitMQ 時不需要重新去建立交換機,交換機會自動建立。如果交換器不設置持久化,那么在 RabbitMQ 服務重啟之后,相關的交換機元數據會丟失。
ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();
2.2 隊列持久化
隊列的持久化是通過在聲明隊列時將durable
參數置為true
實現的。如果隊列不設置持久化,那么在 RabbitMQ 服務重啟之后,該隊列就會被刪掉,數據也會丟失。但隊列持久化不能保證內部所存儲的消息不丟失,要確保消息不丟失,需要將消息設置為持久化。
QueueBuilder.durable(Constant.ACK_QUEUE).build();
創建非持久化隊列:
QueueBuilder.nonDurable(Constant.ACK_QUEUE).build();
2.3 消息持久化
消息實現持久化,需要把消息的投遞模式(MessageProperties
中的deliveryMode
)設置為 2,也就是MessageDeliveryMode.PERSISTENT
。
// 要發送的消息內容
String message = "This is a persistent message";
// 創建一個Message對象,設置為持久化
Message messageObject = new Message(message.getBytes(), new MessageProperties());
messageObject.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 使用RabbitTemplate發送消息
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", messageObject);
需要注意的是,將所有的消息都設置為持久化,會嚴重影響 RabbitMQ 的性能,因為寫入磁盤的速度比寫入內存的速度慢很多。在選擇是否要將消息持久化時,需要在可靠性和吞吐量之間做一個權衡。
即使將交換器、隊列、消息都設置了持久化,也不能百分之百保證數據不丟失。例如,消費者訂閱隊列時
autoAck
參數設置為true
,消費者接收到消息后還沒來得及處理就宕機,會導致數據丟失;持久化的消息存入 RabbitMQ 后,還需要一段時間才能存入磁盤,如果在這段時間內 RabbitMQ 服務節點發生異常,消息可能會丟失。可以通過引入 RabbitMQ 的仲裁隊列或在發送端引入事務機制、發送方確認機制來提高可靠性。(后續都會講到)
三、發送方確認
3.1 confirm 確認模式
Producer 在發送消息時,對發送端設置一個ConfirmCallback
的監聽,無論消息是否到達Exchange
,這個監聽都會被執行。如果Exchange
成功收到消息,ACK
為true
;如果沒收到消息,ACK
為false
。
配置 RabbitMQ:
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: manual #消息接收確認publisher-confirm-type: correlated #消息發送確認
設置確認回調邏輯并發送消息:
@Configuration
public class RabbitTemplateConfig {@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack){System.out.printf("消息接收成功, id:%s \n", correlationData.getId());}else {System.out.printf("消息接收失敗, id:%s, cause: %s", correlationData.getId(), cause);}}});return rabbitTemplate;}
}@RestController
@RequestMapping("/product")
public class ProductController {@Resource(name = "confirmRabbitTemplate")private RabbitTemplate confirmRabbitTemplate;@RequestMapping("/confirm")public String confirm() throws InterruptedException {CorrelationData correlationData1 = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME, "confirm", "confirm test...", correlationData1);return "確認成功";}
}
- 測試:
運行程序,調用接口http://127.0.0.1:8080/product/confirm
,觀察控制臺,消息確認成功。修改交換機名稱,重新運行,會觸發消息發送失敗的結果。
3.2 return 退回模式
消息到達Exchange
之后,會根據路由規則匹配,把消息放入Queue
中。如果一條消息無法被任何隊列消費,可以選擇把消息退還給發送者。
配置 RabbitMQ:
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: manual #消息接收確認publisher-confirm-type: correlated #消息發送確認
- 設置返回回調邏輯并發送消息:
@Configuration
public class RabbitTemplateConfig {@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(CachingConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.printf("消息被退回: %s", returned);}});return rabbitTemplate;}
}@RestController
@RequestMapping("/product")
public class ProductController {@Resource(name = "confirmRabbitTemplate")private RabbitTemplate confirmRabbitTemplate;@RequestMapping("/msgReturn")public String msgReturn(){CorrelationData correlationData = new CorrelationData("2");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm11", "message return test...", correlationData);return "消息發送成功";}
}
測試:
運行程序,調用接口http://127.0.0.1:8080/product/msgReturn
,觀察控制臺,消息被退回。
四、重試機制
在消息傳遞過程中,可能會遇到各種問題,如網絡故障、服務不可用、資源不足等,這些問題可能導致消息處理失敗。為了解決這些問題,RabbitMQ 提供了重試機制,允許消息在處理失敗后重新發送。但如果是程序邏輯引起的錯誤,那么多次重試也是沒有用的,可以設置重試次數。
4.1 重試配置
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: auto #消息接收確認retry:enabled: true # 開啟消費者失敗重試initial-interval: 5000ms # 初始失敗等待時長為5秒max-attempts: 5 # 最大重試次數(包括自身消費的一次)
4.2 配置交換機 & 隊列
//重試機制
public static final String RETRY_QUEUE = "retry_queue";
public static final String RETRY_EXCHANGE_NAME = "retry_exchange";//重試機制 發布訂閱模式
//1. 交換機
@Bean("retryExchange")
public Exchange retryExchange() {return ExchangeBuilder.fanoutExchange(Constant.RETRY_EXCHANGE_NAME).durable(true).build();
}
//2. 隊列
@Bean("retryQueue")
public Queue retryQueue() {return QueueBuilder.durable(Constant.RETRY_QUEUE).build();
五:如何保證 RabbitMQ 消息的可靠傳輸?
消息可能丟失的場景以及解決方案如下:
生產者將消息發送到 RabbitMQ 失敗:
- 可能原因是網絡問題等,
- 解決辦法是參考
發送方確認 - confirm確認模式
。消息在交換機中無法路由到指定隊列:
- 可能原因是代碼或者配置層面錯誤,導致消息路由失敗,
- 解決辦法是參考
發送方確認 - return模式
。消息隊列自身數據丟失:
- 可能原因是消息到達 RabbitMQ 之后,RabbitMQ Server 宕機導致消息丟失,
- 解決辦法是參考持久化,開啟 RabbitMQ 持久化,也可以通過集群的方式提高可靠性。
消費者異常,導致消息丟失:
- 可能原因是消息到達消費者,還沒來得及消費,消費者宕機或消費者邏輯有問題,
- 解決辦法是參考
消息確認
,開啟手動確認,配置重試機制。
以上就是四個RabbitMQ保證消息可靠性的四個機制,后續有更多核心機制的更新,感謝閱覽!!