文章目錄
- 整合Springboot
- 概述
- 消費者
- 生產者
- 消息可靠性投遞
- 故障原因
- 解決方案
- 生產者端消息確認機制(故障情況1)
- 故障情況2解決方案
- 故障情況3解決方案
- 消費端限流
- 概念
- 消息超時
- 概念
- 隊列層面:配置隊列過期
- 消息本身:配置消息過期
- 死信隊列
- 概念
- 創建死信交換機和死信隊列
- 創建正常隊列,綁定死信隊列
- 代碼
- 延遲隊列
- 方案1:借助消息超時時間+死信隊列
- 方案2:給RabbitMQ安裝插件
- 檢查是否安裝
- 測試
整合Springboot
概述
- 搭建環境
- 基礎設定:交換機名稱、隊列名稱、綁定關系
- 發送消息:使用RabbitTemplate
- 接收消息:使用@RabbitListener注解
消費者
pom
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
</parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>
yml
spring:rabbitmq:host: 192.168.217.134port: 5672username: guestpassword: 123456virtual-host: /
logging:level:com.atguigu.mq.listener.MyMessageListener: info
Listener
@Component
@Slf4j
public class MyMessageListener {public static final String EXCHANGE_DIRECT = "exchange.direct.order";public static final String ROUTING_KEY = "order";public static final String QUEUE_NAME = "queue.order";// 寫法一:監聽 + 在 RabbitMQ 服務器上創建交換機、隊列@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"),exchange = @Exchange(value = EXCHANGE_DIRECT),key = {ROUTING_KEY}))
// 寫法二:監聽
// @RabbitListener(queues = {QUEUE_NAME})public void processMessage(String dataString, Message message, Channel channel) {log.info("消費端接收到了消息:" + dataString);}
}
生產者
pom
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
</parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency>
</dependencies>
yml
spring:rabbitmq:host: 192.168.217.134port: 5672username: guestpassword: 123456virtual-host: /
RabbitTemplate
@SpringBootTest
public class RabbitMQTest {public static final String EXCHANGE_DIRECT = "exchange.direct.order";public static final String ROUTING_KEY = "order";@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test01SendMessage() {rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, ROUTING_KEY, "Hello Rabbit!SpringBoot!");}}
消息可靠性投遞
故障原因
- 消息沒有發送到消息隊列上
后果:消費者拿不到消息,業務功能缺失,數據錯誤 - 消息成功存入消息隊列,但是消息隊列服務器宕機了
原本保存在內存中的消息也丟失了
即使服務器重新啟動,消息也找不回來了
后果:消費者拿不到消息,業務功能缺失,數據錯誤 - 消息成功存入消息隊列,但是消費端出現問題,例如:宕機、拋異常等等
后果:業務功能缺失,數據錯誤
解決方案
- 故障情況1:消息沒有發送到消息隊列
- 解決思路A:在生產者端進行確認,具體操作中我們會分別針對交換機和隊列來確認, 如果沒有成功發送到消息隊列服務器上,那就可以嘗試重新發送
- 解決思路B:為目標交換機指定備份交換機,當目標交換機投遞失敗時,把消息投遞至備份交換機
- 故障情況2:消息隊列服務器宕機導致內存中消息丟失
- 解決思路:消息持久化到硬盤上,哪怕服務器重啟也不會導致消息丟失
- 故障情況3:消費端宕機或拋異常導致消息沒有成功被消費
- 消費端消費消息成功,給服務器返回ACK信息,然后消息隊列刪除該消息
- 消費端消費消息失敗,給服務器端返回NACK信息,同時把消息恢復為待消費的狀態,這樣就可以再次取回消息,重試一次(當然,這就需要消費端接口支持冪等性)
關聯交換機和備份交換機
生產者端消息確認機制(故障情況1)
故障原因1 解決方案:消息沒有發送到消息隊列上
pom
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
</parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>
YAML
publisher-confirm-type,publisher-returns兩個必須要增加的配置,如果沒有則功能不生效
# producerspring:rabbitmq:host: 192.168.217.134port: 5672username: guestpassword: 123456virtual-host: /publisher-confirm-type: CORRELATED # 交換機的確認publisher-returns: true # 隊列的確認
logging:level:com.atguigu.mq.config.MQProducerAckConfig: info
創建配置類
// 用于出現推送失敗的情況下查看返回值
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;@Configuration
@Slf4j
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void initRabbitTemplate() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {// 消息發送到交換機成功或失敗時調用這個方法log.info("confirm() 回調函數打印 CorrelationData:" + correlationData);log.info("confirm() 回調函數打印 ack:" + ack);log.info("confirm() 回調函數打印 cause:" + cause);}@Overridepublic void returnedMessage(ReturnedMessage returned) {// 發送到隊列失敗時才調用這個方法log.info("returnedMessage() 回調函數 消息主體: " + new String(returned.getMessage().getBody()));log.info("returnedMessage() 回調函數 應答碼: " + returned.getReplyCode());log.info("returnedMessage() 回調函數 描述:" + returned.getReplyText());log.info("returnedMessage() 回調函數 消息使用的交換器 exchange : " + returned.getExchange());log.info("returnedMessage() 回調函數 消息使用的路由鍵 routing : " + returned.getRoutingKey());}
}
API說明
①ConfirmCallback接口
這是RabbitTemplate內部的一個接口,源代碼如下:
/*** A callback for publisher confirmations.**/@FunctionalInterfacepublic interface ConfirmCallback {/*** Confirmation callback.* @param correlationData correlation data for the callback.* @param ack true for ack, false for nack* @param cause An optional cause, for nack, when available, otherwise null.*/void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);}
生產者端發送消息之后,回調confirm()方法
- ack參數值為true:表示消息成功發送到了交換機
- ack參數值為false:表示消息沒有發送到交換機
②ReturnCallback接口
同樣也RabbitTemplate內部的一個接口,源代碼如下:
/*** A callback for returned messages.** @since 2.3*/@FunctionalInterfacepublic interface ReturnsCallback {/*** Returned message callback.* @param returned the returned message and metadata.*/void returnedMessage(ReturnedMessage returned);}
注意:接口中的returnedMessage()方法僅在消息沒有發送到隊列時調用
ReturnedMessage類中主要屬性含義如下:
屬性名 | 類型 | 含義 |
---|---|---|
message | org.springframework.amqp.core.Message | 消息以及消息相關數據 |
replyCode | int | 應答碼,類似于HTTP響應狀態碼 |
replyText | String | 應答碼說明 |
exchange | String | 交換機名稱 |
routingKey | String | 路由鍵名稱 |
故障情況2解決方案
指定隊列名稱默認自動持久化,還可設置是否自動刪除隊列
故障情況3解決方案
# consumerspring:rabbitmq:host: 192.168.217.134port: 5672username: guestpassword: 123456virtual-host: /listener:simple:acknowledge-mode: manual # 把消息確認模式改為手動確認prefetch: 1 # 每次從隊列中取回消息的數量
deliveryTag:交付標簽機制,每一個消息進入隊列時,broker都會生成一個唯一標識
消息復制到各個隊列,但deliveryTag各不相同
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;@Component
@Slf4j
public class MyMessageListener {public static final String QUEUE_NAME = "queue.order";public static final String QUEUE_NORMAL = "queue.normal.video";public static final String QUEUE_DEAD_LETTER = "queue.dead.letter.video";public static final String QUEUE_DELAY = "queue.test.delay";public static final String QUEUE_PRIORITY = "queue.test.priority";@RabbitListener(queues = {QUEUE_NAME})public void processMessage(String dataString, Message message, Channel channel) throws IOException {// 獲取當前消息的 deliveryTaglong deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 核心操作log.info("消費端 消息內容:" + dataString);System.out.println(10 / 0);// 核心操作成功:返回 ACK 信息channel.basicAck(deliveryTag, false);} catch (Exception e) {// 獲取當前消息是否是重復投遞的// redelivered 為 true:說明當前消息已經重復投遞過一次了// redelivered 為 false:說明當前消息是第一次投遞Boolean redelivered = message.getMessageProperties().getRedelivered();// 核心操作失敗:返回 NACK 信息// requeue 參數:控制消息是否重新放回隊列// 取值為 true:重新放回隊列,broker 會重新投遞這個消息// 取值為 false:不重新放回隊列,broker 會丟棄這個消息if (redelivered) {// 如果當前消息已經是重復投遞的,說明此前已經重試過一次啦,所以 requeue 設置為 false,表示不重新放回隊列channel.basicNack(deliveryTag, false, false);} else {// 如果當前消息是第一次投遞,說明當前代碼是第一次拋異常,尚未重試,所以 requeue 設置為 true,表示重新放回隊列在投遞一次// 第二個參數:boolean multiple表示是否一次消費多條消息,false表示只確認該序列號對應的消息,true則表示確認該序列號對應的消息以及比該序列號小的所有消息,比如我先發送2條消息,他們的序列號分別為2,3,并且他們都沒有被確認,還留在隊列中,那么如果當前消息序列號為4,那么當multiple為true,則序列號為2、3的消息也會被一同確認。channel.basicNack(deliveryTag, false, true);}// reject 表示拒絕// 辨析:basicNack() 和 basicReject() 方法區別// basicNack()能控制是否批量操作// basicReject()不能控制是否批量操作// channel.basicReject(deliveryTag, true);}}
}
消費端限流
概念
一個參數:prefetch
# consumerspring:rabbitmq:host: 192.168.217.134port: 5672username: guestpassword: 123456virtual-host: /listener:simple:acknowledge-mode: manual # 把消息確認模式改為手動確認prefetch: 1 # 每次從隊列中取回消息的數量
消息超時
概念
- 給消息設定一個過期時間,超過這個時間沒有被取走的消息就會被刪除
- 隊列層面:在隊列層面設定消息的過期時間,并不是隊列的過期時間。意思是這
個隊列中的消息全部使用同一個過期時間。 - 消息本身:給具體的某個消息設定過期時間
- 隊列層面:在隊列層面設定消息的過期時間,并不是隊列的過期時間。意思是這
- 如果兩個層面都做了設置,那么哪個時間短,哪個生效
隊列層面:配置隊列過期
5000毫秒過期
消息本身:配置消息過期
@SpringBootTest
public class RabbitMQTest {public static final String EXCHANGE_TIMEOUT = "exchange.test.timeout";public static final String ROUTING_KEY_TIMEOUT = "routing.key.test.timeout";@Testpublic void test04SendMessage() {// 創建消息后置處理器對象MessagePostProcessor postProcessor = message -> {// 設置消息的過期時間,單位是毫秒message.getMessageProperties().setExpiration("7000");return message;};rabbitTemplate.convertAndSend(EXCHANGE_TIMEOUT, ROUTING_KEY_TIMEOUT, "Test timeout", postProcessor);}
}
死信隊列
概念
概念:當一個消息無法被消費,它就變成了死信。
- 死信產生的原因大致有下面三種:
- 拒絕:消費者拒接消息,basicNack()/basicReject(),并且不把消息重新放入原目標隊列,requeue=false
- 溢出:隊列中消息數量到達限制。比如隊列最大只能存儲10條消息,且現在已經存儲 了10條,此時如果再發送一條消息進來,根據先進先出原則,隊列中最早的消息會變 成死信
- 超時:消息到達超時時間未被消費
- 死信的處理方式大致有下面三種:
- 丟棄:對不重要的消息直接丟棄,不做處理
- 入庫:把死信寫入數據庫,日后處理
- 監聽:消息變成死信后進入死信隊列,我們專門設置消費端監聽死信隊列,做后續處理(通常采用)
創建死信交換機和死信隊列
和創建普通隊列一樣
創建正常隊列,綁定死信隊列
綁定隊列到交換機
代碼
@Test
public void testSendMessageButReject() { rabbitTemplate .convertAndSend( EXCHANGE_NORMAL, ROUTING_KEY_NORMAL, "測試死信情況1:消息被拒絕");
}
①監聽正常隊列
@RabbitListener(queues = {QUEUE_NORMAL})
public void processMessageNormal(Message message, Channel channel) throws IOException {// 監聽正常隊列,但是拒絕消息log.info("★[normal]消息接收到,但我拒絕。");channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
②監聽死信隊列
@RabbitListener(queues = {QUEUE_DEAD_LETTER})
public void processMessageDead(String dataString, Message message, Channel channel) throws IOException { // 監聽死信隊列 log.info("★[dead letter]dataString = " + dataString);log.info("★[dead letter]我是死信監聽方法,我接收到了死信消息");channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
測試超出隊列長度進入死信隊列
@Test
public void testSendMultiMessage() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(EXCHANGE_NORMAL,ROUTING_KEY_NORMAL,"測試死信情況2:消息數量超過隊列的最大容量" + i);}
}
執行循環20次的代碼兩次
正常隊列:最大長度為10
死信隊列:沒有設置最大長度,所以推送失敗的消息都進入死信隊列
過一段時間正常隊列中消息超時,進入死信隊列
延遲隊列
- 方案1:借助消息超時時間+死信隊列(就是剛剛我們測試的例子)
- 方案2:給RabbitMQ安裝插件
方案1:借助消息超時時間+死信隊列
方案2:給RabbitMQ安裝插件
插件安裝
https://www.rabbitmq.com/community-plugins.html
docker inspect rabbitmqwget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
mv rabbitmq_delayed_message_exchange-3.13.0.ez /var/lib/docker/volumes/rabbitmq-plugin/_data
# 登錄進入容器內部
docker exec -it rabbitmq /bin/bash# rabbitmq-plugins命令所在目錄已經配置到$PATH環境變量中了,可以直接調用
rabbitmq-plugins enable rabbitmq_delayed_message_exchange# 退出Docker容器
exit# 重啟Docker容器
docker restart rabbitmq
檢查是否安裝
創建新交換機時可以在type中看到x-delayed-message選項
關于x-delayed-type參數的理解:
原本指定交換機類型的地方使用了x-delayed-message這個值,那么這個交換機除了支持延遲消息之外,到底是direct、fanout、topic這些類型中的哪一個呢?
這里就額外使用x-delayed-type來指定交換機本身的類型
測試
生產者
@Test
public void test05SendMessageDelay() {// 創建消息后置處理器對象MessagePostProcessor postProcessor = message -> {// 設置消息過期時間(以毫秒為單位)// x-delay 參數必須基于 x-delayed-message-exchange 插件才能生效message.getMessageProperties().setHeader("x-delay", "10000");return message;};// 發送消息rabbitTemplate.convertAndSend(EXCHANGE_DELAY,ROUTING_KEY_DELAY,"Test delay message by plugin " + new SimpleDateFormat("HH:mm:ss").format(new Date()),postProcessor);
}
消費者
//已創建隊列
@Component
@Slf4j
public class MyDelayMessageListener {public static final String QUEUE_DELAY = "queue.delay.video";@RabbitListener(queues = {QUEUE_DELAY})public void process(String dataString, Message message, Channel channel) throws IOException { log.info("[生產者]" + dataString);log.info("[消費者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
//未創建情況
@Component
@Slf4j
public class MyDelayMessageListener { public static final String EXCHANGE_DELAY = "exchange.delay.video";public static final String ROUTING_KEY_DELAY = "routing.key.delay.video";public static final String QUEUE_DELAY = "queue.delay.video";@RabbitListener(bindings = @QueueBinding( value = @Queue(value = QUEUE_DELAY, durable = "true", autoDelete = "false"), exchange = @Exchange( value = EXCHANGE_DELAY, durable = "true", autoDelete = "false", type = "x-delayed-message", arguments = @Argument(name = "x-delayed-type", value = "direct")), key = {ROUTING_KEY_DELAY} )) public void process(String dataString, Message message, Channel channel) throws IOException { log.info("[生產者]" + dataString); log.info("[消費者]" + new SimpleDateFormat("hh:mm:ss").format(new Date())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }