【RabbitMQ】高級特性:持久性·發送方確認·重試機制·TTL·死信隊列·延遲隊列·事務·消息分發

RabbitMQ的高級特性還包括我的上篇博客

【RabbitMQ】-----詳解RabbitMQ高級特性之消息確認機制-CSDN博客

目錄

RabbitMQ高級特性之持久性

持久性

交換機持久化

隊列持久化+消息持久化

RabbitMQ高級特性之發送方確認機制

發送方確認

添加配置

常量類

聲明隊列和交換機并綁定二者關系

confirm確認模式?

編寫生產消息代碼

生產消息1

解決方法

解決方法

return 模式

編寫生產消息代碼(路由正確)

編寫生產消息代碼(路由錯誤)

面試題

RabbitMQ高級特性之重試機制

配置類

常量類

聲明隊列和交換機并綁定二者關系

編寫生產消息代碼

編寫消費消息代碼1(自動確認)

編寫消費消息代碼2(自動確認)

編寫消費消息代碼3(手動確認)

RabbitMQ高級特性之TTL

TTL

添加配置

常量類

消息的TTL

聲明隊列和交換機并綁定二者關系

編寫生產消息代碼

生產消息

隊列的TTL

聲明隊列和交換機并綁定二者關系

編寫生產消息代碼

生產消息(消息無TTL)

生產消息(消息有TTL)

消息的TTL和隊列的TTL

RabbitMQ高級特性之死信隊列

死信隊列

添加配置

常量類

聲明隊列和交換機并綁定二者關系

死信--消息過期

給隊列設置TTL

編寫生產消息代碼

編寫消費消息代碼

觀察現象

死信--消息超過隊列最大長度

設置隊列的最大長度

編寫生產消息代碼

編寫消費消息代碼

死信--消息被拒絕

編寫生產消息代碼

面試題

RabbitMQ高級特性之延遲隊列

延遲隊列

應用場景

TTL+死信隊列 實現延遲隊列

添加配置

常量類

聲明隊列和交換機并綁定二者關系

編寫生產消息代碼

編寫消費消息代碼

修改生產消息代碼

延遲隊列插件

兩者的區別:

RabbitMQ高級特性之事務

RabbitMQ高級特性之消息分發

消息分發

添加配置

常量類

聲明配置和交換機并綁定二者關系

限流

編寫生產消息代碼

編寫消費消息代碼1

編寫消費消息代碼2

負載均衡

更改配置

編寫消費消息代碼


RabbitMQ高級特性之持久性

持久性

RabbitMQ的持久化分為三個部分:交換器的持久化、隊列的持久化和消息的持久化。

交換機持久化

交換器的持久化是通過在聲明交換機時是將durable參數置為true實現的.相當于將交換機的屬性在服務器內部保存,當MQ的服務器發?意外或關閉之后,重啟 RabbitMQ 時不需要重新去建?交換機, 交換機會?動建?,相當于?直存在.

如果交換器不設置持久化, 那么在 RabbitMQ 服務重啟之后, 相關的交換機元數據會丟失, 對?個?期使?的交換器來說,建議將其置為持久化的.

重啟RabbitMQ服務前:

重啟RabbitMQ服務后:

由此我們可以看到,重啟RabbitMQ服務之后,重啟服務之前聲明的持久化交換機依舊存在

交換機設置了非持久化之后,當重啟RabbitMQ服務之后,會發現之前聲明的非持久化交換機沒有了,因為消息依托于隊列,而隊列依托于交換機,所以當交換機非持久化之后,無論消息和隊列是否持久化,當重啟RabbitMQ之后,消息,隊列和交換機都不復存在了

隊列持久化+消息持久化

因為消息和隊列二者之間的關系比較強,所以下面將隊列的持久化和非持久化與消息的持久化和非持久化一起來講解。

隊列的持久化是通過在聲明隊列時將 durable 參數置為 true實現的.
如果隊列不設置持久化, 那么在RabbitMQ服務重啟之后,該隊列就會被刪掉, 此時數據也會丟失. (隊列沒有了, 消息也?處可存了)

隊列的持久化能保證該隊列本?的元數據不會因異常情況?丟失, 但是并不能保證內部所存儲的消息不會丟失. 要確保消息不會丟失, 需要將消息設置為持久化。

消息實現持久化, 需要把消息的投遞模式( MessageProperties 中的 deliveryMode )設置為2,也就是 MessageDeliveryMode.PERSISTENT。
設置了隊列和消息的持久化, 當 RabbitMQ 服務重啟之后, 消息依舊存在. 如果只設置隊列持久化, 重啟之后消息會丟失. 如果只設置消息的持久化, 重啟之后隊列消失, 繼?消息也丟失. 所以單單設置消息持久化?不設置隊列的持久化顯得毫?意義

隊列持久+消息持久

生產消息

重啟RabbitMQ服務后:

此時我們可以看到,當重啟RabbitMQ服務后,重啟RabbitMQ服務之前聲明的持久化隊列和持久化消息都還是存在的。?

隊列持久+消息非持久

重啟RabbitMQ服務后:

隊列非持久+消息持久

重啟RabbitMQ服務后:

隊列非持久+消息非持久

重啟RabbitMQ服務后:

所以只有當交換機+隊列+消息都是持久化的時候才具有持久性

RabbitMQ高級特性之發送方確認機制

發送方確認

在使? RabbitMQ的時候, 可以通過消息持久化來解決因為服務器的異常崩潰?導致的消息丟失, 但是還有?個問題, 當消息的?產者將消息發送出去之后, 消息到底有沒有正確地到達服務器呢? 如果在消息到達服務器之前已經丟失(?如RabbitMQ重啟, 那么RabbitMQ重啟期間?產者消息投遞失敗), 持久化操作也解決不了這個問題,因為消息根本沒有到達服務器,何談持久化?

RabbitMQ為我們提供了兩種解決?案:

a. 通過事務機制實現
b. 通過發送?確認(publisher confirm) 機制實現

事務機制?較消耗性能, 在實際?作中使?也不多, 下面主要介紹confirm機制來實現發送?的確認.

RabbitMQ為我們提供了兩個?式來控制消息的可靠性投遞:

1. confirm確認模式
2. return退回模式

添加配置
spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://study:study@47.98.109.138:5672/extensionpublisher-confirm-type: correlated   #消息發送確認
常量類
public class Constants {//發送方確認public static final String CONFIRM_QUEUE = "confirm.queue";public static final String CONFIRM_EXCHANGE = "confirm.exchange";
}
聲明隊列和交換機并綁定二者關系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import rabbitextensionsdemo.constant.Constants;@Configuration
public class RabbitMQConfig {//發送方確認@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(Constants.CONFIRM_QUEUE).build();}@Bean("confirmExchange")public DirectExchange confirmExchange(){return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE).build();}@Bean("confirmBinding")public Binding confirmBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("confirm").noargs();}
}
confirm確認模式?

