方案一:
本地消息表 + 定時任務
本地消息表:主要用于存儲 業務數據、交換機、隊列、路由、次數
定時任務:定時掃描本地消息表,重新給業務隊列投遞消息。
具體思路:業務隊列消費失敗時,把 業務數據、交換機、隊列、路由、次數(需要重新計算) 存儲在本地消息表里,然后定時任務去掃描本地消息表,把符合條件(是否滿足重試次數,是否達到重試時間)的數據篩選出來進行二次投遞,消費者正常消費,在消費失敗時需要入庫。
方案二:
利用 rabbitmq_delayed_message_exchange 插件 實現延遲隊列
具體思路:業務隊列消費失敗時,給延遲隊列發送一條消息,消息包含業務數據、交換機、隊列、次數、最大次數等,延遲隊列收到消息后重新給業務隊列投遞消息。業務隊列二次收到消息時,再次消費失敗,校驗最大次數,判斷是否再次重試。
具體實現
- pom.xml
<dependencies><dependency><groupId>run.siyuan</groupId><artifactId>siyuan-common</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>
- application.yml
server:
port: 8080spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: siyuan
password: siyuan123456virtual-host: /
- PluginDelayRabbitConfig.java
import com.rabbitmq.client.ConnectionFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** @className: PluginDelayRabbitConfig* @Description: TODO* @author: wzq* @date: 2022/5/17 10:50 AM*/
@Configuration
public class PluginDelayRabbitConfig {@Bean("pluginDelayExchange")public CustomExchange pluginDelayExchange() {Map<String, Object> argMap = new HashMap<>();argMap.put("x-delayed-type", "direct");//必須要配置這個類型,可以是direct,topic和fanout//第二個參數必須為x-delayed-messagereturn new CustomExchange("PLUGIN_DELAY_EXCHANGE","x-delayed-message",false, false, argMap);}@Bean("pluginDelayQueue")public Queue pluginDelayQueue(){return new Queue("PLUGIN_DELAY_QUEUE");}@Beanpublic Binding pluginDelayBinding(@Qualifier("pluginDelayQueue") Queue queue, @Qualifier("pluginDelayExchange") CustomExchange customExchange){return BindingBuilder.bind(queue).to(customExchange).with("delay").noargs();}
}
- RabbitmqConsumer.java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;/*** @className: consumer* @Description: TODO* @author: wzq* @date: 2022/5/17 10:52 AM*/
@Slf4j
@Component
public class RabbitmqConsumer {@RabbitHandler@RabbitListener(queues = "PLUGIN_DELAY_QUEUE")//監聽延時隊列public void fanoutConsumer(String msg){SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("【插件延遲隊列】【" + sdf.format(new Date()) + "】收到消息:" + msg);}
}
- RabbitMqController.java
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.text.SimpleDateFormat;
import java.util.Date;/*** @className: RabbitMqController* @Description: TODO* @author: wzq* @date: 2022/5/17 10:54 AM*/
@RestController
public class RabbitMqController {@AutowiredRabbitTemplate rabbitTemplate;@GetMapping(value = "/plugin/send")public String pluginMsgSend(@RequestParam Integer time) {JSONObject json = new JSONObject();json.set("name", "插件延遲消息");json.set("time", System.currentTimeMillis());json.set("delayTime", time);MessageProperties messageProperties = new MessageProperties();messageProperties.setHeader("x-delay", 1000 * time);//延遲5秒被刪除Message message = new Message(JSONUtil.toJsonStr(json).getBytes(), messageProperties);rabbitTemplate.convertAndSend("PLUGIN_DELAY_EXCHANGE", "delay", message);//交換機和路由鍵必須和配置文件類中保持一致SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("消息發送成功【" + sdf.format(new Date()) + "】" + "延遲時間:" + time);return "succ";}
}
方案三:
利用 TTL 消息 + DLX 死信隊列 實現延遲隊列
具體思路:業務隊列消費失敗時,會發送一條TTL 消息,消息包含業務數據、交換機、隊列、次數、最大次數等,TTL 消息過期后會進入死信隊列,此時監聽死信隊列接收消息,校驗是否達到重試次數,再重新投遞給業務隊列,業務隊列二次收到消息時,再次消費失敗,校驗最大次數,判斷是否再次重試。超過最大次數入庫,人工干預處理
具體實現
- pom.xml
<dependencies><dependency><groupId>run.siyuan</groupId><artifactId>siyuan-common</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>
- application.yml
server:
port: 8080spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: siyuan
password: siyuan123456virtual-host: /
- Constants.java
public interface Constants {// ------------------------------ delay -------------------------------------// 延時交換機String DELAY_EXCHANGE = "delay.exchange";// 延時交換機隊列String DELAY_EXCHANGE_QUEUE = "delay.exchange.queue";// 延時交換機路由鍵String DELAY_EXCHANGE_ROUTE_KEY = "delay.exchange.route.key";// ------------------------------ dead.letter.fanout -------------------------------------// 死信交換機String DELAY_LETTER_EXCHANGE = "dead.letter.exchange";// 死信交換機隊列String DELAY_LETTER_EXCHANGE_QUEUE = "dead.letter.exchange.queue";// 死信交換機路由鍵String DELAY_LETTER_EXCHANGE_ROUTE_KEY = "dead.letter.exchange.route.key";// ------------------------------ 業務隊列 -------------------------------------String SERVICE_QUEUE = "service.queue";}
- RetryRabbitConfig.java
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import run.siyuan.common.rabbitmq.Constants;import java.util.HashMap;
import java.util.Map;@Configuration
public class RetryRabbitConfig {/*** ------------------- 延時隊列相關 -------------------* @return*//*** 延時交換機*/@Beanpublic DirectExchange ttlDelayExchangeRetry() {return new DirectExchange(Constants.DELAY_EXCHANGE);}/*** 延時交換機隊列*/@Beanpublic Queue ttlDelayExchangeQueueRetry() {Map<String, Object> map = new HashMap<String, Object>();//隊列中所有消息5秒后過期//map.put("x-message-ttl", 1000 * 60 * 5);//過期后進入死信隊列map.put("x-dead-letter-exchange", Constants.DELAY_LETTER_EXCHANGE);return new Queue(Constants.DELAY_EXCHANGE_QUEUE, false, false, false, map);}/*** Fanout交換機和productQueue綁定*/@Beanpublic Binding bindTtlExchangeAndQueueRetry() {return BindingBuilder.bind(ttlDelayExchangeQueueRetry()).to(ttlDelayExchangeRetry()).with(Constants.DELAY_EXCHANGE_ROUTE_KEY);}/*** ------------------- 死信隊列相關 -------------------*//*** fanout死信交換機** @return*/@Beanpublic FanoutExchange deadLetterExchange() {return new FanoutExchange(Constants.DELAY_LETTER_EXCHANGE);}/*** 死信隊列** @return*/@Beanpublic Queue deadLetterQueue() {return new Queue(Constants.DELAY_LETTER_EXCHANGE_QUEUE);}/*** 正常業務隊列* @return*/@Beanpublic Queue serviceQueue() {return new Queue(Constants.SERVICE_QUEUE);}/*** 死信隊列和死信交換機綁定** @return*/@Beanpublic Binding deadLetterBind() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());}
}
- MessageRetryVo
package run.siyuan.rabbitmq.retry.message.model;import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;import java.io.Serializable;
import java.util.Date;@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class MessageRetryVo implements Serializable {private static final long serialVersionUID = 1L;/*** 原始消息body*/private String bodyMsg;/*** 交換器*/private String exchangeName;/*** 路由鍵*/private String routingKey;/*** 隊列*/private String queueName;/*** 最大重試次數*/private Integer maxTryCount = 3;/*** 當前重試次數*/private Integer currentRetryCount = 0;/*** 任務失敗信息*/private String errorMsg;/*** 創建時間*/private Date createTime;/*** 消息類型 0.延時消息 1.重試消息*/private Integer type;@Overridepublic String toString() {return "MessageRetryDTO{" +"bodyMsg='" + bodyMsg + '\'' +", exchangeName='" + exchangeName + '\'' +", routingKey='" + routingKey + '\'' +", queueName='" + queueName + '\'' +", maxTryCount=" + maxTryCount +", currentRetryCount=" + currentRetryCount +", errorMsg='" + errorMsg + '\'' +", createTime=" + createTime +'}';}/*** 檢查重試次數是否超過最大值** @return*/public boolean checkRetryCount(Integer type) {//檢查重試次數是否超過最大值if (this.currentRetryCount <= this.maxTryCount) {if (type.equals(0)) {retryCountCalculate();}return true;}return false;}/*** 重新計算重試次數*/private void retryCountCalculate() {this.currentRetryCount = this.currentRetryCount + 1;}}
- ServiceConsumer.java
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import run.siyuan.common.rabbitmq.Constants;
import run.siyuan.rabbitmq.retry.message.service.CommonMessageDelayService;import java.io.IOException;/**
* 正常消費
*/
@Slf4j
@Component
public class ServiceConsumer extends CommonMessageDelayService {@RabbitListener(queues = Constants.SERVICE_QUEUE, ackMode = "MANUAL", concurrency = "1")private void consumer(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {try {byte[] body = message.getBody();String msg = new String(body);log.info("【正常隊列】【" + System.currentTimeMillis() + "】收到死信隊列消息:" + msg);JSONObject json = JSONUtil.parseObj(msg);if (json.getInt("id") < 0) {throw new Exception("id 小于 0");}channel.basicAck(deliveryTag, false);} catch (Exception e) {log.info("消費異常:{}", e.getMessage());channel.basicNack(deliveryTag, false, false);sendDelayMessage(message, e);}}}
- DeadLetterConsumer
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import run.siyuan.common.rabbitmq.Constants;
import run.siyuan.rabbitmq.retry.message.service.CommonMessageRetryService;import java.io.IOException;/*** @className: DeadLetterConsumer* @Description: TODO 死信隊列消費者* @author: wzq* @date: 2022/5/13 3:05 PM*/
@Slf4j
@Component
public class DeadLetterConsumer extends CommonMessageRetryService {@RabbitHandler@RabbitListener(queues = Constants.DELAY_LETTER_EXCHANGE_QUEUE, ackMode = "MANUAL", concurrency = "1")public void consumer(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {try {log.info("【死信隊列】【" + System.currentTimeMillis() + "】收到死信隊列消息:", new String(message.getBody()));retryMessage(message);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, false);}}
}
- CommonMessageDelayService.java
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import run.siyuan.common.rabbitmq.Constants;
import run.siyuan.rabbitmq.retry.message.model.MessageRetryVo;/*** @className: TtlTetsConsumer* @Description: TODO rabbitmq 補償機制--發送延時消息* @author: wzq* @date: 2022/5/13 3:05 PM*/@Slf4j
public abstract class CommonMessageDelayService extends AbstractCommonMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 發送延時消息** @param message*/protected void sendDelayMessage(Message message, Exception e) {try {//封裝消息MessageRetryVo delayMessageVo = buildMessageRetryInfo(message);log.info("延時消息:{}", delayMessageVo);//獲取所有堆棧信息StackTraceElement[] stackTraceElements = e.getStackTrace();//默認的異常類全路徑為第一條異常堆棧信息的String exceptionClassTotalName = stackTraceElements[0].toString();//遍歷所有堆棧信息,找到vip.xiaonuo開頭的第一條異常信息for (StackTraceElement stackTraceElement : stackTraceElements) {if (stackTraceElement.toString().contains("com.central")) {exceptionClassTotalName = stackTraceElement.toString();break;}}log.info("異常信息:{}", exceptionClassTotalName);delayMessageVo.setErrorMsg(exceptionClassTotalName);delayMessageVo.setType(0);prepareAction(delayMessageVo);} catch (Exception exception) {log.warn("處理消息異常,錯誤信息:", exception);}}/*** 異常消息重新入庫** @param retryVo*/@Overrideprotected void sendMessage(MessageRetryVo retryVo) {//將補償消息實體放入頭部,原始消息內容保持不變MessageProperties messageProperties = new MessageProperties();// 消息的有效時間固定,不使用自定義時間messageProperties.setExpiration(String.valueOf(1000 * 10 * 1));messageProperties.setHeader("message_retry_info", JSONUtil.toJsonStr(retryVo));Message ttlMessage = new Message(JSONUtil.toJsonStr(retryVo).getBytes(), messageProperties);rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, Constants.DELAY_EXCHANGE_ROUTE_KEY, ttlMessage);log.info("發送業務消息 完成 時間:{}", System.currentTimeMillis());}}
- CommonMessageRetryService.java
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import run.siyuan.rabbitmq.retry.message.model.MessageRetryVo;/*** @className: TtlTetsConsumer* @Description: TODO rabitmq 補償機制--重新發送業務消息* @author: wzq* @date: 2022/5/13 3:05 PM*/
@Slf4j
public abstract class CommonMessageRetryService extends AbstractCommonMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 發送延時消息** @param message*/public void retryMessage(Message message) {try {//封裝消息MessageRetryVo retryMessageVo = buildMessageRetryInfo(message);log.info("重試消息:{}", retryMessageVo);retryMessageVo.setType(1);prepareAction(retryMessageVo);} catch (Exception exception) {log.warn("處理消息異常,錯誤信息:", exception);}}/*** 異常消息重新入庫** @param retryVo*/@Overrideprotected void sendMessage(MessageRetryVo retryVo) {//將補償消息實體放入頭部,原始消息內容保持不變MessageProperties messageProperties = new MessageProperties();messageProperties.setHeader("message_retry_info", JSONUtil.toJsonStr(retryVo));Message message = new Message(retryVo.getBodyMsg().getBytes(), messageProperties);rabbitTemplate.convertAndSend(retryVo.getExchangeName(), retryVo.getRoutingKey(), message);log.info("發送業務消息 完成 時間:{}", System.currentTimeMillis());}}
- AbstractCommonMessageService
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import run.siyuan.rabbitmq.retry.message.model.MessageRetryVo;import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.Map;
import java.util.Objects;/*** @className: TtlTetsConsumer* @Description: TODO rabbitmq 補償機制 抽象類* @author: wzq* @date: 2022/5/13 3:05 PM*/
@Slf4j
public abstract class AbstractCommonMessageService {/*** 發送消息** @param retryVo*/protected abstract void sendMessage(MessageRetryVo retryVo);/*** 構建消息補償實體** @param message* @return*/protected MessageRetryVo buildMessageRetryInfo(Message message) {//如果頭部包含補償消息實體,直接返回Map<String, Object> messageHeaders = message.getMessageProperties().getHeaders();if (messageHeaders.containsKey("message_retry_info")) {Object retryMsg = messageHeaders.get("message_retry_info");if (Objects.nonNull(retryMsg)) {return JSONUtil.toBean(JSONUtil.parseObj(retryMsg), MessageRetryVo.class);}}//自動將業務消息加入補償實體MessageRetryVo messageVo = new MessageRetryVo();messageVo.setBodyMsg(new String(message.getBody(), StandardCharsets.UTF_8));messageVo.setExchangeName(message.getMessageProperties().getReceivedExchange());messageVo.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());messageVo.setQueueName(message.getMessageProperties().getConsumerQueue());messageVo.setCreateTime(new Date());return messageVo;}/*** 準備執行** @param messageVo*/protected void prepareAction(MessageRetryVo messageVo) {if (messageVo.checkRetryCount(messageVo.getType())) {this.sendMessage(messageVo);} else {if (log.isWarnEnabled()) {log.warn("當前任務重試次數已經到達最大次數,業務數據:" + messageVo.toString());}doFailCallBack(messageVo);}}/*** 重試失敗,回調服務** @param messageVo*/protected void doFailCallBack(MessageRetryVo messageVo) {try {saveRetryMessageInfo(messageVo);} catch (Exception e) {log.warn("執行失敗回調異常,錯誤原因:{}", e.getMessage());}}/*** 將異常消息入庫** @param messageVo*/protected void saveRetryMessageInfo(MessageRetryVo messageVo) {try {log.info("重試消息次數:{} message_retry_info:{}", messageVo.getCurrentRetryCount(), messageVo);} catch (Exception e) {log.error("將異常消息存儲到mongodb失敗,消息數據:" + messageVo.toString(), e);}}
}
- RetryController.java
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import run.siyuan.common.rabbitmq.Constants;@Slf4j
@RestController
@RequestMapping("/retry")
public class RetryController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping(value = "/service/message")public String consumerFailQueue(@RequestParam(required = false, defaultValue = "1") Integer id) {JSONObject json = new JSONObject();json.set("id", id);json.set("name", "消息名稱");json.set("time", System.currentTimeMillis());String msg = StrUtil.format("消息發送時間:{} 消息數據:{}", System.currentTimeMillis(), json);log.info(msg);rabbitTemplate.convertAndSend(Constants.SERVICE_QUEUE, json);log.info("消息發送完成時間:{}", System.currentTimeMillis());return "success";}
}
PS:方案三會照成消息的阻塞,例如:發送第一個延時消息,10分鐘過期,再發送第二個延時消息,5分鐘過期。第二個消息肯定要比第一個消息提前過期,但此時因為前一個消息沒有過期也就沒有出隊列,那第二個消息只能等待第一個出隊列之后才能出隊列。這樣就照成了消息的阻塞。業務上允許的情況下,可以使用這種方式。