RabbitMQ進階篇

文章目錄

  • 發送者的可靠性
    • 生產者重試機制
    • 實現生產者確認
  • MQ的可靠性
    • 數據持久化
      • 交換機持久化
      • 隊列持久化
      • 消息持久化
    • Lazy Queue(可配置~)
      • 控制臺配置Lazy模式
      • 代碼配置Lazy模式
      • 更新已有隊列為lazy模式
  • 消費者的可靠性
    • 消費者確認機制
    • 失敗重試機制
    • 失敗處理策略
  • 業務冪等性
    • 唯一消息ID
    • 業務判斷
    • 兜底方案
  • 延遲消息
    • 死信交換機(不推薦使用)
    • 延遲消息
    • DelayExchange插件(推薦)
      • 插件下載地址:
      • 安裝:
      • 聲明延遲交換機
      • 發送延遲消息
    • 超時訂單問題
      • 定義常量
      • 抽取共享mq配置
      • 改造下單業務
      • 編寫查詢支付狀態接口
      • 消息監聽

發送者的可靠性

保證一個消息發送出去,至少被消費一次。
image.png
可能在多個步驟中給消息弄丟了

生產者重試機制

不建議使用, 會增加網絡和資源的消耗

第一種情況,就是生產者發送消息時,出現了網絡故障,導致與MQ的連接中斷
當RabbitTemplate與MQ連接超時后,多次重試
修改publisher模塊的application.yaml文件,添加下面的內容:

spring:rabbitmq:connection-timeout: 1s # 設置MQ的連接超時時間template:retry:enabled: true # 開啟超時重試機制initial-interval: 1000ms # 失敗后的初始等待時間multiplier: 1 # 失敗后下次的等待時長倍數,下次等待時長 = initial-interval * multipliermax-attempts: 3 # 最大重試次數

如果對于業務性能有要求,建議禁用重試機制。如果一定要使用,請合理配置等待時長和重試次數,當然也可以考慮使用異步線程來執行發送消息的代碼。

實現生產者確認

image.png

  • ConfirmCallback為發送Exchange(交換器)時回調,成功或者失敗都會觸發;
  • ReturnCallback為路由不到隊列時觸發,成功則不觸發;

Rabbitmq之ConfirmCallback與ReturnCallback使用_rabbitmq returncallback-CSDN博客
在publisher模塊的application.yaml中添加配置:

spring:rabbitmq:publisher-confirm-type: correlated # 開啟publisher confirm機制,并設置confirm類型publisher-returns: true # 開啟publisher return機制

這里publisher-confirm-type有三種模式可選:

  • none:關閉confirm機制
  • simple:同步阻塞等待MQ的回執
  • correlated:MQ異步回調返回回執

定義ReturnCallback:
image.png

