【MQ】Spring3 中 RabbitMQ 的使用與常見場景

一、初識 MQ

傳統的單體架構,分布式架構的同步調用里,無論是方法調用,還是 OpenFeign 難免會有以下問題:

  1. 擴展性差(高耦合,需要依賴對應的服務,同樣的事件,不斷有新需求,這個事件的業務代碼會越來越臃腫,這些子業務都寫在一起)
  2. 性能下降(等待響應,最終整個業務的響應時長就是每次遠程調用的執行時長之和)
  3. 級聯失敗(如果是一個事務,一個服務失敗就會導致全部回滾,若是分布式事務就更加麻煩了,但其實一些行為的失敗不應該導致整體回滾)
  4. 服務宕機(如果服務調用者未考慮服務提供者的性能,導致提供者因為過度請求而宕機)

但如果不是很要求同步調用,其實也可以用異步調用,如果是單體架構,你可能很快能想到一個解決方案,就是阻塞隊列實現消息通知:

在這里插入圖片描述

但是在分布式架構下,可能就需要一個中間件級別的阻塞隊列,這就是我們要學習的 Message Queue 消息隊列,簡稱 MQ,而現在流行的 MQ 還不少,在實現其基本的消息通知功能外,還有一些不錯的擴展

以 RabbitMQ 和 Kafka 為例:

RabbitMQKafka
公司/社區RabbitApache
開發語言ErlangScala & Java
協議支持AMQP,XMPP,SMTP,STOMP自定義協議
可用性
單機吞吐量一般非常高(Kafka 亮點)
消息延遲微秒級毫秒以內
消息可靠性一般

消息延遲指的是,消息到隊列,并在隊列中“就緒”的時間與預期時間的差距,其實就是數據在中間件中流動的耗時,預期時間可以是現在、幾毫秒后、幾秒后、幾天后…

據統計,目前國內消息隊列使用最多的還是 RabbitMQ,再加上其各方面都比較均衡,穩定性也好,因此我們課堂上選擇 RabbitMQ 來學習。

二、RabbitMQ 安裝

Docker 安裝 RabbitMQ:

mkdir /root/mq
cd /root/mqdocker rm mq-server -f
docker rmi rabbitmq:3.8-management -f
docker volume rm mq-plugins -fdocker pull rabbitmq:3.8-management# 插件數據卷最好還是直接掛載 volume,而不是掛載我們的目錄
docker run \
--name mq-server \
-e RABBITMQ_DEFAULT_USER=xxx \
-e RABBITMQ_DEFAULT_PASS=xxx \
--hostname mq1 \
-v mq-plugins:/plugins \
-p 15672:15672 \
-p 5672:5672 \
-d rabbitmq:3.8-management

三、RabbitMQ 基本知識

(1)架構

15672:RabbitMQ 提供的管理控制臺的端口

5672:RabbitMQ 的消息發送處理接口

用戶名密碼就是安裝時,啟動容器時指定的用戶名密碼

MQ 對應的就是這里的消息代理 Broker:

在這里插入圖片描述

RabbitMQ 詳細架構圖:

在這里插入圖片描述

其中包含幾個概念:

  • publisher:生產者,也就是發送消息的一方
  • consumer:消費者,也就是消費消息的一方
  • queue:隊列,存儲消息。生產者投遞的消息會暫存在消息隊列中,等待消費者處理
  • exchange:交換機,負責消息路由。生產者發送的消息由交換機決定投遞到哪個隊列。
  • virtual host:虛擬主機,起到數據隔離的作用。每個虛擬主機相互獨立,有各自的 exchange、queue

現在你可能只認識生產者、消費者、隊列,其他是什么呢?

其實你可以理解為 MQ 也是存儲東西的,存儲的就是消息,virtual host 就是數據庫,queue 就是表,消息就是一行數據,而 MQ 有特殊的機制,消息先通過 exchange 再決定前往哪個 queue

管理控制臺的使用就不多說了

(2)五大模式

這只是最常見的五種模式:

  1. 簡單模式

在這里插入圖片描述

  1. 工作模式

在這里插入圖片描述

  1. 發布訂閱模式

關聯交換機的隊列都能收到一份消息,廣播

在這里插入圖片描述

  1. 路由模式

關聯交換機時,提供 routing key(可以是多個,隊列之間可以重復),發布消息時提供一個 routing key,由此發送給指定的隊列

在這里插入圖片描述

值得注意的是,簡單模式和工作模式,其實也是有交換機的,任何隊列都會綁定一個默認交換機 "",類型是 direct,routing key 為隊列的名稱

  1. 主題模式

在這里插入圖片描述

路由模式的基礎上,隊列關聯交換機時 routing key 可以是帶通配符的

routing key 的單詞通過 . 分割, # 匹配 n 個單詞(n ≥ 0),* 只匹配一個單詞

例如 #.red:

  • 可以匹配的 routing key:p1.red、red、p2.p1.red

在發布消息時,要使用具體的 routing key,交換機發送給匹配的隊列

