【RabbitMQ】 RabbitMQ高級特性(一)

文章目錄

  • 一、消息確認
    • 1.1、消息確認機制
    • 1.2、手動確認方法
      • 1.2.1、AcknowledgeMode.NONE
      • 1.2.2、AcknowledgeMode.AUTO
      • 1.3.3、AcknowledgeMode.MANUAL
  • 二、持久性
    • 2.1、 交換機持久化
    • 2.2、隊列持久化
    • 2.3、消息持久化
  • 三、發送方確認
    • 3.1、confirm確認模式
    • 3.2、return退回模式
    • 3.3、常見面試題
  • 結語


在這里插入圖片描述

一、消息確認

1.1、消息確認機制

生產者發送消息之后, 到達消費端之后, 可能會有以下情況:
a. 消息處理成功
b. 消息處理異常

在這里插入圖片描述
RabbitMQ向消費者發送消息之后, 就會把這條消息刪掉, 那么第兩種情況, 就會造成消息丟失. 那么如何確保消費端已經成功接收了, 并正確處理了呢?
為了保證消息從隊列可靠地到達消費者, RabbitMQ提供了消息確認機制(message acknowledgement)。 消費者在訂閱隊列時,可以指定 autoAck 參數, 根據這個參數設置, 消息確認機制分為以下兩種:

? 自動確認: 當autoAck 等于true時, RabbitMQ 會?動把發送出去的消息置為確認, 然后從內存(或者磁盤)中刪除, 而不管消費者是否真正地消費到了這些消息. 自動確認模式適合對于消息可靠性要求不高的場景.
? 手動確認: 當autoAck等于false時,RabbitMQ會等待消費者顯式地調用Basic.Ack命令, 回復確認信號后才從內存(或者磁盤) 中移去消息. 這種模式適合對消息可靠性要求比較高的場景

代碼示例:

  DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE_NAME1,true,consumer);

當autoAck參數置為false, 對于RabbitMQ服務端額而言, 隊列中的消息分成了兩個部分: ?是等待投遞給消費者的消息. ?是已經投遞給消費者, 但是還沒有收到消費者確認信號的消息. 如果RabbitMQ一直沒有收到消費者的確認信號, 并且消費此消息的消費者已經斷開連接, 則RabbitMQ會安排該消息重新進入隊列,等待投遞給下?個消費者,當然也有可能還是原來的那個消費者

在這里插入圖片描述

從RabbitMQ的Web管理平臺上, 也可以看到當前隊列中Ready狀態和Unacked狀態的消息數:

在這里插入圖片描述

Ready: 等待投遞給消費者的消息數
Unacked: 已經投遞給消費者, 但是未收到消費者確認信號的消息數

1.2、手動確認方法

消費者在收到消息之后, 可以選擇確認, 也可以選擇直接拒絕或者跳過, RabbitMQ也提供了不同的確認應答的方式, 消費者客戶端可以調?與其對應的channel的相關?法, 共有以下三種:
1. 肯定確認: Channel.basicAck(long deliveryTag, boolean multiple)

RabbitMQ 已知道該消息并且成功的處理消息. 可以將其丟棄了.

參數說明:

  1. deliveryTag: 消息的唯?標識,它是?個單調遞增的64 位的?整型值. deliveryTag 是每個通道(Channel)獨?維護的, 所以在每個通道上都是唯?的. 當消費者確認(ack)?條消息時, 必須使?對應 的通道上進?確認.
    2 ) multiple: 是否批量確認. 在某些情況下, 為了減少?絡流量, 可以對?系列連續的 deliveryTag 進行批量確認. 值為 true 則會?次性 ack所有?于或等于指定 deliveryTag 的消息. 值為false, 則只確認當 前指定deliveryTag的消息

若deliverTag = 8時,如果此時multiple 為true ,那么此時8以前的消息都會被確認
反之,如果multiple為false ,那么此時只確認消息8。

deliveryTag 是RabbitMQ中消息確認機制的?個重要組成部分, 它確保了消息傳遞的可靠性和順序性。

2. 否定確認: Channel.basicReject(long deliveryTag, boolean requeue)