package com.itheima.publisher.config;import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("觸發return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}

image.png
image.png
定義ConfirmCallback:
由于每個消息發送時的處理邏輯不一定相同,因此**ConfirmCallback需要在每次發消息時定義。**具體來說,是在調用RabbitTemplate中的convertAndSend方法 時,多傳遞一個參數:

image.png

這里的CorrelationData中包含兩個核心的東西:

  • id:消息的唯一標示,MQ對不同的消息的回執以此做判斷,避免混淆
  • SettableListenableFuture:回執結果的Future對象

將來MQ的回執就會通過這個Future來返回,我們可以提前給CorrelationData中的Future添加回調函數來處理消息回執:

image.png

我們新建一個測試,向系統自帶的交換機發送消息,并且添加ConfirmCallback:

@Test
void testPublisherConfirm() {// 1.創建CorrelationData,需要一個UUID,回調的時候通過id識別CorrelationData cd = new CorrelationData();// 2.給Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future發生異常時的處理邏輯,基本不會觸發log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回執的處理邏輯,參數中的result就是回執內容if(result.isAck()){ // result.isAck(),boolean類型,true代表ack回執,false 代表 nack回執log.debug("發送消息成功,收到 ack!");}else{ // result.getReason(),String類型,返回nack時的異常描述log.error("發送消息失敗,收到 nack, reason : {}", result.getReason());}}});// 3.發送消息rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}

image.png

開啟生產者確認比較消耗MQ性能,一般不建議開啟。而且大家思考一下觸發確認的幾種情況:

  • 路由失敗:一般是因為RoutingKey錯誤導致,往往是編程導致
  • 交換機名稱錯誤:同樣是編程錯誤導致
  • MQ內部故障:這種需要處理,但概率往往較低。因此只有對消息可靠性要求非常高的業務才需要開啟,而且僅僅需要開啟ConfirmCallback處理nack就可以了

image.png

MQ的可靠性

為了提升性能,默認情況下MQ的數據都是在內存存儲的臨時數據,重啟后就會消失。為了保證數據的可靠性,必須配置數據持久化
image.png

  • 交換機持久化
  • 隊列持久化
  • 消息持久化

數據持久化

交換機持久化

image.png

設置為Durable就是持久化模式,Transient就是臨時模式。

隊列持久化

image.png

消息持久化

image.png
image.png

說明:在開啟持久化機制以后,如果同時還開啟了生產者確認,那么MQ會在消息持久化以后才發送ACK回執,進一步確保消息的可靠性。
不過出于性能考慮,為了減少IO次數,發送到MQ的消息并不是逐條持久化到數據庫的,而是每隔一段時間批量持久化。一般間隔在100毫秒左右,這就會導致ACK有一定的延遲,因此建議生產者確認全部采用異步方式。

當內存占滿, page out會影響MQ阻塞

Lazy Queue(可配置~)

image.png

  • 接收到消息后直接存入磁盤而非內存
  • 消費者要消費消息時才會從磁盤中讀取并加載到內存(也就是懶加載)
  • 支持數百萬條的消息存儲

而在3.12版本之后,LazyQueue已經成為所有隊列的默認格式。官方推薦所有隊列都為LazyQueue模式。

控制臺配置Lazy模式

image.png

代碼配置Lazy模式

在利用SpringAMQP聲明隊列的時候,添加x-queue-mod=lazy參數也可設置隊列為Lazy模式:

@Bean
public Queue lazyQueue(){return QueueBuilder.durable("lazy.queue").lazy() // 開啟Lazy模式.build();
}

當然,我們也可以基于注解來聲明隊列并設置為Lazy模式:

@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",					arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
log.info("接收到 lazy.queue的消息:{}", msg);
}

更新已有隊列為lazy模式

可以基于命令行設置policy:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues

當然,也可以在控制臺配置policy,進入在控制臺的Admin頁面,點擊Policies,即可添加配置:
image.png

消費者的可靠性

  • 消息投遞的過程中出現了網絡故障
  • 消費者接收到消息后突然宕機
  • 消費者接收到消息后,因處理不當導致異常

消費者確認機制

當消費者處理消息結束后,應該向RabbitMQ發送一個回執,告知RabbitMQ自己消息處理狀態

  • ack:成功處理消息,RabbitMQ從隊列中刪除該消息
  • nack:消息處理失敗,RabbitMQ需要再次投遞消息
  • reject:消息處理失敗并拒絕該消息,RabbitMQ從隊列中刪除該消息

SpringAMQP幫我們實現了消息確認。并允許我們通過配置文件設置ACK處理方式

  • none:不處理。即消息投遞給消費者后立刻ack,消息會立刻從MQ刪除。非常不安全,不建議使用

  • manual:手動模式。需要自己在業務代碼中調用api,發送ack或reject,存在業務入侵,但更靈活

  • auto:自動模式。SpringAMQP利用AOP對我們的消息處理邏輯做了環繞增強,當業務正常執行時則自動返回

    • 如果是業務異常,會自動返回nack;
    • 如果是消息處理或校驗異常,自動返回reject;

通過下面的配置可以修改SpringAMQP的ACK處理方式:

spring:rabbitmq:listener:simple:acknowledge-mode: auto # 不做處理

失敗重試機制

當消費者出現異常后,消息會不斷requeue(重入隊)到隊列,再重新發送給消費者。如果消費者再次執行依然出錯,消息會再次requeue到隊列,再次投遞,直到消息處理成功為止。

當然,上述極端情況發生的概率還是非常低的,不過不怕一萬就怕萬一。為了應對上述情況Spring又提供了消費者失敗重試機制:在消費者出現異常時利用本地重試,而不是無限制的requeue到mq隊列。

修改consumer服務的application.yml文件,添加內容:

spring:rabbitmq:listener:simple:retry:enabled: true # 開啟消費者失敗重試initial-interval: 1000ms # 初識的失敗等待時長為1秒multiplier: 1 # 失敗的等待時長倍數,下次等待時長 = multiplier * last-intervalmax-attempts: 3 # 最大重試次數stateless: true # true無狀態;false有狀態。如果業務中包含事務,這里改為false

重啟consumer服務,重復之前的測試。可以發現:

  • 消費者在失敗后消息沒有重新回到MQ無限重新投遞,而是在本地重試了3次
  • 本地重試3次以后,拋出了AmqpRejectAndDontRequeueException異常。查看RabbitMQ控制臺,發現消息被刪除了,說明最后SpringAMQP返回的是reject

結論:

  • 開啟本地重試時,消息處理過程中拋出異常,不會requeue到隊列,而是在消費者本地重試
  • 重試達到最大次數后,Spring會返回reject,消息會被丟棄

失敗處理策略

Spring允許我們自定義重試次數耗盡后的消息處理策略,這個策略是由MessageRecovery接口來定義的

  • RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認就是這種方式
  • ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊
  • RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機(推薦)

比較優雅的一種處理方案是RepublishMessageRecoverer,失敗后將消息投遞到一個指定的,專門存放異常消息的隊列,后續由人工集中處理。

在consumer服務中定義處理失敗消息的交換機和隊列
定義一個RepublishMessageRecoverer,關聯隊列和交換機
image.png

/*** 錯誤消息配置類,用于配置 RabbitMQ 錯誤消息處理相關的 Bean。* 當前類會根據 spring.rabbitmq.listener.simple.retry.enabled 屬性的值來決定是否創建相關的 Bean。* 如果該屬性值為 true,則會創建錯誤消息交換機、錯誤隊列和綁定關系,并配置消息恢復器。*/
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {/*** 創建錯誤消息交換機*/@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}/*** 創建錯誤隊列*/@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}/*** 創建錯誤隊列與錯誤消息交換機的綁定關系,"error"是路由鍵*/@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}/*** 創建消息恢復器*/@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

