RabbitMQ延遲消息
文章目錄
- RabbitMQ延遲消息
- 一、延遲消息介紹
- 二、實現
- 2.1 死信交換機
- 2.2 延遲消息插件
- 2.3 取消超時訂單
一、延遲消息介紹
延遲消息:生產者發送消息時指定一個時間,消費者不會立刻收到消息,而是在指定時間后才收到消息
用戶下單搶購,搶到了但是沒有付款,此時其實庫存的數量已經扣減了
如果用戶遲遲沒有付款,超過一定的時間,就會將此訂單取消掉,庫存的數量也會重新加回來
我們可以定義一個定時任務掃描數據中訂單的狀態,超過一定時間沒有付款的,我們就將訂單取消
延遲任務:設置在一定時間之后才執行的任務
當用戶下單成功后,立刻向MQ中發送一條延遲消息,設定延遲時間30分鐘,30分鐘到了之后就可以收到此消息,檢查訂單狀態,如果發現未支付,則訂單直接取消。
這樣解決了實效性的問題,同時對數據庫的壓力也很小
二、實現
2.1 死信交換機
當隊列滿足下列的條件之一時就會稱為死信(dead letter)
-
消費者使用basic.reject或basic.nack聲明消費失敗,并且消息的requeue參數設置為false
消費者不要這個消息了
-
消息是一個過期的消息(達到了隊列或消息本身設置的過期時間),超時無人消費
-
要投遞的隊列消息堆積滿了,最早的消息可能成為死信
如果隊列通過dead-letter-exchange屬性指定了一個交換機,那么該隊列中的死信就會投遞到這個交換機中。這個交換機稱為死信交換機(Dead Letter Exchange,簡稱DLX)
死信交換機只是一種稱呼,和普通的交換機其實是一樣的
我們不給simple.queue隊列綁定消費者,給dlx.queue綁定一個消費者
因為simple.queue隊列沒有消費者,所以不會有人來消費,當有人通過simple.direct交換機向simple.queue隊列發送一條過期時間為30秒的消息,此消息就會在simple.queue隊列卡主
過了30s后,消息就會自動投遞到dlx.direct死信交換機,然后進入dlx.queue隊列,最終消費者拿到后會進行消費
利用死信交換機、死信隊列。過期時間的方式,模擬出了延遲消息的效果
驗證一下
- 在控制臺創建simple.direct交換機
- 將此交換機與simple.queue綁定
注意!simple.queue并沒有綁定到消費者,進入到simple.queue隊列的消息都會變成死信
- 創建隊列dlx.queue和dlx.direct并將其綁定
創建隊列
創建交換機
進行綁定
- 給simple.queue隊列設定死信交換機
注意,這個地方只能是在創建隊列的時候進行綁定
- 在消費者模塊代碼中定義兩個隊列simple.queue、dlx.queue
//檢查一下,一定不要有simple.queue的消費者
//@RabbitListener(queues = "simple.queue")
//public void listenSimpleQueue(String msg){
// System.out.println("消費者收到了simple.queue的消息:【" + msg +"】");
// throw new RuntimeException("拋出異常了");
//}@RabbitListener(queues = "dlx.queue")
public void listenDlxQueue(String msg){log.info("消費者收到了dlx.queue的消息:【" + msg +"】");
}
- 發送消息
在控制臺中下面的這個屬性是帶過期時間的屬性
Java代碼中的發送消息如下所示
@Test
void testSendTTLMessage() {Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8)).setExpiration("10000") //過期時間10s.build();//發送到死信隊列rabbitTemplate.convertAndSend("simple.direct", "hi", message);//直接向隊列發送消息log.info("消息發送成功!");
}
simple.queue隊列中10s內始終存在下面的一條消息
dlx.queue隊列的消費者在10s后會接收到消息
2.2 延遲消息插件
這種定時功能,都是有一定的性能損耗的(Redis除外)
MQ或者Spring的定時功能是在程序內部維護一個時鐘,比如每隔一秒就往前跳一次,這種時鐘的運行過程中CPU就需要不停地計算,定時任務越多,對于CPU的占用越大,定時任務屬于一種CPU密集型的任務
采用延遲消息帶來的弊端就是給服務器CPU造成的額外壓力比較大
使用交換機實現延遲消息非常的繁瑣,需要定義很多的交換機和隊列,而且死信交換機的目的是為了讓我們人工處理死信消息,并不是為了延遲消息而生的
延遲消息的插件能自動實現延遲效果
RabbitMQ官方也推出了一個插件,原生支持延遲消息功能。
該插件的原理是設計了一種支持延遲消息功能的交換機,當消息投遞到交換機后可以暫存一定時間,到期后再投遞到隊列。
暫存的時間取決于發消息時配置的時間(也就是延遲時間)
在Java代碼中配置延遲交換機的兩種方式
在聲明交換機的時候,需要多添加一個參數delayed=“true”
-
注解的方式
在消費者模塊聲明交換機、隊列
@RabbitListener(bindings = @QueueBinding(//隊列value = @Queue(name = "delay.queue", durable = "true"),//交換機exchange = @Exchange(name = "hmall.direct", delayed = "true"),//Routing keykey = "delay"
))
public void listenDelayMessage(String msg) {log.info("接收到delay.queue的延遲消息【" + msg + "】");
}
-
注入Bean的方式
這種方式只聲明了交換機
@Bean
public DirectExchange delayExchange(){return ExchangeBuilder.directExchange("delay.direct").delayed() //設置delay的屬性為true 主要是這個.durable(true) //持久化.build();
}
發送延遲消息的Java代碼
@Testvoid testSendDelayMessage() {
// Message message = MessageBuilder
// .withBody("hello".getBytes(StandardCharsets.UTF_8))
// .setExpiration("10000") //過期時間10s
// .build();
// //發送到死信隊列rabbitTemplate.convertAndSend("dela.direct", "hi", "hello", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("10000");//延遲十秒return message;}});//直接向隊列發送消息log.info("消息發送成功!");}
2.3 取消超時訂單
設置三十分鐘后檢測訂單支付狀態,存在兩個問題
-
如果并發比較高,30分鐘可能堆積消息過多,對MQ的壓力很大
-
大多數訂單在下單后1分鐘內就會支付,但是卻需要再MQ內等待30分鐘,浪費資源
30分鐘太長,可以縮短為10s,10s后立刻來檢查有沒有支付
假如10s后沒有支付,可以再發一個10s的延遲消息,直到成功后不再發送延遲消息
這樣的話MQ的壓力會減少很多
處理如下所示:
查詢支付狀態的時候,需要先查詢本地,之后再查詢支付服務,查完之后判斷支付狀態
定義延時消息時間數組
package com.hmall.common.domain;import com.hmall.common.utils.CollUtils;
import lombok.Data;import java.util.List;@Data
public class MultiDelayMessage<T> {/*** 消息體*/private T data;/*** 記錄延遲時間的集合*/private List<Long> delayMillis;public MultiDelayMessage(T data, List<Long> delayMillis) {this.data = data;this.delayMillis = delayMillis;}public static <T> MultiDelayMessage<T> of(T data, Long ... delayMillis){return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));}/*** 獲取并移除下一個延遲時間* @return 隊列中的第一個延遲時間*/public Long removeNextDelay(){return delayMillis.remove(0);}/*** 是否還有下一個延遲時間*/public boolean hasNextDelay(){return !delayMillis.isEmpty();}
}
定義好對應的交換機和隊列
@Component
@RequiredArgsConstructor
public class PayStatusListener {private final IOrderService orderService;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "mark.order.pay.queue", durable = "true"),exchange = @Exchange(name = "pay.topic", type = ExchangeTypes.TOPIC),key = "pay.success"))public void listenOrderPay(Long orderId) {/* // 1.查詢訂單Order order = orderService.getById(orderId);// 2.判斷訂單狀態是否為未支付if(order == null || order.getStatus() != 1){// 訂單不存在,或者狀態異常return;}// 3.如果未支付,標記訂單狀態為已支付orderService.markOrderPaySuccess(orderId);*/// update order set status = 2 where id = ? AND status = 1orderService.lambdaUpdate().set(Order::getStatus, 2).set(Order::getPayTime, LocalDateTime.now()).eq(Order::getId, orderId).eq(Order::getStatus, 1).update();}
}
在方法中發送延遲檢查訂單狀態的消息
@Override@GlobalTransactionalpublic Long createOrder(OrderFormDTO orderFormDTO) {// 1.訂單數據Order order = new Order();// 1.1.查詢商品List<OrderDetailDTO> detailDTOS = orderFormDTO.getDetails();// 1.2.獲取商品id和數量的MapMap<Long, Integer> itemNumMap = detailDTOS.stream().collect(Collectors.toMap(OrderDetailDTO::getItemId, OrderDetailDTO::getNum));Set<Long> itemIds = itemNumMap.keySet();// 1.3.查詢商品List<ItemDTO> items = itemClient.queryItemByIds(itemIds);if (items == null || items.size() < itemIds.size()) {throw new BadRequestException("商品不存在");}// 1.4.基于商品價格、購買數量計算商品總價:totalFeeint total = 0;for (ItemDTO item : items) {total += item.getPrice() * itemNumMap.get(item.getId());}order.setTotalFee(total);// 1.5.其它屬性order.setPaymentType(orderFormDTO.getPaymentType());order.setUserId(UserContext.getUser());order.setStatus(1);// 1.6.將Order寫入數據庫order表中save(order);// 2.保存訂單詳情List<OrderDetail> details = buildDetails(order.getId(), items, itemNumMap);detailService.saveBatch(details);// 3.扣減庫存try {itemClient.deductStock(detailDTOS);} catch (Exception e) {throw new RuntimeException("庫存不足!");}// 4.清理購物車商品// cartClient.deleteCartItemByIds(itemIds);try {rabbitTemplate.convertAndSend(MqConstants.TRADE_EXCHANGE_NAME, MqConstants.ORDER_CREATE_KEY,itemIds/*,new RelyUserInfoMessageProcessor()*/);} catch (AmqpException e) {log.error("清理購物車的消息發送異常", e);}// 5.延遲檢測訂單狀態消息try {MultiDelayMessage<Long> msg = MultiDelayMessage.of(order.getId(), 10000L, 10000L, 10000L, 15000L, 15000L, 30000L, 30000L);rabbitTemplate.convertAndSend(MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg,new DelayMessageProcessor(msg.removeNextDelay().intValue()));} catch (AmqpException e) {log.error("延遲消息發送異常!", e);}return order.getId();}
將MessagePostProcessMessage對象提取出來了,不用每次都new了
@RequiredArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {private final int delay;@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(delay);return message;}
}
或者是使用下面視頻里面的代碼
但是下面的代碼每次都要使用一個內部類