在上個章節中我實現了創建優惠券模板的功能,但是,優惠券總會有過期時間,我們怎么去解決到期自動修改優惠券狀態這樣一個功能呢?我們可以使用RocketMQ5.x新出的任意定時發送消息功能來解決。
初始方案:
- 首先在創建優惠券模板下面繼續添加代碼。首先先創建topic
- 接著去定義消息體
// 執行 RocketMQ5.x 消息隊列發送&異常處理邏輯SendResult sendResult;try {sendResult = rocketMQTemplate.syncSendDeliverTimeMills(couponTemplateDelayCloseTopic, message, deliverTimeStamp);log.info("[生產者] 優惠券模板延時關閉 - 發送結果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), messageKeys);} catch (Exception ex) {log.error("[生產者] 優惠券模板延時關閉 - 消息發送失敗,消息體:{}", couponTemplateDO.getId(), ex);}
- 接著去調用相關方法去發送消息
這樣一個延時發送的消息生產者就定義好了,接著我們去定義消費者。
- 我們可以創建一個消費者類讓他去實現RocketMQListener然后添加 @RocketMQMessageListener 注解,其中加上 Topic 和消費者組定義
- 接著我們可以在onMessage方法內去定義我們接收到消息以后需要去做什么
- 這里我們需要去修改數據庫當中的優惠券的狀態
這樣就已經解決了定時修改優惠券模板狀態這樣的功能,但是這樣直接寫在代碼當中不是太優雅,而且耦合度太高。我們可以通過去定義一個生產者的抽象類(模板),然后去通過定義具體的類來繼承這些抽象類。然后在創建優惠券模板的方法中我們只需要去定義一個事件,然后調用該類的方法去發送消息即可。
@RequiredArgsConstructor
@Slf4j(topic = "CommonSendProduceTemplate")
public abstract class AbstractCommonSendProduceTemplate<T> {private final RocketMQTemplate rocketMQTemplate;/*** 構建消息發送事件基礎擴充屬性實體** @param messageSendEvent 消息發送事件* @return 擴充屬性實體*/protected abstract BaseSendExtendDTO buildBaseSendExtendParam(T messageSendEvent);/*** 構建消息基本參數,請求頭、Keys...** @param messageSendEvent 消息發送事件* @param requestParam 擴充屬性實體* @return 消息基本參數*/protected abstract Message<?> buildMessage(T messageSendEvent, BaseSendExtendDTO requestParam);/*** 消息事件通用發送** @param messageSendEvent 消息發送事件* @return 消息發送返回結果*/public SendResult sendMessage(T messageSendEvent) {BaseSendExtendDTO baseSendExtendDTO = buildBaseSendExtendParam(messageSendEvent);SendResult sendResult;try {// 構建 Topic 目標落點 formats: `topicName:tags`StringBuilder destinationBuilder = StrUtil.builder().append(baseSendExtendDTO.getTopic());if (StrUtil.isNotBlank(baseSendExtendDTO.getTag())) {destinationBuilder.append(":").append(baseSendExtendDTO.getTag());}// 延遲時間不為空,發送任意延遲消息,否則發送普通消息if (baseSendExtendDTO.getDelayTime() != null) {sendResult = rocketMQTemplate.syncSendDeliverTimeMills(destinationBuilder.toString(),buildMessage(messageSendEvent, baseSendExtendDTO),baseSendExtendDTO.getDelayTime());} else {sendResult = rocketMQTemplate.syncSend(destinationBuilder.toString(),buildMessage(messageSendEvent, baseSendExtendDTO),baseSendExtendDTO.getSentTimeout());}log.info("[生產者] {} - 發送結果:{},消息ID:{},消息Keys:{}", baseSendExtendDTO.getEventName(), sendResult.getSendStatus(), sendResult.getMsgId(), baseSendExtendDTO.getKeys());} catch (Throwable ex) {log.error("[生產者] {} - 消息發送失敗,消息體:{}", baseSendExtendDTO.getEventName(), JSON.toJSONString(messageSendEvent), ex);throw ex;}return sendResult;}
}
在該抽象類中定義了兩個抽象方法,
-
protected abstract BaseSendExtendDTO buildBaseSendExtendParam(T messageSendEvent);該方法用來補充事件的相關屬性
-
protected abstract Message<?> buildMessage(T messageSendEvent, BaseSendExtendDTO requestParam);該方法是用來補充消息的屬性
-
sendMessage方法中首先會通過buildBaseSendExtendParam方法把事件的屬性進行補充,比如延時時間等。
-
然后會獲取這個時間是否存在延時時間這個屬性,如果有則發送延時消息,如果沒有,就發送普通消息。
這就是延時發送消息的生產者的一個實現類
這是消費者沒有太大變化。
我們現在梳理一下它的執行流程
- 創建完優惠券模板之后,我們根據優惠券的相關信息構建出一個發送事件templateDelayEvent
- 然后通過自動注入的couponTemplateDelayExecuteStatusProducer延時發送消息的生產者,調用它的發送消息方法進行延時發送消息。
- 這個生產者是去實現了生產者的一個抽象類,并實現了補充事件屬性,以及構建消息體的抽象方法。
- 然后消費者通過監聽該消息來完成消費。