消費者如何保證消息一定被消費?

  • 開啟消費者確認機制為auto, 由spring確認消息處理成功后返回ack, 異常時返回nack
  • 開啟消費者失敗重試機制, 并設置MessageRecoverer, 多次重試失敗后將信息投遞到異常交換機

業務冪等性

image.png
保證消息處理的冪等性。這里給出兩種方案:

  • 唯一消息ID
  • 業務狀態判斷

唯一消息ID

  • 每一條消息都生成一個唯一的id,與消息一起投遞給消費者。
  • 消費者接收到消息后處理自己的業務,業務處理成功后將消息ID保存到數據庫
  • 如果下次又收到相同消息,去數據庫查詢判斷是否存在,存在則為重復消息放棄處理。

SpringAMQP的MessageConverter自帶了MessageID的功能,我們只要開啟這個功能即可
以Jackson的消息轉換器為例( 加在啟動類 ):

@Bean
public MessageConverter messageConverter(){// 1.定義消息轉換器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自動創建消息id,用于識別不同消息,也可以在業務中基于ID判斷是否是重復消息jjmc.setCreateMessageIds(true);// 在底層會自動創建一個UUIDreturn jjmc;
}

在Spring AMQP中,當使用Jackson2JsonMessageConverter并開啟setCreateMessageIds(true)功能時,底層會自動在消息的屬性中添加一個名為amqp_messageId的字段,其值為自動生成的UUID。
具體來說,UUID會成為消息的一部分,保存在消息的AMQP(Advanced Message Queuing Protocol)屬性中。這些屬性是與消息一起傳遞的元數據,包含了關于消息的一些信息。amqp_messageId字段就是用于唯一標識消息的UUID。
當消息被發送給消費者時,消費者可以通過**message.getMessageProperties().getMessageId()**方法來獲取消息的ID,然后根據業務需求將該ID保存到數據庫中。在處理相同消息時,消費者可以在數據庫中查詢是否存在相同的消息ID,以判斷是否為重復消息。