(3)數據隔離

  1. 隔離 virtual host

在這里插入圖片描述

  1. 隔離用戶(賦予訪問權限)

在這里插入圖片描述

四、RabbitMQ 基本使用 Spring AMQP

引入 RabbitMQ 相關的 SDK,可以通過創建連接 Connection、創建通道 Channel,用 Channel 進行操作,接受消息也差不多,不過多演示:

public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立連接ConnectionFactory factory = new ConnectionFactory();// 1.1.設置連接參數,分別是:主機名、端口號、vhost、用戶名、密碼factory.setHost("xx.xx.xx.xx");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("xxx");factory.setPassword("xxx");// 1.2.建立連接Connection connection = factory.newConnection();// 2.創建通道ChannelChannel channel = connection.createChannel();// 3.創建隊列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.發送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("發送消息成功:【" + message + "】");// 5.關閉通道和連接channel.close();connection.close();}
}

但比較麻煩,Spring AMQP 框架可以自動裝配 RabbitMQ 的操作對象 RabbitTemplate,這樣我們就可以更方便的操作 MQ,并充分發揮其特性

<!--AMQP依賴,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

默認包含 RabbitMQ 的實現,如果你想對接其他 AMQP 協議的 MQ,得自己實現其抽象封裝的接口

(1)發送消息

注意,下面是 Spring3 的寫法,所以會有點不一樣,可能看不懂,稍后解釋!

消息發送器封裝:

@Repository
@RequiredArgsConstructor
@Slf4j
public class RabbitMQSender {private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread");private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {rabbitTemplate.setTaskExecutor(EXECUTOR);}private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> {log.error("處理 ack 回執失敗, {}", ex.getMessage());return null;};private MessagePostProcessor delayMessagePostProcessor(long delay) {return message -> {// 小于 0 也是立即執行// setDelay 才是給 RabbitMQ 看的,setReceivedDelay 是給 publish-returns 看的message.getMessageProperties().setDelay((int) Math.max(delay, 0));return message;};};private CorrelationData newCorrelationData() {return new CorrelationData(UUIDUtil.uuid32());}/*** @param exchange 交換機* @param routingKey routing key* @param msg 消息* @param delay 延遲時間(如果是延遲交換機,delay 才有效)* @param maxRetries 最大重試機會* @param <T> 消息的對象類型*/private <T> void send(String exchange, String routingKey, T msg, long delay, int maxRetries){log.info("準備發送消息,exchange: {}, routingKey: {}, msg: {}, delay: {}s, maxRetries: {}",exchange, routingKey, msg, TimeUnit.MILLISECONDS.toSeconds(delay), maxRetries);CorrelationData correlationData = newCorrelationData();MessagePostProcessor delayMessagePostProcessor = delayMessagePostProcessor(delay);correlationData.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(new Consumer<>() {private int retryCount = 0; // 一次 send 從始至終都用的是一個 Consumer 對象,所以作用的都是同一個計數器@Overridepublic void accept(CorrelationData.Confirm confirm) {Optional.ofNullable(confirm).ifPresent(c -> {if(c.isAck()) {log.info("ACK {} 消息成功到達,{}", correlationData.getId(), c.getReason());} else {log.warn("NACK {} 消息未能到達,{}", correlationData.getId(), c.getReason());if(retryCount >= maxRetries) {log.error("次數到達上限 {}", maxRetries);return;}retryCount++;log.warn("開始第 {} 次重試", retryCount);CorrelationData cd = newCorrelationData();cd.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(this, EXECUTOR);rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, cd);}});}}, EXECUTOR);rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, correlationData);}public void sendMessage(String exchange, String routingKey, Object msg) {send(exchange, routingKey, msg, 0, 0);}public void sendDelayMessage(String exchange, String routingKey, Object msg, long delay){send(exchange, routingKey, msg, delay, 0);}public void sendWithConfirm(String exchange, String routingKey, Object msg, int maxReties) {send(exchange, routingKey, msg, 0, maxReties);}public void sendDelayMessageWithConfirm(String exchange, String routingKey, Object msg, long delay, int maxReties) {send(exchange, routingKey, msg, delay, maxReties);}}

(2)接受消息

監聽器:

  • RabbitTemplate 是可以主動獲取消息的,也可以不實時監聽,但是一般情況都是監聽,有消息就執行
  • 監聽的是 queue,若 queue 不存在,就會根據注解創建一遍
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "xxx"),exchange = @Exchange(name = "xxx", delayed = "true"),key = {"xxx"}
))
public void xxx(X x) {}

(3)聲明交換機與隊列

可以通過 @Bean 創建 Bean 對象的方式去聲明,可以自行搜索,我更喜歡監聽器注解的形式,而且 Bean 的方式,可能會因為配置不完全一樣,導致其他配置類的交換機隊列無法聲明(現象如此,底層為啥我不知道)

(4)消息轉換器

