1.概述
延遲隊列其實就是隊列里的消息是希望在指定時間到了以后或之前取出和處理,簡單來說,延時隊列就是用來存放需要在指定時間被處理的元素的隊列。
延時隊列的使用場景:
1.訂單在十分鐘之內未支付則自動取消
2.新創建的店鋪,如果在十天內都沒有上傳過商品,則自動發送消息提醒。
3.用戶注冊成功后,如果三天內沒有登陸則進行短信提醒。
4.用戶發起退款,如果三天內沒有得到處理則通知相關運營人員。
5.預定會議后,需要在預定的時間點前十分鐘通知各個與會人員參加會議
2.代碼演示?
代碼是用springboot整合的。
先導入依賴
<dependencies><!--RabbitMQ 依賴--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--RabbitMQ 測試依賴--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency>
</dependencies>
配置文件
spring.rabbitmq.host=192.168.10.137
spring.rabbitmq.port=5672
spring.rabbitmq.username=rabbit
spring.rabbitmq.password=rabbit
spring.rabbitmq.virtual-host=/
啟動器
@SpringBootApplicationpublic class App {public static void main(String[] args) {SpringApplication.run(App.class,args);}}
?需求如下:創建兩個隊列 QA 和 QB,兩者隊列 TTL 分別設置為 10S 和 40S,然后在創建一個交 換機 X 和死信交 換機 Y,它們的類型都是 direct,創建一個死信隊列 QD,它們的綁定關系如下:
定義配置類,描述上圖的隊列,交換機以及隊列和交換機之間的關系
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;// 使用@Configuration注解表明這是一個配置類,Spring容器會掃描該類來獲取Bean的定義信息
@Configuration
public class DelayedQueueConfig {// 定義直連類型(Direct)的交換機,名稱為xExchange// @Bean注解用于將方法返回的對象注冊為Spring容器中的一個Bean,"xExchange"是該Bean的名稱@Bean("xExchange")public DirectExchange xExchange() {// 創建并返回一個名為"X"的DirectExchange實例,DirectExchange類型的交換機根據路由鍵直接轉發消息return new DirectExchange("X");}// 聲明另一個直連類型的交換機,名稱為yExchange@Bean("yExchange")public DirectExchange yExchange() {// 創建并返回一個名為"Y"的DirectExchange實例return new DirectExchange("Y");}// 聲明隊列queueA@Bean("queueA")public Queue queueA() {// 創建一個HashMap用于存儲隊列的屬性參數Map<String, Object> args = new HashMap<>();// 設置死信交換機(當消息在隊列中過期或被否定確認等情況時,消息會被轉發到這個交換機)為yExchangeargs.put("x-dead-letter-exchange", "Y");// 設置死信路由鍵,當消息進入死信交換機后,根據這個路由鍵來路由到對應的死信隊列args.put("x-dead-letter-routing-key", "YD");// 設置消息在隊列中的存活時間(即延時時間)為10000毫秒,也就是10秒,這使得queueA成為一個延時隊列args.put("x-message-ttl", 10000);// 創建一個持久化的隊列(QueueBuilder.durable方法),名稱為"QA",并帶上前面設置的屬性參數return QueueBuilder.durable("QA").withArguments(args).build();}// 綁定交換機xExchange和隊列queueA// @Qualifier注解用于根據指定的Bean名稱來注入對應的Bean實例,這里分別指定了要注入的隊列和交換機實例@Beanpublic Binding queueQABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange) {// 使用BindingBuilder將隊列queueA綁定到交換機xExchange上,綁定的路由鍵為"XA"return BindingBuilder.bind(queueA).to(xExchange).with("XA");}// 聲明隊列queueB,設置其延時時間為40秒@Bean("queueB")public Queue queueB() {Map<String, Object> args = new HashMap<>();// 同樣設置死信交換機為yExchangeargs.put("x-dead-letter-exchange", "Y");// 設置死信路由鍵為"YD"args.put("x-dead-letter-routing-key", "YD");// 設置消息在隊列中的存活時間為40000毫秒,即40秒,使queueB成為延時隊列args.put("x-message-ttl", 40000);// 創建一個持久化的隊列,名稱為"QB",并帶上相關屬性參數return QueueBuilder.durable("QB").withArguments(args).build();}// 綁定交換機xExchange和隊列queueB@Beanpublic Binding queueQBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange) {// 將隊列queueB綁定到交換機xExchange上,綁定的路由鍵為"XB"return BindingBuilder.bind(queueB).to(xExchange).with("XB");}// 聲明死信隊列queueD@Bean("queueD")public Queue queueD() {// 創建一個名稱為"QD"的隊列,用于接收從延時隊列中過期轉移過來的消息return new Queue("QD");}// 聲明死信交換機yExchange和死信隊列queueD的綁定關系@Beanpublic Binding deadQueueBindingQD(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange) {// 使用BindingBuilder將死信隊列queueD綁定到死信交換機yExchange上,綁定的路由鍵為"YD"return BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}
?生產者
// @RestController注解表明該類是一個RESTful風格的控制器,用于處理HTTP請求并返回JSON等格式的數據
@RestController
// @RequestMapping("ttl")注解用于映射請求路徑,所有以"/ttl"開頭的請求會被該控制器處理
@RequestMapping("ttl")
public class SendMessageController { // @Resource注解用于自動裝配RabbitTemplate實例,RabbitTemplate是Spring AMQP提供的用于操作RabbitMQ的工具類@Resource RabbitTemplate rabbitTemplate; // 該方法用于處理"/sendMsg/{message}"路徑的請求,是消息發送的邏輯所在// @RequestMapping("sendMsg/{message}")注解將該方法映射到指定的請求路徑,其中{message}是一個路徑變量@RequestMapping("sendMsg/{message}") public void sendMessage(@PathVariable("message") String message){ // 使用rabbitTemplate的convertAndSend方法向RabbitMQ發送消息// 第一個參數"X"指定交換機名稱,對應前面配置的xExchange// 第二個參數"XA"指定路由鍵,用于將消息路由到綁定了該路由鍵的隊列(這里是queueA)// 消息內容是拼接后的字符串,表明消息來自ttl為10秒鐘的延時隊列,并帶上傳入的參數messagerabbitTemplate.convertAndSend("X", "XA", "消息來自ttl為10秒鐘的延時隊列" + message); // 同理,這條消息發送到交換機"X",通過路由鍵"XB"路由到queueB// 消息內容表明來自ttl為40秒鐘的延時隊列rabbitTemplate.convertAndSend("X", "XB", "消息來自ttl為40秒鐘的延時隊列" + message); }
}
消費者
// @Component注解將該類標記為一個Spring組件,使其能被Spring容器掃描并管理
@Component
// @Slf4j注解是Lombok提供的,用于自動生成日志對象log,方便在類中記錄日志
@Slf4j
public class MessageConsumerListener { // @RabbitListener(queues = "QD")注解表明該方法是一個RabbitMQ消息監聽器,監聽名為"QD"的隊列// 當隊列"QD"(即前面配置的死信隊列)中有消息時,該方法會被觸發執行@RabbitListener(queues = "QD") public void getMessage(Message message, Channel channel) throws Exception{ // 獲取消息體內容,將消息的字節數組轉換為字符串String msg = new String(message.getBody()); // 使用日志對象log記錄信息,輸出當前時間以及從死信隊列收到的消息內容log.info("當前時間是:{},收到死信隊列的消息{}",new Date().toString(),msg); }
}
我們上面構建的延時隊列太局限性了,因為我們直接寫死了延時隊列的時間,但我們實際的應用中很多情況都是根據客戶端動態設置時間,比如騰訊會議我們要預定多久的會。
所以下面這個案例新增了一個隊列QC,他不設置TTL,而是根據傳送的數據來動態設定。
我們在配置類中加上QC和交換機x交換機y之間的綁定關系
@Configuration
public class DelayedQueueConfig {@Bean("queueC")public Queue queueC() {Map<String, Object> args = new HashMap<>();//設置綁定死信交換機的屬性args.put("x-dead-letter-exchange", "Y");args.put("x-dead-letter-routing-key", "YD");return QueueBuilder.durable("QC").withArguments(args).build();}//綁定隊列QC和交換機X之間的關系@Beanpublic Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueC).to(xExchange).with("XC");}
}
?定義生產者
// @RestController注解表明該類是一個RESTful風格的控制器,用于處理HTTP請求并返回JSON等格式的數據
@RestController
// @RequestMapping("ttl")注解用于映射請求路徑,所有以"/ttl"開頭的請求會被該控制器處理
@RequestMapping("ttl")
public class SendMessageController { // @Resource注解用于自動裝配RabbitTemplate實例,RabbitTemplate是Spring AMQP提供的用于操作RabbitMQ的工具類@Resource RabbitTemplate rabbitTemplate; // 該方法用于處理"/sendttlMessage/{message}/{ttl}"路徑的請求,是消息發送的邏輯所在// @RequestMapping("sendttlMessage/{message}/{ttl}")注解將該方法映射到指定的請求路徑,其中{message}和{ttl}是路徑變量@RequestMapping("sendttlMessage/{message}/{ttl}") public void sendTtlMessage(@PathVariable("message") String message, @PathVariable("ttl") String ttl) { // 使用rabbitTemplate的convertAndSend方法向RabbitMQ發送消息// 第一個參數"X"指定交換機名稱// 第二個參數"XC"指定路由鍵,用于將消息路由到綁定了該路由鍵的隊列// 第三個參數message是消息內容// 第四個參數是一個Lambda表達式,用于在發送消息前設置消息的過期時間(通過setExpiration方法)// 設置的過期時間由路徑變量ttl傳入,單位為毫秒rabbitTemplate.convertAndSend("X", "XC", message, msg -> { msg.getMessageProperties().setExpiration(ttl); return msg; }); }
}