業務判斷

image.png
image.png
相比較而言,消息ID的方案需要改造原有的數據庫,所以我更推薦使用業務判斷的方案。

@Override
public void markOrderPaySuccess(Long orderId) {
// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
lambdaUpdate()
.set(Order::getStatus, 2)
.set(Order::getPayTime, LocalDateTime.now())
.eq(Order::getId, orderId)
.eq(Order::getStatus, 1)
.update();
}

上述代碼邏輯上符合了冪等判斷的需求,但是由于判斷和更新是兩步動作,因此在極小概率下可能存在線程安全問題。

@Override
public void markOrderPaySuccess(Long orderId) {
// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
lambdaUpdate()
.set(Order::getStatus, 2)
.set(Order::getPayTime, LocalDateTime.now())
.eq(Order::getId, orderId)
.eq(Order::getStatus, 1)
.update();
}

image.png

兜底方案

既然MQ通知不一定發送到交易服務,那么交易服務就必須自己主動去查詢支付狀態。這樣即便支付服務的MQ通知失敗,我們依然能通過主動查詢來保證訂單狀態的一致。

image.png

圖中黃色線圈起來的部分就是MQ通知失敗后的兜底處理方案,由交易服務自己主動去查詢支付狀態。

綜上,支付服務與交易服務之間的訂單狀態一致性是如何保證的?

  • 首先,支付服務會正在用戶支付成功以后利用MQ消息通知交易服務,完成訂單狀態同步。
  • 其次,為了保證MQ消息的可靠性,我們采用了生產者確認機制、消費者確認、消費者失敗重試等策略,確保消息投遞的可靠性
  • 最后,我們還在交易服務設置了定時任務,定期查詢訂單支付狀態。這樣即便MQ通知失敗,還可以利用定時任務作為兜底方案,確保訂單支付狀態的最終一致性。

延遲消息

image.png

死信交換機(不推薦使用)

當一個隊列中的消息滿足下列情況之一時,可以成為死信(dead letter):

  • 消費者使用basic.reject或 basic.nack聲明消費失敗,并且消息的requeue參數設置為false
  • 消息是一個過期消息,超時無人消費
  • 要投遞的隊列消息滿了,無法投遞

死信交換機有什么作用呢?

  • 收集那些因處理失敗而被拒絕的消息
  • 收集那些因隊列滿了而被拒絕的消息
  • 收集因TTL(有效期)到期的消息

延遲消息

RabbitMQ的消息過期是基于追溯方式來實現的,也就是說當一個消息的TTL到期以后不一定會被移除或投遞到死信交換機,而是在消息恰好處于隊首時才會被處理。

當隊列中消息堆積很多的時候,過期消息可能不會被按時處理,因此你設置的TTL時間不一定準確。

DelayExchange插件(推薦)

插件下載地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

安裝:

因為我們是基于Docker安裝,所以需要先查看RabbitMQ的插件目錄對應的數據卷。

docker volume inspect mq-plugins

結果如下:
image.png

[{"CreatedAt": "2024-01-19T09:22:59+08:00","Driver": "local","Labels": null,"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data","Name": "mq-plugins","Options": null,"Scope": "local"}
]

插件目錄被掛載到了/var/lib/docker/volumes/mq-plugins/_data這個目錄,我們上傳插件到該目錄下。

接下來執行命令,啟用插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

聲明延遲交換機

基于注解方式:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"	
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延遲消息:{}", msg);
}

基于@Bean的方式:

