1.內存級Spring_Event
1.1 控制器層:StringTextController
/*** 字符串文本管理控制器* 提供通過消息隊列異步獲取文本信息的接口*/
@RestController
@RequestMapping("/api/string-text")
public class StringTextController {@Resourceprivate StringTextProducer stringTextProducer;/*** 通過消息隊列異步查詢字符串文本信息* * 流程:* 1. 接收前端查詢參數* 2. 封裝為消息對象發送到消息隊列* 3. 立即返回"消息已發送"響應,不等待實際處理結果* * 優點:* - 避免長耗時查詢阻塞HTTP連接* - 支持水平擴展處理能力* * @param pageReqVO 分頁查詢參數* @return 統一響應結果(僅包含消息發送狀態)*/@GetMapping("/getStringTextByMQ")@Operation(summary = "通過消息隊列獲取test信息")public CommonResult<String> getStringTextByMQ(@Validated @ModelAttribute StringTextPageReqVO pageReqVO) {log.info("控制器接收到MQ請求,線程名: {}", Thread.currentThread().getName());try {// 發送消息到隊列(異步處理)stringTextProducer.sendStringTextQueryMessage(pageReqVO.getText(), pageReqVO.getPageNo(), pageReqVO.getPageSize());return CommonResult.success("消息已發送,異步處理中");} catch (Exception e) {log.error("消息發送異常", e);return CommonResult.error(500, "消息發送失敗");}}
}
1.2?消息生產者:StringTextProducer
/*** 字符串文本查詢消息生產者* 負責將查詢請求封裝為消息并發送到消息隊列* * 設計模式:* - 使用Spring事件機制作為輕量級消息隊列* - 可無縫切換為Kafka/RocketMQ等真正的MQ系統*/
@Slf4j
@Component
public class StringTextProducer {@Resourceprivate ApplicationContext applicationContext;/*** 發送字符串查詢消息* * @param text 查詢文本(用于模糊匹配)* @param pageNo 頁碼* @param pageSize 每頁大小* * 消息傳遞機制:* 1. 創建StringTextQueryMessage對象* 2. 通過Spring事件發布器將消息廣播到事件總線* 3. 由StringTextConsumer監聽并處理該消息*/public void sendStringTextQueryMessage(String text, Integer pageNo, Integer pageSize) {StringTextQueryMessage message = new StringTextQueryMessage();message.setText(text);message.setPageNo(pageNo);message.setPageSize(pageSize);log.info("[sendStringTextQueryMessage][發送消息: {}]", message);// 使用Spring事件機制發布消息// 注意:此處使用同步事件,通過@Async注解在Consumer端實現異步applicationContext.publishEvent(message);}
}
1.3?消息消費者:StringTextConsumer
/*** 字符串文本查詢消息消費者* 負責從消息隊列接收查詢請求并執行實際查詢操作* * 線程模型:* - 使用@Async注解指定專用線程池"curdAsyncExecutor"* - 實現請求處理與HTTP請求線程分離*/
@Slf4j
@Component
public class StringTextConsumer {@Resourceprivate StringTextService stringTextService;/*** 處理字符串查詢消息* * @param message 查詢消息對象* * 執行流程:* 1. 從消息中提取查詢參數* 2. 調用Service層執行實際查詢* 3. 記錄查詢結果(可擴展為保存到結果表或通知前端)*/@EventListener@Async("curdAsyncExecutor") // 使用專用異步線程池public void onMessage(StringTextQueryMessage message) {log.info("[onMessage][接收消息: {},線程名: {}]", message, Thread.currentThread().getName());// 1. 將消息轉換為請求對象StringTextPageReqVO reqVO = new StringTextPageReqVO();reqVO.setText(message.getText());reqVO.setPageNo(message.getPageNo());reqVO.setPageSize(message.getPageSize());// 2. 執行實際查詢(可能是數據庫或外部系統)PageResult<StringTextDO> pageResult = stringTextService.getUserPage(reqVO);log.info("[onMessage][處理結果: {}]", pageResult);// 3. 可擴展邏輯:// - 將結果保存到臨時表// - 推送WebSocket通知前端結果就緒// - 觸發后續處理流程}
}
1.4?消息模型:StringTextQueryMessage
/*** 字符串文本查詢消息* 用于在生產者和消費者之間傳遞查詢參數* * 設計特點:* - 實現Serializable接口以便于消息傳輸* - 使用JSR-303注解進行參數校驗* - 字段與StringTextPageReqVO保持語義一致*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class StringTextQueryMessage implements Serializable {private static final long serialVersionUID = 20230610L;/*** 查詢的文本內容(用于模糊查詢)*/private String text;/*** 頁碼(從1開始)*/@NotNull(message = "頁碼不能為空")private Integer pageNo;/*** 每頁大小*/@NotNull(message = "每頁大小不能為空")private Integer pageSize;
}
客戶端 → HTTP請求 → StringTextController → StringTextProducer → 消息隊列 → StringTextConsumer → StringTextService → 數據庫查詢
1.5 問題提出
1.5.1 生產者如何找到消費者?
- StringTextProducer 通過 applicationContext.publishEvent 發布 StringTextQueryMessage 事件。
- Spring 容器根據事件類型(StringTextQueryMessage)將事件分發給 StringTextConsumer 的 @EventListener 方法。
- 匹配基于 Java 類型系統,參數類型必須兼容事件類型。
1.5.2 假如添加多個新的接收參數為StringTextQueryMessage的消費者,還有幾個是接收的參數是StringTextQueryMessage及其子類,會全部接收到還是會匹配精度最高的?
- 事件類型匹配:
- Spring 根據事件對象的 運行時類型(StringTextQueryMessage 或其子類)查找所有 @EventListener 方法,檢查其參數類型是否與事件類型兼容。
- 兼容的規則是:監聽方法的參數類型可以是事件類型的 類本身、超類 或 接口。
- 例如,如果發布的事件是 StringTextQueryMessage,所有參數類型為 StringTextQueryMessage、其超類(如 Object)或接口的 @EventListener 方法都會被調用。
- 多消費者行為:
- Spring Event 不基于“精度最高”選擇單個消費者,而是 廣播 事件給所有匹配的監聽器。
- 如果有多個消費者監聽 StringTextQueryMessage 或其子類,Spring 會按順序調用所有匹配的 @EventListener 方法。
- 執行順序:
- 默認情況下,Spring 不保證 @EventListener 方法的調用順序。
- 可以通過 @Order 注解或實現 Ordered 接口指定執行順序(較低的 order 值優先執行)。
- 異步性:
- 如果 @EventListener 方法標注了 @Async(如您的 StringTextConsumer),每個消費者的處理會在異步線程中執行,互不阻塞。
1.5.3以上情況能不能指定一個consumer匹配?
方法 1:使用條件注解(@EventListener(condition = ...))
- 在 @EventListener 中添加 condition 屬性,通過 SpEL(Spring Expression Language)過濾事件。
- 示例:修改 StringTextConsumer1 和 StringTextConsumer2,添加條件:
@EventListener(condition = "#message.text == 'specific'")
如果 message.text == "specific",只有 Consumer1 處理。
方法 2:使用自定義事件路由
- 引入一個事件路由機制,通過事件對象的額外屬性指定目標消費者。
- 修改 StringTextQueryMessage,添加 consumerId 字段:
@Data
public class StringTextQueryMessage {
? ? private String text;
? ? @NotNull(message = "頁碼不能為空")
? ? private Integer pageNo;
? ? @NotNull(message = "每頁大小不能為空")
? ? private Integer pageSize;
? ? private String consumerId; // 新增:指定目標消費者
}
@Slf4j
@Component
public class StringTextConsumer1 {
? ? @Resource
? ? private StringTextService stringTextService;
? ? @EventListener
? ? @Async("curdAsyncExecutor")
? ? public void onMessage(StringTextQueryMessage message) {
? ? ? ? if (!"consumer1".equals(message.getConsumerId())) {
? ? ? ? ? ? return; // 忽略不匹配的 consumerId
? ? ? ? }
? ? ? ? log.info("[Consumer1][接收消息: {},線程名: {}]", message, Thread.currentThread().getName());
? ? ? ? StringTextPageReqVO reqVO = new StringTextPageReqVO();
? ? ? ? reqVO.setText(message.getText());
? ? ? ? reqVO.setPageNo(message.getPageNo());
? ? ? ? reqVO.setPageSize(message.getPageSize());
? ? ? ? PageResult<StringTextDO> pageResult = stringTextService.getUserPage(reqVO);
? ? ? ? log.info("[Consumer1][處理結果: {}]", pageResult);
? ? }
}
@Slf4j
@Component
public class StringTextConsumer2 {
? ? @EventListener
? ? @Async("curdAsyncExecutor")
? ? public void onMessage(StringTextQueryMessage message) {
? ? ? ? if (!"consumer2".equals(message.getConsumerId())) {
? ? ? ? ? ? return;
? ? ? ? }
? ? ? ? log.info("[Consumer2][接收消息: {},線程名: {}]", message, Thread.currentThread().getName());
? ? }
}
方法 3:自定義事件分發器
- 重寫 Spring 的事件分發邏輯,使用自定義 ApplicationEventMulticaster:
- 創建自定義 ApplicationEventMulticaster:
? ? @Component("applicationEventMulticaster")
? ? public class CustomEventMulticaster extends SimpleApplicationEventMulticaster {
? ? ? ? @Override
? ? ? ? public void multicastEvent(ApplicationEvent event) {
? ? ? ? ? ? if (event instanceof StringTextQueryMessage) {
? ? ? ? ? ? ? ? StringTextQueryMessage message = (StringTextQueryMessage) event;
? ? ? ? ? ? ? ? // 根據條件選擇特定消費者,例如 consumerId
? ? ? ? ? ? ? ? getApplicationListeners(event).stream()
? ? ? ? ? ? ? ? ? ? ? ? .filter(listener -> {
? ? ? ? ? ? ? ? ? ? ? ? ? ? // 假設 listener 的方法名或類名包含 consumerId
? ? ? ? ? ? ? ? ? ? ? ? ? ? return listener.getListenerId().contains(message.getConsumerId());
? ? ? ? ? ? ? ? ? ? ? ? })
? ? ? ? ? ? ? ? ? ? ? ? .forEach(listener -> listener.onApplicationEvent(event));
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? super.multicastEvent(event);
? ? ? ? ? ? }
? ? ? ? }
? ? }
執行結果如下:
2025-06-10 10:39:03.349 | INFO 16540 | http-nio-48080-exec-7 [TID: N/A] c.i.m.f.a.c.i.ApiAccessLogInterceptor | [preHandle][開始請求 URL(/admin-api/curd/stringtext/getStringTextByMQ) 參數({text=username})]Controller 方法路徑:cn.iocoder.moyun.module.curd.controller.admin.CurdTestController(CurdTestController.java:57)
2025-06-10 10:39:03.355 | INFO 16540 | http-nio-48080-exec-7 [TID: N/A] c.i.m.m.c.c.admin.CurdTestController | 控制器接收到MQ請求,線程名: http-nio-48080-exec-7
2025-06-10 10:39:03.355 | INFO 16540 | http-nio-48080-exec-7 [TID: N/A] c.i.m.m.c.m.p.sms.StringTextProducer | [sendStringTextQueryMessage][發送消息: StringTextQueryMessage(text=username, pageNo=1, pageSize=10)]
2025-06-10 10:39:03.357 | INFO 16540 | http-nio-48080-exec-7 [TID: N/A] c.i.m.f.a.c.i.ApiAccessLogInterceptor | [afterCompletion][完成請求 URL(/admin-api/curd/stringtext/getStringTextByMQ) 耗時(6 ms)]
2025-06-10 10:39:03.358 | INFO 16540 | curd-async-2 [TID: N/A] c.i.m.m.c.m.c.sms.StringTextConsumer | [onMessage][接收消息: StringTextQueryMessage(text=username, pageNo=1, pageSize=10),線程名: curd-async-2]
2025-06-10 10:39:03.374 | INFO 16540 | curd-async-2 [TID: N/A] c.i.m.m.c.s.s.StringTextServiceImpl | 同步查詢結果: PageResult(list=[StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-03T10:00, updateTime=null, creator=system, updater=null, deleted=false), tenantId=1), id=3, text={"username":"user002","nickname":"臨時用戶","password":"xyz789","status":1}, loginIp=192.168.1.102, loginDate=2023-05-03T10:20), StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-02T09:00, updateTime=2023-05-03T11:30, creator=system, updater=admin, deleted=false), tenantId=1), id=2, text={"username":"user001","nickname":"測試用戶","password":"abc123","email":"test@example.com","mobile":"13800138001"}, loginIp=192.168.1.101, loginDate=2023-05-02T09:15), StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-01T08:00, updateTime=2023-05-05T15:00, creator=system, updater=admin, deleted=false), tenantId=1), id=1, text={"username":"admin","nickname":"管理員","password":"123456","dept_id":1001,"post_ids":[101,102]}, loginIp=192.168.1.100, loginDate=2023-05-01T08:30)], total=3)
2025-06-10 10:39:03.375 | INFO 16540 | curd-async-2 [TID: N/A] c.i.m.m.c.m.c.sms.StringTextConsumer | [onMessage][處理結果: PageResult(list=[StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-03T10:00, updateTime=null, creator=system, updater=null, deleted=false), tenantId=1), id=3, text={"username":"user002","nickname":"臨時用戶","password":"xyz789","status":1}, loginIp=192.168.1.102, loginDate=2023-05-03T10:20), StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-02T09:00, updateTime=2023-05-03T11:30, creator=system, updater=admin, deleted=false), tenantId=1), id=2, text={"username":"user001","nickname":"測試用戶","password":"abc123","email":"test@example.com","mobile":"13800138001"}, loginIp=192.168.1.101, loginDate=2023-05-02T09:15), StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-01T08:00, updateTime=2023-05-05T15:00, creator=system, updater=admin, deleted=false), tenantId=1), id=1, text={"username":"admin","nickname":"管理員","password":"123456","dept_id":1001,"post_ids":[101,102]}, loginIp=192.168.1.100, loginDate=2023-05-01T08:30)], total=3)]
2. Redis消息隊列
第一部分:Redis完成MQ的詳細過程梳理
Yudao項目利用Redis實現了兩種消息隊列機制:
- Redis Stream:用于集群消費,消息持久化存儲,支持消費者組,適合需要可靠性和消息追蹤的場景。
- Redis Pub/Sub(Channel):用于廣播消費,消息不持久化,適合實時性要求高的場景,所有訂閱者都會收到消息。
我將模擬一個完整的消息發送和消費過程,分別講解Stream和Channel的實現,詳細說明每個類的作用、注冊方式以及它們如何協作。
2.1 Redis Stream消息隊列的完整過程
場景:通過/getStringTextByStream接口發送Stream消息
假設用戶通過HTTP請求調用/getStringTextByStream接口,傳入StringTextPageReqVO對象(包含text、pageNo、pageSize字段)。以下是消息從發送到消費的詳細流程:
步驟1:接收HTTP請求(Controller層)
- 相關類:@GetMapping("/getStringTextByStream")(未顯式定義類名,假設為StringTextController)
- 作用:這是一個Spring MVC控制器方法,負責接收前端的HTTP GET請求,處理通過Redis Stream發送消息的邏輯。
- 功能:
- 接收StringTextPageReqVO參數,包含分頁查詢的信息(如text、pageNo、pageSize)。
- 調用StringTextRedisProducer的sendStreamMessage方法發送消息。
- 返回CommonResult表示消息已發送,異步處理中。
- 框架注冊:
- 通過Spring的@GetMapping注解,自動注冊為一個HTTP端點。
- 依賴Spring Boot的Web模塊,控制器類通常標注@RestController,由Spring容器管理。
- 與其他類的交互:
- 依賴StringTextRedisProducer來發送Stream消息。
- 使用StringTextPageReqVO作為請求參數的VO(Value Object)。
- 代碼分析:
@GetMapping("/getStringTextByStream")
public CommonResult<String> getStringTextByStream(@Validated @ModelAttribute StringTextPageReqVO pageReqVO) {log.info("控制器接收到 Stream 請求,線程名: {}", Thread.currentThread().getName());try {stringTextRedisProducer.sendStreamMessage(pageReqVO.getText(), pageReqVO.getPageNo(), pageReqVO.getPageSize());return CommonResult.success("Stream 消息已發送,異步處理中");} catch (Exception e) {log.error("Stream 消息發送異常", e);return CommonResult.error(500, "Stream 消息發送失敗");}
}
- 控制器接收到請求后,提取pageReqVO中的字段,傳遞給StringTextRedisProducer。
- 異常處理確保即使發送失敗,也能返回錯誤響應。
步驟2:生產者發送Stream消息
- 相關類:StringTextRedisProducer
- 作用:這是一個生產者類,負責創建并發送Redis Stream消息。
- 功能:
- 提供sendStreamMessage方法,構造StringTextQueryStreamMessage對象,設置text、pageNo、pageSize字段。
- 使用RedisMQTemplate的send方法將消息發送到Redis Stream。
- 記錄日志,跟蹤消息發送情況。
- 框架注冊:
- 標注@Component,由Spring容器管理。
- 通過@Resource注入RedisMQTemplate依賴。
- 與其他類的交互:
- 依賴RedisMQTemplate執行實際的Redis Stream消息發送。
- 使用StringTextQueryStreamMessage作為消息載體。
- 被StringTextController調用。
- 代碼分析
@Component
public class StringTextRedisProducer {@Resourceprivate RedisMQTemplate redisMQTemplate;public void sendStreamMessage(String text, Integer pageNo, Integer pageSize) {StringTextQueryStreamMessage message = new StringTextQueryStreamMessage();message.setText(text);message.setPageNo(pageNo);message.setPageSize(pageSize);redisMQTemplate.send(message);log.info("[sendStreamMessage][發送 Stream 消息: {}]", message);}
}
- 創建StringTextQueryStreamMessage對象,填充請求參數。
- 調用redisMQTemplate.send(message),將消息發送到Redis Stream。
- 相關類:StringTextQueryStreamMessage
- 作用:Stream消息的載體,定義消息的結構和Stream Key。
- 功能:
- 繼承AbstractRedisStreamMessage,包含text、pageNo、pageSize字段。
- 重寫getStreamKey(),指定Stream的鍵為"string-text-query-stream"。
- 框架注冊:
- 作為普通Java類,無需Spring注冊,直接由StringTextRedisProducer實例化。
- 與其他類的交互:
- 被StringTextRedisProducer用于封裝消息內容。
- 被StringTextStreamConsumer用于解析消息內容。
- 代碼分析:
@Data
public class StringTextQueryStreamMessage extends AbstractRedisStreamMessage {private String text;@NotNull(message = "頁碼不能為空")private Integer pageNo;@NotNull(message = "每頁大小不能為空")private Integer pageSize;@Overridepublic String getStreamKey() {return "string-text-query-stream";}
}
- 定義消息結構,確保pageNo和pageSize不為空。
- 指定Stream Key,用于Redis Stream的消息存儲。
步驟3:RedisMQTemplate發送消息
- 相關類:RedisMQTemplate
- 作用:Redis MQ操作的核心模板類,封裝了Redis Stream和Pub/Sub的消息發送邏輯。
- 功能:
- 提供send方法,針對Stream消息,將消息序列化為JSON并添加到指定的Stream Key。
- 支持攔截器機制,在消息發送前后調用RedisMessageInterceptor的鉤子方法。
- 使用RedisTemplate執行底層的Redis操作。
- 框架注冊:
- 通過YudaoRedisMQProducerAutoConfiguration類注冊為Spring Bean。
- 依賴StringRedisTemplate和RedisMessageInterceptor列表。
- 與其他類的交互:
- 被StringTextRedisProducer調用,用于發送Stream消息。
- 依賴RedisTemplate執行Redis操作。
- 調用RedisMessageInterceptor處理消息發送前后的擴展邏輯。
- 被StringTextStreamConsumer用于ACK消息。
- 代碼分析:
public class RedisMQTemplate {private final RedisTemplate<String, ?> redisTemplate;private final List<RedisMessageInterceptor> interceptors = new ArrayList<>();public <T extends AbstractRedisStreamMessage> RecordId send(T message) {try {sendMessageBefore(message);return redisTemplate.opsForStream().add(StreamRecords.newRecord().ofObject(JsonUtils.toJsonString(message)).withStreamKey(message.getStreamKey()));} finally {sendMessageAfter(message);}}
}
- 調用sendMessageBefore執行攔截器前置邏輯。
- 使用redisTemplate.opsForStream().add將消息添加到Stream,返回RecordId。
- 調用sendMessageAfter執行攔截器后置邏輯。
- 相關類:YudaoRedisMQProducerAutoConfiguration
- 作用:生產者配置類,負責注冊RedisMQTemplate。
- 功能:
- 創建RedisMQTemplate實例,注入StringRedisTemplate和攔截器列表。
- 確保在YudaoRedisAutoConfiguration之后加載(依賴Redis配置)。
- 框架注冊:
- 標注@AutoConfiguration,由Spring Boot自動加載。
- 使用@Bean注冊RedisMQTemplate。
- 與其他類的交互:
- 為StringTextRedisProducer和StringTextStreamConsumer提供RedisMQTemplate。
- 依賴StringRedisTemplate和RedisMessageInterceptor。
- 代碼分析:
@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
public class YudaoRedisMQProducerAutoConfiguration {@Beanpublic RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,List<RedisMessageInterceptor> interceptors) {RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate);interceptors.forEach(redisMQTemplate::addInterceptor);return redisMQTemplate;}
}
相關類:RedisMessageInterceptor
- 作用:消息攔截器接口,提供發送和消費消息前后的擴展點。
- 功能:
- 定義sendMessageBefore、sendMessageAfter、consumeMessageBefore、consumeMessageAfter方法。
- 適用于多租戶場景或其他擴展需求(如日志記錄、權限檢查)。
- 框架注冊:
- 實現類需標注@Component,由Spring容器管理。
- 由YudaoRedisMQProducerAutoConfiguration注入到RedisMQTemplate。
- 與其他類的交互:
- 被RedisMQTemplate調用,用于消息處理擴展。
步驟4:消費者監聽和處理Stream消息
- 相關類:StringTextStreamConsumer
- 作用:Stream消息的消費者,負責處理收到的StringTextQueryStreamMessage消息。
- 功能:
- 繼承AbstractRedisStreamMessageListener,實現onMessage方法。
- 將消息轉換為StringTextPageReqVO,調用StringTextService執行分頁查詢。
- 記錄處理結果日志。
- 框架注冊:
- 標注@Component,由Spring容器管理。
- 通過YudaoRedisMQConsumerAutoConfiguration注冊到StreamMessageListenerContainer。
- 與其他類的交互:
- 依賴StringTextService執行業務邏輯。
- 依賴RedisMQTemplate進行消息ACK。
- 接收StringTextQueryStreamMessage消息。
- 代碼分析:
@Component
public class StringTextStreamConsumer extends AbstractRedisStreamMessageListener<StringTextQueryStreamMessage> {@Resourceprivate StringTextService stringTextService;@Overridepublic void onMessage(StringTextQueryStreamMessage message) {log.info("[onMessage][Stream 消息: {},線程名: {}]", message, Thread.currentThread().getName());StringTextPageReqVO reqVO = new StringTextPageReqVO();reqVO.setText(message.getText());reqVO.setPageNo(message.getPageNo());reqVO.setPageSize(message.getPageSize());PageResult<StringTextDO> pageResult = stringTextService.getUserPage(reqVO);log.info("[onMessage][Stream 處理結果: {}]", pageResult);}
}
- 相關類:AbstractRedisStreamMessageListener
- 作用:Stream消息監聽器的抽象基類,封裝通用消費邏輯。
- 功能:
- 實現StreamListener接口,處理Redis Stream消息。
- 解析消息為指定類型(StringTextQueryStreamMessage)。
- 調用攔截器前置/后置邏輯,執行消息ACK。
- 框架注冊:
- 作為抽象類,不直接注冊,其子類(如StringTextStreamConsumer)注冊。
- 與其他類的交互:
- 被StringTextStreamConsumer繼承。
- 依賴RedisMQTemplate進行ACK。
- 被YudaoRedisMQConsumerAutoConfiguration用于注冊監聽器。
- 相關類:YudaoRedisMQConsumerAutoConfiguration
- 作用:消費者配置類,負責注冊Stream和Pub/Sub的監聽容器。
- 功能:
- 創建StreamMessageListenerContainer,配置Stream監聽。
- 注冊StringTextStreamConsumer到容器,綁定Stream Key和消費者組。
- 創建RedisPendingMessageResendJob處理未消費消息。
- 框架注冊:
- 標注@AutoConfiguration,在YudaoRedisAutoConfiguration之后加載。
- 使用@Bean注冊StreamMessageListenerContainer和RedisPendingMessageResendJob。
- 與其他類的交互:
- 依賴RedisMQTemplate和StringTextStreamConsumer。
- 為RedisPendingMessageResendJob提供監聽器列表。
- 代碼分析:
@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
public class YudaoRedisMQConsumerAutoConfiguration {@Beanpublic StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners) {// 配置容器,注冊監聽器}
}
相關類:RedisPendingMessageResendJob
- 作用:定時任務,重新投遞Stream中未消費的超時消息。
- 功能:
- 每分鐘檢查Pending消息,超時(默認5分鐘)后重新投遞。
- 使用分布式鎖(Redisson)確保單實例執行。
- 框架注冊:
- 由YudaoRedisMQConsumerAutoConfiguration注冊為Bean。
- 標注@Scheduled啟用定時任務。
- 與其他類的交互:
- 依賴RedisMQTemplate和StringTextStreamConsumer的監聽器列表。
- 使用RedissonClient實現分布式鎖。
步驟5:業務邏輯處理
- 相關類:StringTextService
- 作用:業務服務層,執行分頁查詢邏輯。
- 功能:
- 提供getUserPage方法,根據StringTextPageReqVO查詢分頁數據。
- 返回PageResult<StringTextDO>。
- 框架注冊:
- 通常標注@Service,由Spring容器管理。
- 與其他類的交互:
- 被StringTextStreamConsumer調用。
- 相關類:StringTextPageReqVO
- 作用:分頁請求的VO,定義查詢參數。
- 功能:
- 包含text、pageNo、pageSize等字段。
- 繼承PageParam,支持分頁參數。
- 框架注冊:
- 作為普通Java類,無需注冊。
- 與其他類的交互:
- 被StringTextController接收前端參數。
- 被StringTextStreamConsumer用于構造查詢參數。
第二部分:具體類解析
2.2 RedisMQTemplate類
/*** Redis MQ 操作模板類* 封裝了基于Redis的兩種消息模型的發送操作:* 1. Pub/Sub模式:發布訂閱模式,適合廣播消息* 2. Stream模式:消息流模式,適合需要消息順序和可靠性的場景* 同時提供了消息攔截器機制,允許在消息發送前后執行自定義邏輯*/
@AllArgsConstructor
public class RedisMQTemplate {@Getterprivate final RedisTemplate<String, ?> redisTemplate;/*** 攔截器數組* 用于在消息發送前后執行自定義邏輯,如日志記錄、指標收集、分布式追蹤等*/@Getterprivate final List<RedisMessageInterceptor> interceptors = new ArrayList<>();/*** 發送 Redis 消息,基于 Redis pub/sub 實現* * @param message 繼承自AbstractRedisChannelMessage的消息對象* @return void 無返回值* @功能 基于Redis的Pub/Sub模式發布消息,在發送前后分別執行攔截器的前置和后置處理* 使用JSON序列化消息內容,通過message.getChannel()獲取消息發布的頻道名稱*/public <T extends AbstractRedisChannelMessage> void send(T message) {try {sendMessageBefore(message);// 發送消息redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message));} finally {sendMessageAfter(message);}}/*** 發送 Redis 消息,基于 Redis Stream 實現* * @param message 繼承自AbstractRedisStreamMessage的消息對象* @return RecordId Redis Stream生成的消息ID,格式如"1686475200000-0"* @功能 基于Redis的Stream模式發送消息,在發送前后分別執行攔截器的前置和后置處理* 使用JSON序列化消息內容,通過message.getStreamKey()獲取Stream的鍵名*/public <T extends AbstractRedisStreamMessage> RecordId send(T message) {try {sendMessageBefore(message);// 發送消息return redisTemplate.opsForStream().add(StreamRecords.newRecord().ofObject(JsonUtils.toJsonString(message)) // 設置內容.withStreamKey(message.getStreamKey())); // 設置 stream key} finally {sendMessageAfter(message);}}/*** 添加攔截器* * @param interceptor 消息攔截器對象* @return void 無返回值* @功能 向攔截器列表中添加一個消息攔截器,攔截器將在消息發送前后執行自定義邏輯*/public void addInterceptor(RedisMessageInterceptor interceptor) {interceptors.add(interceptor);}/*** 消息發送前執行攔截器邏輯* * @param message 消息對象* @return void 無返回值* @功能 在消息發送前調用所有攔截器的前置處理方法,按攔截器添加的順序執行(正序)*/private void sendMessageBefore(AbstractRedisMessage message) {// 正序執行攔截器的前置處理方法interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message));}/*** 消息發送后執行攔截器邏輯* * @param message 消息對象* @return void 無返回值* @功能 在消息發送后調用所有攔截器的后置處理方法,按攔截器添加順序的逆序執行(倒序)*/private void sendMessageAfter(AbstractRedisMessage message) {// 倒序執行攔截器的后置處理方法for (int i = interceptors.size() - 1; i >= 0; i--) {interceptors.get(i).sendMessageAfter(message);}}}
2.3 消息父類及其子類
AbstractRedisChannelMessageListener自定義監聽類父類
/*** Redis Pub/Sub 監聽器抽象類,用于實現廣播消費** @param <T> 消息類型。一定要填寫噢,不然會報錯*/
public abstract class AbstractRedisChannelMessageListener<T extends AbstractRedisChannelMessage> implements MessageListener {/*** 消息類型*/private final Class<T> messageType;/*** Redis Channel*/private final String channel;/*** RedisMQTemplate*/@Setterprivate RedisMQTemplate redisMQTemplate;/*** messageType 在 onMessage 方法中用于將 Redis 收到的消息(JSON 字符串)反序列化為 T 類型的對象* 通過 JsonUtils.parseObject(message.getBody(), messageType),明確的消息類型確保了正確的反序列化* channel 字段用于指定監聽器訂閱的 Redis Pub/Sub 通道* getChannel() 方法允許每個消息類型定義自己的通道,使監聽器能夠靈活適應不同的消息類型和通道* messageType 確定了后續消息反序列化的目標類型,確保消息被解析為 StringTextQueryChannelMessage。* channel 確定了監聽器訂閱的 Redis 通道,Spring Data Redis 會使用 getChannel() 的返回值("string.text.query.channel")訂閱該通道。* @SneakyThrows 隱藏了反射相關的異常(如 NoSuchMethodException),假設 StringTextQueryChannelMessage 有無參構造函數且 getChannel() 可訪問。*/@SneakyThrowsprotected AbstractRedisChannelMessageListener() {//通過反射獲取子類的泛型參數this.messageType = getMessageClass();/*** 具體案例:* StringTextQueryChannelMessage 的無參構造函數* 調用 newInstance() 創建一個 StringTextQueryChannelMessage 實例。* 調用實例的 getChannel() 方法,獲取通道名稱(如 "string.text.query.channel")*/this.channel = messageType.getDeclaredConstructor().newInstance().getChannel();}/*** 獲得 Sub 訂閱的 Redis Channel 通道** @return channel*/public final String getChannel() {return channel;}/*** Message message:Redis 傳遞的原始消息對象,包含消息體(message.getBody())和通道信息* T messageObj = JsonUtils.parseObject(message.getBody(), messageType)* 使用 JsonUtils.parseObject 將消息體(message.getBody(),字節數組)反序列化為 T 類型的對象* messageType 是構造函數中初始化的消息類(Class<T>),確保消息被正確解析為指定類型(如 OrderMessage)* 調用 this.onMessage(messageObj),這是抽象方法,子類必須實現以定義具體的消息處理* messageType 是 StringTextQueryChannelMessage.class(由構造函數設置)。* 具體案例:* JsonUtils.parseObject(message.getBody(), messageType)* 將 JSON 字節數組解析為 StringTextQueryChannelMessage 對象:* @param message* @param bytes*/@Overridepublic final void onMessage(Message message, byte[] bytes) {T messageObj = JsonUtils.parseObject(message.getBody(), messageType);try {consumeMessageBefore(messageObj);// 消費消息this.onMessage(messageObj);} finally {consumeMessageAfter(messageObj);}}/*** 處理消息** @param message 消息*/public abstract void onMessage(T message);/*** 通過解析類上的泛型,獲得消息類型** @return 消息類型*/@SuppressWarnings("unchecked")private Class<T> getMessageClass() {Type type = TypeUtil.getTypeArgument(getClass(), 0);if (type == null) {throw new IllegalStateException(String.format("類型(%s) 需要設置消息類型", getClass().getName()));}return (Class<T>) type;}private void consumeMessageBefore(AbstractRedisMessage message) {assert redisMQTemplate != null;List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();// 正序interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));}private void consumeMessageAfter(AbstractRedisMessage message) {assert redisMQTemplate != null;List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();// 倒序for (int i = interceptors.size() - 1; i >= 0; i--) {interceptors.get(i).consumeMessageAfter(message);}}
StringTextChannelConsumerListener自定義監聽器實現類
@Slf4j
@Component
public class StringTextChannelConsumerListener extends AbstractRedisChannelMessageListener<StringTextQueryChannelMessage> {@Resourceprivate StringTextService stringTextService;/*** 在 StringTextChannelConsumerListener 實例化時(由 Spring 容器管理)* 會調用父類 AbstractRedisChannelMessageListener 的構造函數* 當 Redis 通道 "string.text.query.channel" 收到消息時* Spring Data Redis 調用 StringTextChannelConsumerListener 的 onMessage(Message, byte[]) 方法(繼承自父類)* @param message 消息*/@Overridepublic void onMessage(StringTextQueryChannelMessage message) {log.info("[onMessage][Channel 消息: {},線程名: {}]", message, Thread.currentThread().getName());StringTextPageReqVO reqVO = new StringTextPageReqVO();reqVO.setText(message.getText());reqVO.setPageNo(message.getPageNo());reqVO.setPageSize(message.getPageSize());PageResult<StringTextDO> pageResult = stringTextService.getUserPage(reqVO);log.info("[onMessage][Channel 處理結果: {}]", pageResult);}
}
監聽器執行邏輯:
- Spring Data Redis 的調用:
- 當通道有消息時,Spring Data Redis 調用 MessageListener 的 onMessage(Message, byte[]),即 AbstractRedisChannelMessageListener 的實現。
- 父類到子類的調用:
- 父類的 onMessage(Message, byte[]) 處理消息反序列化和攔截器邏輯,然后通過 this.onMessage(T) 調用子類的 onMessage(StringTextQueryChannelMessage)。
- 子類沒有直接調用父類:
- 子類的 onMessage 是父類調用的結果,不是子類主動調用父類。
- 子類的 onMessage 只處理業務邏輯,依賴父類的預處理。
- 構造函數的作用:
- 初始化 messageType 和 channel,確保消息反序列化和通道訂閱正確,直接影響子類 onMessage 的執行。
這種設計通過模板方法模式實現了通用邏輯和業務邏輯的分離,父類控制流程,子類提供具體實現。
2.4 配置類
YudaoRabbitMQAutoConfiguration
/*** RabbitMQ 消息隊列配置類* 配置RabbitMQ的消息轉換器,使用Jackson進行JSON序列化* 當生產者發送消息時,將Java對象轉換為JSON格式的字節數組* 當消費者接收消息時,將JSON格式的消息轉換回Java對象*/
@AutoConfiguration
@Slf4j
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
public class YudaoRabbitMQAutoConfiguration {/*** Jackson2JsonMessageConverter Bean:使用 jackson 序列化消息*/@Beanpublic MessageConverter createMessageConverter() {return new Jackson2JsonMessageConverter();}
}
YudaoRabbitMQAutoConfiguration:配置 RabbitMQ 消息轉換器,使用 Jackson 進行 JSON 序列化。
YudaoRedisMQConsumerAutoConfiguration
/*** Redis 消息隊列 Consumer 配置類* redisMessageListenerContainer(): 創建Pub/Sub消息監聽容器* redisStreamMessageListenerContainer(): 創建Stream消息監聽容器* redisPendingMessageResendJob(): 創建待處理消息重發任務*/
@Slf4j
@EnableScheduling // 啟用定時任務,用于 RedisPendingMessageResendJob 重發消息
@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
public class YudaoRedisMQConsumerAutoConfiguration {/*** 創建 Redis Pub/Sub 廣播消費的容器*/@Bean@ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的時候,才需要注冊 Redis pubsub 監聽public RedisMessageListenerContainer redisMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners) {// 創建 RedisMessageListenerContainer 對象RedisMessageListenerContainer container = new RedisMessageListenerContainer();// 設置 RedisConnection 工廠。container.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory());// 添加監聽器listeners.forEach(listener -> {listener.setRedisMQTemplate(redisMQTemplate);container.addMessageListener(listener, new ChannelTopic(listener.getChannel()));log.info("[redisMessageListenerContainer][注冊 Channel({}) 對應的監聽器({})]",listener.getChannel(), listener.getClass().getName());});return container;}/*** 創建 Redis Stream 重新消費的任務*/@Bean@ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的時候,才需要注冊 Redis pubsub 監聽public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> listeners,RedisMQTemplate redisTemplate,@Value("${spring.application.name}") String groupName,RedissonClient redissonClient) {return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient);}/*** 創建 Redis Stream 集群消費的容器** 基礎知識:<a href="https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html">Redis Stream 的 xreadgroup 命令</a>*/@Bean(initMethod = "start", destroyMethod = "stop")@ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的時候,才需要注冊 Redis pubsub 監聽public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners) {RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate();checkRedisVersion(redisTemplate);// 第一步,創建 StreamMessageListenerContainer 容器// 創建 options 配置StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions =StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().batchSize(10) // 一次性最多拉取多少條消息.targetType(String.class) // 目標類型。統一使用 String,通過自己封裝的 AbstractStreamMessageListener 去反序列化.build();// 創建 container 對象StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions);// 第二步,注冊監聽器,消費對應的 Stream 主題String consumerName = buildConsumerName();listeners.parallelStream().forEach(listener -> {log.info("[redisStreamMessageListenerContainer][開始注冊 StreamKey({}) 對應的監聽器({})]",listener.getStreamKey(), listener.getClass().getName());// 創建 listener 對應的消費者分組try {redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());} catch (Exception ignore) {}// 設置 listener 對應的 redisTemplatelistener.setRedisMQTemplate(redisMQTemplate);// 創建 Consumer 對象Consumer consumer = Consumer.from(listener.getGroup(), consumerName);// 設置 Consumer 消費進度,以最小消費進度為準StreamOffset<String> streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed());// 設置 Consumer 監聽StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest.builder(streamOffset).consumer(consumer).autoAcknowledge(false) // 不自動 ack.cancelOnError(throwable -> false); // 默認配置,發生異常就取消消費,顯然不符合預期;因此,我們設置為 falsecontainer.register(builder.build(), listener);log.info("[redisStreamMessageListenerContainer][完成注冊 StreamKey({}) 對應的監聽器({})]",listener.getStreamKey(), listener.getClass().getName());});return container;}/*** 構建消費者名字,使用本地 IP + 進程編號的方式。* 參考自 RocketMQ clientId 的實現** @return 消費者名字*/private static String buildConsumerName() {return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID());}/*** 校驗 Redis 版本號,是否滿足最低的版本號要求!*/private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {// 獲得 Redis 版本Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);String version = MapUtil.getStr(info, "redis_version");// 校驗最低版本必須大于等于 5.0.0int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false));if (majorVersion < 5) {throw new IllegalStateException(StrUtil.format("您當前的 Redis 版本為 {},小于最低要求的 5.0.0 版本!" +"請參考 {} 文檔進行安裝。", version, DocumentEnum.REDIS_INSTALL.getUrl()));}}}
方法 1: redisMessageListenerContainer
- 作用:
- 創建并配置 RedisMessageListenerContainer,用于監聽 Redis Pub/Sub 通道消息。
- 注冊所有 AbstractRedisChannelMessageListener 子類的監聽器(如 StringTextChannelConsumerListener)。
方法 2: redisPendingMessageResendJob
作用:
- 創建 RedisPendingMessageResendJob Bean,定時重發 Redis Stream 的待處理(pending)消息。
方法 3: redisStreamMessageListenerContainer
- 作用:
- 創建并配置 StreamMessageListenerContainer,用于監聽 Redis Stream 消息。
- 注冊所有 AbstractRedisStreamMessageListener 子類的監聽器。
- 輸入參數:
- RedisMQTemplate redisMQTemplate:提供 Redis 連接和攔截器。
- List<AbstractRedisStreamMessageListener<?>> listeners:所有 Stream 監聽器。
- 輸出:StreamMessageListenerContainer<String, ObjectRecord<String, String>>,用于 Stream 消息消費。
- 實現邏輯:
- 檢查 Redis 版本:
- checkRedisVersion(redisTemplate); 確保 Redis 版本 ≥ 5.0.0(支持 Stream)。
- 創建容器:
- 配置 containerOptions:
- batchSize(10):每次拉取最多 10 條消息。
- targetType(String.class):消息體為 String 類型(JSON 字符串),由監聽器反序列化。
- 創建 StreamMessageListenerContainer。
- 配置 containerOptions:
- 注冊監聽器:
- 使用 parallelStream 并行處理監聽器,提高效率。
- 對每個監聽器:
- 創建消費者組(createGroup)。
- 注入 redisMQTemplate。
- 創建 Consumer(組名 + 消費者名)。
- 設置 StreamOffset(從最后消費位置讀取)。
- 注冊監聽器,配置手動 ACK 和錯誤處理。
- 日志記錄注冊過程。
- 檢查 Redis 版本:
2.5 RedisPendingMessageResendJob(定時任務類)
/*** 這個任務用于處理,crash 之后的消費者未消費完的消息* 重新分發信息(清理員)*/
@Slf4j
@AllArgsConstructor
public class RedisPendingMessageResendJob {private static final String LOCK_KEY = "redis:pending:msg:lock";/*** 消息超時時間,默認 5 分鐘** 1. 超時的消息才會被重新投遞* 2. 由于定時任務 1 分鐘一次,消息超時后不會被立即重投,極端情況下消息5分鐘過期后,再等 1 分鐘才會被掃瞄到*/private static final int EXPIRE_TIME = 5 * 60;private final List<AbstractRedisStreamMessageListener<?>> listeners;private final RedisMQTemplate redisTemplate;private final String groupName;private final RedissonClient redissonClient;/*** 一分鐘執行一次,這里選擇每分鐘的35秒執行,是為了避免整點任務過多的問題*/@Scheduled(cron = "35 * * * * ?")public void messageResend() {RLock lock = redissonClient.getLock(LOCK_KEY);// 嘗試加鎖if (lock.tryLock()) {try {execute();} catch (Exception ex) {log.error("[messageResend][執行異常]", ex);} finally {lock.unlock();}}}/*** 執行清理邏輯** @see <a href="https://gitee.com/zhijiantianya/ruoyi-vue-pro/pulls/480/files">討論</a>*/private void execute() {//redisTemplate.getRedisTemplate().opsForStream() 獲取 Redis Stream 的操作接口,用于執行 pending、range、add、acknowledge 等操作。StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream();//檢查每條傳送帶listeners.forEach(listener -> {//ops.pending(listener.getStreamKey(), groupName):查詢 Stream 和消費者組的 pending 消息概況。PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(ops.pending(listener.getStreamKey(), groupName));// 每個消費者的 pending 隊列消息數量(檢查每個傳送帶(Stream)上有哪些卡住的包裹(pending 消息))Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> {log.info("[processPendingMessage][消費者({}) 消息數量({})]", consumerName, pendingMessageCount);// 每個消費者的 pending消息的詳情信息,getStreamKey():返回 Stream 名稱(如 "string.text.query.stream"),pendingMessagesPerConsumer:Map,鍵是消費者名稱,值是 pending 消息數量PendingMessages pendingMessages = ops.pending(listener.getStreamKey(), Consumer.from(groupName, consumerName), Range.unbounded(), pendingMessageCount);if (pendingMessages.isEmpty()) {return;}pendingMessages.forEach(pendingMessage -> {// 獲取消息上一次傳遞到 consumer 的時間,long lastDelivery = pendingMessage.getElapsedTimeSinceLastDelivery().getSeconds();if (lastDelivery < EXPIRE_TIME){return;}// 獲取指定 id 的消息體(對于超時的包裹,清理員去傳送帶上找到它的具體內容(records),比如包裹里裝了啥(JSON 數據)。如果沒找到(比如包裹被刪了),就跳過)List<MapRecord<String, Object, Object>> records = ops.range(listener.getStreamKey(),Range.of(Range.Bound.inclusive(pendingMessage.getIdAsString()), Range.Bound.inclusive(pendingMessage.getIdAsString())));if (CollUtil.isEmpty(records)) {return;}// 重新投遞消息redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord().ofObject(records.get(0).getValue()) // 設置內容.withStreamKey(listener.getStreamKey()));// ack 消息消費完成redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, records.get(0));log.info("[processPendingMessage][消息({})重新投遞成功]", records.get(0).getId());});});});}
}
- RedisPendingMessageResendJob 是一個定時任務類,專門用于處理 Redis Stream 消息隊列中因消費者異常(如崩潰)而未確認(ACK)的待處理(pending)消息。
- 它通過定期掃描消費者組的 pending 消息,檢查消息是否超時(默認 5 分鐘),并將超時的消息重新投遞到 Stream,同時確認(ACK)原消息,以避免消息丟失或重復處理。
3. 總結
3.1?整體背景
想象你在一家快遞公司(你的系統)工作,Redis 消息隊列就像公司的兩條傳送帶:
- Pub/Sub 傳送帶(廣播模式):像廣播電臺,包裹(消息)發出去,所有聽眾(消費者)都能收到,但包裹不會保存。
- Stream 傳送帶(集群模式):像物流流水線,包裹有編號(消息 ID),可以保存、追蹤,多個快遞員(消費者)分工處理。
實現代碼CurdTestController 負責把包裹放上傳送帶,框架類代碼負責管理傳送帶、派送包裹、處理異常(比如包裹卡住)。下面,我會先介紹每個角色的作用(類分析),然后講一個包裹從發出到送達的故事(全過程)。
3.2?類結構與作用分析
3.2.1 框架的 Redis MQ 相關類
- YudaoRedisMQProducerAutoConfiguration
- 作用:像快遞公司的“生產設備管理員”,負責初始化消息發送工具(RedisMQTemplate)。
- 職責:
- 創建 RedisMQTemplate Bean,注入 StringRedisTemplate 和攔截器(RedisMessageInterceptor)。
- 配置攔截器,增強消息發送的擴展性(如日志、租戶處理)。
- 關鍵方法:
- redisMQTemplate:構造 RedisMQTemplate,添加攔截器。
- 使用場景:Spring 啟動時,自動配置 RedisMQTemplate,供生產者(如 StringTextRedisProducer)使用。
- YudaoRedisMQConsumerAutoConfiguration
- 作用:像“派送中心管理員”,負責配置 Pub/Sub 和 Stream 的消息監聽容器,以及異常包裹清理任務。
- 職責:
- 配置 RedisMessageListenerContainer:監聽 Pub/Sub 通道(如 string-text-query-channel)。
- 配置 StreamMessageListenerContainer:監聽 Stream 隊列(如 string-text-query-stream)。
- 創建 RedisPendingMessageResendJob:定時清理 Stream 的待處理(pending)包裹。
- 檢查 Redis 版本,確保支持 Stream(≥5.0.0)。
- 生成消費者名稱(如 192.168.1.1@1234)。
- 關鍵方法:
- redisMessageListenerContainer:注冊 Pub/Sub 監聽器。
- redisStreamMessageListenerContainer:注冊 Stream 監聽器,設置批處理(10 條/次)、手動 ACK。
- redisPendingMessageResendJob:創建定時任務。
- buildConsumerName:生成唯一消費者名稱。
- checkRedisVersion:驗證 Redis 版本。
- 使用場景:Spring 啟動時,自動配置消費者容器,監聽消息并處理異常。
- RedisMQTemplate
- 作用:像“傳送帶操作員”,負責把包裹放上 Pub/Sub 或 Stream 傳送帶,并支持攔截器擴展。
- 職責:
- 發送 Pub/Sub 消息:convertAndSend 到指定通道。
- 發送 Stream 消息:opsForStream().add 到指定 Stream,返回消息 ID。
- 執行攔截器:發送前后調用 sendMessageBefore/sendMessageAfter。
- 提供 opsForStream() 等接口,供定時任務(如 RedisPendingMessageResendJob)操作 Stream。
- 關鍵方法:
- send(T extends AbstractRedisChannelMessage):發送 Pub/Sub 消息。
- send(T extends AbstractRedisStreamMessage):發送 Stream 消息。
- addInterceptor:添加攔截器。
- 使用場景:被生產者(如 StringTextRedisProducer)調用發送消息,被定時任務調用操作 Stream。
- RedisMessageInterceptor
- 作用:像“包裹檢查員”,在消息發送或消費前后進行額外處理(如日志、租戶隔離)。
- 職責:
- 定義接口:sendMessageBefore/After、consumeMessageBefore/After。
- 默認空實現,允許自定義擴展。
- 使用場景:由 RedisMQTemplate 在消息發送/消費時調用,典型用于多租戶場景或日志記錄。
- RedisPendingMessageResendJob
- 作用:像“包裹清理員”,定時(每分鐘 35 秒)檢查 Stream 傳送帶上卡住的包裹(pending 消息),重新投遞超時的包裹。
- 職責:
- 使用分布式鎖(RedissonClient)確保單一實例執行。
- 掃描每個 Stream 的 pending 消息,檢查超時(5 分鐘)。
- 重新投遞超時消息,確認(ACK)原消息。
- 關鍵方法:
- messageResend:定時任務入口,加鎖調用 execute。
- execute:掃描 pending 消息,重新投遞并 ACK。
- 使用場景:消費者崩潰未確認消息時,定時重發確保消息不丟失。
- AbstractRedisMessage
- 作用:像“包裹模板”,定義消息的基本結構。
- 職責:
- 提供 headers(鍵值對),存儲元數據(如租戶 ID)。
- 抽象基類,供具體消息類繼承。
- 使用場景:被 AbstractRedisChannelMessage 和 AbstractRedisStreamMessage 繼承。
- AbstractRedisChannelMessage
- 作用:像“Pub/Sub 包裹模板”,定義 Pub/Sub 消息的結構。
- 職責:
- 提供 getChannel(),默認返回類名,子類可自定義通道(如 string-text-query-channel)。
- 忽略序列化通道名(@JsonIgnore)。
- 使用場景:被你的 StringTextQueryChannelMessage 繼承。
- AbstractRedisStreamMessage
- 作用:像“Stream 包裹模板”,定義 Stream 消息的結構。
- 職責:
- 提供 getStreamKey(),默認返回類名,子類可自定義 Stream(如 string-text-query-stream)。
- 忽略序列化 Stream 鍵。
- 使用場景:被你的 StringTextQueryStreamMessage 繼承。
- AbstractRedisChannelMessageListener<T>
- 作用:像“Pub/Sub 快遞員”,監聽 Pub/Sub 通道,處理消息。
- 職責:
- 自動獲取消息類型(messageType)和通道(channel)。
- 反序列化消息(JsonUtils.parseObject)。
- 執行攔截器(consumeMessageBefore/After)。
- 調用子類的 onMessage 處理消息。
- 關鍵方法:
- onMessage(Message, byte[]):處理原始 Redis 消息。
- onMessage(T):抽象方法,子類實現具體邏輯。
- 使用場景:被你的 StringTextChannelConsumerListener 繼承。
- AbstractRedisStreamMessageListener<T>
- 作用:像“Stream 快遞員”,監聽 Stream 隊列,處理消息。
- 職責:
- 自動獲取消息類型(messageType)、Stream 鍵(streamKey)、消費者組(group)。
- 反序列化消息,執行攔截器,調用子類 onMessage。
- 手動 ACK 消息(opsForStream().acknowledge)。
- 關鍵方法:
- onMessage(ObjectRecord):處理 Stream 消息。
- onMessage(T):抽象方法,子類實現。
- 使用場景:被你的 StringTextStreamConsumerListener 繼承。
3.2.2 實現定義類
- CurdTestController
- 作用:像“客戶服務中心”,接收用戶請求,觸發消息發送或異步查詢。
- 職責:
- 提供 REST 接口:
- /getStringText:異步查詢(StringTextService.getUserPageAsync)。
- /getStringTextByMQ:觸發 MQ 消息(未實現具體生產者)。
- /getStringTextByStream:發送 Stream 消息(StringTextRedisProducer.sendStreamMessage)。
- /getStringTextByChannel:發送 Pub/Sub 消息(StringTextRedisProducer.sendChannelMessage)。
- 記錄日志,處理異常。
- 提供 REST 接口:
- 使用場景:用戶通過 HTTP 請求觸發消息隊列或異步處理。
- StringTextRedisProducer
- 作用:像“包裹打包員”,創建并發送 Pub/Sub 和 Stream 消息。
- 職責:
- sendStreamMessage:發送 StringTextQueryStreamMessage 到 string-text-query-stream。
- sendChannelMessage:發送 StringTextQueryChannelMessage 到 string-text-query-channel。
- 使用 RedisMQTemplate 發送消息,記錄日志。
- 使用場景:被 CurdTestController 調用,發送查詢請求到消息隊列。
- StringTextChannelConsumerListener
- 作用:像“Pub/Sub 派送員”,監聽 string-text-query-channel,處理消息。
- 職責:
- 接收 StringTextQueryChannelMessage,轉換為 StringTextPageReqVO。
- 調用 StringTextService.getUserPage 查詢數據。
- 記錄處理日志。
- 使用場景:處理 Pub/Sub 廣播消息,適合實時通知場景。
- StringTextStreamConsumerListener
- 作用:像“Stream 派送員”,監聽 string-text-query-stream,處理消息。
- 職責:
- 接收 StringTextQueryStreamMessage,轉換為 StringTextPageReqVO。
- 調用 StringTextService.getUserPage 查詢數據。
- 手動 ACK 消息,記錄日志。
- 使用場景:處理 Stream 集群消息,適合持久化、可靠投遞場景。
- StringTextQueryChannelMessage
- 作用:像“Pub/Sub 包裹”,定義 Pub/Sub 消息結構。
- 職責:
- 包含 text、pageNo、pageSize,校驗非空。
- 指定通道 string-text-query-channel。
- 使用場景:由 StringTextRedisProducer 發送,StringTextChannelConsumerListener 消費。
- StringTextQueryStreamMessage
- 作用:像“Stream 包裹”,定義 Stream 消息結構。
- 職責:
- 包含 text、pageNo、pageSize,校驗非空。
- 指定 Stream string-text-query-stream。
- 使用場景:由 StringTextRedisProducer 發送,StringTextStreamConsumerListener 消費。
- StringTextServiceImpl
- 作用:像“數據倉庫”,提供數據查詢服務。
- 職責:
- getUserPage:同步查詢分頁數據。
- getUserPageAsync:異步查詢,使用線程池(curdAsyncExecutor)。
- 調用 StringTextMapper 訪問數據庫。
- 使用場景:被控制器和消費者調用,處理查詢邏輯。
3.3 文字圖總結
3.3.1 圖示總結
YudaoRedisMQProducerAutoConfiguration
└─ redisMQTemplate(StringRedisTemplate redisTemplate, List<RedisMessageInterceptor> interceptors)│ │ // 初始化消息發送工具,注入 Redis 模板和攔截器(默認空)│ ├─ Parameters:│ ├─ redisTemplate: StringRedisTemplate // Redis 操作模板│ └─ interceptors: List<RedisMessageInterceptor> // 攔截器列表(默認空)│ └─ Returns: RedisMQTemplate│ └─ RedisMQTemplate(redisTemplate)│ └─ addInterceptor(interceptor) // 添加攔截器(默認空,支持鏈式調用)
YudaoRedisMQConsumerAutoConfiguration
├─ checkRedisVersion(StringRedisTemplate redisTemplate)
│ │
│ │ // 校驗 Redis 版本 ≥ 5.0.0
│ │
│ ├─ Parameters: redisTemplate: StringRedisTemplate
│ └─ Returns: void
│
├─ redisMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners)
│ │
│ │ // 配置 Pub/Sub 監聽容器(基于 Redis 發布訂閱模式)
│ │
│ ├─ Parameters:
│ │ ├─ redisMQTemplate: RedisMQTemplate
│ │ └─ listeners: List<AbstractRedisChannelMessageListener<?>>
│ │ // 包含 StringTextChannelConsumerListener 實現類
│ │
│ └─ Returns: RedisMessageListenerContainer
│ │
│ └─ addMessageListener(StringTextChannelConsumerListener listener, ChannelTopic("string-text-query-channel"))
│ │
│ │ // 綁定監聽器到指定通道(channel)
│ │
│ ├─ Parameters:
│ │ ├─ listener: StringTextChannelConsumerListener
│ │ └─ topic: ChannelTopic("string-text-query-channel")
│ │ // 通道名稱固定為 string-text-query-channel
│ │
│ └─ StringTextChannelConsumerListener.setRedisMQTemplate(redisMQTemplate)
│ // 注入消息發送模板,用于消費時的響應邏輯
│
├─ redisStreamMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners)
│ │
│ │ // 配置 Stream 監聽容器(基于 Redis Stream 數據結構)
│ │
│ ├─ Parameters:
│ │ ├─ redisMQTemplate: RedisMQTemplate
│ │ └─ listeners: List<AbstractRedisStreamMessageListener<?>>
│ │ // 包含 StringTextStreamConsumerListener 實現類
│ │
│ └─ Returns: StreamMessageListenerContainer
│ │
│ └─ register(StreamReadRequest("string-text-query-stream", Consumer("my-app", "192.168.1.1@1234"), StringTextStreamConsumerListener listener))
│ │
│ │ // 綁定監聽器到指定 Stream(支持消費者組模式)
│ │
│ ├─ Parameters:
│ │ ├─ request: StreamReadRequest
│ │ │ // StreamKey: string-text-query-stream
│ │ │ // Consumer: my-app@192.168.1.1@1234(消費者組名+實例標識)
│ │ └─ listener: StringTextStreamConsumerListener
│ │
│ ├─ redisTemplate.opsForStream().createGroup("string-text-query-stream", "my-app")
│ │ // 自動創建消費者組(若不存在),組名與應用名一致
│ │
│ └─ StringTextStreamConsumerListener.setRedisMQTemplate(redisMQTemplate)
│ // 注入消息發送模板,用于消費時的響應邏輯
│
└─ redisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> listeners, RedisMQTemplate redisTemplate, String groupName, RedissonClient redissonClient)│ │ // 創建定時任務,處理 Stream 中未確認的 pending 消息│ // 防止消息堆積,保證至少一次投遞語義│ ├─ Parameters:│ ├─ listeners: List<AbstractRedisStreamMessageListener<?>> │ │ // 包含 StringTextStreamConsumerListener 實現類│ ├─ redisTemplate: RedisMQTemplate│ ├─ groupName: String // 默認使用 spring.application.name("my-app")│ └─ redissonClient: RedissonClient // 分布式鎖,保證任務單點執行│ └─ Returns: RedisPendingMessageResendJob// 定時任務會定期掃描 pending 消息并重新投遞
通過 Stream 發送消息的流程
CurdTestController
└─ getStringTextByStream(StringTextPageReqVO pageReqVO)│ │ // 處理 Stream 消息請求│ Parameters: pageReqVO: {text="example", pageNo=1, pageSize=10}│ Returns: "Stream 消息已發送,異步處理中"│ └─ StringTextRedisProducer.sendStreamMessage(text, pageNo, pageSize)│ │ // 發送 Stream 消息(參數:example, 1, 10)│ Returns: void│ └─ RedisMQTemplate.send(StringTextQueryStreamMessage message)│ │ // 發送到 Stream(消息內容包含 text/pageNo/pageSize)│ Returns: RecordId(如 "1234567890-0")│ ├─ sendMessageBefore(message)│ // 調用消息發送前的攔截器(默認無操作)│ ├─ JsonUtils.toJsonString(message)│ // 序列化消息為 JSON│ Returns: "{\"text\":\"example\",\"pageNo\":1,\"pageSize\":10}"│ ├─ redisTemplate.opsForStream().add(StreamRecords)│ // 添加到 Stream(StreamKey: string-text-query-stream)│ Parameters: JSON 字符串 + StreamKey│ Returns: RecordId(消息唯一標識)│ └─ sendMessageAfter(message)// 調用消息發送后的攔截器(默認無操作)
StreamMessageListenerContainer
└─ StringTextStreamConsumerListener.onMessage(ObjectRecord<String, String> message)│ │ // 處理 Stream 消息(參數:包含 StreamKey 和 JSON 內容的記錄)│ Returns: void│ ├─ JsonUtils.parseObject(message.getValue(), StringTextQueryStreamMessage.class)│ // 反序列化 JSON 為消息對象│ Returns: {text="example", pageNo=1, pageSize=10}│ ├─ consumeMessageBefore(messageObj)│ // 調用消息消費前的攔截器(默認無操作)│ ├─ onMessage(StringTextQueryStreamMessage message)│ // 子類自定義處理邏輯(由業務實現)│ Parameters: 反序列化后的消息對象│ Returns: void│ │ └─ StringTextService.getUserPage(StringTextPageReqVO reqVO)│ │ │ │ // 業務邏輯:查詢數據│ │ Parameters: {text="example", pageNo=1, pageSize=10}│ │ Returns: 分頁結果(包含查詢到的數據)│ │ │ └─ StringTextMapper.selectPage(reqVO)│ // 數據庫層查詢│ Returns: 從數據庫獲取的分頁結果│ ├─ redisMQTemplate.getRedisTemplate().opsForStream().acknowledge("my-app", message)│ // 關鍵步驟:確認消息已消費│ Parameters: 消費者組名 + 消息記錄│ Returns: void│ (若不確認,消息會被視為 pending 并由定時任務重新投遞)│ └─ consumeMessageAfter(messageObj)// 調用消息消費后的攔截器(默認無操作)
通過 Pub/Sub 發送消息的流程
CurdTestController
└─ getStringTextByChannel(StringTextPageReqVO pageReqVO)│ │ // 處理 Pub/Sub 消息請求│ Parameters: pageReqVO: {text="example", pageNo=1, pageSize=10}│ Returns: "Channel 消息已發送,異步處理中"│ └─ StringTextRedisProducer.sendChannelMessage(text, pageNo, pageSize)│ │ // 發送 Pub/Sub 消息(參數:example, 1, 10)│ Returns: void│ └─ RedisMQTemplate.send(StringTextQueryChannelMessage message)│ │ // 發送到通道(channel: string-text-query-channel)│ Returns: void│ ├─ sendMessageBefore(message)│ // 調用消息發送前的攔截器(默認無操作)│ ├─ JsonUtils.toJsonString(message)│ // 序列化消息為 JSON│ Returns: "{\"text\":\"example\",\"pageNo\":1,\"pageSize\":10}"│ ├─ redisTemplate.convertAndSend("string-text-query-channel", json)│ // 發布到 Redis 通道│ Parameters: 通道名 + JSON 字符串│ Returns: void(無返回值,Fire-and-Forget 模式)│ └─ sendMessageAfter(message)// 調用消息發送后的攔截器(默認無操作)
RedisMessageListenerContainer
└─ StringTextChannelConsumerListener.onMessage(Message message, byte[] bytes)│ │ // 處理 Pub/Sub 消息(參數:消息對象+通道字節數組)│ Returns: void(無返回值,Fire-and-Forget 模式)│ ├─ JsonUtils.parseObject(message.getBody(), StringTextQueryChannelMessage.class)│ // 反序列化 JSON 為消息對象│ Returns: {text="example", pageNo=1, pageSize=10}│ ├─ consumeMessageBefore(messageObj)│ // 調用消息消費前的攔截器(默認無操作)│ ├─ onMessage(StringTextQueryChannelMessage message)│ // 子類自定義處理邏輯(由業務實現)│ Parameters: 反序列化后的消息對象│ Returns: void│ │ └─ StringTextService.getUserPage(StringTextPageReqVO reqVO)│ │ │ │ // 業務邏輯:查詢數據│ │ Parameters: {text="example", pageNo=1, pageSize=10}│ │ Returns: 分頁結果(包含查詢到的數據)│ │ │ └─ StringTextMapper.selectPage(reqVO)│ // 數據庫層查詢│ Returns: 從數據庫獲取的分頁結果│ └─ consumeMessageAfter(messageObj)// 調用消息消費后的攔截器(默認無操作)
RedisPendingMessageResendJob 定時任務流程
3.3.2?類作用總結
- 框架類:
- YudaoRedisMQProducerAutoConfiguration:初始化生產工具。
- YudaoRedisMQConsumerAutoConfiguration:配置消費容器和定時任務。
- RedisMQTemplate:發送消息,操作 Stream.
- RedisMessageInterceptor:擴展點(未使用)。
- RedisPendingMessageResendJob:重發 pending 消息。
- AbstractRedisMessage:消息基類。
- AbstractRedisChannelMessage:Pub/Sub 消息模板。
- AbstractRedisStreamMessage:Stream 消息模板。
- AbstractRedisChannelMessageListener:Pub/Sub 消費者基類。
- AbstractRedisStreamMessageListener:Stream 消費者基類。
- 具體業務實現類:
- CurdTestController:觸發消息發送。
- StringTextRedisProducer:發送消息。
- StringTextChannelConsumerListener:處理 Pub/Sub 消息。
- StringTextStreamConsumerListener:處理 Stream 消息。
- StringTextQueryChannelMessage:Pub/Sub 消息。
- StringTextQueryStreamMessage:Stream 消息。
- StringTextServiceImpl:查詢數據。