一 : 在使用 RabbitMQ 作為消息隊列時,保證消息被成功發送和消費是一個非常重要的問題。以下是一些關鍵點和最佳實踐,以確保消息的可靠傳輸和處理。*
配置方式:
保證消息被成功發送
確認模式(Confirm Mode):生產者可以啟用確認模式,確保消息成功到達交換機。
使用 channel.confirmSelect() 啟用確認模式。
使用 channel.waitForConfirms() 或 channel.addConfirmListener() 來處理確認消息。
事務模式(Transaction Mode):生產者可以使用事務模式,確保消息成功到達隊列。
使用 channel.txSelect() 開啟事務,channel.txCommit() 提交事務,channel.txRollback() 回滾事務。
處理發送失敗:實現重試機制,可以在發送失敗時重試。
使用死信交換機(Dead Letter Exchange, DLX)來存儲處理失敗的消息。
保證消息被成功消費
手動確認(Manual Acknowledgment):消費者應該使用手動確認模式,確保消息被成功處理后再確認。
使用 channel.basicConsume(queue, false, consumer) 開啟手動確認模式。
在消息處理成功后,調用 channel.basicAck(deliveryTag, false) 確認消息。
處理消費失敗:實現消費失敗的重試機制。
使用死信交換機(DLX)來存儲處理失敗的消息。
冪等性:確保消費者處理消息的冪等性,避免重復消費導致的問題。
二:通過記錄消息到數據庫中,采用定時任務輪詢方式:
1 這是一個 Spring 組件,用于構建和發布返利消息事件。
//topic 字段從配置文件中獲取,表示消息隊列的 topic。
//buildEventMessage 方法用于構建 EventMessage 對象,包含隨機生成的 ID、當前時間戳和數據。
//topic 方法返回消息隊列的 topic。
//RebateMessage 是一個內部類,定義了返利消息的結構@Component
public class SendRebateMessageEvent extends BaseEvent<SendRebateMessageEvent.RebateMessage> {@Value("${spring.rabbitmq.topic.send_rebate}")private String topic;@Overridepublic EventMessage<RebateMessage> buildEventMessage(RebateMessage data) {return EventMessage.<SendRebateMessageEvent.RebateMessage>builder().id(RandomStringUtils.randomNumeric(11)).timestamp(new Date()).data(data).build();}@Overridepublic String topic() {return topic;}@Data@Builder@AllArgsConstructor@NoArgsConstructorpublic static class RebateMessage {private String userId;private String rebateDesc;private String rebateType;private String rebateConfig;private String bizId;}
}@Data
public abstract class BaseEvent<T> {public abstract EventMessage<T> buildEventMessage(T data);public abstract String topic();@Data@Builder@AllArgsConstructor@NoArgsConstructorpublic static class EventMessage<T> {private String id;private Date timestamp;private T data;}}
2 生產者示例
//topic 字段從配置文件中獲取,表示消息隊列的 topic。
//listener 方法使用 @RabbitListener 注解監聽指定隊列的消息。
//消息到達后,解析消息內容,根據 rebateType 字段的不同,調用相應的服務方法處理消息。
//異常處理機制確保了消息處理的健壯性。@Component
public class RebateMessageCustomer {@Value("${spring.rabbitmq.topic.send_rebate}")private String topic;@Resourceprivate IRaffleActivityAccountQuotaService raffleActivityAccountQuotaService;@Resourceprivate ICreditAdjustService creditAdjustService;@RabbitListener(queuesToDeclare = @Queue(value = "${spring.rabbitmq.topic.send_rebate}"))public void listener(String message) {try {log.info("監聽用戶行為返利消息 topic: {} message: {}", topic, message);BaseEvent.EventMessage<SendRebateMessageEvent.RebateMessage> eventMessage = JSON.parseObject(message, new TypeReference<BaseEvent.EventMessage<SendRebateMessageEvent.RebateMessage>>() {}.getType());SendRebateMessageEvent.RebateMessage rebateMessage = eventMessage.getData();switch (rebateMessage.getRebateType()) {case "sku":SkuRechargeEntity skuRechargeEntity = new SkuRechargeEntity();skuRechargeEntity.setUserId(rebateMessage.getUserId());skuRechargeEntity.setSku(Long.valueOf(rebateMessage.getRebateConfig()));skuRechargeEntity.setOutBusinessNo(rebateMessage.getBizId());skuRechargeEntity.setOrderTradeType(OrderTradeTypeVO.rebate_no_pay_trade);raffleActivityAccountQuotaService.createOrder(skuRechargeEntity);break;case "integral":TradeEntity tradeEntity = new TradeEntity();tradeEntity.setUserId(rebateMessage.getUserId());tradeEntity.setTradeName(TradeNameVO.REBATE);tradeEntity.setTradeType(TradeTypeVO.FORWARD);tradeEntity.setAmount(new BigDecimal(rebateMessage.getRebateConfig()));tradeEntity.setOutBusinessNo(rebateMessage.getBizId());creditAdjustService.createOrder(tradeEntity);break;}} catch (AppException e) {if (ResponseCode.INDEX_DUP.getCode().equals(e.getCode())) {log.warn("監聽用戶行為返利消息,消費重復 topic: {} message: {}", topic, message, e);return;}throw e;} catch (Exception e) {log.error("監聽用戶行為返利消息,消費失敗 topic: {} message: {}", topic, message, e);throw e;}}
}
3 消費者示例
//這個方法用于保存用戶返利記錄,并在事務中插入用戶行為返利訂單和任務對象。
//在事務外,同步發送 MQ 消息。
//發送消息時,調用 eventPublisher.publish 方法發布消息到指定的 topic。public void saveUserRebateRecord(String userId, List<BehaviorRebateAggregate> behaviorRebateAggregates) {try {dbRouter.doRouter(userId);transactionTemplate.execute(status -> {try {for (BehaviorRebateAggregate behaviorRebateAggregate : behaviorRebateAggregates) {BehaviorRebateOrderEntity behaviorRebateOrderEntity = behaviorRebateAggregate.getBehaviorRebateOrderEntity();UserBehaviorRebateOrder userBehaviorRebateOrder = new UserBehaviorRebateOrder();userBehaviorRebateOrder.setUserId(behaviorRebateOrderEntity.getUserId());userBehaviorRebateOrder.setOrderId(behaviorRebateOrderEntity.getOrderId());userBehaviorRebateOrder.setBehaviorType(behaviorRebateOrderEntity.getBehaviorType());userBehaviorRebateOrder.setRebateDesc(behaviorRebateOrderEntity.getRebateDesc());userBehaviorRebateOrder.setRebateType(behaviorRebateOrderEntity.getRebateType());userBehaviorRebateOrder.setRebateConfig(behaviorRebateOrderEntity.getRebateConfig());userBehaviorRebateOrder.setOutBusinessNo(behaviorRebateOrderEntity.getOutBusinessNo());userBehaviorRebateOrder.setBizId(behaviorRebateOrderEntity.getBizId());userBehaviorRebateOrderDao.insert(userBehaviorRebateOrder);TaskEntity taskEntity = behaviorRebateAggregate.getTaskEntity();Task task = new Task();task.setUserId(taskEntity.getUserId());task.setTopic(taskEntity.getTopic());task.setMessageId(taskEntity.getMessageId());task.setMessage(JSON.toJSONString(taskEntity.getMessage()));task.setState(taskEntity.getState().getCode());taskDao.insert(task);}return 1;} catch (DuplicateKeyException e) {status.setRollbackOnly();log.error("寫入返利記錄,唯一索引沖突 userId: {}", userId, e);throw new AppException(ResponseCode.INDEX_DUP.getCode(), ResponseCode.INDEX_DUP.getInfo());}});} finally {dbRouter.clear();}for (BehaviorRebateAggregate behaviorRebateAggregate : behaviorRebateAggregates) {TaskEntity taskEntity = behaviorRebateAggregate.getTaskEntity();Task task = new Task();task.setUserId(taskEntity.getUserId());task.setMessageId(taskEntity.getMessageId());try {eventPublisher.publish(taskEntity.getTopic(), taskEntity.getMessage());taskDao.updateTaskSendMessageCompleted(task);} catch (Exception e) {log.error("寫入返利記錄,發送MQ消息失敗 userId: {} topic: {}", userId, task.getTopic());taskDao.updateTaskSendMessageFail(task);}}
}public List<String> createOrder(BehaviorEntity behaviorEntity) {// 1. 查詢返利配置List<DailyBehaviorRebateVO> dailyBehaviorRebateVOS = behaviorRebateRepository.queryDailyBehaviorRebateConfig(behaviorEntity.getBehaviorTypeVO());if (null == dailyBehaviorRebateVOS || dailyBehaviorRebateVOS.isEmpty()) return new ArrayList<>();// 2. 構建聚合對象List<String> orderIds = new ArrayList<>();List<BehaviorRebateAggregate> behaviorRebateAggregates = new ArrayList<>();for (DailyBehaviorRebateVO dailyBehaviorRebateVO : dailyBehaviorRebateVOS) {// 拼裝業務ID;用戶ID_返利類型_外部透徹業務IDString bizId = behaviorEntity.getUserId() + Constants.UNDERLINE + dailyBehaviorRebateVO.getRebateType() + Constants.UNDERLINE + behaviorEntity.getOutBusinessNo();BehaviorRebateOrderEntity behaviorRebateOrderEntity = BehaviorRebateOrderEntity.builder().userId(behaviorEntity.getUserId()).orderId(RandomStringUtils.randomNumeric(12)).behaviorType(dailyBehaviorRebateVO.getBehaviorType()).rebateDesc(dailyBehaviorRebateVO.getRebateDesc()).rebateType(dailyBehaviorRebateVO.getRebateType()).rebateConfig(dailyBehaviorRebateVO.getRebateConfig()).outBusinessNo(behaviorEntity.getOutBusinessNo()).bizId(bizId).build();orderIds.add(behaviorRebateOrderEntity.getOrderId());// MQ 消息對象SendRebateMessageEvent.RebateMessage rebateMessage = SendRebateMessageEvent.RebateMessage.builder().userId(behaviorEntity.getUserId()).rebateType(dailyBehaviorRebateVO.getRebateType()).rebateConfig(dailyBehaviorRebateVO.getRebateConfig()).bizId(bizId).build();// 構建事件消息BaseEvent.EventMessage<SendRebateMessageEvent.RebateMessage> rebateMessageEventMessage = sendRebateMessageEvent.buildEventMessage(rebateMessage);// 組裝任務對象TaskEntity taskEntity = new TaskEntity();taskEntity.setUserId(behaviorEntity.getUserId());taskEntity.setTopic(sendRebateMessageEvent.topic());taskEntity.setMessageId(rebateMessageEventMessage.getId());taskEntity.setMessage(rebateMessageEventMessage);taskEntity.setState(TaskStateVO.create);BehaviorRebateAggregate behaviorRebateAggregate = BehaviorRebateAggregate.builder().userId(behaviorEntity.getUserId()).behaviorRebateOrderEntity(behaviorRebateOrderEntity).taskEntity(taskEntity).build();behaviorRebateAggregates.add(behaviorRebateAggregate);}// 3. 存儲聚合對象數據behaviorRebateRepository.saveUserRebateRecord(behaviorEntity.getUserId(), behaviorRebateAggregates);// 4. 返回訂單ID集合return orderIds;}@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class TaskEntity {/** 活動ID */private String userId;/** 消息主題 */private String topic;/** 消息編號 */private String messageId;/** 消息主體 */private BaseEvent.EventMessage<SendRebateMessageEvent.RebateMessage> message;/** 任務狀態;create-創建、completed-完成、fail-失敗 */private TaskStateVO state;}@Data
public class Task {/** 自增ID */private String id;/** 活動ID */private String userId;/** 消息主題 */private String topic;/** 消息編號 */private String messageId;/** 消息主體 */private String message;/** 任務狀態;create-創建、completed-完成、fail-失敗 */private String state;/** 創建時間 */private Date createTime;/** 更新時間 */private Date updateTime;}
4 定時任務示例
// @Scheduled(cron = "0/5 * * * * ?")
public void exec_db01() {try {// 設置庫表dbRouter.setDBKey(1);dbRouter.setTBKey(0);// 查詢未發送的任務List<TaskEntity> taskEntities = taskService.queryNoSendMessageTaskList();if (taskEntities.isEmpty()) return;// 發送MQ消息for (TaskEntity taskEntity : taskEntities) {try {taskService.sendMessage(taskEntity);taskService.updateTaskSendMessageCompleted(taskEntity.getUserId(), taskEntity.getMessageId());} catch (Exception e) {log.error("定時任務,發送MQ消息失敗 userId: {} topic: {}", taskEntity.getUserId(), taskEntity.getTopic());taskService.updateTaskSendMessageFail(taskEntity.getUserId(), taskEntity.getMessageId());}}} catch (Exception e) {log.error("定時任務,掃描MQ任務表發送消息失敗。", e);} finally {dbRouter.clear();}
}@Data
public class TaskEntity {/** 活動ID */private String userId;/** 消息主題 */private String topic;/** 消息編號 */private String messageId;/** 消息主體 */private String message;}
三: 串聯流程
生產消息:saveUserRebateRecord 方法在事務中插入用戶行為返利訂單和任
務對象。
在事務外,調用 eventPublisher.publish 方法發布消息到指定的
topic。
消息構建和發布:SendRebateMessageEvent 類構建 EventMessage 對象,包含隨機
生成的 ID、當前時間戳和數據,并返回消息隊列的 topic。
消費消息:RebateMessageCustomer 類監聽指定隊列的消息,解析消息內容,
根據 rebateType 字段的不同,調用相應的服務方法處理消息。
定時任務補償:SendMessageTaskJob 類定時掃描數據庫中的任務表,發送未發送的
消息到 MQ 隊列,并更新任務狀態。如果發送失敗,記錄錯誤日志并
更新任務狀態為發送失敗。
四:配置文件
spring:rabbitmq:addresses: ****port: ***username: **password: **listener:simple:prefetch: 1 # 每次投遞n個消息,消費完在投遞n個topic:send_rebate: send_rebate
五: 消費失敗
消息發送:
生產者在發送消息時,會將消息的相關信息(如消息內容、發送狀態等)記錄到 task 表中。
如果消息發送成功,則更新 task 表中的狀態為“已發送”。
如果消息發送失敗,則更新 task 表中的狀態為“發送失敗”。
定時任務會掃描 task 表,查找狀態為“發送失敗”的消息,并重試發送。消息消費:
消費者在處理消息時,也會將消息的相關信息(如消息內容、處理狀態等)記錄到 task 表中。
如果消息處理成功,則更新 task 表中的狀態為“已處理”。
如果消息處理失敗,則更新 task 表中的狀態為“處理失敗”。
定時任務會掃描 task 表,查找狀態為“處理失敗”的消息,并重試處理。
通過這種方式,可以確保即使消息在發送或消費過程中出現失敗,也能夠通過重試機制最終成功發送或處理。
示例:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;@Service
public class MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate TaskRepository taskRepository;@Transactionalpublic void sendMessage(String message) {try {rabbitTemplate.convertAndSend("send_rebate", message);TaskEntity task = new TaskEntity();task.setMessage(message);task.setStatus("SENT");taskRepository.save(task);} catch (Exception e) {TaskEntity task = new TaskEntity();task.setMessage(message);task.setStatus("SEND_FAILED");taskRepository.save(task);}}
}
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class MessageConsumer {@Autowiredprivate TaskRepository taskRepository;@RabbitListener(queues = "${spring.rabbitmq.topic.send_rebate}")public void handleMessage(String message) {try {// 處理消息的邏輯System.out.println("Processing message: " + message);// 假設處理成功TaskEntity task = new TaskEntity();task.setMessage(message);task.setStatus("PROCESSED");taskRepository.save(task);} catch (Exception e) {// 處理失敗的邏輯TaskEntity task = new TaskEntity();task.setMessage(message);task.setStatus("PROCESS_FAILED");taskRepository.save(task);}}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.util.List;@Component
public class RetryTaskJob {@Autowiredprivate TaskRepository taskRepository;@Autowiredprivate MessageService messageService;@Autowiredprivate MessageConsumer messageConsumer;@Scheduled(fixedRate = 60000) // 每分鐘執行一次public void retryFailedMessages() {// 重試發送失敗的消息List<TaskEntity> sendFailedTasks = taskRepository.findByStatus("SEND_FAILED");for (TaskEntity task : sendFailedTasks) {try {messageService.sendMessage(task.getMessage());task.setStatus("SENT");taskRepository.save(task);} catch (Exception e) {// 記錄日志或進行其他處理}}// 重試處理失敗的消息List<TaskEntity> processFailedTasks = taskRepository.findByStatus("PROCESS_FAILED");for (TaskEntity task : processFailedTasks) {try {messageConsumer.handleMessage(task.getMessage());task.setStatus("PROCESSED");taskRepository.save(task);} catch (Exception e) {// 記錄日志或進行其他處理}}}
}
六: 防止重復消費
保證消息消費的冪等性是確保消息系統可靠性的重要一環。冪等性意
味著無論消息被處理一次還是多次,結果都是相同的。以下是一些常見的策略來保證消息消費的冪等性:唯一標識符:為每條消息生成一個唯一的標識符(如 UUID),并在
處理消息時檢查該標識符是否已經被處理過。如果已經處理過,則忽略該消息。狀態檢查:在處理消息之前,檢查系統的狀態,確保該消息對應的操
作尚未執行。例如,如果消息是要更新某個資源,可以先檢查該資源的狀態,確保更新操作尚未執行。數據庫唯一約束:在數據庫中為消息處理結果創建唯一約束,確保相
同的消息不會被重復處理。冪等API設計:設計冪等的API,確保相同的請求多次執行不會產生不
同的結果。例如,使用“PUT”方法更新資源,而不是“POST”方法。
使用 Redis 來實現消息消費的冪等性是一個非常有效的方法。Redis 是一個高性能的內存數據庫,適合用于存儲臨時狀態信息。
1 消費者處理消息并記錄到 Redis
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;@Component
public class MessageConsumer {@Autowiredprivate StringRedisTemplate redisTemplate;@RabbitListener(queues = "${spring.rabbitmq.topic.send_rebate}")public void handleMessage(String message) {// 假設消息中包含唯一標識符String messageId = extractMessageId(message);// 檢查消息是否已經處理過或正在處理if (redisTemplate.hasKey(messageId)) {System.out.println("Message already processed or being processed: " + messageId);return;}// 將消息ID存入Redis,設置過期時間redisTemplate.opsForValue().set(messageId, "PROCESSING", 60, TimeUnit.SECONDS);try {// 處理消息的邏輯System.out.println("Processing message: " + message);// 假設處理成功redisTemplate.opsForValue().set(messageId, "PROCESSED", 60, TimeUnit.SECONDS);} catch (Exception e) {// 處理失敗的邏輯redisTemplate.delete(messageId); // 刪除Redis中的記錄,以便可以重試}}private String extractMessageId(String message) {// 假設消息中包含唯一標識符,例如 JSON 格式中的 "id" 字段// 這里只是一個示例,實際實現可能需要解析消息內容return message.substring(0, 36); // 假設 UUID 長度為 36}
}
spring:redis:host: localhostport: 6379
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
通過上述方法和配置,您可以確保消息消費的冪等性。消費者在處理消息之前會檢查消息的唯一標識符是否已經存在于 Redis 中,如果存在,則忽略該消息,從而避免重復處理。同時,通過設置過期時間,可以確保在處理過程中出現異常時,Redis 中的記錄會被刪除,從而允許消息重試。