消息是一個字符串,但為了滿足更多需求,需要將一個對象序列化成一個字符串,但默認的序列化實現貌似用的是 java 對象的序列化,這種方式可能得同一個程序的 java 類才能反序列化成功,所以我們應該選擇分布式的序列化方式,比如 json

@Configuration
@RequiredArgsConstructor
@Slf4j
public class MessageConverterConfig {@Beanpublic MessageConverter messageConverter(){// 1. 定義消息轉換器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(JsonUtil.OBJECT_MAPPER);// 2. 配置自動創建消息 id,用于識別不同消息jackson2JsonMessageConverter.setCreateMessageIds(Boolean.TRUE);return jackson2JsonMessageConverter;}
}

這里的 JsonUtil.OBJECT_MAPPER,就是框架的或者自己實現的 ObjectMapper

(5)配置文件

spring:rabbitmq:host: ${xxx.mq.host} # rabbitMQ 的 ip 地址port: ${xxx.mq.port} # 端口username: ${xxx.mq.username}password: ${xxx.mq.password}virtual-host: ${xxx.mq.virtual-host}publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true # 若是 false 則直接丟棄了,并不會發送者回執listener:simple:prefetch: 1 # 預取為一個(消費完才能拿下一個)concurrency: 2 # 消費者最少 2 個線程max-concurrency: 10 # 消費者最多 10 個線程auto-startup: true # 為 false 監聽者不會實時創建和監聽,為 true 監聽的過程中,若 queue 不存在,會再根據注解進行創建,創建后只監聽 queue,declare = "false" 才是不自動聲明default-requeue-rejected: false # 拒絕后不 requeue(成為死信,若沒有綁定死信交換機,就真的丟了)acknowledge-mode: auto # 消費者執行成功 ack、異常 nack(manual 為手動、none 代表無論如何都是 ack)retry: # 這個屬于 spring amqp 的 retry 機制enabled: false # 不開啟失敗重試
#          initial-interval: 1000
#          multiplier: 2
#          max-attempts: 3
#          stateless: true # true 代表沒有狀態,若有消費者包含事務,這里改為 false

五、常見問題

在這里插入圖片描述

(1)RabbitMQ 如何保證消息可靠性

保證消息可靠性、不丟失。主要從三個層面考慮

如果報錯可以先記錄到日志中,再去修復數據(保底)

1、生產者確認機制

生產者確認機制,確保生產者的消息能到達隊列