Producer 在發送消息的時候, 對發送端設置?個ConfirmCallback的監聽, ?論消息是否到達
Exchange, 這個監聽都會被執?, 如果Exchange成功收到, ACK( Acknowledge character , 確認字符)為true, 如果沒收到消息, ACK就為false。

RabbitTemplate.ConfirmCallback 和 ConfirmListener 區別

在RabbitMQ中, ConfirmListener和ConfirmCallback都是?來處理消息確認的機制, 但它們屬于不同的客?端庫, 并且使?的場景和?式有所不同.

1. ConfirmListener 是 RabbitMQ Java Client 庫中的接?. 這個庫是 RabbitMQ 官?提供的?個直接與RabbitMQ服務器交互的客?端庫. ConfirmListener 接?提供了兩個?法: handleAck 和handleNack, ?于處理消息確認和否定確認的事件.

2. ConfirmCallback 是 Spring AMQP 框架中的?個接?. 專?為Spring環境設計. ?于簡化與
RabbitMQ交互的過程. 它只包含?個 confirm ?法,?于處理消息確認的回調.
在 Spring Boot 應?中, 通常會使? ConfirmCallback, 因為它與 Spring 框架的其他部分更加整合, 可以利? Spring 的配置和依賴注?功能. ?在使? RabbitMQ Java Client 庫時, 則可能會直接實現ConfirmListener 接?, 更直接的與RabbitMQ的Channel交互

編寫生產消息代碼
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import rabbitextensionsdemo.constant.Constants;import java.util.Date;@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate confirmRabbitTemplate;@RequestMapping("/confirm")public String confirm() {confirmRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("執行了confirm方法");if (ack){System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);//相應的業務處理}}});CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm", "confirm test...", correlationData);return "消息發送成功";}
}
public interface ConfirmCallback {/*** 確認回調* @param correlationData: 發送消息時的附加信息 , 通常?于在確認回調中識別特定的消 息* @param ack: 交換機是否收到消息 , 收到為 true, 未收到為 false* @param cause: 當消息確認失敗時 , 這個字符串參數將提供失敗的原因 . 這個原因可以?于調 試和錯誤處理 .* 成功時 , cause 為 null*/void confirm ( @Nullable CorrelationData correlationData, boolean ack,@Nullable String cause);
}
生產消息1

第一次生產消息

第二次生產消息

此時我們看到,第一次生產消息時能夠正常生產消息,但是當我們第二次生產消息時卻拋異常了,異常信息為:java.lang.IllegalStateException: Only one ConfirmCallback is supported by each RabbitTemplate

解決方法

是為什么呢?從異常信息中我們可以看到,ConfirmCallback只能被設置一次,但是從我們的代碼中可以看到,我們每次生產消息時都會設置一次ConfirmCallback,顯然這就是問題所在。

下面我們把剛剛的ConfirmCallback提取出來,重新設置RabbitTemplate。

RabbitTemplateConfig

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//設置回調方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("執行了confirm方法");if (ack){System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);//相應的業務處理}}});return rabbitTemplate;}
}

ProducerController

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import rabbitextensionsdemo.constant.Constants;@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate RabbitTemplate confirmRabbitTemplate;@RequestMapping("/pres")public String pres() {Message message = new Message("Presistent test...".getBytes(), new MessageProperties());//消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);System.out.println(message);rabbitTemplate.convertAndSend(Constants.PRES_EXCHANGE, "pres", message);return "消息發送成功";}@RequestMapping("/confirm")public String confirm() {CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm", "confirm test...", correlationData);return "消息發送成功";}
}

此時我們可以看到,我們解決了前面多次生產消息導致的ConfirmCallback被設置多次的問題,但是我們此時的代碼就真的沒有問題了嗎?

當我們生產其它消息時,發現我們并沒有給這個生產消息的方法設置ConfirmCallback啊,但是為什么在控制臺上看到執行了我們設置的ConfrimCallback,這是為什么呢?

是因為我們在前面設置了RabbitTemplate,而且使用了@Autowired注解注入了RabbitTemplate,雖然我們注入了兩個,一個是rabbitTemplate,一個是confirmRabbitTemplate,但是這兩個都是同一個RabbitTemplate。

解決方法