RabbitMQ在2.0.0版本開始引?了 Basic.Reject 這個命令, 消費者客?端可以調用channel.basicReject方法來告訴RabbitMQ拒絕這個消息.
參數說明:

  1. deliveryTag: 參考channel.basicAck
  2. requeue: 表?拒絕后, 這條消息如何處理. 如果requeue 參數設置為true, 則RabbitMQ會重新將這條 消息存?隊列,以便可以發送給下?個訂閱的消費者.如果requeue參數設置為false, 則RabbitMQ會把 消息從隊列中移除, ?不會把它發送給新的消費者

3. 否定確認: Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
Basic.Reject命令?次只能拒絕?條消息,如果想要批量拒絕消息,則可以使?Basic.Nack這個命令. 消費者客戶端可以調用channel.basicNack方法來實現.
參數介紹參考上面兩個方法.
multiple參數設置為true則表示拒絕deliveryTag編號之前所有未被當前消費者確認的消息.

代碼示例:
Spring-AMQP 對消息確認機制提供了三種策略

public enum AcknowledgeMode {NONE,MANUAL,AUTO; 
}
  1. AcknowledgeMode.NONE
    ? 這種模式下, 消息?旦投遞給消費者, 不管消費者是否成功處理了消息, RabbitMQ 就會自動確認 消息, 從RabbitMQ隊列中移除消息. 如果消費者處理消息失敗, 消息可能會丟失.
  2. AcknowledgeMode.AUTO(默認)
    ? 這種模式下, 消費者在消息處理成功時會自動確認消息, 但如果處理過程中拋出了異常, 則不會確認消息.
  3. AcknowledgeMode.MANUAL
    ? ?動確認模式下, 消費者必須在成功處理消息后顯式調? basicAck ?法來確認消息. 如果消息未被確認, RabbitMQ 會認為消息尚未被成功處理, 并且會在消費者可用時重新投遞該消息, 這 種模式提?了消息處理的可靠性, 因為即使消費者處理消息后失敗, 消息也不會丟失, 而是可以被重新處理.

主要流程:

  1. 配置確認機制(自動確認/手動機制)
  2. 生產者發送消息
  3. 消費端邏輯
  4. 測試

1.2.1、AcknowledgeMode.NONE

  1. 配置確認機制
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: none
  1. 發送消息:
    隊列,交換機配置
public class Constant {public static final String ACK_EXCHANGE_NAME = "ack_exchange";public static final String ACK_QUEUE = "ack_queue"; 
}/*以下為消費端?動應答代碼?例配置*/@Bean("ackExchange")public Exchange ackExchange() {returnExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();}//2. 隊列@Bean("ackQueue")public Queue ackQueue() {return QueueBuilder.durable(Constant.ACK_QUEUE).build();}//3. 隊列和交換機綁定 Binding@Bean("ackBinding")public Binding ackBinding(@Qualifier("ackExchange") Exchange exchange,@Qualifier("ackQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ack").noargs();}