  1. publisher-confirm,針對的是消息從發送者到交換機的可靠性,成功則進行下一步,失敗返回 NACK
private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread");private final RabbitTemplate rabbitTemplate;@PostConstruct
public void init() {rabbitTemplate.setTaskExecutor(EXECUTOR);
}private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> {log.error("處理 ack 回執失敗, {}", ex.getMessage());return null;
};private MessagePostProcessor delayMessagePostProcessor(long delay) {return message -> {// 小于 0 也是立即執行// setDelay 才是給 RabbitMQ 看的,setReceivedDelay 是給 publish-returns 看的message.getMessageProperties().setDelay((int) Math.max(delay, 0));return message;};
};private CorrelationData newCorrelationData() {return new CorrelationData(UUIDUtil.uuid32());
}/*** @param exchange 交換機* @param routingKey routing key* @param msg 消息* @param delay 延遲時間(如果是延遲交換機,delay 才有效)* @param maxRetries 最大重試機會* @param <T> 消息的對象類型*/
private <T> void send(String exchange, String routingKey, T msg, long delay, int maxRetries){log.info("準備發送消息,exchange: {}, routingKey: {}, msg: {}, delay: {}s, maxRetries: {}",exchange, routingKey, msg, TimeUnit.MILLISECONDS.toSeconds(delay), maxRetries);CorrelationData correlationData = newCorrelationData();MessagePostProcessor delayMessagePostProcessor = delayMessagePostProcessor(delay);correlationData.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(new Consumer<>() {private int retryCount = 0; // 一次 send 從始至終都用的是一個 Consumer 對象,所以作用的都是同一個計數器@Overridepublic void accept(CorrelationData.Confirm confirm) {Optional.ofNullable(confirm).ifPresent(c -> {if(c.isAck()) {log.info("ACK {} 消息成功到達,{}", correlationData.getId(), c.getReason());} else {log.warn("NACK {} 消息未能到達,{}", correlationData.getId(), c.getReason());if(retryCount >= maxRetries) {log.error("次數到達上限 {}", maxRetries);return;}retryCount++;log.warn("開始第 {} 次重試", retryCount);CorrelationData cd = newCorrelationData();cd.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(this, EXECUTOR);rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, cd);}});}}, EXECUTOR);rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, correlationData);
}

Spring3 的 RabbitMQ Confirm,需要配置為 correlated,發送消息時提供 CorrelationData,也就是與消息關聯的數據,包括發送者確認時的回調方法

在這里插入圖片描述

要想提供 Confirm 的回調辦法,需要配置 correlationData.getFuture() 返回的 CompletableFuture 對象(新的 JUC 工具類,可以查一查如何使用)

配置后,在未來根據回調函數進行處理(當然也可以直接設置在 RabbitTemplate 對象的 ConfirmCallBack)

還可以自己實現消息的發送者重試:

在這里插入圖片描述

  1. publisher-returns,針對的是消息從交換機到隊列的可靠性,成功則返回 ACK,失敗觸發 returns 的回調方法
@Component
@RequiredArgsConstructor
@Slf4j
public class PublisherReturnsCallBack implements RabbitTemplate.ReturnsCallback {// 不存在 routing key 對應的隊列,那在我看來轉發到零個是合理的現象,但在這里也認為是路由失敗(MQ 認為消息一定至少要進入一個隊列,之后才能被處理,這就是可靠性)(反正就是回執了,你愛咋處理是你自己的事情)@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {// 可能一些版本的 mq 會因為是延時交換機,導致發送者回執,只要沒有 NACK 這種情況其實并不是不可靠(其實我也不知道有沒有版本會忽略)// 但是其實不忽略也不錯,畢竟者本來就是特殊情況,一般交換機是不存儲的,但是這個臨時存儲消息// 但這樣也就代表了,延時后消息路由失敗是沒法再次處理的(因為我們交給延時交換機后就不管了,可靠性有 mq 自己保持)MessageProperties messageProperties = returnedMessage.getMessage().getMessageProperties();// 這里的 message 并不是原本的 message,是額外的封裝,x-delay 在 publish-returns 里面封裝到 receiveDelay 里了Integer delay = messageProperties.getReceivedDelay();// 如果不是延時交換機,卻設置了 delay 大于 0,是不會延時的,所以是其他原因導致的(以防萬一把消息記錄到日志里)if(Objects.nonNull(delay) && delay.compareTo(0) > 0) {log.info("交換機 {}, 路由鍵 {} 消息 {} 延遲 {} s", returnedMessage.getExchange(), returnedMessage.getRoutingKey(), messageProperties, TimeUnit.MILLISECONDS.toSeconds(delay));return;}log.warn("publisher-returns 發送者回執(應答碼{},應答內容{})(消息 {} 成功到達交換機 {},但路由失敗,路由鍵為 {})",returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getMessage(),returnedMessage.getExchange(), returnedMessage.getRoutingKey());}
}

RabbitMQSender:

private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread");private final RabbitTemplate rabbitTemplate;private final PublisherReturnsCallBack publisherReturnsCallBack;@PostConstruct
public void init() {rabbitTemplate.setTaskExecutor(EXECUTOR);// 設置統一的 publisher-returns(confirm 也可以設置統一的,但最好還是在發送時設置在 future 里)// rabbitTemplate 的 publisher-returns 同一時間只能存在一個// 因為 publisher confirm 后,其實 exchange 有沒有轉發成功,publisher 沒必要每次發送都關注這個 exchange 的內部職責,更多的是“系統與 MQ 去約定”rabbitTemplate.setReturnsCallback(publisherReturnsCallBack);
}

同理你也可以按照自己的想法進行重試…

在測試練習階段里,這個過程是異步回調的,如果是單元測試,發送完消息進程就結束了,可能就沒回調,程序就結束了,自然就看不到回調時的日志

如果既沒有 ACK 也沒有 NACK,也沒有發布者回執,那就相當于這個消息銷聲匿跡了,沒有任何的回應,那么就會拋出異常,我們可以處理這個異常,比如打印日志、重發之類的…

private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> {log.error("處理 ack 回執失敗, {}", ex.getMessage());return null;
};

2、持久化

消息隊列的數據持久化,確保消息未消費前在隊列中不會丟失,其中的交換機、隊列、和消息都要做持久化

默認都是持久化的

在這里插入圖片描述

在這里插入圖片描述

在這里插入圖片描述

3、消費者確認

隊列的消息出隊列,并不會立即刪除,而是等待消費者返回 ACK 或者 NACK

消費者要什么時候發送 ACK 呢?

  • 1)RabbitMQ投遞消息給消費者
  • 2)消費者獲取消息后,返回ACK給RabbitMQ
  • 3)RabbitMQ刪除消息
  • 4)消費者宕機,消息尚未處理

如果出現這種場景,就是不可靠的,所以應該是消息處理后,再發送 ACK

Spring AMQP 有三種消費者確認模式:

  1. manual,手段 ack,自己用 rabbitTemplate 去發送 ACK/NACK(這個比較麻煩,不用 RabbitListener 接受消息才必須用這個)
  2. auto,配合 RabbitListener 注解,代碼若出現異常,NACK,成功則 ACK
  3. none,獲得消息后直接 ACK,無論是否執行成功

出現 NACK 后要如何處理(此過程還在我們的服務器):

  1. 拒絕(默認)
  2. 重新入隊列
  3. 返回 ACK,消費者重新發布消息指定的交換機