package com.itheima.consumer.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class DelayExchangeConfig {@Beanpublic DirectExchange delayExchange(){return ExchangeBuilder.directExchange("delay.direct") // 指定交換機類型和名稱.delayed() // 設置delay的屬性為true.durable(true) // 持久化.build();}@Beanpublic Queue delayedQueue(){return new Queue("delay.queue");}@Beanpublic Binding delayQueueBinding(){return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");}
}

發送延遲消息

發送消息時,必須通過x-delay屬性設定延遲時間:

@Test
void testPublisherDelayMessage() {// 1.創建消息String message = "hello, delayed message";// 2.發送消息,利用消息后置處理器添加消息頭rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 添加延遲消息屬性message.getMessageProperties().setDelay(5000);return message;}});
}

可以寫一個延遲時間的類, 不用每次都new一個,具體代碼如下:
image.png

/*** @author Ccoo* 2024/1/22*/
@RequiredArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {private final int delay;@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(delay);return message;}
}

最后上面的業務代碼變為:

@Test
void testPublisherDelayMessage() {// 1.創建消息String message = "hello, delayed message";// 2.發送消息,利用消息后置處理器添加消息頭rabbitTemplate.convertAndSend("delay.direct", "delay", message,new DelayMessageProcessor(message.removeNextDelay().intValue());
} 

延遲消息插件內部會維護一個本地數據庫表,同時使用Elang Timers功能實現計時。如果消息的延遲時間設置較長,可能會導致堆積的延遲消息非常多,會帶來較大的CPU開銷,同時延遲消息的時間會存在誤差。因此,不建議設置延遲時間過長的延遲消息

超時訂單問題

image.png

由于我們要多次發送延遲消息,因此需要先定義一個記錄消息延遲時間的消息體,處于通用性考慮,我們將其定義到hm-common模塊下:

image.png

@Data
public class MultiDelayMessage<T> {/*** 消息體*/private T data;/*** 記錄延遲時間的集合*/private List<Long> delayMillis;public MultiDelayMessage(T data, List<Long> delayMillis) {this.data = data;this.delayMillis = delayMillis;}public static <T> MultiDelayMessage<T> of(T data, Long ... delayMillis){return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));}/*** 獲取并移除下一個延遲時間* @return 隊列中的第一個延遲時間*/public Long removeNextDelay(){return delayMillis.remove(0);}/*** 是否還有下一個延遲時間*/public boolean hasNextDelay(){return !delayMillis.isEmpty();}
}

定義常量

image.png

/*** @author Ccoo* 2024/1/22*/
public interface MqConstants {String DELAY_EXCHANGE = "trade.delay.topic";String DELAY_ORDER_QUEUE = "trade.order.delay.queue";String DELAY_ORDER_ROUTING_KEY = "order.query";
}

抽取共享mq配置

在nacos中定義一個名為shared-mq.xml的配置文件,內容如下:

spring: rabbitmq:host: ${hm.mq.host:192.168.164.128} # 主機名port: ${hm.mq.port:5672} # 端口virtual-host: ${hm.mq.vhost:/hmall} # 虛擬主機username: ${hm.mq.un:itheima} # 用戶名password: ${hm.mq.pw:123321} # 密碼listener:simple:prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個消息

在trade-service模塊添加共享配置:
image.png

改造下單業務

  1. 引入依賴

在trade-service模塊的pom.xml中引入amqp的依賴:

<!--amqp--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
  1. 改造下單業務

image.png

編寫查詢支付狀態接口

首先,在hm-api模塊定義三個類:

image.png

說明:

  • PayOrderDTO:支付單的數據傳輸實體
  • PayClient:支付系統的Feign客戶端
  • PayClientFallback:支付系統的fallback邏輯