解決辦法:我們在RabbitTemplateConfig中設置兩個RabbitTemplate.

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);return rabbitTemplate;}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//設置回調方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("執行了confirm方法");if (ack){System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);//相應的業務處理}}});return rabbitTemplate;}
}

    與此同時,我們修改注入方式:

    此時,當再次使用/producer/pres來生產消息時,就沒問題了。

    下面我們修改一下生產消息時給消息設置的路由規則:

        @RequestMapping("/confirm")public String confirm() {CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm111", "confirm test...", correlationData);return "消息發送成功";}

    生產消息

    我們知道,上面生產消息時給消息設置的路由規則并不存在,按道理說,應該會打印“未收到消息”而非“收到消息”,原因是因為,上面的confirm確認模式是用來確定生產消息是否到達了交換機,而上面的路由規則是針對消息從交換機到隊列的,解決上面的路由問題使用到另一種確認模式。

    return 模式

    消息到達Exchange之后, 會根據路由規則匹配, 把消息放?Queue中. Exchange到Queue的過程, 如果?條消息?法被任何隊列消費(即沒有隊列與消息的路由鍵匹配或隊列不存在等), 可以選擇把消息退回給發送者. 消息退回給發送者時, 我們可以設置?個返回回調?法, 對消息進?處理。

    修改RabbitTemplateConfig,設置消息退回的回調方法

    import org.springframework.amqp.core.ReturnedMessage;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;@Configuration
    public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);return rabbitTemplate;}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//設置回調方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("執行了confirm方法");if (ack){System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);//相應的業務處理}}});//消息被退回時, 回調方法rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息退回:"+returned);}});return rabbitTemplate;}
    }

    使?RabbitTemplate的setMandatory?法設置消息的mandatory屬性為true(默認為false). 這個屬性的作?是告訴RabbitMQ, 如果?條消息?法被任何隊列消費, RabbitMQ應該將消息返回給發送者, 此時 ReturnCallback 就會被觸發。

    回調函數中有?個參數: ReturnedMessage, 包含以下屬性:

    public class ReturnedMessage {//返回的消息對象,包含了消息體和消息屬性private final Message message;//由 Broker 提供的回復碼 , 表?消息?法路由的原因 . 通常是?個數字代碼,每個數字代表不同 的含義 .private final int replyCode;//?個?本字符串 , 提供了?法路由消息的額外信息或錯誤描述 .private final String replyText;//消息被發送到的交換機名稱private final String exchange;//消息的路由鍵,即發送消息時指定的鍵private final String routingKey;
    }
    
    編寫生產消息代碼(路由正確)
        @RequestMapping("/returns")public String returns() {CorrelationData correlationData = new CorrelationData("5");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm", "returns test...", correlationData);return "消息發送成功";}

    編寫生產消息代碼(路由錯誤)
        @RequestMapping("/returns")public String returns() {CorrelationData correlationData = new CorrelationData("5");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm111", "returns test...", correlationData);return "消息發送成功";}

    此時我們可以看到,隊列中依舊是只有1條消息,而且代碼執行了消息退回,而且消息退回時打印了消息信息,顯然我們可以看到,消息的路由規則是錯誤的,不會入隊列。

    面試題

    如何保證RabbitMQ消息的可靠傳輸?

    從這個圖中, 可以看出, 消息可能丟失的場景以及解決?案:

    1. ?產者將消息發送到 RabbitMQ失敗
    ? ? ? ? a. 可能原因: ?絡問題等
    ? ? ? ? b. 解決辦法: [發送?確認-confirm確認模式]
    2. 消息在交換機中?法路由到指定隊列:
    ? ? ? ? a. 可能原因: 代碼或者配置層?錯誤, 導致消息路由失敗
    ? ? ? ? b. 解決辦法: [發送?確認-return模式]

    3. 消息隊列??數據丟失
    ? ? ? ? a. 可能原因: 消息到達RabbitMQ之后, RabbitMQ Server 宕機導致消息丟失.
    ? ? ? ? b. 解決辦法: [持久性]. 開啟 RabbitMQ持久化, 就是消息寫?之后會持久化到磁盤, 如果RabbitMQ 掛了, 恢復之后會?動讀取之前存儲的數據. (極端情況下, RabbitMQ還未持久化就掛了, 可能導致少量數據丟失, 這個概率極低, 也可以通過集群的?式提?可靠性)
    4. 消費者異常, 導致消息丟失
    ? ? ? ? a. 可能原因: 消息到達消費者, 還沒來得及消費, 消費者宕機. 消費者邏輯有問題.
    ? ? ? ? b. 解決辦法: [消息確認]. RabbitMQ 提供了 消費者應答機制 來使 RabbitMQ 能夠感知到消費者是否消費成功消息. 默認情況下消費者應答機制是?動應答的, 可以開啟?動確認, 當消費者確認消費成功后才會刪除消息, 從?避免消息丟失. 除此之外, 也可以配置重試機制, 當消息消費異常時, 通過消息重試確保消息的可靠性。

    RabbitMQ高級特性之重試機制

    在消息傳遞過程中, 可能會遇到各種問題, 如?絡故障, 服務不可?, 資源不?等, 這些問題可能導致消息處理失敗. 為了解決這些問題, RabbitMQ 提供了重試機制, 允許消息在處理失敗后重新發送.但如果是程序邏輯引起的錯誤, 那么多次重試也是沒有?的, 可以設置重試次數.

    添加配置

    配置類
    spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://study:study@47.98.109.138:5672/extensionlistener:simple:acknowledge-mode: auto  #消息接收確認retry:enabled: true # 開啟消費者失敗重試initial-interval: 5000ms # 初始失敗等待時長為5秒max-attempts: 5 # 最大重試次數
    
    常量類
    public class Constants {//重試機制public static final String RETRY_QUEUE = "retry.queue";public static final String RETRY_EXCHANGE = "retry.exchange";
    }
    聲明隊列和交換機并綁定二者關系
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import rabbitextensionsdemo.constant.Constants;@Configuration
    public class RabbitMQConfig {//重試機制@Bean("retryQueue")public Queue retryQueue(){return QueueBuilder.durable(Constants.RETRY_QUEUE).build();}@Bean("retryExchange")public DirectExchange retryExchange(){return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).build();}@Bean("retryBinding")public Binding retryBinding(@Qualifier("retryQueue") Queue queue, @Qualifier("retryExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("retry").noargs();}
    }
    編寫生產消息代碼
        @RequestMapping("/retry")public String retry() {System.out.println("retry...");rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE, "retry", "retry test...");return "消息發送成功";}
    編寫消費消息代碼1(自動確認)
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    import rabbitextensionsdemo.constant.Constants;import java.io.UnsupportedEncodingException;@Component
    public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void handlerMessage(Message message) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("["+ Constants.RETRY_QUEUE+"]接收到消息: %s, deliveryTag: %s \n", new String(message.getBody(), "UTF-8"), deliveryTag);int num = 3/0;System.out.println("業務處理完成");}
    }

    此時我們可以看到,消費方發生了異常,接著進行了5次重試,然后就拋異常了。

    編寫消費消息代碼2(自動確認)
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    import rabbitextensionsdemo.constant.Constants;@Component
    public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void handlerMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("["+Constants.RETRY_QUEUE+"]接收到消息: %s, deliveryTag: %s \n", new String(message.getBody(), "UTF-8"), deliveryTag);try {int num = 3/0;System.out.println("業務處理完成");}catch (Exception e){System.out.println("業務處理失敗");}}
    }

    消費消息

    此時我們發現,雖然配置了重試次數為5次,但是在處理異常后,并沒有進行重試。

    編寫消費消息代碼3(手動確認)
    spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://study:study@47.98.109.138:5672/extensionlistener:simple:acknowledge-mode: manual  #消息接收確認retry:enabled: true # 開啟消費者失敗重試initial-interval: 5000ms # 初始失敗等待時長為5秒max-attempts: 5 # 最大重試次數
    
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    import rabbitextensionsdemo.constant.Constants;@Component
    public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void handlerMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("["+Constants.RETRY_QUEUE+"]接收到消息: %s, deliveryTag: %s \n", new String(message.getBody(), "UTF-8"), deliveryTag);try {int num = 3/0;System.out.println("業務處理完成");channel.basicAck(deliveryTag, false);}catch (Exception e){System.out.println("業務處理失敗");channel.basicNack(deliveryTag, false, true);}}
    }

    因為拋出了異常,設置了手動確認,而且設置了拒絕確認時重新入隊,所以會不停地在控制臺打印,而且deleveryTag的值不斷增加,是因為消息不斷重新出隊和入隊。

    可以看到, ?動確認模式時, 重試次數的限制不會像在?動確認模式下那樣直接?效, 因為是否重試以及何時重試更多地取決于應?程序的邏輯和消費者的實現.

    • ?動確認模式下, RabbitMQ 會在消息被投遞給消費者后?動確認消息. 如果消費者處理消息時拋出異常, RabbitMQ 根據配置的重試參數?動將消息重新?隊, 從?實現重試. 重試次數和重試間隔等參數可以直接在RabbitMQ的配置中設定,并且RabbitMQ會負責執?這些重試策略.
    • ?動確認模式下, 消費者需要顯式地對消息進?確認. 如果消費者在處理消息時遇到異常, 可以選擇不確認消息使消息可以重新?隊. 重試的控制權在于應?程序本?, ?不是RabbitMQ的內部機制. 應?程序可以通過??的邏輯和利?RabbitMQ的?級特性來實現有效的重試策略。

    使?重試機制時需要注意:

    1. ?動確認模式下: 程序邏輯異常, 多次重試還是失敗, 消息就會被?動確認, 那么消息就丟失

    2. ?動確認模式下: 程序邏輯異常, 多次重試消息依然處理失敗, ?法被確認, 就?直是
    unacked的狀態, 導致消息積壓

    RabbitMQ高級特性之TTL

    TTL

    TTL(Time?to?Live, 過期時間), 即過期時間.?RabbitMQ可以對消息和隊列設置TTL.
    當消息到達存活時間之后, 還沒有被消費, 就會被?動清除。

    咱們在?上購物, 經常會遇到?個場景, 當下單超過24?時還未付款, 訂單會被?動取消
    還有類似的, 申請退款之后, 超過7天未被處理, 則?動退款。

    添加配置
    spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://study:study@47.98.109.138:5672/extension
    常量類
    public class Constants {//ttlpublic static final String TTL_QUEUE = "ttl.queue";public static final String TTL_QUEUE2 = "ttl2.queue";public static final String TTL_EXCHANGE = "ttl.exchange";
    }
    消息的TTL
    聲明隊列和交換機并綁定二者關系
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import rabbitextensionsdemo.constant.Constants;@Configuration
    public class RabbitMQConfig {    @Bean("ttlQueue")public Queue ttlQueue(){return QueueBuilder.durable(Constants.TTL_QUEUE).build();}@Bean("ttlExchange")public DirectExchange ttlExchange(){return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();}@Bean("ttlBinding")public Binding ttlBinding(@Qualifier("ttlQueue") Queue queue, @Qualifier("ttlExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();}
    }
    編寫生產消息代碼
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import rabbitextensionsdemo.constant.Constants;@RequestMapping("/producer")
    @RestController
    public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;@RequestMapping("/ttl")public String ttl() {System.out.println("ttl...");rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test 30s...", message -> {message.getMessageProperties().setExpiration("30000");  //單位: 毫秒, 過期時間為30sreturn message;});rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test 10s...", message -> {message.getMessageProperties().setExpiration("10000");  //單位: 毫秒, 過期時間為10sreturn message;});return "消息發送成功";}
    }
    生產消息

    我們可以看到,生產的兩條消息的確消失了,但是耗時30秒,這是為什么呢?

    原因是因為設置消息的TTL,哪怕消息過期了,也不會立即刪除,而是在將消息投遞給消費者之前進行判定

    隊列的TTL
    聲明隊列和交換機并綁定二者關系
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import rabbitextensionsdemo.constant.Constants;@Configuration
    public class RabbitMQConfig {@Bean("ttlQueue2")public Queue ttlQueue2(){return QueueBuilder.durable(Constants.TTL_QUEUE2).ttl(20000).build();  //設置隊列的ttl為20s}@Bean("ttlExchange")public DirectExchange ttlExchange(){return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();}@Bean("ttlBinding2")public Binding ttlBinding2(@Qualifier("ttlQueue2") Queue queue, @Qualifier("ttlExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();}
    }
    編寫生產消息代碼
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import rabbitextensionsdemo.constant.Constants;@RequestMapping("/producer")
    @RestController
    public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;@RequestMapping("/ttl")public String ttl() {System.out.println("ttl...");rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test 30s...", message -> {message.getMessageProperties().setExpiration("30000");  //單位: 毫秒, 過期時間為30sreturn message;});return "消息發送成功";}@RequestMapping("/ttl2")public String ttl2() {System.out.println("ttl2...");//發送普通消息rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test...");return "消息發送成功";}
    }
    生產消息(消息無TTL)

    我們可以看到,雖然沒有給消息設置TTL,但是給ttl2.queue隊列設置了20秒的TTL,20秒過后,ttl2.queue隊列中的消息消失了。

    生產消息(消息有TTL)

    此時我們可以看到,20秒之后,消息消失了,我們給隊列設置的TTL為20秒,給消息設置的TTL為30秒,最終消息的TTL 為 min(消息的TTL,隊列的TTL)

    消息的TTL和隊列的TTL

    設置隊列TTL屬性的?法, ?旦消息過期, 就會從隊列中刪除
    設置消息TTL的?法, 即使消息過期, 也不會?上從隊列中刪除, ?是在即將投遞到消費者之前進?判定的.


    為什么這兩種?法處理的?式不?樣?
    因為設置隊列過期時間, 隊列中已過期的消息肯定在隊列頭部, RabbitMQ只要定期從隊頭開始掃描是否有過期的消息即可.
    ?置消息TTL的?式, 每條消息的過期時間不同, 如果要刪除所有過期消息需要掃描整個隊列, 所以不如等到此消息即將被消費時再判定是否過期, 如果過期再進?刪除即可.

    RabbitMQ高級特性之死信隊列

    死信隊列

    死信(dead message) 簡單理解就是因為種種原因, ?法被消費的信息, 就是死信.

    有死信, ?然就有死信隊列. 當消息在?個隊列中變成死信之后,它能被重新被發送到另?個交換器
    中,這個交換器就是DLX( Dead Letter Exchange ), 綁定DLX的隊列, 就稱為死信隊列(Dead
    Letter Queue,簡稱DLQ).

    消息變成死信?般是由于以下?種情況:

    1. 消息被拒絕(?Basic.Reject/Basic.Nack ),并且設置 requeue 參數為 false.
    2. 消息過期.
    3. 隊列達到最??度.

    添加配置
    spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://study:study@47.98.109.138:5672/extension
    常量類
    public class Constants {//死信public static final String NORMAL_QUEUE = "normal.queue";public static final String NORMAL_EXCHANGE = "normal.exchange";public static final String DL_QUEUE = "dl.queue";public static final String DL_EXCHANGE= "dl.exchange";
    }
    聲明隊列和交換機并綁定二者關系
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import rabbitextensionsdemo.constant.Constants;@Configuration
    public class DLConfig {//正常的交換機和隊列@Bean("normalQueue")public Queue normalQueue(){return QueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dlx").build();}@Bean("normalExchange")public DirectExchange normalExchange(){return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();}//死信交換機和隊列@Bean("dlQueue")public Queue dlQueue(){return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("dlExchange")public DirectExchange dlExchange(){return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();}@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlQueue") Queue queue, @Qualifier("dlExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();}
    }
    死信--消息過期
    給隊列設置TTL

    編寫生產消息代碼
        @RequestMapping("/dl")public String dl() {System.out.println("dl...");//發送普通消息rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dl test...");System.out.printf("%tc 消息發送成功 \n", new Date());return "消息發送成功";}
    編寫消費消息代碼
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    import rabbitextensionsdemo.constant.Constants;import java.util.Date;@Component
    public class DLListener {@RabbitListener(queues = Constants.DL_QUEUE)public void dlHandMessage(Message message, Channel channel) throws Exception {//消費者邏輯System.out.printf("[dl.queue] %tc 接收到消息: %s, deliveryTag: %d \n", new Date(), new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());}
    }
    觀察現象

    我們可以看到,消息在10秒后過期,從normal隊列進入到了死信隊列,消息進入到死信隊列后被消費。

    死信--消息超過隊列最大長度
    設置隊列的最大長度

    編寫生產消息代碼
        @RequestMapping("/dl")public String dl() {//測試隊列長度for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dl test..."+i);}return "消息發送成功";}
    編寫消費消息代碼
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    import rabbitextensionsdemo.constant.Constants;import java.util.Date;@Component
    public class DLListener {@RabbitListener(queues = Constants.DL_QUEUE)public void dlHandMessage(Message message, Channel channel) throws Exception {//消費者邏輯System.out.printf("[dl.queue] %tc 接收到消息: %s, deliveryTag: %d \n", new Date(), new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());}
    }

    此時我們可以看到,給隊列設置了最大長度為10,但是隊列接收到了20條消息,就會導致前10條消息變成死信。

    死信--消息被拒絕
    編寫生產消息代碼
        @RequestMapping("/dl")public String dl() {System.out.println("dl...");//發送普通消息rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dl test...");System.out.printf("%tc 消息發送成功 \n", new Date());return "消息發送成功";}

    編寫消費消息代碼

    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    import rabbitextensionsdemo.constant.Constants;@Component
    public class DLListener {@RabbitListener(queues = Constants.NORMAL_QUEUE)public void handMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消費者邏輯System.out.printf("[normal.queue]接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());//進行業務邏輯處理System.out.println("業務邏輯處理");int num = 3/0;System.out.println("業務處理完成");//肯定確認channel.basicAck(deliveryTag,false);} catch (Exception e) {//否定確認channel.basicNack(deliveryTag, false, false);  //requeue為false, 該消息成為死信}}
    }

    可以看到,normal隊列中的消息在被消費時因為發生了異常而執行到了拒絕消息的代碼,而且設置了消息不重新入隊,導致消息變成了死信,進而進入到了死信隊列。

    面試題

    1.死信隊列的概念

    死信(Dead Letter)是消息隊列中的?種特殊消息, 它指的是那些?法被正常消費或處理的消息. 在消息隊列系統中, 如RabbitMQ, 死信隊列?于存儲這些死信消息。

    2.死信的來源

    1) 消息過期: 消息在隊列中存活的時間超過了設定的TTL
    2) 消息被拒絕: 消費者在處理消息時, 可能因為消息內容錯誤, 處理邏輯異常等原因拒絕處理該消息. 如果拒絕時指定不重新?隊(requeue=false), 消息也會成為死信.
    3) 隊列滿了: 當隊列達到最??度, ?法再容納新的消息時, 新來的消息會被處理為死信.

    3.死信的應用場景?

    對于RabbitMQ來說, 死信隊列是?個?常有?的特性. 它可以處理異常情況下,消息不能夠被消費者正確消費?被置?死信隊列中的情況, 應?程序可以通過消費這個死信隊列中的內容來分析當時所遇到的異常情況, 進?可以改善和優化系統.

    ?如: ???付訂單之后, ?付系統會給訂單系統返回當前訂單的?付狀態
    為了保證?付信息不丟失, 需要使?到死信隊列機制. 當消息消費異常時, 將消息投?到死信隊列中, 由訂單系統的其他消費者來監聽這個隊列, 并對數據進?處理(?如發送?單等,進???確認).

    場景的應?場景還有:

    ? 消息重試:將死信消息重新發送到原隊列或另?個隊列進?重試處理.
    ? 消息丟棄:直接丟棄這些?法處理的消息,以避免它們占?系統資源.
    ? ?志收集:將死信消息作為?志收集起來,?于后續分析和問題定位.

    RabbitMQ高級特性之延遲隊列

    延遲隊列

    延遲隊列(Delayed Queue),即消息被發送以后, 并不想讓消費者?刻拿到消息, ?是等待特定時間后,消費者才能拿到這個消息進?消費

    應用場景

    延遲隊列的使?場景有很多, ?如:

    1. 智能家居: ??希望通過?機遠程遙控家?的智能設備在指定的時間進??作. 這時候就可以將??指令發送到延遲隊列, 當指令設定的時間到了再將指令推送到智能設備.
    2. ?常管理: 預定會議后,需要在會議開始前?五分鐘提醒參會?參加會議
    3. ??注冊成功后, 7天后發送短信, 提???活躍度等

    RabbitMQ本?沒有直接?持延遲隊列的的功能, 但是可以通過前?所介紹的TTL+死信隊列的?式組合模擬出延遲隊列的功能.

    假設?個應?中需要將每條消息都設置為10秒的延遲, ?產者通過 normal_exchange 這個交換器將發送的消息存儲在 normal_queue 這個隊列中. 消費者訂閱的并?是 normal_queue 這個隊列, ?
    是 dlx_queue 這個隊列. 當消息從 normal_queue 這個隊列中過期之后被存? dlx_queue 這個
    隊列中,消費者就恰巧消費到了延遲10秒的這條消息.

    TTL+死信隊列 實現延遲隊列
    添加配置
    spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://study:study@47.98.109.138:5672/extension
    常量類
    public class Constants {//死信public static final String NORMAL_QUEUE = "normal.queue";public static final String NORMAL_EXCHANGE = "normal.exchange";public static final String DL_QUEUE = "dl.queue";public static final String DL_EXCHANGE= "dl.exchange";
    }
    聲明隊列和交換機并綁定二者關系
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import rabbitextensionsdemo.constant.Constants;@Configuration
    public class DLConfig {//正常的交換機和隊列@Bean("normalQueue")public Queue normalQueue(){return QueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dlx").build();}@Bean("normalExchange")public DirectExchange normalExchange(){return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();}//死信交換機和隊列@Bean("dlQueue")public Queue dlQueue(){return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("dlExchange")public DirectExchange dlExchange(){return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();}@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlQueue") Queue queue, @Qualifier("dlExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();}
    }
    編寫生產消息代碼
        @RequestMapping("/delay")public String delay() {System.out.println("delay...");rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "ttl test 10s...", message -> {message.getMessageProperties().setExpiration("10000");  //單位: 毫秒, 過期時間為10sreturn message;});rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "ttl test 30s...", message -> {message.getMessageProperties().setExpiration("30000");  //單位: 毫秒, 過期時間為30sreturn message;});System.out.printf("%tc 消息發送成功 \n", new Date());return "消息發送成功";}
    編寫消費消息代碼
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    import rabbitextensionsdemo.constant.Constants;import java.util.Date;@Component
    public class DLListener {@RabbitListener(queues = Constants.DL_QUEUE)public void dlHandMessage(Message message, Channel channel) throws Exception {//消費者邏輯System.out.printf("[dl.queue] %tc 接收到消息: %s, deliveryTag: %d \n", new Date(), new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());}
    }

    此時我們可以看到,基于TTL+死信隊列實現出來了 延遲隊列 的效果,但是這樣就沒問題了嗎?

    修改生產消息代碼
        @RequestMapping("/delay")public String delay() {System.out.println("delay...");rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "ttl test 30s...", message -> {message.getMessageProperties().setExpiration("30000");  //單位: 毫秒, 過期時間為30sreturn message;});rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "ttl test 10s...", message -> {message.getMessageProperties().setExpiration("10000");  //單位: 毫秒, 過期時間為10sreturn message;});System.out.printf("%tc 消息發送成功 \n", new Date());return "消息發送成功";}

    此時我們看到,設置TTL為10秒的消息居然在30秒后才進入死信隊列,這是為什么呢?

    是因為RabbitMQ檢查消息的TTL是在消息發送給消費方的時候進行檢測的,而什么時候發送給發送方又根據隊頭消息的TTL,所以這就是問題所在,也是TTL+死信隊列實現延遲隊列所存在的問題

    ?那這個問題在上一篇ttl的時候就說過了,這里依然是個問題,雖然設置隊列的ttl不會有這個問題,但是設置隊列ttl我們針對不同延遲時間就需要創建多個隊列,這是不太合理的,所以針對這個問題,我們有一個延遲隊列的插件可以使用

    延遲隊列插件

    延遲隊列插件,會給我們提供一個特殊的交換機,來完成我們的延遲功能

    這是我們插件的下載地址

    https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

    ?我們需要找到ez文件并下載,但是注意這里的版本要與你的RabbitMQ版本可以匹配,否則之后會出現問題

    那插件下完后,我們要找到對應目錄,下載插件

    上面兩個目錄,我們可以任選一個下載即可,如果沒有這個目錄,我們手動創建

    然后把下載的ez文件,copy到這個目錄中即可,然后我們可以使用命令 rabbitmq-plugins list 來查看插件列表,看看我們有沒有成功放進去,但是注意,即使我們成功放進去并成功顯示了,也可能會出錯,這就可能是你們下載的RabbitMQ版本與整個延遲插件的版本不匹配,重新下載其他版本即可

    然后我們啟動插件?rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    之后重啟服務??service?rabbitmq-server?restart

    在沒有發生錯誤的情況下,我們就發現我們會多了一個默認的交換機

    此時我們代碼中就不需要聲明普通交換機了而是直接使用默認交換機即可

    我們生產者代碼是需要改一下的,我們需要調用一個方法來設置延遲時間

    @RequestMapping("/delay2")
    public String delay2() {//發送帶ttl的消息 rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, "delayed", 
    "delayed test 20s..."+new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setDelayLong(20000L); return messagePostProcessor;});rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, "delayed", 
    "delayed test 10s..."+new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setDelayLong(10000L); //設置延遲時間 return messagePostProcessor;});return "發送成功!";
    }

    此時我們就可以在10s正確接收一個消息,在20s正確接收另一個消息?

    兩者的區別:

    1.通過TTL+死信

    ?優點:比較靈活,不需要我們額外引入插件

    ?缺點:我們設置消息TTL的時候可能會出現順序的問題,而且我們需要多創建死信隊列和死信交換機,完成一些綁定,增加了系統的復雜性

    2.基于插件實現的延遲隊列

    ?優點:通過插件能夠簡化延遲消息的實現,并且避免了時序問題

    ?缺點:需要依賴插件,不同版本RabbitMQ需要不同版本插件,有運維工作

    RabbitMQ高級特性之事務

    事務

    ? ?RabbitMQ是基于AMQP協議實現的,該協議實現了事務機制,所以RabbitMQ也支持事務機制,他的事務允許開發者確保消息的發送和接收時原子性的,要么全部成功,要么全部失敗

    我們設置事務有三步,首先就是開啟事務

    因為我們是針對rabbitTemplate來作設置的,所以會影響此rabbitTemplate的所有消息,這里我們新開了一個,然后我們使用時要加一個注解

    ?最后一步我們需要加上事務管理

    這樣我們就成功開啟了事務

    這三步是必不可少的,缺少一步,都無法成功開啟事務

    RabbitMQ高級特性之消息分發

    消息分發

    RabbitMQ隊列擁有多個消費者時, 隊列會把收到的消息分派給不同的消費者. 每條消息只會發送給訂閱列表?的?個消費者. 這種?式?常適合擴展, 如果現在負載加重,那么只需要創建更多的消費者來消費處理消息即可.

    默認情況下, RabbitMQ是以輪詢的?法進?分發的, ?不管消費者是否已經消費并已經確認了消息. 這種?式是不太合理的, 試想?下, 如果某些消費者消費速度慢, ?某些消費者消費速度快, 就可能會導致某些消費者消息積壓, 某些消費者空閑, 進?應?整體的吞吐量下降.

    如何處理呢? 我們可以channel.basicQos(int prefetchCount) ?法, 來限制當前信道上的消費者所能保持的最?未確認消息的數量.
    ?如: 消費端調?了 channelbasicQos(5) , RabbitMQ會為該消費者計數, 發送?條消息計數+1, 消費?條消息計數-1, 當達到了設定的上限, RabbitMQ就不會再向它發送消息了,直到消費者確認了某條消息.類似TCP/IP中的"滑動窗?".

    添加配置
    spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://study:study@47.98.109.138:5672/extensionlistener:simple:acknowledge-mode: manual  #消息接收確認prefetch: 5
    
    常量類
    public class Constants {//限流public static final String QOS_QUEUE = "qos.queue";public static final String QOS_EXCHANGE = "qos.exchange";
    }
    聲明配置和交換機并綁定二者關系
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import rabbitextensionsdemo.constant.Constants;@Configuration
    public class QosConfig {@Bean("qosQueue")public Queue qosQueue(){return QueueBuilder.durable(Constants.QOS_QUEUE).build();}@Bean("qosExchange")public Exchange qosExchange(){return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build();}@Bean("qosBinding")public Binding qosBinding(@Qualifier("qosQueue") Queue queue, @Qualifier("qosExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();}
    }
    限流

    如下使?場景:
    訂單系統每秒最多處理5000請求, 正常情況下, 訂單系統可以正常滿?需求
    但是在秒殺時間點, 請求瞬間增多, 每秒1萬個請求, 如果這些請求全部通過MQ發送到訂單系統, ?疑會把訂單系統壓垮.

    RabbitMQ提供了限流機制, 可以控制消費端?次只拉取N個請求。
    通過設置prefetchCount參數, 同時也必須要設置消息應答?式為?動應答。
    prefetchCount: 控制消費者從隊列中預取(prefetch)消息的數量, 以此來實現流控制和負載均衡。

    編寫生產消息代碼
        @RequestMapping("/qos")public String qos() {System.out.println("qos test...");//發送普通消息for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, "qos", "qos test..."+i);}return "消息發送成功";}
    編寫消費消息代碼1
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    import rabbitextensionsdemo.constant.Constants;@Component
    public class QosListener {@RabbitListener(queues = Constants.QOS_QUEUE)public void handMessage(Message message, Channel channel) throws Exception {//消費者邏輯System.out.printf("111接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());}
    }

    可以看到,只消費了5條消息,且這5條消息未確定。

    編寫消費消息代碼2
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    import rabbitextensionsdemo.constant.Constants;@Component
    public class QosListener {@RabbitListener(queues = Constants.QOS_QUEUE)public void handMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();//消費者邏輯System.out.printf("111接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());//肯定確認channel.basicAck(deliveryTag,false);}
    }

    此時我們可以看到,消息一次性被全都消費掉了。

    負載均衡

    我們也可以?此配置,來實現"負載均衡"
    如下圖所?, 在有兩個消費者的情況下,?個消費者處理任務?常快, 另?個?常慢,就會造成?個消費者會?直很忙, ?另?個消費者很閑. 這是因為 RabbitMQ 只是在消息進?隊列時分派消息. 它不考慮消費者未確認消息的數量。

    更改配置
    spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://study:study@47.98.109.138:5672/extensionlistener:simple:acknowledge-mode: manual  #消息接收確認prefetch: 1
    
    編寫消費消息代碼
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    import rabbitextensionsdemo.constant.Constants;@Component
    public class QosListener {@RabbitListener(queues = Constants.QOS_QUEUE)public void handMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消費者邏輯System.out.printf("111接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());Thread.sleep(2000);//肯定確認channel.basicAck(deliveryTag,false);} catch (Exception e) {//否定確認channel.basicNack(deliveryTag, false, true);}}@RabbitListener(queues = Constants.QOS_QUEUE)public void handMessage2(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消費者邏輯System.out.printf("222接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());Thread.sleep(1000);//肯定確認channel.basicAck(deliveryTag,false);} catch (Exception e) {//否定確認channel.basicNack(deliveryTag, false, true);}}
    }

    我們可以看到看到,消費者2消費消息的速度是消費者1消費消息的速度的2倍。

    以上就是RabbitMQ的絕大多數的高級特性了,希望對你有幫助

    本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
    如若轉載,請注明出處:http://www.pswp.cn/pingmian/97701.shtml
    繁體地址,請注明出處:http://hk.pswp.cn/pingmian/97701.shtml
    英文地址,請注明出處:http://en.pswp.cn/pingmian/97701.shtml

    如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

    相關文章

    鴻蒙Next ArkWeb網頁多媒體開發實戰:從基礎到高級應用

    解鎖鴻蒙ArkWeb的強大多媒體能力&#xff0c;讓網頁視頻音頻體驗媲美原生應用在日常應用開發中&#xff0c;我們經常需要在應用中嵌入網頁并展示其中的多媒體內容。鴻蒙HarmonyOS Next的ArkWeb組件提供了強大的網頁渲染能力&#xff0c;尤其對網頁中的多媒體元素有出色的支持。…

    06. Linux進程概念 1

    Linux進程概念 馮諾依曼體系 馮諾依曼體系結構&#xff08;Von Neumann Architecture&#xff09;是現代計算機設計的奠基石&#xff0c;由數學家約翰馮諾依曼于1945年提出。這一架構徹底改變了早期計算機“硬件即程序”的設計方式&#xff0c;使得計算機可以靈活地運行不同程序…

    HTTP標頭全解析:保護你的Web應用!

    在網絡攻擊頻發的時代&#xff0c;你的Web應用是否像一座沒有城墻的城堡&#xff0c;任由XSS、點擊劫持和中間人攻擊入侵&#xff1f;HTTP標頭&#xff0c;這些看似不起眼的響應頭&#xff0c;其實是Web安全的隱形守護者。想象一個電商網站&#xff0c;用戶數據被竊取&#xff…

    rt-linux下__slab_alloc里的另外一處可能睡眠的邏輯

    一、背景 在之前的博客 tasklet上下文內存分配觸發might_alloc檢查及同步回收調用鏈 里&#xff0c;我們講了一處內存分配時會引起睡眠的調用鏈&#xff08;這個引起睡眠的這個調用鏈它是在普通linux里也是存在的&#xff09;。這篇博客里&#xff0c;我們講一處內存分配路徑下…

    基于STM32F103C8T6的智能環境監測系統:DHT11溫濕度檢測與OLED顯示實現

    引言 你是否曾想實時握身邊環境的溫濕度變化&#xff1f;無論是居家種植需要精準調控環境&#xff0c;還是實驗室存放敏感材料需監控條件&#xff0c;亦或是智能座艙場景下的環境感知&#xff0c;智能環境監測系統正成為連接物理世界與數字管理的重要橋梁。而在眾多嵌入式開發…

    動態規劃在子數組/子串問題

    目錄 一、最大子數組和&#xff08;LeetCode 53&#xff09; 二、環形子數組的最大和&#xff08;LeetCode 918&#xff09; 三、乘積最大子數組&#xff08;LeetCode 152&#xff09; 四、乘積為正數的最長子數組長度&#xff08;LeetCode 1567&#xff09; 五、等差數列…

    微信小程序開發筆記(01_小程序基礎與配置文件)

    ZZHow(ZZHow1024) 參考課程: 【尚硅谷微信小程序開發教程】 [https://www.bilibili.com/video/BV1LF4m1E7kB] 009_文件和目錄結構介紹新建頁面與調試基礎庫 一個完整的小程序項目分為兩個部分&#xff1a;主體文件、頁面文件 主體文件又稱全局文件&#xff0c;能夠作用于整…

    NLP Subword 之 BPE(Byte Pair Encoding) 算法原理

    本文將介紹以下內容&#xff1a; 1. BPE 算法核心原理2. BPE 算法流程3. BPE 算法源碼實現DemoBPE最早是一種數據壓縮算法&#xff0c;由Sennrich等人于2015年引入到NLP領域并很快得到推廣。該算法簡單有效&#xff0c;因而目前它是最流行的方法。GPT-2和RoBERTa使用的Subword算…

    CSS 偽類選擇器

    偽類選擇器&#xff08;pseudo-class selector&#xff09;是一種用于選擇HTML元素特定狀態或特征的關鍵字&#xff0c;它允許開發者基于文檔樹之外的信息&#xff08;如用戶交互、元素位置或狀態變化&#xff09;來選擇元素并應用樣式。偽類選擇器以冒號(:)開頭&#xff0c;附…

    Electron 新特性:2025 版本更新解讀

    引言&#xff1a;Electron 新特性在 2025 版本更新中的解讀核心價值與必要性 在 Electron 框架的持續演進中&#xff0c;新特性的引入是推動桌面開發創新的核心動力&#xff0c;特別是 2025 年的版本更新&#xff0c;更是 Electron 項目從成熟生態到前沿技術的躍進之鑰。它不僅…

    MyBatis從入門到面試:掌握持久層框架的精髓

    MyBatis從入門到面試&#xff1a;掌握持久層框架的精髓 前言 在Java企業級應用開發中&#xff0c;持久層框架的選擇至關重要。MyBatis作為一款優秀的半自動化ORM框架&#xff0c;以其靈活的SQL定制能力和良好的性能表現&#xff0c;成為了眾多開發者的首選。本文將帶你從MyBa…

    5.Three.js 學習(基礎+實踐)

    Three.js 是 “WebGL 的封裝庫”&#xff0c;幫你屏蔽了底層的著色器 / 緩沖區細節&#xff0c;專注于 “3D 場景搭建”&#xff0c;開發效率高&#xff0c;是通用 3D 開發的首選。他的核心是 “場景 - 相機 - 渲染器” 的聯動邏輯&#xff0c;先掌握基礎組件&#xff0c;再學進…

    消火栓設備工程量計算 -【圖形識別】秒計量

    消火栓設備工程量計算 -【圖形識別】秒計量 消防系統的消火栓設備水槍、水帶和消火栓組成&#xff0c;根據清單定額規則計算消火栓設備工程量。通過CAD快速看圖的圖形識別框選圖紙就能自動數出消火栓數量&#xff0c;省時又準確&#xff0c;是工程人做消防算量的好幫手。 一、…

    Docker 與 VSCode 遠程容器連接問題深度排查與解決指南

    Docker 與 VSCode 遠程容器連接問題深度排查與解決指南 引言 Visual Studio Code 的 Remote - Containers 擴展極大地提升了開發體驗&#xff0c;它將開發環境容器化&#xff0c;保證了環境的一致性&#xff0c;并允許開發者像在本地一樣在容器內進行編碼、調試和運行。然而&…

    愛圖表:鏑數科技推出的智能數據可視化平臺

    本文轉載自&#xff1a;https://www.hello123.com/aitubiao ** 一、? AI 圖表&#xff1a;智能數據可視化好幫手 愛圖表是鏑數科技旗下的一款智能數據可視化工具&#xff0c;它能讓復雜的數字和報表變得直觀又好懂。接入了先進的DeepSeek 系列 AI 模型&#xff0c;它不僅會做…

    ENVI系列教程(四)——圖像幾何校正

    目錄 1 概述 1.1 控制點選擇方式 1.2 幾何校正模型 1.3 控制點的預測與誤差計算 2 詳細操作步驟 2.1 掃描地形圖的幾何校正 2.1.1 第一步:打開并顯示圖像文件 2.1.2 第二步:啟動幾何校正模塊 2.2 Landsat5 影像幾何校正 2.2.1 第一步:打開并顯示圖像文件 2.2.2 第…

    STM32-FreeRTOS操作系統-消息隊列

    引言在嵌入式開發領域&#xff0c;STM32與FreeRTOS的結合應用極為廣泛。本文將探討如何在STM32上使用FreeRTOS實現消息隊列功能&#xff0c;助力高效任務通信與系統協作。消息隊列定義消息隊列是一種在 FreeRTOS 中用于任務間通信的機制。它允許任務將消息發送到隊列中&#xf…

    【開題答辯全過程】以 C語言程序設計課程網站為例,包含答辯的問題和答案

    個人簡介一名14年經驗的資深畢設內行人&#xff0c;語言擅長Java、php、微信小程序、Python、Golang、安卓Android等開發項目包括大數據、深度學習、網站、小程序、安卓、算法。平常會做一些項目定制化開發、代碼講解、答辯教學、文檔編寫、也懂一些降重方面的技巧。感謝大家的…

    手機上有哪些比較好用的待辦事項提醒工具

    在快節奏的現代工作中&#xff0c;我們每天都要面對大量的任務與事務。從項目截止日期、客戶會議&#xff0c;到日常的工作安排&#xff0c;瑣碎的事項容易讓人顧此失彼。 手機待辦事項工具早已突破傳統“記事本”的局限&#xff0c;成為移動辦公場景下的效率核心。它們通過任務…

    Mysql數據庫事務全解析:概念、操作與隔離級別

    MySQL系列 文章目錄MySQL系列一、什么是事務1.1事務的核心概念1.2、 事務的四大屬性&#xff08;ACID&#xff09;1.2.1 原子性&#xff08;Atomicity&#xff09;1.2.2 一致性&#xff08;Consistency&#xff09;1.2.3 隔離性&#xff08;Isolation&#xff09;1.2.4 持久性&…