@Configuration
@RequiredArgsConstructor
@Slf4j
public class MessageRecovererConfig {@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RejectAndDontRequeueRecoverer(); // nack、直接 reject 和不 requeue,成為死信(默認)
//        return new ImmediateRequeueMessageRecoverer(); // nack、requeue
//        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); // ack、發送給指定的交換機,confirm 機制需要設置到 rabbitTemplate 里}}

Spring 提供的 retry 機制,在消費者出現異常時利用本地重試,而不是無限制的requeue到mq隊列。

spring:rabbitmq:listener:simple:acknowledge-mode: auto # 消費者執行成功 ack、異常 nack(manual 為手動、none 代表無論如何都是 ack)retry: # 這個屬于 spring amqp 的 retry 機制enabled: false # 不開啟失敗重試initial-interval: 1000 # 第一次重試時間間隔multiplier: 3 # 每次重試間隔的倍數max-attempts: 4 # 最大接受次數stateless: true # true 代表沒有狀態,若有消費者包含事務,這里改為 false

解釋:第一次失敗,一秒后重試、第二次失敗,三秒后重試,第三次失敗,九秒后重試,第四次失敗就沒機會了(SpringAMQP會拋出異常AmqpRejectAndDontRequeueException)

失敗之后根據對應的處理策略進行處理

(2)死信交換機

消息過期、消息執行失敗并且不重試也不重新入隊列,堆積過多等情況,消息會成為死信,若隊列綁定死信交換機,則轉發給死信交換機,若沒有則直接丟棄

隊列1 -> 死信交換機 -> 隊列2,這個過程是消息隊列內部保證的可靠性,消息也沒有包含原發送者的信息,甚至連接已經斷開了,所以沒有 publisher-confirm 也沒有 publisher-returns

在這里插入圖片描述

這個機制和 republish 有點像,但是有本質的區別,republish 是消費者重發,而這里是隊列將死信轉發給死信交換機

死信的情況:

  1. nack && requeue == false
  2. 超時未消費
  3. 隊列滿了,由于隊列的特性,隊列頭會先成為死信

(3)延遲功能如何實現

剛才提到死信的誕生可能是超時未消費,那么其實這個點也可以簡單的實現一個延遲隊列:

隊列為一個不被監聽的專門用來延遲消息發送的緩沖帶,其死信交換機才是目標交換機

message.getMessageProperties().setExpiration("1000");

設置的是過期時間,其本意并不是延遲,是可以實現延遲~

在這里插入圖片描述

另外,隊列本身也能設置 ttl 過期時間,但并不是隊列的過期時間(顯然不合理,截止后無論啥都丟了,冤不冤啊,至少我想不到這種場景),而是隊列中的消息存活的最大時間,消息的過期時間和這個取一個最小值才是真實的過期時間

值得注意的是,雖然能實現延時消息的功能,但是

  1. 實現復雜
  2. 延遲可能不準確,因為隊列的特性,如果隊列頭未出隊列,哪怕其后者出現死信,也只能乖乖等前面的先出去之后才能前往死信交換機(例如消息的 ttl 分別為 9s、3s、1s,最終三個消息會被同時轉發,因為“最長壽的”排在了前面)

這種方式的順序優先級大于時間優先級

而 RabbitMQ 也提供了一個插件,叫 DelayExchange 延時交換機,專門用來實現延時功能

Scheduling Messages with RabbitMQ | RabbitMQ

  • 請自行上網下載

延時交換機的聲明:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"
))
public void listenDelayMessage(String msg){log.info("接收到delay.queue的延遲消息:{}", msg);
}

延時消息的發送:

private MessagePostProcessor delayMessagePostProcessor(long delay) {return message -> {// 小于 0 也是立即執行message.getMessageProperties().setDelay((int) Math.max(delay, 0));return message;};
};

這里設置的是 Delay,不是過期時間,哪怕超過了時間也不叫做死信

期間一直存在延時交換機的硬存里,延遲消息插件內部會維護一個本地數據庫表,同時使用 Elang Timers 功能實現計時。如果消息的延遲時間設置較長,可能會導致堆積的延遲消息非常多,會帶來較大的CPU開銷,同時延遲消息的時間會存在誤差。

(4)消息堆積如何解決

死信的成因還可能是堆疊過多

我在實際的開發中,沒遇到過這種情況,不過,如果發生了堆積的問題,解決方案也所有很多的

  1. 提高消費者的消費能力 ,可以使用多線程消費任務
  2. 增加更多消費者,提高消費速度,使用工作隊列模式, 設置多個消費者消費消費同一個隊列中的消息
  3. 擴大隊列容積,提高堆積上限

但是,RabbitMQ 隊列占的是內存,間接性的落盤,提高上限最終的結果很有可能就是反復落庫,特別不穩定,且并沒有解決消息堆積過多的問題

我們可以使用 RabbitMQ 惰性隊列,惰性隊列的好處主要是

  1. 接收到消息后直接存入磁盤而非內存,雖然慢,但沒有間歇性的 page-out,性能比較穩定
  2. 消費者要消費消息時才會從磁盤中讀取并加載到內存,正常消費后就刪除了
  3. 基于磁盤存儲,消息上限高,支持數百萬條的消息存儲

聲明方式:

而要設置一個隊列為惰性隊列,只需要在聲明隊列時,指定x-queue-mode屬性為lazy即可。可以通過命令行將一個運行中的隊列修改為惰性隊列:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  

命令解讀:

  • rabbitmqctl :RabbitMQ的命令行工具
  • set_policy :添加一個策略
  • Lazy :策略名稱,可以自定義
  • "^lazy-queue$" :用正則表達式匹配隊列的名字
  • '{"queue-mode":"lazy"}' :設置隊列模式為lazy模式
  • --apply-to queues :策略的作用對象,是所有的隊列
  • x-queue-mode 參數的值為 lazy
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = "xxx"),value = @Queue(name = "xxx", arguments = @Argument(name = "x-queue-mode", value = "lazy")),key = "xxx"
))