@Data
@ApiModel(description = "支付單數據傳輸實體")
public class PayOrderDTO {@ApiModelProperty("id")private Long id;@ApiModelProperty("業務訂單號")private Long bizOrderNo;@ApiModelProperty("支付單號")private Long payOrderNo;@ApiModelProperty("支付用戶id")private Long bizUserId;@ApiModelProperty("支付渠道編碼")private String payChannelCode;@ApiModelProperty("支付金額,單位分")private Integer amount;@ApiModelProperty("付類型,1:h5,2:小程序,3:公眾號,4:掃碼,5:余額支付")private Integer payType;@ApiModelProperty("付狀態,0:待提交,1:待支付,2:支付超時或取消,3:支付成功")private Integer status;@ApiModelProperty("拓展字段,用于傳遞不同渠道單獨處理的字段")private String expandJson;@ApiModelProperty("第三方返回業務碼")private String resultCode;@ApiModelProperty("第三方返回提示信息")private String resultMsg;@ApiModelProperty("支付成功時間")private LocalDateTime paySuccessTime;@ApiModelProperty("支付超時時間")private LocalDateTime payOverTime;@ApiModelProperty("支付二維碼鏈接")private String qrCodeUrl;@ApiModelProperty("創建時間")private LocalDateTime createTime;@ApiModelProperty("更新時間")private LocalDateTime updateTime;
}
@FeignClient(value = "pay-service", fallbackFactory = PayClientFallback.class)
public interface PayClient {/*** 根據交易訂單id查詢支付單* @param id 業務訂單id* @return 支付單信息*/@GetMapping("/pay-orders/biz/{id}")PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id);
}
@Slf4j
public class PayClientFallback implements FallbackFactory<PayClient> {@Overridepublic PayClient create(Throwable cause) {return new PayClient() {@Overridepublic PayOrderDTO queryPayOrderByBizOrderNo(Long id) {return null;}};}
}

最后,在pay-service模塊的PayController中實現該接口:

@ApiOperation("根據id查詢支付單")
@GetMapping("/biz/{id}")
public PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id){PayOrder payOrder = payOrderService.lambdaQuery().eq(PayOrder::getBizOrderNo, id).one();return BeanUtils.copyBean(payOrder, PayOrderDTO.class);
}

消息監聽

接下來,我們在trader-service編寫一個監聽器,監聽延遲消息,查詢訂單支付狀態:
image.png

