文章目錄
- 發送者的可靠性
- 生產者重試機制
- 實現生產者確認
- MQ的可靠性
- 數據持久化
- 交換機持久化
- 隊列持久化
- 消息持久化
- Lazy Queue(可配置~)
- 控制臺配置Lazy模式
- 代碼配置Lazy模式
- 更新已有隊列為lazy模式
- 消費者的可靠性
- 消費者確認機制
- 失敗重試機制
- 失敗處理策略
- 業務冪等性
- 唯一消息ID
- 業務判斷
- 兜底方案
- 延遲消息
- 死信交換機(不推薦使用)
- 延遲消息
- DelayExchange插件(推薦)
- 插件下載地址:
- 安裝:
- 聲明延遲交換機
- 發送延遲消息
- 超時訂單問題
- 定義常量
- 抽取共享mq配置
- 改造下單業務
- 編寫查詢支付狀態接口
- 消息監聽
發送者的可靠性
保證一個消息發送出去,至少被消費一次。
可能在多個步驟中給消息弄丟了
生產者重試機制
不建議使用, 會增加網絡和資源的消耗
第一種情況,就是生產者發送消息時,出現了網絡故障,導致與MQ的連接中斷
當RabbitTemplate與MQ連接超時后,多次重試
修改publisher模塊的application.yaml文件,添加下面的內容:
spring:rabbitmq:connection-timeout: 1s # 設置MQ的連接超時時間template:retry:enabled: true # 開啟超時重試機制initial-interval: 1000ms # 失敗后的初始等待時間multiplier: 1 # 失敗后下次的等待時長倍數,下次等待時長 = initial-interval * multipliermax-attempts: 3 # 最大重試次數
如果對于業務性能有要求,建議禁用重試機制。如果一定要使用,請合理配置等待時長和重試次數,當然也可以考慮使用異步線程來執行發送消息的代碼。
實現生產者確認
- ConfirmCallback為發送Exchange(交換器)時回調,成功或者失敗都會觸發;
- ReturnCallback為路由不到隊列時觸發,成功則不觸發;
Rabbitmq之ConfirmCallback與ReturnCallback使用_rabbitmq returncallback-CSDN博客
在publisher模塊的application.yaml中添加配置:
spring:rabbitmq:publisher-confirm-type: correlated # 開啟publisher confirm機制,并設置confirm類型publisher-returns: true # 開啟publisher return機制
這里publisher-confirm-type有三種模式可選:
- none:關閉confirm機制
- simple:同步阻塞等待MQ的回執
- correlated:MQ異步回調返回回執
定義ReturnCallback:
package com.itheima.publisher.config;import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("觸發return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}
定義ConfirmCallback:
由于每個消息發送時的處理邏輯不一定相同,因此**ConfirmCallback需要在每次發消息時定義。**具體來說,是在調用RabbitTemplate中的convertAndSend方法 時,多傳遞一個參數:
這里的CorrelationData中包含兩個核心的東西:
- id:消息的唯一標示,MQ對不同的消息的回執以此做判斷,避免混淆
- SettableListenableFuture:回執結果的Future對象
將來MQ的回執就會通過這個Future來返回,我們可以提前給CorrelationData中的Future添加回調函數來處理消息回執:
我們新建一個測試,向系統自帶的交換機發送消息,并且添加ConfirmCallback:
@Test
void testPublisherConfirm() {// 1.創建CorrelationData,需要一個UUID,回調的時候通過id識別CorrelationData cd = new CorrelationData();// 2.給Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future發生異常時的處理邏輯,基本不會觸發log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回執的處理邏輯,參數中的result就是回執內容if(result.isAck()){ // result.isAck(),boolean類型,true代表ack回執,false 代表 nack回執log.debug("發送消息成功,收到 ack!");}else{ // result.getReason(),String類型,返回nack時的異常描述log.error("發送消息失敗,收到 nack, reason : {}", result.getReason());}}});// 3.發送消息rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}
開啟生產者確認比較消耗MQ性能,一般不建議開啟。而且大家思考一下觸發確認的幾種情況:
- 路由失敗:一般是因為RoutingKey錯誤導致,往往是編程導致
- 交換機名稱錯誤:同樣是編程錯誤導致
- MQ內部故障:這種需要處理,但概率往往較低。因此只有對消息可靠性要求非常高的業務才需要開啟,而且僅僅需要開啟ConfirmCallback處理nack就可以了
MQ的可靠性
為了提升性能,默認情況下MQ的數據都是在內存存儲的臨時數據,重啟后就會消失。為了保證數據的可靠性,必須配置數據持久化
- 交換機持久化
- 隊列持久化
- 消息持久化
數據持久化
交換機持久化
設置為Durable就是持久化模式,Transient就是臨時模式。
隊列持久化
消息持久化
說明:在開啟持久化機制以后,如果同時還開啟了生產者確認,那么MQ會在消息持久化以后才發送ACK回執,進一步確保消息的可靠性。
不過出于性能考慮,為了減少IO次數,發送到MQ的消息并不是逐條持久化到數據庫的,而是每隔一段時間批量持久化。一般間隔在100毫秒左右,這就會導致ACK有一定的延遲,因此建議生產者確認全部采用異步方式。
當內存占滿, page out會影響MQ阻塞
Lazy Queue(可配置~)
- 接收到消息后直接存入磁盤而非內存
- 消費者要消費消息時才會從磁盤中讀取并加載到內存(也就是懶加載)
- 支持數百萬條的消息存儲
而在3.12版本之后,LazyQueue已經成為所有隊列的默認格式。官方推薦所有隊列都為LazyQueue模式。
控制臺配置Lazy模式
代碼配置Lazy模式
在利用SpringAMQP聲明隊列的時候,添加x-queue-mod=lazy參數也可設置隊列為Lazy模式:
@Bean
public Queue lazyQueue(){return QueueBuilder.durable("lazy.queue").lazy() // 開啟Lazy模式.build();
}
當然,我們也可以基于注解來聲明隊列并設置為Lazy模式:
@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true", arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
log.info("接收到 lazy.queue的消息:{}", msg);
}
更新已有隊列為lazy模式
可以基于命令行設置policy:
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
當然,也可以在控制臺配置policy,進入在控制臺的Admin頁面,點擊Policies,即可添加配置:
消費者的可靠性
- 消息投遞的過程中出現了網絡故障
- 消費者接收到消息后突然宕機
- 消費者接收到消息后,因處理不當導致異常
消費者確認機制
當消費者處理消息結束后,應該向RabbitMQ發送一個回執,告知RabbitMQ自己消息處理狀態
- ack:成功處理消息,RabbitMQ從隊列中刪除該消息
- nack:消息處理失敗,RabbitMQ需要再次投遞消息
- reject:消息處理失敗并拒絕該消息,RabbitMQ從隊列中刪除該消息
SpringAMQP幫我們實現了消息確認。并允許我們通過配置文件設置ACK處理方式
-
none:不處理。即消息投遞給消費者后立刻ack,消息會立刻從MQ刪除。非常不安全,不建議使用
-
manual:手動模式。需要自己在業務代碼中調用api,發送ack或reject,存在業務入侵,但更靈活
-
auto:自動模式。SpringAMQP利用AOP對我們的消息處理邏輯做了環繞增強,當業務正常執行時則自動返回
- 如果是業務異常,會自動返回nack;
- 如果是消息處理或校驗異常,自動返回reject;
通過下面的配置可以修改SpringAMQP的ACK處理方式:
spring:rabbitmq:listener:simple:acknowledge-mode: auto # 不做處理
失敗重試機制
當消費者出現異常后,消息會不斷requeue(重入隊)到隊列,再重新發送給消費者。如果消費者再次執行依然出錯,消息會再次requeue到隊列,再次投遞,直到消息處理成功為止。
當然,上述極端情況發生的概率還是非常低的,不過不怕一萬就怕萬一。為了應對上述情況Spring又提供了消費者失敗重試機制:在消費者出現異常時利用本地重試,而不是無限制的requeue到mq隊列。
修改consumer服務的application.yml文件,添加內容:
spring:rabbitmq:listener:simple:retry:enabled: true # 開啟消費者失敗重試initial-interval: 1000ms # 初識的失敗等待時長為1秒multiplier: 1 # 失敗的等待時長倍數,下次等待時長 = multiplier * last-intervalmax-attempts: 3 # 最大重試次數stateless: true # true無狀態;false有狀態。如果業務中包含事務,這里改為false
重啟consumer服務,重復之前的測試。可以發現:
- 消費者在失敗后消息沒有重新回到MQ無限重新投遞,而是在本地重試了3次
- 本地重試3次以后,拋出了AmqpRejectAndDontRequeueException異常。查看RabbitMQ控制臺,發現消息被刪除了,說明最后SpringAMQP返回的是reject
結論:
- 開啟本地重試時,消息處理過程中拋出異常,不會requeue到隊列,而是在消費者本地重試
- 重試達到最大次數后,Spring會返回reject,消息會被丟棄
失敗處理策略
Spring允許我們自定義重試次數耗盡后的消息處理策略,這個策略是由MessageRecovery接口來定義的
- RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認就是這種方式
- ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊
- RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機(推薦)
比較優雅的一種處理方案是RepublishMessageRecoverer,失敗后將消息投遞到一個指定的,專門存放異常消息的隊列,后續由人工集中處理。
在consumer服務中定義處理失敗消息的交換機和隊列
定義一個RepublishMessageRecoverer,關聯隊列和交換機
/*** 錯誤消息配置類,用于配置 RabbitMQ 錯誤消息處理相關的 Bean。* 當前類會根據 spring.rabbitmq.listener.simple.retry.enabled 屬性的值來決定是否創建相關的 Bean。* 如果該屬性值為 true,則會創建錯誤消息交換機、錯誤隊列和綁定關系,并配置消息恢復器。*/
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {/*** 創建錯誤消息交換機*/@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}/*** 創建錯誤隊列*/@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}/*** 創建錯誤隊列與錯誤消息交換機的綁定關系,"error"是路由鍵*/@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}/*** 創建消息恢復器*/@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}
消費者如何保證消息一定被消費?
- 開啟消費者確認機制為auto, 由spring確認消息處理成功后返回ack, 異常時返回nack
- 開啟消費者失敗重試機制, 并設置MessageRecoverer, 多次重試失敗后將信息投遞到異常交換機
業務冪等性
保證消息處理的冪等性。這里給出兩種方案:
- 唯一消息ID
- 業務狀態判斷
唯一消息ID
- 每一條消息都生成一個唯一的id,與消息一起投遞給消費者。
- 消費者接收到消息后處理自己的業務,業務處理成功后將消息ID保存到數據庫
- 如果下次又收到相同消息,去數據庫查詢判斷是否存在,存在則為重復消息放棄處理。
SpringAMQP的MessageConverter自帶了MessageID的功能,我們只要開啟這個功能即可
以Jackson的消息轉換器為例( 加在啟動類 ):
@Bean
public MessageConverter messageConverter(){// 1.定義消息轉換器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自動創建消息id,用于識別不同消息,也可以在業務中基于ID判斷是否是重復消息jjmc.setCreateMessageIds(true);// 在底層會自動創建一個UUIDreturn jjmc;
}
在Spring AMQP中,當使用Jackson2JsonMessageConverter并開啟setCreateMessageIds(true)功能時,底層會自動在消息的屬性中添加一個名為amqp_messageId的字段,其值為自動生成的UUID。
具體來說,UUID會成為消息的一部分,保存在消息的AMQP(Advanced Message Queuing Protocol)屬性中。這些屬性是與消息一起傳遞的元數據,包含了關于消息的一些信息。amqp_messageId字段就是用于唯一標識消息的UUID。
當消息被發送給消費者時,消費者可以通過**message.getMessageProperties().getMessageId()**方法來獲取消息的ID,然后根據業務需求將該ID保存到數據庫中。在處理相同消息時,消費者可以在數據庫中查詢是否存在相同的消息ID,以判斷是否為重復消息。
業務判斷
相比較而言,消息ID的方案需要改造原有的數據庫,所以我更推薦使用業務判斷的方案。
@Override
public void markOrderPaySuccess(Long orderId) {
// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
lambdaUpdate()
.set(Order::getStatus, 2)
.set(Order::getPayTime, LocalDateTime.now())
.eq(Order::getId, orderId)
.eq(Order::getStatus, 1)
.update();
}
上述代碼邏輯上符合了冪等判斷的需求,但是由于判斷和更新是兩步動作,因此在極小概率下可能存在線程安全問題。
@Override
public void markOrderPaySuccess(Long orderId) {
// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
lambdaUpdate()
.set(Order::getStatus, 2)
.set(Order::getPayTime, LocalDateTime.now())
.eq(Order::getId, orderId)
.eq(Order::getStatus, 1)
.update();
}
兜底方案
既然MQ通知不一定發送到交易服務,那么交易服務就必須自己主動去查詢支付狀態。這樣即便支付服務的MQ通知失敗,我們依然能通過主動查詢來保證訂單狀態的一致。
圖中黃色線圈起來的部分就是MQ通知失敗后的兜底處理方案,由交易服務自己主動去查詢支付狀態。
綜上,支付服務與交易服務之間的訂單狀態一致性是如何保證的?
- 首先,支付服務會正在用戶支付成功以后利用MQ消息通知交易服務,完成訂單狀態同步。
- 其次,為了保證MQ消息的可靠性,我們采用了生產者確認機制、消費者確認、消費者失敗重試等策略,確保消息投遞的可靠性
- 最后,我們還在交易服務設置了定時任務,定期查詢訂單支付狀態。這樣即便MQ通知失敗,還可以利用定時任務作為兜底方案,確保訂單支付狀態的最終一致性。
延遲消息
死信交換機(不推薦使用)
當一個隊列中的消息滿足下列情況之一時,可以成為死信(dead letter):
- 消費者使用basic.reject或 basic.nack聲明消費失敗,并且消息的requeue參數設置為false
- 消息是一個過期消息,超時無人消費
- 要投遞的隊列消息滿了,無法投遞
死信交換機有什么作用呢?
- 收集那些因處理失敗而被拒絕的消息
- 收集那些因隊列滿了而被拒絕的消息
- 收集因TTL(有效期)到期的消息
延遲消息
RabbitMQ的消息過期是基于追溯方式來實現的,也就是說當一個消息的TTL到期以后不一定會被移除或投遞到死信交換機,而是在消息恰好處于隊首時才會被處理。
當隊列中消息堆積很多的時候,過期消息可能不會被按時處理,因此你設置的TTL時間不一定準確。
DelayExchange插件(推薦)
插件下載地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
安裝:
因為我們是基于Docker安裝,所以需要先查看RabbitMQ的插件目錄對應的數據卷。
docker volume inspect mq-plugins
結果如下:
[{"CreatedAt": "2024-01-19T09:22:59+08:00","Driver": "local","Labels": null,"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data","Name": "mq-plugins","Options": null,"Scope": "local"}
]
插件目錄被掛載到了/var/lib/docker/volumes/mq-plugins/_data這個目錄,我們上傳插件到該目錄下。
接下來執行命令,啟用插件:
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
聲明延遲交換機
基于注解方式:
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延遲消息:{}", msg);
}
基于@Bean的方式:
package com.itheima.consumer.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class DelayExchangeConfig {@Beanpublic DirectExchange delayExchange(){return ExchangeBuilder.directExchange("delay.direct") // 指定交換機類型和名稱.delayed() // 設置delay的屬性為true.durable(true) // 持久化.build();}@Beanpublic Queue delayedQueue(){return new Queue("delay.queue");}@Beanpublic Binding delayQueueBinding(){return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");}
}
發送延遲消息
發送消息時,必須通過x-delay屬性設定延遲時間:
@Test
void testPublisherDelayMessage() {// 1.創建消息String message = "hello, delayed message";// 2.發送消息,利用消息后置處理器添加消息頭rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 添加延遲消息屬性message.getMessageProperties().setDelay(5000);return message;}});
}
可以寫一個延遲時間的類, 不用每次都new一個,具體代碼如下:
/*** @author Ccoo* 2024/1/22*/
@RequiredArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {private final int delay;@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(delay);return message;}
}
最后上面的業務代碼變為:
@Test
void testPublisherDelayMessage() {// 1.創建消息String message = "hello, delayed message";// 2.發送消息,利用消息后置處理器添加消息頭rabbitTemplate.convertAndSend("delay.direct", "delay", message,new DelayMessageProcessor(message.removeNextDelay().intValue());
}
延遲消息插件內部會維護一個本地數據庫表,同時使用Elang Timers功能實現計時。如果消息的延遲時間設置較長,可能會導致堆積的延遲消息非常多,會帶來較大的CPU開銷,同時延遲消息的時間會存在誤差。因此,不建議設置延遲時間過長的延遲消息。
超時訂單問題
由于我們要多次發送延遲消息,因此需要先定義一個記錄消息延遲時間的消息體,處于通用性考慮,我們將其定義到hm-common模塊下:
@Data
public class MultiDelayMessage<T> {/*** 消息體*/private T data;/*** 記錄延遲時間的集合*/private List<Long> delayMillis;public MultiDelayMessage(T data, List<Long> delayMillis) {this.data = data;this.delayMillis = delayMillis;}public static <T> MultiDelayMessage<T> of(T data, Long ... delayMillis){return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));}/*** 獲取并移除下一個延遲時間* @return 隊列中的第一個延遲時間*/public Long removeNextDelay(){return delayMillis.remove(0);}/*** 是否還有下一個延遲時間*/public boolean hasNextDelay(){return !delayMillis.isEmpty();}
}
定義常量
/*** @author Ccoo* 2024/1/22*/
public interface MqConstants {String DELAY_EXCHANGE = "trade.delay.topic";String DELAY_ORDER_QUEUE = "trade.order.delay.queue";String DELAY_ORDER_ROUTING_KEY = "order.query";
}
抽取共享mq配置
在nacos中定義一個名為shared-mq.xml的配置文件,內容如下:
spring: rabbitmq:host: ${hm.mq.host:192.168.164.128} # 主機名port: ${hm.mq.port:5672} # 端口virtual-host: ${hm.mq.vhost:/hmall} # 虛擬主機username: ${hm.mq.un:itheima} # 用戶名password: ${hm.mq.pw:123321} # 密碼listener:simple:prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個消息
在trade-service模塊添加共享配置:
改造下單業務
- 引入依賴
在trade-service模塊的pom.xml中引入amqp的依賴:
<!--amqp--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
- 改造下單業務
編寫查詢支付狀態接口
首先,在hm-api模塊定義三個類:
說明:
- PayOrderDTO:支付單的數據傳輸實體
- PayClient:支付系統的Feign客戶端
- PayClientFallback:支付系統的fallback邏輯
@Data
@ApiModel(description = "支付單數據傳輸實體")
public class PayOrderDTO {@ApiModelProperty("id")private Long id;@ApiModelProperty("業務訂單號")private Long bizOrderNo;@ApiModelProperty("支付單號")private Long payOrderNo;@ApiModelProperty("支付用戶id")private Long bizUserId;@ApiModelProperty("支付渠道編碼")private String payChannelCode;@ApiModelProperty("支付金額,單位分")private Integer amount;@ApiModelProperty("付類型,1:h5,2:小程序,3:公眾號,4:掃碼,5:余額支付")private Integer payType;@ApiModelProperty("付狀態,0:待提交,1:待支付,2:支付超時或取消,3:支付成功")private Integer status;@ApiModelProperty("拓展字段,用于傳遞不同渠道單獨處理的字段")private String expandJson;@ApiModelProperty("第三方返回業務碼")private String resultCode;@ApiModelProperty("第三方返回提示信息")private String resultMsg;@ApiModelProperty("支付成功時間")private LocalDateTime paySuccessTime;@ApiModelProperty("支付超時時間")private LocalDateTime payOverTime;@ApiModelProperty("支付二維碼鏈接")private String qrCodeUrl;@ApiModelProperty("創建時間")private LocalDateTime createTime;@ApiModelProperty("更新時間")private LocalDateTime updateTime;
}
@FeignClient(value = "pay-service", fallbackFactory = PayClientFallback.class)
public interface PayClient {/*** 根據交易訂單id查詢支付單* @param id 業務訂單id* @return 支付單信息*/@GetMapping("/pay-orders/biz/{id}")PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id);
}
@Slf4j
public class PayClientFallback implements FallbackFactory<PayClient> {@Overridepublic PayClient create(Throwable cause) {return new PayClient() {@Overridepublic PayOrderDTO queryPayOrderByBizOrderNo(Long id) {return null;}};}
}
最后,在pay-service模塊的PayController中實現該接口:
@ApiOperation("根據id查詢支付單")
@GetMapping("/biz/{id}")
public PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id){PayOrder payOrder = payOrderService.lambdaQuery().eq(PayOrder::getBizOrderNo, id).one();return BeanUtils.copyBean(payOrder, PayOrderDTO.class);
}
消息監聽
接下來,我們在trader-service編寫一個監聽器,監聽延遲消息,查詢訂單支付狀態:
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderStatusListener {private final IOrderService orderService;private final PayClient payClient;private final RabbitTemplate rabbitTemplate;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = MqConstants.DELAY_ORDER_QUEUE, durable = "true"),exchange = @Exchange(name = MqConstants.DELAY_EXCHANGE, type = ExchangeTypes.TOPIC),key = MqConstants.DELAY_ORDER_ROUTING_KEY))public void listenOrderCheckDelayMessage(MultiDelayMessage<Long> msg) {// 1.獲取消息中的訂單idLong orderId = msg.getData();// 2.查詢訂單,判斷狀態:1是未支付,大于1則是已支付或已關閉Order order = orderService.getById(orderId);if (order == null || order.getStatus() > 1) {// 訂單不存在或交易已經結束,放棄處理return;}// 3.可能是未支付,查詢支付服務PayOrderDTO payOrder = payClient.queryPayOrderByBizOrderNo(orderId);if (payOrder != null && payOrder.getStatus() == 3) {// 支付成功,更新訂單狀態orderService.markOrderPaySuccess(orderId);return;}// 4.確定未支付,判斷是否還有剩余延遲時間if (msg.hasNextDelay()) {// 4.1.有延遲時間,需要重發延遲消息,先獲取延遲時間的int值int delayVal = msg.removeNextDelay().intValue();// 4.2.發送延遲消息rabbitTemplate.convertAndSend(MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg,message -> {message.getMessageProperties().setDelay(delayVal);return message;});return;}// 5.沒有剩余延遲時間了,說明訂單超時未支付,需要取消訂單orderService.cancelOrder(orderId);}
}