交換機、隊列擴展屬性叫參數,消息的拓展屬性叫頭部,擴展屬性一般都以 x- 開頭(extra)

消息堆積問題的解決方案?

  • 隊列上綁定多個消費者,提高消費速度
  • 使用惰性隊列,可以再mq中保存更多消息

惰性隊列的優點有哪些?

  • 基于磁盤存儲,消息上限高
  • 沒有間歇性的 page-out,性能比較穩定

惰性隊列的缺點有哪些?

  • 基于磁盤存儲,消息時效性會降低
  • 性能受限于磁盤的IO

(5)高可用如何保證

RabbitMQ 在服務大規模項目時,一般情況下不會像數據庫那樣存儲的瓶頸,用惰性隊列已經是很頂天了的,其特性和用途不會有太極端的存儲壓力

更多的是在并發情況下,處理消息的能力有瓶頸,可能出現節點宕機的情況,而避免單節點宕機,數據丟失、無法提供服務等問題需要解決,也就是需要保證高可用性

Erlang 是一種面向并發的語言,天然支持集群模式,RabbitMQ 的集群有兩種模式:

  1. 普通集群:是一種分布式集群,將隊列分散到集群的各個節點,從而提高整個集群的并發能力
  2. 鏡像集群:是一種主從集群,在普通集群的基礎上,添加了主從備份的功能,提高集群的數據可用性

鏡像集群雖然支持主從,但主從同步并不是強一致的,某些情況下可能有數據丟失的風險(雖然重啟能解決,但那不是強一致,而是最終一致),因此 RabbitMQ 3.8 以后,推出了新的功能:仲裁隊列來代替鏡像集群,底層采用 Raft 協議確保主從的數據一致性

1、普通集群

各個節點之間,實時同步 MQ 元數據(一些靜態的共享的數據):

  1. 交換機的信息
  2. 隊列的信息

但不包括隊列中的消息(動態的數據不同步)

監聽隊列的時候,如果監聽的節點不存在該隊列(只是知道元數據),當前節點會訪問隊列所在的節點,該節點返回數據到當前節點并返回給監聽者

隊列所在節點宕機,隊列中的消息就會“丟失”(是在重啟之前,這個消息就消失無法被處理的意思)

在這里插入圖片描述

如何部署,上網搜搜就行

2、鏡像集群

各個節點之間,實時同步 MQ 元數據(一些靜態的共享的數據):

  1. 交換機的信息
  2. 隊列的信息

本質是主從模式,創建隊列的節點為主節點,其他節點為鏡像節點,隊列中的消息會從主節點備份到鏡像節點中

注意

  • 像 Redis 那樣的主從集群,同步都是全部同步來著
  • 但 RabbitMQ 集群的主從模式比較特別,他的粒度是隊列,而不是全部

也就是說,一個隊列的主節點,可能是另一個隊列的鏡像節點,所以分析某個場景的時候,要確認是哪個隊列,單獨進行觀察分析討論

  • 不同隊列之間只有交互,不會相互影響數據同步

針對某一個隊列,所有寫操作都在主節點完成,然后同步給鏡像節點,讀操作任何一個都 ok

主節點宕機,鏡像節成為新的主節點

在這里插入圖片描述

鏡像集群有三種模式:

  1. exactly 準確模式,指定副本數 count = 主節點數 1 + 鏡像節點數,集群會盡可能的維護這個數值,如果鏡像節點出現故障,就在另一個節點上創建鏡像,比較建議這種模式,可以設置為 N/2 + 1
  2. all 全部模式,count = N,主節點外全部都是鏡像節點
  3. nodes 模式,指定鏡像節點名稱列表,隨機一個作為主節點,如果列表里的節點都不存在或不可用,則創建隊列時的節點作為主節點,之后訪問集群,列表中的節點若存在才會創建鏡像節點

沒有鏡像節點其實就相當于普通模式了

如何配置上網搜搜就行,比較麻煩,需要設置策略,以及匹配的隊列(不同隊列分開來討論,可以設置不同的策略)

3、仲裁隊列