@Slf4j
@Component
@RequiredArgsConstructor
public class OrderStatusListener {private final IOrderService orderService;private final PayClient payClient;private final RabbitTemplate rabbitTemplate;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = MqConstants.DELAY_ORDER_QUEUE, durable = "true"),exchange = @Exchange(name = MqConstants.DELAY_EXCHANGE, type = ExchangeTypes.TOPIC),key = MqConstants.DELAY_ORDER_ROUTING_KEY))public void listenOrderCheckDelayMessage(MultiDelayMessage<Long> msg) {// 1.獲取消息中的訂單idLong orderId = msg.getData();// 2.查詢訂單,判斷狀態:1是未支付,大于1則是已支付或已關閉Order order = orderService.getById(orderId);if (order == null || order.getStatus() > 1) {// 訂單不存在或交易已經結束,放棄處理return;}// 3.可能是未支付,查詢支付服務PayOrderDTO payOrder = payClient.queryPayOrderByBizOrderNo(orderId);if (payOrder != null && payOrder.getStatus() == 3) {// 支付成功,更新訂單狀態orderService.markOrderPaySuccess(orderId);return;}// 4.確定未支付,判斷是否還有剩余延遲時間if (msg.hasNextDelay()) {// 4.1.有延遲時間,需要重發延遲消息,先獲取延遲時間的int值int delayVal = msg.removeNextDelay().intValue();// 4.2.發送延遲消息rabbitTemplate.convertAndSend(MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg,message -> {message.getMessageProperties().setDelay(delayVal);return message;});return;}// 5.沒有剩余延遲時間了,說明訂單超時未支付,需要取消訂單orderService.cancelOrder(orderId);}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/38427.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/38427.shtml
英文地址,請注明出處:http://en.pswp.cn/web/38427.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

西部智慧健身小程序+華為運動健康服務

1、 應用介紹 西部智慧健身小程序為用戶提供一站式全流程科學健身綜合服務。用戶通過登錄微信小程序&#xff0c;可享用健康篩查、運動風險評估、體質檢測評估、運動處方推送、個人運動數據監控與評估等公益服務。 2、 體驗介紹西部智慧健身小程序華為運動健康服務核心體驗如…

idea xml ctrl+/ 注釋格式不對齊

處理前 處理后 解決辦法 取消這兩個勾選

核方法總結(三)———核主成分(kernel PCA)學習筆記

一、核主成分 1.1 和PCA的區別 PCA &#xff08;主成分分析&#xff09;對應一個線性高斯模型&#xff08;參考書的第二章&#xff09;&#xff0c;其基本假設是數據由一個符合正態分布的隱變量通過一個線性映射得到&#xff0c;因此可很好描述符合高斯分布的數據。然而在很多實…

ViewBinding的使用(因為kotlin-android-extensions插件的淘汰)

書籍&#xff1a; 《第一行代碼 Android》第三版 開發環境&#xff1a; Android Studio Jellyfish | 2023.3.1 問題&#xff1a; 3.2.4在Activity中使用Toast章節中使用到了kotlin-android-extensions插件,但是該插件已經淘汰,根據網上了解,目前使用了新的技術VewBinding替…

UE4_材質_材質節點_DepthFade

一、DepthFade參數 DepthFade&#xff08;深度消退&#xff09;表達式用來隱藏半透明對象與不透明對象相交時出現的不美觀接縫。 項目說明屬性消退距離&#xff08;Fade Distance&#xff09;這是應該發生消退的全局空間距離。未連接 FadeDistance&#xff08;FadeDistance&a…

【數據分析“三劍客”】—— Pandas

Pandas Pandas 是基于NumPy的一種工具&#xff0c;該工具是為解決數據分析任務而創建的, Pandas提供了大量能使我們快速便捷地處理數據的函數和方法。Pandas與出色的 Jupyter工具包和其他庫相結合&#xff0c;Python中用于進行數據分析的環境在性能、生產率和協作能力方面都是…

光照老化試驗箱:材料耐久性的“時間加速器”

光照老化試驗箱&#xff1a;材料耐久性的“時間加速器”概述 光照老化試驗箱是一種模擬自然光照條件下材料老化過程的設備&#xff0c;廣泛應用于材料科學領域的耐久性能評估。通過模擬日光中的紫外線、熱輻射等環境因素&#xff0c;加速材料老化過程&#xff0c;以此來驗證材…

redhawk:tech file與lefdef layer name不匹配問題

我正在「拾陸樓」和朋友們討論有趣的話題&#xff0c;你?起來吧&#xff1f; 拾陸樓知識星球入口 一些工藝廠商給的redhawk tech file是加密的&#xff0c;讀完tech file再讀lef/def會報錯&#xff0c;根本不知道問題在哪&#xff0c;他們一般會搭配給一個layer map&#xff…

分解+降維+預測!多重創新!直接寫核心!EMD-KPCA-Transformer多變量時間序列光伏功率預測

分解降維預測&#xff01;多重創新&#xff01;直接寫核心&#xff01;EMD-KPCA-Transformer多變量時間序列光伏功率預測 目錄 分解降維預測&#xff01;多重創新&#xff01;直接寫核心&#xff01;EMD-KPCA-Transformer多變量時間序列光伏功率預測效果一覽基本介紹程序設計參…

【簡單講解神經網絡訓練中batch的作用】

&#x1f3a5;博主&#xff1a;程序員不想YY啊 &#x1f4ab;CSDN優質創作者&#xff0c;CSDN實力新星&#xff0c;CSDN博客專家 &#x1f917;點贊&#x1f388;收藏?再看&#x1f4ab;養成習慣 ?希望本文對您有所裨益&#xff0c;如有不足之處&#xff0c;歡迎在評論區提出…

Maven依賴解析過程詳細講解

Maven依賴解析是一個遞歸的過程&#xff0c;涉及從項目的POM文件開始&#xff0c;逐步解析直接依賴和傳遞依賴。以下是詳細的解析過程&#xff1a; 讀取項目的POM文件&#xff1a; Maven首先讀取項目的POM文件 (pom.xml)&#xff0c;該文件定義了項目的直接依賴。 解析直接依賴…

STM32第十五課:LCD屏幕及應用

文章目錄 需求一、LCD顯示屏二、全屏圖片三、數據顯示1.顯示歡迎詞2.顯示溫濕度3.顯示當前時間 四、需求實現代碼 需求 1.在LCD屏上顯示一張全屏圖片。 2.在LCD屏上顯示當前時間&#xff0c;溫度&#xff0c;濕度。 一、LCD顯示屏 液晶顯示器&#xff0c;簡稱 LCD(Liquid Cry…

node_sqlite3.node is not a valid win32 application

electron打包報錯 使用electron在linux平臺打包含有sqlite3的項目為win32應用時在運行階段&#xff0c;有時會遇到下面的錯誤 node_sqlite3.node is not a valid win32 application出現該錯誤的原因主要是在64bit的linux環境中&#xff0c;sqlite3默認build出來的二進制文件不…

【Windows】Visual Studio Installer下載緩慢解決辦法

【Windows】Visual Studio Installer下載緩慢解決辦法 1.背景2.分析3.結果 1.背景 使用visual studio在線安裝包進行IDE安裝&#xff0c;發現下載幾乎停滯&#xff0c;網速幾乎為零。 經過排查并不是因為實際網絡帶寬導致。 這里涉及DNS知識&#xff1b; DNS&#xff08;Dom…

消防認證-防火卷簾

一、消防認證 消防認證是指消防產品符合國家相關技術要求和標準&#xff0c;且通過了國家認證認可監督管理委員會審批&#xff0c;獲得消防認證資質的認證機構頒發的證書&#xff0c;消防產品具有完好的防火功能&#xff0c;是住房和城鄉建設領域驗收的重要指標。 二、認證依據…

TP8 JS(html2canvas) 把DIV內容生成二維碼并與背景圖、文字組合生成分享海報

方法一&#xff1a;前端JS生成(推薦) 注意&#xff1a; 1.這個網頁只能截圖圖片效果代碼&#xff0c;其它任何html效果都不能有&#xff0c;不然截圖就不準確 2.如果要生成的圖片DIV內容中引用了第三個方的圖片&#xff0c;就是不使用同一個域名下的圖片&#xff0c;需要把后…

Python爬取淘寶商品評價信息實戰:從零到一的高效教程

引言&#xff1a;揭秘淘寶數據金礦 在電商領域&#xff0c;用戶評價是衡量產品優劣的金標準。作為Python爬蟲工程師&#xff0c;掌握從淘寶這座數據金礦中挖掘寶貴評價信息的技能至關重要。本文將帶你手把手實操&#xff0c;用Python爬蟲技術獲取淘寶商品的評價信息&#xff0…

Docker多階段構建Node.js應用程序

Node.js 應用程序 創建一個目錄來存放你的項目文件&#xff0c;然后在該目錄下創建以下文件。 package.json {"name": "docker-node-test","version": "1.0.0","description": "A simple Node.js app for Docker mu…

【折騰筆記】使用 PicList + Lsky Pro 對圖片進行雙重壓縮

前言 因為服務器的帶寬比較小,為了提高網站的訪問速度,網站內的圖片進行壓縮是必不可少的。另外將圖片轉換成WebP的格式可以減小文件大小、加快加載速度、支持高級特性(如透明度和動畫),以及減少存儲需求,為網站提供了顯著的性能。需要提前安裝好PicList客戶端和Lsky Pr…

最快33天錄用!一投就中的醫學4區SCI,幾乎不退稿~

【SciencePub學術】今天小編給大家推薦2本生物醫學領域的SCI&#xff0c;此期刊為我處目前合作的重點期刊&#xff01;影響因子0-3.0之間&#xff0c;最重要的是審稿周期較短&#xff0c;對急投的學者較為友好&#xff01; 醫學醫藥類SCI 01 / 期刊概況 【期刊簡介】IF&…