通過接口發送消息:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController@RequestMapping("/producer")public class ProductController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack() {rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack","consumer ack test...");return "發送成功!";}}
  1. 寫消費端邏輯
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Componentpublic class AckQueueListener {//指定監聽隊列的名稱@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throwsException {System.out.printf("接收到消息: %s, deliveryTag: %d%n", newString(message.getBody(), "UTF-8"),message.getMessageProperties().getDeliveryTag());//模擬處理失敗//int num = 3 / 0;System.out.println("處理完成");}

這個代碼運行的結果是正常的, 運行后消息會被簽收: Ready為0, unacked為0

  1. 運行程序
    調用接口, 發送消息 可以看到隊列中有?條消息, unacked的為0(需要先把消費者注掉)

在這里插入圖片描述

開啟消費者, 控制臺輸出:

接收到消息: consumer ack test..., deliveryTag: 1
2 2024-04-29T17:03:57.797+08:00 WARN 16952 --- [ntContainer#0-1] 
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message 
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: 
Listener method 'public void 
com.bite.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception
//....

管理界面:
在這里插入圖片描述

1.2.2、AcknowledgeMode.AUTO

1、配置確認機制

spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: auto
  1. 重新運行程序
    調用接口, 發送消息 可以看到隊列中有?條消息, unacked的為0(需要先把消費者注掉)

在這里插入圖片描述
開啟消費者, 控制臺不斷輸出錯誤信息:

接收到消息: consumer ack test..., deliveryTag: 1
2024-04-29T17:07:06.114+08:00 WARN 16488 --- [ntContainer#0-1] 
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message 
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: 
Listener method 'public void 
com.bite.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception接收到消息: consumer ack test..., deliveryTag: 2
2024-04-29T17:07:07.161+08:00 WARN 16488 --- [ntContainer#0-1] 
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message 
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: 
Listener method 'public void 
com.bite.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception接收到消息: consumer ack test..., deliveryTag: 3
2024-04-29T17:07:08.208+08:00 WARN 16488 --- [ntContainer#0-1] 
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message 
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: 
Listener method 'public void 
com.bite.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception

在這里插入圖片描述
從日志上可以看出, 當消費者出現異常時, RabbitMQ會不斷的重發. 由于異常,多次重試還是失敗,消息沒被確認,也無法nack,就?直是unacked狀態,導致消息積壓

1.3.3、AcknowledgeMode.MANUAL

  1. 配置確認機制
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: manual
  1. 消費端?動確認邏輯
import com.bite.rabbitmq.constant.Constant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Componentpublic class AckQueueListener {//指定監聽隊列的名稱@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throwsException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1. 接收消息 System.out.printf("接收到消息: %s, deliveryTag: %d%n", newString(message.getBody(), "UTF-8"),message.getMessageProperties().getDeliveryTag());//2. 處理業務邏輯System.out.println("處理業務邏輯");//?動設置?個異常, 來測試異常拒絕機制// int num = 3/0;//3. ?動簽收channel.basicAck(deliveryTag, true);} catch (Exception e) {//4. 異常了就拒絕簽收//第三個參數requeue, 是否重新發送, 如果為true, 則會重新發送,,若為false, 則直接丟棄channel.basicNack(deliveryTag, true, true);}}}

這個代碼運行的結果是正常的, 運行后消息會被簽收: Ready為0, unacked為0

控制臺輸出:

 接收到消息: consumer ack test..., deliveryTag: 1
處理業務邏輯.....

管理界面:

在這里插入圖片描述

  1. 異常時拒絕簽收
  @Componentpublic class AckQueueListener {//指定監聽隊列的名稱@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throwsException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1. 接收消息 System.out.printf("接收到消息: %s, deliveryTag: %d%n", newString(message.getBody(), "UTF-8"),message.getMessageProperties().getDeliveryTag());//2. 處理業務邏輯System.out.println("處理業務邏輯");//?動設置?個異常, 來測試異常拒絕機制int num = 3/0;//3. ?動簽收channel.basicAck(deliveryTag, true);} catch (Exception e) {//4. 異常了就拒絕簽收//第三個參數requeue, 是否重新發送, 如果為true, 則會重新發送,,若為false, 則直接丟棄channel.basicNack(deliveryTag, true, true);}}}

運行結果: 消費異常時不斷重試, deliveryTag 從1遞增控制臺日志:

接收到消息: consumer ack test..., deliveryTag: 1 
處理業務邏輯 接收到消息: consumer ack test..., deliveryTag: 2 
處理業務邏輯 接收到消息: consumer ack test..., deliveryTag: 3 
處理業務邏輯 接收到消息: consumer ack test..., deliveryTag: 4 
處理業務邏輯 接收到消息: consumer ack test..., deliveryTag: 5 
處理業務邏輯 接收到消息: consumer ack test..., deliveryTag: 6
處理業務邏輯.....

管理界?上unacked也變成了1

在這里插入圖片描述

Unacked的狀態變化很快, 為方便觀察, 消費消息前增加?下休眠時間Thread.sleep(10000);

二、持久性

在前?講了消費端處理消息時, 消息如何不丟失, 但是如何保證當RabbitMQ服務停掉以后, 生產者發送的消息不丟失呢. 默認情況下, RabbitMQ 退出或者由于某種原因崩潰時, 會忽視隊列和消息, 除非告知他不要這么做.

RabbitMQ的持久化分為三個部分:交換器的持久化、隊列的持久化和消息的持久化.

2.1、 交換機持久化

交換器的持久化是通過在聲明交換機時是將durable參數置為true實現的.相當于將交換機的屬性在服務器內部保存,當MQ的服務器發生意外或關閉之后,重啟 RabbitMQ 時不需要重新去建立交換機, 交換 機會自動建立,相當于?直存在. 如果交換器不設置持久化, 那么在 RabbitMQ 服務重啟之后, 相關的交換機元數據會丟失, 對?個?期 使用的交換器來說,建議將其置為持久化的.

ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();

2.2、隊列持久化

隊列的持久化是通過在聲明隊列時將 durable 參數置為 true實現的. 如果隊列不設置持久化, 那么在RabbitMQ服務重啟之后,該隊列就會被刪掉, 此時數據也會丟失. (隊列沒 有了, 消息也無處可存了)。 隊列的持久化能保證該隊列本身的元數據不會因異常情況而丟失, 但是并不能保證內部所存儲的消息不會丟失. 要確保消息不會丟失, 需要將消息設置為持久化. 咱們前面用的創建隊列的方式都是持久化的

 QueueBuilder.durable(Constant.ACK_QUEUE).build();

點進去看源碼會發現,該?法默認durable 是true

public static QueueBuilder durable(String name) {return (new QueueBuilder(name)).setDurable();
}
private QueueBuilder setDurable() {this.durable = true;return this; 
}

通過下?代碼,可以創建非持久化的隊列

QueueBuilder.nonDurable(Constant.ACK_QUEUE).build();

2.3、消息持久化

消息實現持久化, 需要把消息的投遞模式( MessageProperties 中的 deliveryMode )設置為2,也就是MessageDeliveryMode.PERSISTENT

NON_PERSISTENT,//?持久化
PERSISTENT;//持久化

設置了隊列和消息的持久化, 當 RabbitMQ 服務重啟之后, 消息依舊存在. 如果只設置隊列持久化, 重啟之后消息會丟失. 如果只設置消息的持久化, 重啟之后隊列消失, 繼而消息也丟失. 所以單單設置消息 持久化而不設置隊列的持久化顯得毫無意義

//?持久化信息
channel.basicPublish(“”,QUEUE_NAME,null,msg.getBytes());
//持久化信息
channel.basicPublish(“”,QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());

MessageProperties.PERSISTENT_TEXT_PLAIN 實際就是封裝了這個屬性

  public static final BasicProperties PERSISTENT_TEXT_PLAIN =new BasicProperties("text/plain",null,null,2, //deliveryMode0, null, null, null,null, null, null, null,null, null);

如果使?RabbitTemplate 發送持久化消息, 代碼如下:

// 要發送的消息內容
String message = "This is a persistent message";
// 創建?個Message對象,設置為持久化
Message messageObject = new Message(message.getBytes(), newMessageProperties());
messageObject.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 使?RabbitTemplate發送消息
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack",  messageObject);

注意:RabbitMQ默認情況下會將消息視為持久化的,除?隊列被聲明為非持久化,或者消息在發送時被標記為非持久化

將所有的消息都設置為持久化, 會嚴重影響RabbitMQ的性能(隨機). 寫入磁盤的速度比寫入內 存的速度慢得不只一點點. 對于可靠性不是那么高的消息可以不采用持久化處理以提高整體的吞吐量. 在選擇是否要將消息持久化時, 需要在可靠性和吐吞量之間做?個權衡

將交換器、隊列、消息都設置了持久化之后就能百分之百保證數據不丟失了嗎? 答案是否定的

  1. 從消費者來說, 如果在訂閱消費隊列時將autoAck參數設置為true, 那么當消費者接收到相關消息之 后, 還沒來得及處理就宕機了, 這樣也算數據居丟失. 這種情況很好解決, 將autoAck參數設置為false, 并進行手動確認。
    2 . 在持久化的消息正確存?RabbitMQ之后,還需要有?段時間(雖然很短,但是不可忽視)才能存入磁盤 中.RabbitMQ并不會為每條消息都進行同步存盤(調用內核的fsync方法)的處理, 可能僅僅保存到操 作系統緩存之中而不是物理磁盤之中. 如果在這段時間內RabbitMQ服務節點發生了宕機、重啟等異 常情況, 消息保存還沒來得及落盤, 那么這些消息將會丟失

這個問題怎么解決呢?

可以在發送端引入事務機制或者發送?確認機制來保證消息已經正確地發送并存儲至RabbitMQ中,即"發送方確認"

三、發送方確認

在使用 RabbitMQ的時候, 可以通過消息持久化來解決因為服務器的異常崩潰?導致的消息丟失, 但是還 有?個問題, 當消息的?產者將消息發送出去之后, 消息到底有沒有正確地到達服務器呢? 如果在消息到 達服務器之前已經丟失(比如RabbitMQ重啟, 那么RabbitMQ重啟期間生產者消息投遞失敗), 持久化操作也解決不了這個問題,因為消息根本沒有到達服務器,何談持久化?
RabbitMQ為我們提供了兩種解決?案:
a. 通過事務機制實現
b. 通過發送方確認(publisher confirm) 機制實現

事務機制?較消耗性能, 在實際工作中使用也不多, 咱們主要介紹confirm機制來實現發送方的確認.RabbitMQ為我們提供了兩個方式來控制消息的可靠性投遞

  1. confirm確認模式
  2. return退回模式

3.1、confirm確認模式

Producer 在發送消息的時候, 對發送端設置?個ConfirmCallback的監聽, 無論消息是否到達Exchange, 這個監聽都會被執行, 如果Exchange成功收到, ACK( Acknowledge character , 確認字符)為true, 如果沒收到消息, ACK就為false.
步驟如下:

  1. 配置RabbitMQ
  2. 設置確認回調邏輯并發送消息

接下來看實現步驟

  1. 配置RabbitMQ
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/numslistener:simple:acknowledge-mode: manual #消息接收確認publisher-confirm-type: correlated #消息發送確認
  1. 設置確認回調邏輯并發送消息
    ?論消息確認成功還是失敗, 都會調?ConfirmCallback的confirm方法. 如果消息成功發送到Broker,ack為true.如果消息發送失敗, ack為false, 并且cause提供失敗的原因
  @Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(ConnectionFactoryconnectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack,String cause) {System.out.printf("");if (ack) {System.out.printf("消息接收成功, id:%s \n",correlationData.getId());} else {System.out.printf("消息接收失敗, id:%s, cause: %s",correlationData.getId(), cause);}}});return rabbitTemplate;}@Resource(name = "confirmRabbitTemplate")private RabbitTemplate confirmRabbitTemplate;@RequestMapping("/confirm")public String confirm() throws InterruptedExceptionCorrelationData correlationData1 = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME,"confirm","confirm test...",correlationData1);return"確認成功";
}

?法說明:

public interface ConfirmCallback {/*** 確認回調* @param correlationData: 發送消息時的附加信息, 通常?于在確認回調中識別特定的消 息* @param ack: 交換機是否收到消息, 收到為true, 未收到為false* @param cause: 當消息確認失敗時,這個字符串參數將提供失敗的原因.這個原因可以?于調 試和錯誤處理.* 成功時, cause為null */void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);
}

RabbitTemplate.ConfirmCallback 和 ConfirmListener 區別
在RabbitMQ中, ConfirmListener和ConfirmCallback都是用來處理消息確認的機制, 但它們屬于不同的客戶端庫, 并且使用的場景和方式有所不同.
1 . ConfirmListener 是 RabbitMQ Java Client 庫中的接口. 這個庫是 RabbitMQ 官方提供的一個直接與RabbitMQ服務器交互的客戶端庫. ConfirmListener 接口提供了兩個方法: handleAck 和handleNack, 用于處理消息確認和否定確認的事件.
2 . ConfirmCallback 是 Spring AMQP 框架中的?個接口. 專門為Spring環境設計. 用于簡化與RabbitMQ交互的過程. 它只包含?個 confirm方法,?于處理消息確認的回調.

在 Spring Boot 應用中, 通常會使用 ConfirmCallback, 因為它與 Spring 框架的其他部分更加整合, 可 以利用 Spring 的配置和依賴注入功能. 而在使用 RabbitMQ Java Client 庫時, 則可能會直接實現ConfirmListener 接口, 更直接的RabbitMQChannel交互

3.2、return退回模式

消息到達Exchange之后, 會根據路由規則匹配, 把消息放?Queue中. Exchange到Queue的過程, 如果一條消息無法被任何隊列消費(即沒有隊列與消息的路由鍵匹配或隊列不存在等), 可以選擇把消息退回 給發送者. 消息退回給發送者時, 我們可以設置?個返回回調方法, 對消息進行處理.

步驟如下:

  1. 配置RabbitMQ
  2. 設置返回回調邏輯并發送消息

1 . 配置RabbitMQ

spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/numslistener:simple:acknowledge-mode: manual #消息接收確認publisher-confirm-type: correlated #消息發送確認
  1. 設置返回回調邏輯并發送消息

這里是引用消息無法被路由到任何隊列, 它將返回給發送者,這時setReturnCallback設置的回調將被觸發

 @Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(CachingConnectionFactoryconnectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.printf("消息被退回: %s", returned);}});return rabbitTemplate;}@RequestMapping("/msgReturn")public String msgReturn() {CorrelationData correlationData = new CorrelationData("2");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm11", "message return test...", correlationData);return "消息發送成功";}

使?RabbitTemplate的setMandatory方法設置消息的mandatory屬性為true(默認為false). 這個屬性的作用是告訴RabbitMQ, 如果?條消息無法被任何隊列消費, RabbitMQ應該將消息返回給發送者, 此 時 ReturnCallback 就會被觸發.

回調函數中有?個參數: ReturnedMessage, 包含以下屬性:

  public class ReturnedMessage {//返回的消息對象,包含了消息體和消息屬性private final Message message;//由Broker提供的回復碼, 表?消息?法路由的原因. 通常是?個數字代碼,每個數字代表不同 的含義. private final int replyCode;//?個?本字符串, 提供了?法路由消息的額外信息或錯誤描述.private final String replyText;//消息被發送到的交換機名稱private final String exchange;//消息的路由鍵,即發送消息時指定的鍵private final String routingKey;}

3.3、常見面試題

如何保證RabbitMQ消息的可靠傳輸?
先放?張RabbitMQ消息傳遞圖:

在這里插入圖片描述

從這個圖中, 可以看出, 消息可能丟失的場景以及解決?案:

  1. ?產者將消息發送到 RabbitMQ失敗
    a. 可能原因: 網絡問題等
    b. 解決辦法: 參考本章節[發送方確認-confirm確認模式]
  2. 消息在交換機中無法路由到指定隊列:
    a. 可能原因: 代碼或者配置層面錯誤, 導致消息路由失敗
    b. 解決辦法: 參考本章節[發送方確認-return模式]
  3. 消息隊列自身數據丟失
    a. 可能原因: 消息到達RabbitMQ之后, RabbitMQ Server 宕機導致消息丟失.
    b. 解決辦法: 參考本章節[持久性]. 開啟 RabbitMQ持久化, 就是消息寫入之后會持久化到磁盤, 如果RabbitMQ 掛了, 恢復之后會自動讀取之前存儲的數據. (極端情況下, RabbitMQ還未持久化就掛了, 可能導致少量數據丟失, 這個概率極低, 也可以通過集群的方式提高可靠性)
  4. 消費者異常, 導致消息丟失
    a. 可能原因: 消息到達消費者, 還沒來得及消費, 消費者宕機. 消費者邏輯有問題.
    b. 解決辦法: RabbitMQ 提供了 消費者應答機制來使 RabbitMQ 能夠感知 到消費者是否消費成功消息. 默認情況下消費者應答機制是自動應答的, 可以開啟手動確認, 當消費者確認消費成功后才會刪除消息, 從而避免消息丟失. 除此之外, 也可以配置重試機制(下篇文章會為大家介紹), 當消息消費異常時, 通過消息重試確保消息的可靠性

結語

本篇文章主要介紹了RAbbitMQ中的部分高級特性,主要從消息確認,持久化,發送方確認三個方面展開
以上就是本文全部內容,感謝各位能夠看到最后,如有問題,歡迎各位大佬在評論區指正,希望大家可以有所收獲!創作不易,希望大家多多支持!

最后,大家再見!祝好!我們下期見!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/905018.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/905018.shtml
英文地址,請注明出處:http://en.pswp.cn/news/905018.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

探索Hello Robot開源移動操作機器人Stretch 3的技術亮點與市場定位

Hello Robot 推出的 Stretch 3 機器人憑借其前沿技術和多功能性在眾多產品中占據優勢。Stretch 3 機器人采用開源設計,為開發者提供了靈活的定制空間,能夠滿足各種不同的需求。其配備的靈活手腕組件和 Intel Realsense D405 攝像頭,顯著增強了…

expo多網絡請求設定。

在使用 npx expo start 啟動 Expo 開發服務器時,你可以通過設置網絡模式來控制你的應用如何連接到開發服務器。Expo 提供了幾種網絡模式供你選擇: LAN (Default): 這是默認模式。在這種模式下,你的應用會通過本地局域網 (LAN) 連接到你的開發…

Nginx 安全防護與HTTPS部署

目錄 一、核心安全配置 1、隱藏版本號 2、限制危險請求方法 3、請求限制(CC攻擊防御) (1)使用Nginx的limit_req模塊限制請求速率 (2)壓力測試驗證 4、防盜鏈 (1)修改 Window…

windows 環境下 python環境安裝與配置

運行環境安裝 第一步安裝包下載 python開發工具安裝包下載官網: https://www.python.org/ 根據自己的實際需求選擇。 這里記錄了各個版本的區別和差異。根據區別和差異選擇適合自己的版本。 Windows Installer和Windows embeddable package是兩種不同的軟件包類…

TB6600HG是一款PWM(脈寬調制)斬波型單芯片雙極性正弦波微步進電機驅動集成電路。

該驅動器支持電機的正向和反向旋轉控制,并具有多種激勵模式,包括2相、1-2相、W1-2相、2W1-2相和4W1-2相。 使用這款驅動器,只需時鐘信號即可驅動2相雙極性步進電機,且振動小、效率高。 主要特點: 單芯片雙極性正弦波…

【JS逆向基礎】爬蟲核心模塊:request模塊與包的概念

前言:這篇文章主要介紹JS逆向爬蟲中最常用的request模塊,然后引出一系列的模塊的概念,當然Python中其他比較常用的還有很多模塊,正是這些模塊也可以稱之為庫的東西構成了Python強大的生態,使其幾乎可以實現任何功能。下…

極狐Gitlab 里程碑功能介紹

極狐GitLab 是 GitLab 在中國的發行版,關于中文參考文檔和資料有: 極狐GitLab 中文文檔極狐GitLab 中文論壇極狐GitLab 官網 里程碑 (BASIC ALL) 極狐GitLab 中的里程碑是一種跟蹤議題和合并請求的方法,這些請求是為了在特定時間段內實現更…

【日擼 Java 三百行】Day 10(綜合任務 1)

目錄 Day 10:綜合任務 1 一、題目分析 1. 數據結構 2. 相關函數基本知識 二、模塊介紹 1. 初始化與成績矩陣的構建 2. 創建總成績數組 3. 尋找成績極值 三、代碼與測試 小結 拓展:關于求極值的相關算法 Day 10:綜合任務 1 Task&…

c++:庫(Library)

目錄 什么是庫? C中庫的兩種形態:靜態庫 和 動態庫 靜態鏈接 vs 動態鏈接(鏈接 ≠ 庫) 🔒 靜態鏈接(Static Linking) 🔗 動態鏈接(Dynamic Linking) C標…

Java線程池深度解析:從使用到原理全面掌握

在高并發場景下,線程管理是提升系統性能的關鍵。本文將深入探討Java線程池的核心機制,帶你從基礎使用到底層實現全面掌握這一重要技術。 一、線程池存在的意義 1.1 線程的隱形成本 盡管線程相比進程更輕量,但當QPS達到萬級時: 頻…

PostgreSQL 的 pg_advisory_lock_shared 函數

PostgreSQL 的 pg_advisory_lock_shared 函數詳解 pg_advisory_lock_shared 是 PostgreSQL 提供的共享咨詢鎖函數,允許多個會話同時獲取相同鍵值的共享鎖,但排斥排他鎖。 共享咨詢鎖 vs 排他咨詢鎖 鎖類型共享鎖 (pg_advisory_lock_shared)排他鎖 (pg…

Halcon之計算抓取螺母的位姿

文章目錄 1,項目說明。2,注意事項3,關聯的主要算子3.1, gen_parallels_xld 3.2 ,convert_pose_type 4,程序流程。5,代碼6,Demo鏈接。 1,項目說明。 Robot標定使用的模式…

互聯網大廠Java求職面試:AI集成場景下的技術挑戰與架構設計

標題:互聯網大廠Java求職面試:AI集成場景下的技術挑戰與架構設計 第一幕:向量數據庫選型與性能調優 技術總監(嚴肅臉): 鄭薪苦,我們最近在做一個基于大語言模型的企業級AI應用,需要…

ABB電機控制和保護單元與Profibus DP主站轉Modbus TCP網關快速通訊案例

ABB電機控制和保護單元與Profibus DP主站轉Modbus TCP網關快速通訊案例 在現代工業自動化系統中,設備之間的互聯互通至關重要。Profibus DP和Modbus TCP是兩種常見的通信協議,分別應用于不同的場景。為了實現這兩種協議的相互轉換,Profibus …

智慧農業、智慧養殖平臺—監控攝像頭管理監控設計—仙盟創夢IDE

智慧養殖 監控攝像頭是核心管理工具,主要通過以下方式提升養殖效率與管理水平: 環境實時監測:對養殖區域進行全天候可視化監控,及時捕捉溫度、濕度、通風等環境要素變化,確保動物生存環境穩定 。例如在規模化豬場&…

YOLO 從入門到精通學習指南

一、引言 在計算機視覺領域,目標檢測是一項至關重要的任務,其應用場景廣泛,涵蓋安防監控、自動駕駛、智能交通等眾多領域。YOLO(You Only Look Once)作為目標檢測領域的經典算法系列,以其高效、快速的特點受到了廣泛的關注和應用。本學習指南將帶領你從 YOLO 的基礎概念…

Java 24新特性深度解析:從優化技巧到高手進階指南

一、Java 24核心新特性詳解 Java 24作為長期支持版本(LTS),帶來了許多令人振奮的新特性,下面我們將深入探討其中最值得關注的改進。 1. 字符串模板(String Templates)正式發布 字符串模板結束了Java字符串拼接的混亂時代&#…

《類和對象(中)》

引言: 上次我們主要學習了類的相關知識,今天我們就來學習類和對象(中),今天也會用到之前學習過的東西,可以說是前面知識的結合,較前面會難一點(打個預防針)。 一:類的默認成員函數…

為什么 AI 理解不了邏輯問題?

人類擅長“如果 A 則 B”,AI 擅長“這個像那個”。邏輯推理?對它來說是一場災難性的認知挑戰。 前言 在實際使用 AI(尤其是大型語言模型,比如 GPT、Claude、Gemini 等)時,我們常發現一個詭異的現象:它們文采斐然,甚至能講出笑話,但一旦問點小學奧數或邏輯問題,就集體…

C# 使用SunnyUI控件 (VS 2019)

前言:建議下載源碼,源碼中包含了各種控件的用法案例。 下載 幫助文檔: 文檔預覽 - Gitee.comGitee: SunnyUI: SunnyUI.NET 是基于.NET Framework 4.0、.NET8、.NET9 框架的 C# WinForm UI、開源控件庫、工具類庫、擴展類庫、多頁面開發框架。GitHub: h…