目錄
一、延時插件實現
1、版本要求
2、為運行新容器時安裝
3、為已運行的容器安裝
4、驗證安裝
5、代碼編寫
1. 配置類
2. 生產者
3. 消費者
二、死信隊列實現
1、代碼編寫
1. 配置類
2. 生產者
3. 消費者
三、踩坑記錄
1、發送消息失敗
2、消息過期后未能轉發到死信隊列
3、消費者消費報錯
一、延時插件實現
1、版本要求
RabbitMQ 3.5.7以上
2、為運行新容器時安裝
# 1. 拉取帶管理界面的鏡像
docker pull rabbitmq:3.11-management
?
# 2. 啟動容器并啟用插件
docker run -d \--name rabbitmq \-p 5672:5672 \-p 15672:15672 \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=password \rabbitmq:3.11-management \bash -c "rabbitmq-plugins enable rabbitmq_delayed_message_exchange && rabbitmq-server"
3、為已運行的容器安裝
# 1. 進入正在運行的容器
docker exec -it rabbitmq /bin/bash
?
# 2. 在容器內執行插件安裝
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
?
# 3. 退出容器
exit
?
# 4. 重啟容器使插件生效
docker restart rabbitmq
4、驗證安裝
# 方法1:檢查插件列表
docker exec rabbitmq rabbitmq-plugins list | grep delayed
?
# 方法2:登錄管理界面
# 訪問 http://localhost:15672 (使用設置的賬號密碼登錄)
# 在 "Exchanges" 標簽頁創建交換機時,Type 下拉框會出現 "x-delayed-message" 選項
5、代碼編寫
1. 配置類
@Configuration
public class RabbitMqConfig {public static final String DELAYED_EXCHANGE = "delayed.exchange";public static final String DELAYED_QUEUE = "delayed.queue";public static final String DELAYED_ROUTING_KEY = "delayed_routing_key";@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct"); // 交換機類型return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message", // 固定類型true,false,args);}
?@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE, true);}
?@Beanpublic Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}}
2. 生產者
public void send(String exchange, String routing_key,Object data, Integer delayMillis) {// 消息后處理器:設置延時和持久化MessagePostProcessor processor = message -> {// 毫秒message.getMessageProperties().setDelay(delayMillis);// 持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;};
?rabbitTemplate.convertAndSend(exchange, routingKey, data, processor);
}
3. 消費者
@Component
@RabbitListener(queues = RabbitMqConfig.DELAYED_QUEUE)
public class DelayedListener {
?@RabbitHandlerpublic void listener(String data, Channel channel, Message message) {log.warn("消息消費成功,消息內容:{}", data);MessageProperties properties = message.getMessageProperties();long deliveryTag = properties.getDeliveryTag()channel.basicAck(deliveryTag, false);}
?
}
二、死信隊列實現
1、代碼編寫
1. 配置類
@Configuration
public class RabbitMqConfig {public static final String DELAYED_EXCHANGE = "delayed.exchange";public static final String DELAYED_QUEUE = "delayed.queue";public static final String DELAYED_ROUTING_KEY = "delayed_routing_key";
?public static final String NORMAL_EXCHANGE = "normal.exchange";public static final String NORMAL_QUEUE = "normal.queue";public static final String NORMAL_ROUTING_KEY = "normal_routing_key";// 死信隊列(延時隊列)@Beanpublic Queue delayedQueue() {return QueueBuilder.durable(DELAYED_QUEUE).build();}
?// 死信交換機@Beanpublic DirectExchange delayedExchange() {return new DirectExchange(DELAYED_EXCHANGE);}
?// 綁定死信隊列到死信交換機@Beanpublic Binding delayedBinding(Queue delayedQueue, DirectExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY);}
?// 普通隊列@Beanpublic Queue normalQueue() {return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DELAYED_EXCHANGE).deadLetterRoutingKey(DELAYED_ROUTING_KEY).build();}
?// 普通交換機@Beanpublic DirectExchange normalExchange() {return new DirectExchange(NORMAL_EXCHANGE);}
?// 綁定普通隊列到普通交換機@Beanpublic Binding normalBinding(Queue normalQueue, DirectExchange normalExchange) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY);}}
2. 生產者
public void send(String exchange, String routing_key, Object data, Integer delayMillis) {String uuid = IdUtil.simpleUUID();// 消息入庫略,uuid為主鍵MessageProperties properties = new MessageProperties();// 設置TTL,單位毫秒properties.setExpiration(String.valueOf(delayMillis));// 消息持久化(2 表示持久化)properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
?Message msg = rabbitTemplate.getMessageConverter().toMessage(data, properties);rabbitTemplate.send(exchange, routingKey, msg, new CorrelationData(uuid));
}
3. 消費者
@Component
@RabbitListener(queues = RabbitMqConfig.DELAYED_QUEUE)
public class DelayedListener {
?@RabbitHandlerpublic void listener(String data, Channel channel, Message message) {log.warn("消息消費成功,消息內容:{}", data);MessageProperties properties = message.getMessageProperties();long deliveryTag = properties.getDeliveryTag()channel.basicAck(deliveryTag, false);}
?
}
三、踩坑記錄
1、發送消息失敗
原因:RabbitTemplate
配置了消息抵達確認,消息ID沒有傳值。
RabbitTemplate rabbitTemplate = new RabbitTemplate();
// 消息抵達確認通知
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {String msgId = data.getId();if (ack) {log.info("消息抵達隊列成功:{}", data);} else {log.error("消息未能發送成功,消息ID:{}", data.getId(), cause);}
});
生產者實際發送消息未傳消息ID:
錯誤格式
rabbitTemplate.convertAndSend(exchange, routingKey, data);
正確格式
String uuid = IdUtil.simpleUUID();
rabbitTemplate.convertAndSend(exchange, routingKey, data, new CorrelationData(uuid));
2、消息過期后未能轉發到死信隊列
原因:正常消息未綁定死信隊列,消息過期自動刪除,而不會轉發到死信隊列中。
錯誤格式
@Bean
public Queue delayedNormalQueue() {return QueueBuilder.durable(NORMAL_QUEUE).build();
}
正確格式
@Bean
public Queue delayedNormalQueue() {return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DELAYED_EXCHANGE) // 指定死信交換機.deadLetterRoutingKey(DELAYED_ROUTING_KEY) // 指定死信路由鍵.build();
}
3、消費者消費報錯
原因:發送的消息由于自定義的 MessageProperties
,其中缺失了 contentType
參數,需要使用轉化器進行轉換,而不是直接發送消息。
錯誤格式
MessageProperties properties = new MessageProperties();
properties.setExpiration(String.valueOf(delayMillis));
?
Message msg = new Message(message.getBytes(), properties);
rabbitTemplate.convertAndSend(exchange, routingKey, msg, new CorrelationData(uuid));
正確格式
MessageProperties properties = new MessageProperties();
properties.setExpiration(String.valueOf(delayMillis));
?
Message msg = rabbitTemplate.getMessageConverter().toMessage(message, properties);
rabbitTemplate.send(exchange, routingKey, msg, new CorrelationData(uuid));