RabbitMQ 3.8 以后,推出了新的功能仲裁隊列來

  1. 代替鏡像集群,都是主從模式,支持主從數據同步,默認是 exactly count = 5
  2. 約定大于配置,使用非常簡單沒有復雜的配置,隊列的類型選擇 Quorum 即可
  3. 底層采用 Raft 協議確保主從的數據強一致性

Spring Boot 配置:

在這里插入圖片描述

仲裁隊列聲明:

@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = "xxx"),value = @Queue(name = "xxx", arguments = @Argument(name = "x-queue-type", value = "quorum")),key = "xxx"
))

隊列不聲明默認就是普通集群,這里聲明的仲裁隊列也只是針對一個隊列

(6)消息重復消費問題

在保證MQ消息不重復的情況下,MQ 的一條消息被消費者消費了多次

消費者消費消息成功后,在給MQ發送消息確認的時候出現了網絡異常或者是服務宕機,MQ 遲遲沒有接收到 ACK 也沒有 NACK,此時 MQ 不會將發送的消息刪除,按兵不動,消費者重新監聽或者有其他消費者的時候,交由它消費,而這條消息如果在之前就消費過了的話,則會導致重復消費

解決方案:

  1. 消息消費的業務本身具有冪等性,再次處理相同消息時不會產生副作用,一些時候可能需要用到分布式鎖去維護冪等性
    • 比如一個訂單的狀態設置為結束,那重復消費的結果一致
  2. 記錄消息的唯一標識,如果消費過了的,則不再消費
    • 消費成功將 id 緩存起來,消費時查詢緩存里是否有這條消息
    • 設置允許的緩存時間時,你不必想得太極端,一般很快就有消費者繼續監聽拿到消息,哪怕真有那個情況,這里帶來的損失大概率可以忽略不記了,一切要結合實際情況!

有時候兩種方案沒有嚴格的界定

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/895150.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/895150.shtml
英文地址,請注明出處:http://en.pswp.cn/news/895150.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

EasyExcel 導出合并層級單元格

EasyExcel 導出合并層級單元格 一、案例 案例一 1.相同訂單號單元格進行合并 合并結果 案例二 1.相同訂單號的單元格進行合并2.相同訂單號的總數和總金額進行合并 合并結果 案例三 1.相同訂單號的單元格進行合并2.相同訂單號的商品分類進行合并3.相同訂單號的總數和總金額…

cs106x-lecture3(Autumn 2017)

打卡cs106x(Autumn 2017)-lecture3 1、streamErrors Suppose an input file named streamErrors-data.txt contains the following text: Donald Knuth M 76 Stanford U. The code below attempts to read the data from the file, but each section has a bug. Correct th…

C++模板編程——typelist的實現

文章最后給出了匯總的代碼&#xff0c;可直接運行 1. typelist是什么 typelist是一種用來操作類型的容器。和我們所熟知的vector、list、deque類似&#xff0c;只不過typelist存儲的不是變量&#xff0c;而是類型。 typelist簡單來說就是一個類型容器&#xff0c;能夠提供一…

springboot 事務管理

在Spring Boot中&#xff0c;事務管理是通過Spring框架的事務管理模塊來實現的。Spring提供了聲明式事務管理和編程式事務管理兩種方式。通常&#xff0c;我們使用聲明式事務管理&#xff0c;因為它更簡潔且易于維護。 1. 聲明式事務管理 聲明式事務管理是通過注解來實現的。…

windows通過網絡向Ubuntu發送文件/目錄

由于最近要使用樹莓派進行一些代碼練習&#xff0c;但是好多東西都在windows里或虛擬機上&#xff0c;就想將文件傳輸到樹莓派上&#xff0c;但試了發現u盤不能簡單傳送&#xff0c;就在網絡上找到了通過windows 的scp命令傳送 前提是樹莓派先開啟ssh服務&#xff0c;且Window…

字節跳動后端一面

&#x1f4cd;1. Gzip壓縮技術詳解 Gzip是一種流行的無損數據壓縮格式&#xff0c;它使用DEFLATE算法來減少文件大小&#xff0c;廣泛應用于網絡傳輸和文件存儲中以提高效率。 &#x1f680; 使用場景&#xff1a; ? 網站優化&#xff1a;通過壓縮HTML、CSS、JavaScript文件來…

Leetcode 3448. Count Substrings Divisible By Last Digit

Leetcode 3448. Count Substrings Divisible By Last Digit 1. 解題思路2. 代碼實現 題目鏈接&#xff1a;3448. Count Substrings Divisible By Last Digit 1. 解題思路 這一題的話我們走的是一個累積數組的思路。 首先&#xff0c;我們使用一個cache數組記錄下任意段數字…

三維模擬-機械臂自翻車

機械仿真 前言效果圖后續 前言 最近在研究Unity機械仿真&#xff0c;用Unity實現其運動學仿真展示的功能&#xff0c;發現一個好用的插件“MGS-Machinery-master”&#xff0c;完美的解決了Unity關節定義缺少液壓缸伸縮關節功能&#xff0c;內置了多個場景&#xff0c;講真的&…

USB子系統學習(四)用戶態下使用libusb讀取鼠標數據

文章目錄 1、聲明2、HID協議2.1、描述符2.2、鼠標數據格式 3、應用程序4、編譯應用程序5、測試6、其它 1、聲明 本文是在學習韋東山《驅動大全》USB子系統時&#xff0c;為梳理知識點和自己回看而記錄&#xff0c;全部內容高度復制粘貼。 韋老師的《驅動大全》&#xff1a;商…

2月9日QT

優化登錄框&#xff1a; 當用戶點擊取消按鈕&#xff0c;彈出問題對話框&#xff0c;詢問是否要確定退出登錄&#xff0c;并提供兩個按鈕&#xff0c;yes|No&#xff0c;如果用戶點擊的Yes&#xff0c;則關閉對話框&#xff0c;如果用戶點擊的No&#xff0c;則繼續登錄 當用戶…

安卓路由與aop 以及 Router-api

安卓路由&#xff08;Android Router&#xff09;和AOP&#xff08;面向切面編程&#xff09;是兩個在Android開發中常用的概念。下面我將詳細講解這兩個概念及其在Android開發中的應用。 一、安卓路由 安卓路由主要用于在應用程序中管理不同組件之間的導航和通信。它可以簡化…

大模型賦能網絡安全整體應用流程概述

一、四個階段概述 安全大模型的應用大致可以分為四個階段: 階段一主要基于開源基礎模型訓練安全垂直領域的模型; 階段二主要基于階段一訓練出來的安全大模型開展推理優化、蒸餾等工序,從而打造出不同安全場景的專家模型,比如數據安全領域、安全運營領域、調用郵件識別領…

nexus部署及配置https訪問

1. 使用docker-compose部署nexus docker-compose-nexus.yml version: "3" services:nexus:container_name: my-nexusimage: sonatype/nexus3:3.67.1hostname: my-nexusnetwork_mode: hostports:- 8081:8081deploy:resources:limits:cpus: 4memory: 8192Mreservations…

史上最快 Python版本 Python 3.13 安裝教程

Python3.13安裝和配置 一、Python的下載 1. 網盤下載地址 (下載速度比較快&#xff0c;推薦&#xff09; Python3.13.0下載&#xff1a;Python3.13.0下載地址&#xff08;windows&#xff09;3.13.0下載地址&#xff08;windows&#xff09; 點擊下面的下載鏈接&#xff0c…

Docker從入門到精通- 容器化技術全解析

第一章&#xff1a;Docker 入門 一、什么是 Docker&#xff1f; Docker 就像一個超級厲害的 “打包神器”。它能幫咱們把應用程序和它運行所需要的東東都整整齊齊地打包到一起&#xff0c;形成一個獨立的小盒子&#xff0c;這個小盒子在 Docker 里叫容器。以前呢&#xff0c;…

ProcessingP5js數據可視化

折線圖繪制程序設計說明 可以讀取表格數據&#xff0c;并轉換成折線圖&#xff0c;條形圖和餅狀圖&#xff0c;并設計了銜接動畫效果 1. 功能概述 本程序使用 Processing 讀取 CSV 文件數據&#xff0c;并繪制帶有坐標軸和數據點的折線圖。橫坐標&#xff08;X 軸&#xff09…

使用云計算,企業的數據監管合規問題如何解決?

使用云計算&#xff0c;企業的數據監管合規問題如何解決&#xff1f; 在當今這個信息化、數字化的時代&#xff0c;數據無疑成為了企業最寶貴的資產之一。隨著云計算的普及&#xff0c;企業將大量數據存儲在云端&#xff0c;不僅提升了效率&#xff0c;也帶來了更多靈活性。然…

AWS Fargate

AWS Fargate 是一個由 Amazon Web Services (AWS) 提供的無服務器容器計算引擎。它使開發者能夠運行容器化應用程序&#xff0c;而無需管理底層的服務器或虛擬機。簡而言之&#xff0c;AWS Fargate 讓你只需關注應用的容器本身&#xff0c;而不需要管理運行容器的基礎設施&…

vue3+vite+eslint|prettier+elementplus+國際化+axios封裝+pinia

文章目錄 vue3 vite 創建項目如果創建項目選了 eslint prettier從零教你使用 eslint prettier第一步&#xff0c;下載eslint第二步&#xff0c;創建eslint配置文件&#xff0c;并下載好其他插件第三步&#xff1a;安裝 prettier安裝后配置 eslint (2025/2/7 補充) 第四步&am…

vLLM V1 重磅升級:核心架構全面革新

本文主要是 翻譯簡化個人評讀&#xff0c;原文請參考&#xff1a;vLLM V1: A Major Upgrade to vLLM’s Core Architecture vLLM V1 開發背景 2025年1月27日&#xff0c;vLLM 開發團隊推出 vLLM V1 alpha 版本&#xff0c;這是對框架核心架構的里程碑式升級。基于過去一年半的…