java每日精進 6.11【消息隊列】

1.內存級Spring_Event

1.1 控制器層:StringTextController

/*** 字符串文本管理控制器* 提供通過消息隊列異步獲取文本信息的接口*/
@RestController
@RequestMapping("/api/string-text")
public class StringTextController {@Resourceprivate StringTextProducer stringTextProducer;/*** 通過消息隊列異步查詢字符串文本信息* * 流程:* 1. 接收前端查詢參數* 2. 封裝為消息對象發送到消息隊列* 3. 立即返回"消息已發送"響應,不等待實際處理結果* * 優點:* - 避免長耗時查詢阻塞HTTP連接* - 支持水平擴展處理能力* * @param pageReqVO 分頁查詢參數* @return 統一響應結果(僅包含消息發送狀態)*/@GetMapping("/getStringTextByMQ")@Operation(summary = "通過消息隊列獲取test信息")public CommonResult<String> getStringTextByMQ(@Validated @ModelAttribute StringTextPageReqVO pageReqVO) {log.info("控制器接收到MQ請求,線程名: {}", Thread.currentThread().getName());try {// 發送消息到隊列(異步處理)stringTextProducer.sendStringTextQueryMessage(pageReqVO.getText(), pageReqVO.getPageNo(), pageReqVO.getPageSize());return CommonResult.success("消息已發送,異步處理中");} catch (Exception e) {log.error("消息發送異常", e);return CommonResult.error(500, "消息發送失敗");}}
}

1.2?消息生產者:StringTextProducer

/*** 字符串文本查詢消息生產者* 負責將查詢請求封裝為消息并發送到消息隊列* * 設計模式:* - 使用Spring事件機制作為輕量級消息隊列* - 可無縫切換為Kafka/RocketMQ等真正的MQ系統*/
@Slf4j
@Component
public class StringTextProducer {@Resourceprivate ApplicationContext applicationContext;/*** 發送字符串查詢消息* * @param text 查詢文本(用于模糊匹配)* @param pageNo 頁碼* @param pageSize 每頁大小* * 消息傳遞機制:* 1. 創建StringTextQueryMessage對象* 2. 通過Spring事件發布器將消息廣播到事件總線* 3. 由StringTextConsumer監聽并處理該消息*/public void sendStringTextQueryMessage(String text, Integer pageNo, Integer pageSize) {StringTextQueryMessage message = new StringTextQueryMessage();message.setText(text);message.setPageNo(pageNo);message.setPageSize(pageSize);log.info("[sendStringTextQueryMessage][發送消息: {}]", message);// 使用Spring事件機制發布消息// 注意:此處使用同步事件,通過@Async注解在Consumer端實現異步applicationContext.publishEvent(message);}
}

1.3?消息消費者:StringTextConsumer

/*** 字符串文本查詢消息消費者* 負責從消息隊列接收查詢請求并執行實際查詢操作* * 線程模型:* - 使用@Async注解指定專用線程池"curdAsyncExecutor"* - 實現請求處理與HTTP請求線程分離*/
@Slf4j
@Component
public class StringTextConsumer {@Resourceprivate StringTextService stringTextService;/*** 處理字符串查詢消息* * @param message 查詢消息對象* * 執行流程:* 1. 從消息中提取查詢參數* 2. 調用Service層執行實際查詢* 3. 記錄查詢結果(可擴展為保存到結果表或通知前端)*/@EventListener@Async("curdAsyncExecutor") // 使用專用異步線程池public void onMessage(StringTextQueryMessage message) {log.info("[onMessage][接收消息: {},線程名: {}]", message, Thread.currentThread().getName());// 1. 將消息轉換為請求對象StringTextPageReqVO reqVO = new StringTextPageReqVO();reqVO.setText(message.getText());reqVO.setPageNo(message.getPageNo());reqVO.setPageSize(message.getPageSize());// 2. 執行實際查詢(可能是數據庫或外部系統)PageResult<StringTextDO> pageResult = stringTextService.getUserPage(reqVO);log.info("[onMessage][處理結果: {}]", pageResult);// 3. 可擴展邏輯:// - 將結果保存到臨時表// - 推送WebSocket通知前端結果就緒// - 觸發后續處理流程}
}

1.4?消息模型:StringTextQueryMessage

/*** 字符串文本查詢消息* 用于在生產者和消費者之間傳遞查詢參數* * 設計特點:* - 實現Serializable接口以便于消息傳輸* - 使用JSR-303注解進行參數校驗* - 字段與StringTextPageReqVO保持語義一致*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class StringTextQueryMessage implements Serializable {private static final long serialVersionUID = 20230610L;/*** 查詢的文本內容(用于模糊查詢)*/private String text;/*** 頁碼(從1開始)*/@NotNull(message = "頁碼不能為空")private Integer pageNo;/*** 每頁大小*/@NotNull(message = "每頁大小不能為空")private Integer pageSize;
}

客戶端 → HTTP請求 → StringTextController → StringTextProducer → 消息隊列 → StringTextConsumer → StringTextService → 數據庫查詢

1.5 問題提出

1.5.1 生產者如何找到消費者?

  • StringTextProducer 通過 applicationContext.publishEvent 發布 StringTextQueryMessage 事件。
  • Spring 容器根據事件類型(StringTextQueryMessage)將事件分發給 StringTextConsumer 的 @EventListener 方法。
  • 匹配基于 Java 類型系統,參數類型必須兼容事件類型。


1.5.2 假如添加多個新的接收參數為StringTextQueryMessage的消費者,還有幾個是接收的參數是StringTextQueryMessage及其子類,會全部接收到還是會匹配精度最高的?

  • 事件類型匹配
    • Spring 根據事件對象的 運行時類型(StringTextQueryMessage 或其子類)查找所有 @EventListener 方法,檢查其參數類型是否與事件類型兼容。
    • 兼容的規則是:監聽方法的參數類型可以是事件類型的 類本身超類接口
    • 例如,如果發布的事件是 StringTextQueryMessage,所有參數類型為 StringTextQueryMessage、其超類(如 Object)或接口的 @EventListener 方法都會被調用。
  • 多消費者行為
    • Spring Event 不基于“精度最高”選擇單個消費者,而是 廣播 事件給所有匹配的監聽器。
    • 如果有多個消費者監聽 StringTextQueryMessage 或其子類,Spring 會按順序調用所有匹配的 @EventListener 方法。
  • 執行順序
    • 默認情況下,Spring 不保證 @EventListener 方法的調用順序。
    • 可以通過 @Order 注解或實現 Ordered 接口指定執行順序(較低的 order 值優先執行)。
  • 異步性
    • 如果 @EventListener 方法標注了 @Async(如您的 StringTextConsumer),每個消費者的處理會在異步線程中執行,互不阻塞。

1.5.3以上情況能不能指定一個consumer匹配?

方法 1:使用條件注解(@EventListener(condition = ...))
  • 在 @EventListener 中添加 condition 屬性,通過 SpEL(Spring Expression Language)過濾事件。
  • 示例:修改 StringTextConsumer1 和 StringTextConsumer2,添加條件:
  • @EventListener(condition = "#message.text == 'specific'")

  • 如果 message.text == "specific",只有 Consumer1 處理。

方法 2:使用自定義事件路由
  • 引入一個事件路由機制,通過事件對象的額外屬性指定目標消費者。
  • 修改 StringTextQueryMessage,添加 consumerId 字段:
  • @Data

    public class StringTextQueryMessage {

    ? ? private String text;

    ? ? @NotNull(message = "頁碼不能為空")

    ? ? private Integer pageNo;

    ? ? @NotNull(message = "每頁大小不能為空")

    ? ? private Integer pageSize;

    ? ? private String consumerId; // 新增:指定目標消費者

    }

  • @Slf4j

    @Component

    public class StringTextConsumer1 {

    ? ? @Resource

    ? ? private StringTextService stringTextService;

    ? ? @EventListener

    ? ? @Async("curdAsyncExecutor")

    ? ? public void onMessage(StringTextQueryMessage message) {

    ? ? ? ? if (!"consumer1".equals(message.getConsumerId())) {

    ? ? ? ? ? ? return; // 忽略不匹配的 consumerId

    ? ? ? ? }

    ? ? ? ? log.info("[Consumer1][接收消息: {},線程名: {}]", message, Thread.currentThread().getName());

    ? ? ? ? StringTextPageReqVO reqVO = new StringTextPageReqVO();

    ? ? ? ? reqVO.setText(message.getText());

    ? ? ? ? reqVO.setPageNo(message.getPageNo());

    ? ? ? ? reqVO.setPageSize(message.getPageSize());

    ? ? ? ? PageResult<StringTextDO> pageResult = stringTextService.getUserPage(reqVO);

    ? ? ? ? log.info("[Consumer1][處理結果: {}]", pageResult);

    ? ? }

    }

    @Slf4j

    @Component

    public class StringTextConsumer2 {

    ? ? @EventListener

    ? ? @Async("curdAsyncExecutor")

    ? ? public void onMessage(StringTextQueryMessage message) {

    ? ? ? ? if (!"consumer2".equals(message.getConsumerId())) {

    ? ? ? ? ? ? return;

    ? ? ? ? }

    ? ? ? ? log.info("[Consumer2][接收消息: {},線程名: {}]", message, Thread.currentThread().getName());

    ? ? }

    }

方法 3:自定義事件分發器
  • 重寫 Spring 的事件分發邏輯,使用自定義 ApplicationEventMulticaster:
    • 創建自定義 ApplicationEventMulticaster:
    • ? ? @Component("applicationEventMulticaster")

      ? ? public class CustomEventMulticaster extends SimpleApplicationEventMulticaster {

      ? ? ? ? @Override

      ? ? ? ? public void multicastEvent(ApplicationEvent event) {

      ? ? ? ? ? ? if (event instanceof StringTextQueryMessage) {

      ? ? ? ? ? ? ? ? StringTextQueryMessage message = (StringTextQueryMessage) event;

      ? ? ? ? ? ? ? ? // 根據條件選擇特定消費者,例如 consumerId

      ? ? ? ? ? ? ? ? getApplicationListeners(event).stream()

      ? ? ? ? ? ? ? ? ? ? ? ? .filter(listener -> {

      ? ? ? ? ? ? ? ? ? ? ? ? ? ? // 假設 listener 的方法名或類名包含 consumerId

      ? ? ? ? ? ? ? ? ? ? ? ? ? ? return listener.getListenerId().contains(message.getConsumerId());

      ? ? ? ? ? ? ? ? ? ? ? ? })

      ? ? ? ? ? ? ? ? ? ? ? ? .forEach(listener -> listener.onApplicationEvent(event));

      ? ? ? ? ? ? } else {

      ? ? ? ? ? ? ? ? super.multicastEvent(event);

      ? ? ? ? ? ? }

      ? ? ? ? }

      ? ? }

執行結果如下:

2025-06-10 10:39:03.349 |  INFO 16540 | http-nio-48080-exec-7 [TID: N/A] c.i.m.f.a.c.i.ApiAccessLogInterceptor    | [preHandle][開始請求 URL(/admin-api/curd/stringtext/getStringTextByMQ) 參數({text=username})]Controller 方法路徑:cn.iocoder.moyun.module.curd.controller.admin.CurdTestController(CurdTestController.java:57)
2025-06-10 10:39:03.355 |  INFO 16540 | http-nio-48080-exec-7 [TID: N/A] c.i.m.m.c.c.admin.CurdTestController     | 控制器接收到MQ請求,線程名: http-nio-48080-exec-7
2025-06-10 10:39:03.355 |  INFO 16540 | http-nio-48080-exec-7 [TID: N/A] c.i.m.m.c.m.p.sms.StringTextProducer     | [sendStringTextQueryMessage][發送消息: StringTextQueryMessage(text=username, pageNo=1, pageSize=10)]
2025-06-10 10:39:03.357 |  INFO 16540 | http-nio-48080-exec-7 [TID: N/A] c.i.m.f.a.c.i.ApiAccessLogInterceptor    | [afterCompletion][完成請求 URL(/admin-api/curd/stringtext/getStringTextByMQ) 耗時(6 ms)]
2025-06-10 10:39:03.358 |  INFO 16540 | curd-async-2 [TID: N/A] c.i.m.m.c.m.c.sms.StringTextConsumer     | [onMessage][接收消息: StringTextQueryMessage(text=username, pageNo=1, pageSize=10),線程名: curd-async-2]
2025-06-10 10:39:03.374 |  INFO 16540 | curd-async-2 [TID: N/A] c.i.m.m.c.s.s.StringTextServiceImpl      | 同步查詢結果: PageResult(list=[StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-03T10:00, updateTime=null, creator=system, updater=null, deleted=false), tenantId=1), id=3, text={"username":"user002","nickname":"臨時用戶","password":"xyz789","status":1}, loginIp=192.168.1.102, loginDate=2023-05-03T10:20), StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-02T09:00, updateTime=2023-05-03T11:30, creator=system, updater=admin, deleted=false), tenantId=1), id=2, text={"username":"user001","nickname":"測試用戶","password":"abc123","email":"test@example.com","mobile":"13800138001"}, loginIp=192.168.1.101, loginDate=2023-05-02T09:15), StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-01T08:00, updateTime=2023-05-05T15:00, creator=system, updater=admin, deleted=false), tenantId=1), id=1, text={"username":"admin","nickname":"管理員","password":"123456","dept_id":1001,"post_ids":[101,102]}, loginIp=192.168.1.100, loginDate=2023-05-01T08:30)], total=3)
2025-06-10 10:39:03.375 |  INFO 16540 | curd-async-2 [TID: N/A] c.i.m.m.c.m.c.sms.StringTextConsumer     | [onMessage][處理結果: PageResult(list=[StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-03T10:00, updateTime=null, creator=system, updater=null, deleted=false), tenantId=1), id=3, text={"username":"user002","nickname":"臨時用戶","password":"xyz789","status":1}, loginIp=192.168.1.102, loginDate=2023-05-03T10:20), StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-02T09:00, updateTime=2023-05-03T11:30, creator=system, updater=admin, deleted=false), tenantId=1), id=2, text={"username":"user001","nickname":"測試用戶","password":"abc123","email":"test@example.com","mobile":"13800138001"}, loginIp=192.168.1.101, loginDate=2023-05-02T09:15), StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-01T08:00, updateTime=2023-05-05T15:00, creator=system, updater=admin, deleted=false), tenantId=1), id=1, text={"username":"admin","nickname":"管理員","password":"123456","dept_id":1001,"post_ids":[101,102]}, loginIp=192.168.1.100, loginDate=2023-05-01T08:30)], total=3)]

2. Redis消息隊列

第一部分:Redis完成MQ的詳細過程梳理

Yudao項目利用Redis實現了兩種消息隊列機制:

  • Redis Stream:用于集群消費,消息持久化存儲,支持消費者組,適合需要可靠性和消息追蹤的場景。
  • Redis Pub/Sub(Channel):用于廣播消費,消息不持久化,適合實時性要求高的場景,所有訂閱者都會收到消息。

我將模擬一個完整的消息發送和消費過程,分別講解Stream和Channel的實現,詳細說明每個類的作用、注冊方式以及它們如何協作。

2.1 Redis Stream消息隊列的完整過程

場景:通過/getStringTextByStream接口發送Stream消息

假設用戶通過HTTP請求調用/getStringTextByStream接口,傳入StringTextPageReqVO對象(包含text、pageNo、pageSize字段)。以下是消息從發送到消費的詳細流程:


步驟1:接收HTTP請求(Controller層)
  • 相關類:@GetMapping("/getStringTextByStream")(未顯式定義類名,假設為StringTextController)
    • 作用:這是一個Spring MVC控制器方法,負責接收前端的HTTP GET請求,處理通過Redis Stream發送消息的邏輯。
    • 功能
      • 接收StringTextPageReqVO參數,包含分頁查詢的信息(如text、pageNo、pageSize)。
      • 調用StringTextRedisProducer的sendStreamMessage方法發送消息。
      • 返回CommonResult表示消息已發送,異步處理中。
    • 框架注冊
      • 通過Spring的@GetMapping注解,自動注冊為一個HTTP端點。
      • 依賴Spring Boot的Web模塊,控制器類通常標注@RestController,由Spring容器管理。
    • 與其他類的交互
      • 依賴StringTextRedisProducer來發送Stream消息。
      • 使用StringTextPageReqVO作為請求參數的VO(Value Object)。
  • 代碼分析
@GetMapping("/getStringTextByStream")
public CommonResult<String> getStringTextByStream(@Validated @ModelAttribute StringTextPageReqVO pageReqVO) {log.info("控制器接收到 Stream 請求,線程名: {}", Thread.currentThread().getName());try {stringTextRedisProducer.sendStreamMessage(pageReqVO.getText(), pageReqVO.getPageNo(), pageReqVO.getPageSize());return CommonResult.success("Stream 消息已發送,異步處理中");} catch (Exception e) {log.error("Stream 消息發送異常", e);return CommonResult.error(500, "Stream 消息發送失敗");}
}
  • 控制器接收到請求后,提取pageReqVO中的字段,傳遞給StringTextRedisProducer。
  • 異常處理確保即使發送失敗,也能返回錯誤響應。
步驟2:生產者發送Stream消息
  • 相關類:StringTextRedisProducer
    • 作用:這是一個生產者類,負責創建并發送Redis Stream消息。
    • 功能
      • 提供sendStreamMessage方法,構造StringTextQueryStreamMessage對象,設置text、pageNo、pageSize字段。
      • 使用RedisMQTemplate的send方法將消息發送到Redis Stream。
      • 記錄日志,跟蹤消息發送情況。
    • 框架注冊
      • 標注@Component,由Spring容器管理。
      • 通過@Resource注入RedisMQTemplate依賴。
    • 與其他類的交互
      • 依賴RedisMQTemplate執行實際的Redis Stream消息發送。
      • 使用StringTextQueryStreamMessage作為消息載體。
      • 被StringTextController調用。
  • 代碼分析
@Component
public class StringTextRedisProducer {@Resourceprivate RedisMQTemplate redisMQTemplate;public void sendStreamMessage(String text, Integer pageNo, Integer pageSize) {StringTextQueryStreamMessage message = new StringTextQueryStreamMessage();message.setText(text);message.setPageNo(pageNo);message.setPageSize(pageSize);redisMQTemplate.send(message);log.info("[sendStreamMessage][發送 Stream 消息: {}]", message);}
}
  • 創建StringTextQueryStreamMessage對象,填充請求參數。
  • 調用redisMQTemplate.send(message),將消息發送到Redis Stream。
  • 相關類:StringTextQueryStreamMessage
    • 作用:Stream消息的載體,定義消息的結構和Stream Key。
    • 功能
      • 繼承AbstractRedisStreamMessage,包含text、pageNo、pageSize字段。
      • 重寫getStreamKey(),指定Stream的鍵為"string-text-query-stream"。
    • 框架注冊
      • 作為普通Java類,無需Spring注冊,直接由StringTextRedisProducer實例化。
    • 與其他類的交互
      • 被StringTextRedisProducer用于封裝消息內容。
      • 被StringTextStreamConsumer用于解析消息內容。
  • 代碼分析
@Data
public class StringTextQueryStreamMessage extends AbstractRedisStreamMessage {private String text;@NotNull(message = "頁碼不能為空")private Integer pageNo;@NotNull(message = "每頁大小不能為空")private Integer pageSize;@Overridepublic String getStreamKey() {return "string-text-query-stream";}
}
  • 定義消息結構,確保pageNo和pageSize不為空。
  • 指定Stream Key,用于Redis Stream的消息存儲。
步驟3:RedisMQTemplate發送消息
  • 相關類:RedisMQTemplate
    • 作用:Redis MQ操作的核心模板類,封裝了Redis Stream和Pub/Sub的消息發送邏輯。
    • 功能
      • 提供send方法,針對Stream消息,將消息序列化為JSON并添加到指定的Stream Key。
      • 支持攔截器機制,在消息發送前后調用RedisMessageInterceptor的鉤子方法。
      • 使用RedisTemplate執行底層的Redis操作。
    • 框架注冊
      • 通過YudaoRedisMQProducerAutoConfiguration類注冊為Spring Bean。
      • 依賴StringRedisTemplate和RedisMessageInterceptor列表。
    • 與其他類的交互
      • 被StringTextRedisProducer調用,用于發送Stream消息。
      • 依賴RedisTemplate執行Redis操作。
      • 調用RedisMessageInterceptor處理消息發送前后的擴展邏輯。
      • 被StringTextStreamConsumer用于ACK消息。
  • 代碼分析
public class RedisMQTemplate {private final RedisTemplate<String, ?> redisTemplate;private final List<RedisMessageInterceptor> interceptors = new ArrayList<>();public <T extends AbstractRedisStreamMessage> RecordId send(T message) {try {sendMessageBefore(message);return redisTemplate.opsForStream().add(StreamRecords.newRecord().ofObject(JsonUtils.toJsonString(message)).withStreamKey(message.getStreamKey()));} finally {sendMessageAfter(message);}}
}
  • 調用sendMessageBefore執行攔截器前置邏輯。
  • 使用redisTemplate.opsForStream().add將消息添加到Stream,返回RecordId。
  • 調用sendMessageAfter執行攔截器后置邏輯。
  • 相關類:YudaoRedisMQProducerAutoConfiguration
    • 作用:生產者配置類,負責注冊RedisMQTemplate。
    • 功能
      • 創建RedisMQTemplate實例,注入StringRedisTemplate和攔截器列表。
      • 確保在YudaoRedisAutoConfiguration之后加載(依賴Redis配置)。
    • 框架注冊
      • 標注@AutoConfiguration,由Spring Boot自動加載。
      • 使用@Bean注冊RedisMQTemplate。
    • 與其他類的交互
      • 為StringTextRedisProducer和StringTextStreamConsumer提供RedisMQTemplate。
      • 依賴StringRedisTemplate和RedisMessageInterceptor。
  • 代碼分析
@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
public class YudaoRedisMQProducerAutoConfiguration {@Beanpublic RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,List<RedisMessageInterceptor> interceptors) {RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate);interceptors.forEach(redisMQTemplate::addInterceptor);return redisMQTemplate;}
}

相關類:RedisMessageInterceptor

  • 作用:消息攔截器接口,提供發送和消費消息前后的擴展點。
  • 功能
    • 定義sendMessageBefore、sendMessageAfter、consumeMessageBefore、consumeMessageAfter方法。
    • 適用于多租戶場景或其他擴展需求(如日志記錄、權限檢查)。
  • 框架注冊
    • 實現類需標注@Component,由Spring容器管理。
    • 由YudaoRedisMQProducerAutoConfiguration注入到RedisMQTemplate。
  • 與其他類的交互
    • 被RedisMQTemplate調用,用于消息處理擴展。
步驟4:消費者監聽和處理Stream消息
  • 相關類:StringTextStreamConsumer
    • 作用:Stream消息的消費者,負責處理收到的StringTextQueryStreamMessage消息。
    • 功能
      • 繼承AbstractRedisStreamMessageListener,實現onMessage方法。
      • 將消息轉換為StringTextPageReqVO,調用StringTextService執行分頁查詢。
      • 記錄處理結果日志。
    • 框架注冊
      • 標注@Component,由Spring容器管理。
      • 通過YudaoRedisMQConsumerAutoConfiguration注冊到StreamMessageListenerContainer。
    • 與其他類的交互
      • 依賴StringTextService執行業務邏輯。
      • 依賴RedisMQTemplate進行消息ACK。
      • 接收StringTextQueryStreamMessage消息。
  • 代碼分析
@Component
public class StringTextStreamConsumer extends AbstractRedisStreamMessageListener<StringTextQueryStreamMessage> {@Resourceprivate StringTextService stringTextService;@Overridepublic void onMessage(StringTextQueryStreamMessage message) {log.info("[onMessage][Stream 消息: {},線程名: {}]", message, Thread.currentThread().getName());StringTextPageReqVO reqVO = new StringTextPageReqVO();reqVO.setText(message.getText());reqVO.setPageNo(message.getPageNo());reqVO.setPageSize(message.getPageSize());PageResult<StringTextDO> pageResult = stringTextService.getUserPage(reqVO);log.info("[onMessage][Stream 處理結果: {}]", pageResult);}
}
  • 相關類:AbstractRedisStreamMessageListener
    • 作用:Stream消息監聽器的抽象基類,封裝通用消費邏輯。
    • 功能
      • 實現StreamListener接口,處理Redis Stream消息。
      • 解析消息為指定類型(StringTextQueryStreamMessage)。
      • 調用攔截器前置/后置邏輯,執行消息ACK。
    • 框架注冊
      • 作為抽象類,不直接注冊,其子類(如StringTextStreamConsumer)注冊。
    • 與其他類的交互
      • 被StringTextStreamConsumer繼承。
      • 依賴RedisMQTemplate進行ACK。
      • 被YudaoRedisMQConsumerAutoConfiguration用于注冊監聽器。
  • 相關類:YudaoRedisMQConsumerAutoConfiguration
    • 作用:消費者配置類,負責注冊Stream和Pub/Sub的監聽容器。
    • 功能
      • 創建StreamMessageListenerContainer,配置Stream監聽。
      • 注冊StringTextStreamConsumer到容器,綁定Stream Key和消費者組。
      • 創建RedisPendingMessageResendJob處理未消費消息。
    • 框架注冊
      • 標注@AutoConfiguration,在YudaoRedisAutoConfiguration之后加載。
      • 使用@Bean注冊StreamMessageListenerContainer和RedisPendingMessageResendJob。
    • 與其他類的交互
      • 依賴RedisMQTemplate和StringTextStreamConsumer。
      • 為RedisPendingMessageResendJob提供監聽器列表。
  • 代碼分析
@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
public class YudaoRedisMQConsumerAutoConfiguration {@Beanpublic StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners) {// 配置容器,注冊監聽器}
}

相關類:RedisPendingMessageResendJob

  • 作用:定時任務,重新投遞Stream中未消費的超時消息。
  • 功能
    • 每分鐘檢查Pending消息,超時(默認5分鐘)后重新投遞。
    • 使用分布式鎖(Redisson)確保單實例執行。
  • 框架注冊
    • 由YudaoRedisMQConsumerAutoConfiguration注冊為Bean。
    • 標注@Scheduled啟用定時任務。
  • 與其他類的交互
    • 依賴RedisMQTemplate和StringTextStreamConsumer的監聽器列表。
    • 使用RedissonClient實現分布式鎖。
步驟5:業務邏輯處理
  • 相關類:StringTextService
    • 作用:業務服務層,執行分頁查詢邏輯。
    • 功能
      • 提供getUserPage方法,根據StringTextPageReqVO查詢分頁數據。
      • 返回PageResult<StringTextDO>。
    • 框架注冊
      • 通常標注@Service,由Spring容器管理。
    • 與其他類的交互
      • 被StringTextStreamConsumer調用。
  • 相關類:StringTextPageReqVO
    • 作用:分頁請求的VO,定義查詢參數。
    • 功能
      • 包含text、pageNo、pageSize等字段。
      • 繼承PageParam,支持分頁參數。
    • 框架注冊
      • 作為普通Java類,無需注冊。
    • 與其他類的交互
      • 被StringTextController接收前端參數。
      • 被StringTextStreamConsumer用于構造查詢參數。

第二部分:具體類解析

2.2 RedisMQTemplate類
/*** Redis MQ 操作模板類* 封裝了基于Redis的兩種消息模型的發送操作:* 1. Pub/Sub模式:發布訂閱模式,適合廣播消息* 2. Stream模式:消息流模式,適合需要消息順序和可靠性的場景* 同時提供了消息攔截器機制,允許在消息發送前后執行自定義邏輯*/
@AllArgsConstructor
public class RedisMQTemplate {@Getterprivate final RedisTemplate<String, ?> redisTemplate;/*** 攔截器數組* 用于在消息發送前后執行自定義邏輯,如日志記錄、指標收集、分布式追蹤等*/@Getterprivate final List<RedisMessageInterceptor> interceptors = new ArrayList<>();/*** 發送 Redis 消息,基于 Redis pub/sub 實現* * @param message 繼承自AbstractRedisChannelMessage的消息對象* @return void 無返回值* @功能 基于Redis的Pub/Sub模式發布消息,在發送前后分別執行攔截器的前置和后置處理*       使用JSON序列化消息內容,通過message.getChannel()獲取消息發布的頻道名稱*/public <T extends AbstractRedisChannelMessage> void send(T message) {try {sendMessageBefore(message);// 發送消息redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message));} finally {sendMessageAfter(message);}}/*** 發送 Redis 消息,基于 Redis Stream 實現* * @param message 繼承自AbstractRedisStreamMessage的消息對象* @return RecordId Redis Stream生成的消息ID,格式如"1686475200000-0"* @功能 基于Redis的Stream模式發送消息,在發送前后分別執行攔截器的前置和后置處理*       使用JSON序列化消息內容,通過message.getStreamKey()獲取Stream的鍵名*/public <T extends AbstractRedisStreamMessage> RecordId send(T message) {try {sendMessageBefore(message);// 發送消息return redisTemplate.opsForStream().add(StreamRecords.newRecord().ofObject(JsonUtils.toJsonString(message)) // 設置內容.withStreamKey(message.getStreamKey())); // 設置 stream key} finally {sendMessageAfter(message);}}/*** 添加攔截器* * @param interceptor 消息攔截器對象* @return void 無返回值* @功能 向攔截器列表中添加一個消息攔截器,攔截器將在消息發送前后執行自定義邏輯*/public void addInterceptor(RedisMessageInterceptor interceptor) {interceptors.add(interceptor);}/*** 消息發送前執行攔截器邏輯* * @param message 消息對象* @return void 無返回值* @功能 在消息發送前調用所有攔截器的前置處理方法,按攔截器添加的順序執行(正序)*/private void sendMessageBefore(AbstractRedisMessage message) {// 正序執行攔截器的前置處理方法interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message));}/*** 消息發送后執行攔截器邏輯* * @param message 消息對象* @return void 無返回值* @功能 在消息發送后調用所有攔截器的后置處理方法,按攔截器添加順序的逆序執行(倒序)*/private void sendMessageAfter(AbstractRedisMessage message) {// 倒序執行攔截器的后置處理方法for (int i = interceptors.size() - 1; i >= 0; i--) {interceptors.get(i).sendMessageAfter(message);}}}
2.3 消息父類及其子類
AbstractRedisChannelMessageListener自定義監聽類父類
/*** Redis Pub/Sub 監聽器抽象類,用于實現廣播消費** @param <T> 消息類型。一定要填寫噢,不然會報錯*/
public abstract class AbstractRedisChannelMessageListener<T extends AbstractRedisChannelMessage> implements MessageListener {/*** 消息類型*/private final Class<T> messageType;/*** Redis Channel*/private final String channel;/*** RedisMQTemplate*/@Setterprivate RedisMQTemplate redisMQTemplate;/*** messageType 在 onMessage 方法中用于將 Redis 收到的消息(JSON 字符串)反序列化為 T 類型的對象* 通過 JsonUtils.parseObject(message.getBody(), messageType),明確的消息類型確保了正確的反序列化* channel 字段用于指定監聽器訂閱的 Redis Pub/Sub 通道* getChannel() 方法允許每個消息類型定義自己的通道,使監聽器能夠靈活適應不同的消息類型和通道* messageType 確定了后續消息反序列化的目標類型,確保消息被解析為 StringTextQueryChannelMessage。* channel 確定了監聽器訂閱的 Redis 通道,Spring Data Redis 會使用 getChannel() 的返回值("string.text.query.channel")訂閱該通道。* @SneakyThrows 隱藏了反射相關的異常(如 NoSuchMethodException),假設 StringTextQueryChannelMessage 有無參構造函數且 getChannel() 可訪問。*/@SneakyThrowsprotected AbstractRedisChannelMessageListener() {//通過反射獲取子類的泛型參數this.messageType = getMessageClass();/*** 具體案例:* StringTextQueryChannelMessage 的無參構造函數* 調用 newInstance() 創建一個 StringTextQueryChannelMessage 實例。* 調用實例的 getChannel() 方法,獲取通道名稱(如 "string.text.query.channel")*/this.channel = messageType.getDeclaredConstructor().newInstance().getChannel();}/*** 獲得 Sub 訂閱的 Redis Channel 通道** @return channel*/public final String getChannel() {return channel;}/*** Message message:Redis 傳遞的原始消息對象,包含消息體(message.getBody())和通道信息* T messageObj = JsonUtils.parseObject(message.getBody(), messageType)* 使用 JsonUtils.parseObject 將消息體(message.getBody(),字節數組)反序列化為 T 類型的對象* messageType 是構造函數中初始化的消息類(Class<T>),確保消息被正確解析為指定類型(如 OrderMessage)* 調用 this.onMessage(messageObj),這是抽象方法,子類必須實現以定義具體的消息處理* messageType 是 StringTextQueryChannelMessage.class(由構造函數設置)。* 具體案例:* JsonUtils.parseObject(message.getBody(), messageType)* 將 JSON 字節數組解析為 StringTextQueryChannelMessage 對象:* @param message* @param bytes*/@Overridepublic final void onMessage(Message message, byte[] bytes) {T messageObj = JsonUtils.parseObject(message.getBody(), messageType);try {consumeMessageBefore(messageObj);// 消費消息this.onMessage(messageObj);} finally {consumeMessageAfter(messageObj);}}/*** 處理消息** @param message 消息*/public abstract void onMessage(T message);/*** 通過解析類上的泛型,獲得消息類型** @return 消息類型*/@SuppressWarnings("unchecked")private Class<T> getMessageClass() {Type type = TypeUtil.getTypeArgument(getClass(), 0);if (type == null) {throw new IllegalStateException(String.format("類型(%s) 需要設置消息類型", getClass().getName()));}return (Class<T>) type;}private void consumeMessageBefore(AbstractRedisMessage message) {assert redisMQTemplate != null;List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();// 正序interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));}private void consumeMessageAfter(AbstractRedisMessage message) {assert redisMQTemplate != null;List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();// 倒序for (int i = interceptors.size() - 1; i >= 0; i--) {interceptors.get(i).consumeMessageAfter(message);}}
StringTextChannelConsumerListener自定義監聽器實現類
@Slf4j
@Component
public class StringTextChannelConsumerListener extends AbstractRedisChannelMessageListener<StringTextQueryChannelMessage> {@Resourceprivate StringTextService stringTextService;/*** 在 StringTextChannelConsumerListener 實例化時(由 Spring 容器管理)* 會調用父類 AbstractRedisChannelMessageListener 的構造函數* 當 Redis 通道 "string.text.query.channel" 收到消息時* Spring Data Redis 調用 StringTextChannelConsumerListener 的 onMessage(Message, byte[]) 方法(繼承自父類)* @param message 消息*/@Overridepublic void onMessage(StringTextQueryChannelMessage message) {log.info("[onMessage][Channel 消息: {},線程名: {}]", message, Thread.currentThread().getName());StringTextPageReqVO reqVO = new StringTextPageReqVO();reqVO.setText(message.getText());reqVO.setPageNo(message.getPageNo());reqVO.setPageSize(message.getPageSize());PageResult<StringTextDO> pageResult = stringTextService.getUserPage(reqVO);log.info("[onMessage][Channel 處理結果: {}]", pageResult);}
}

監聽器執行邏輯:

  • Spring Data Redis 的調用
    • 當通道有消息時,Spring Data Redis 調用 MessageListener 的 onMessage(Message, byte[]),即 AbstractRedisChannelMessageListener 的實現。
  • 父類到子類的調用
    • 父類的 onMessage(Message, byte[]) 處理消息反序列化和攔截器邏輯,然后通過 this.onMessage(T) 調用子類的 onMessage(StringTextQueryChannelMessage)。
  • 子類沒有直接調用父類
    • 子類的 onMessage 是父類調用的結果,不是子類主動調用父類。
    • 子類的 onMessage 只處理業務邏輯,依賴父類的預處理。
  • 構造函數的作用
    • 初始化 messageType 和 channel,確保消息反序列化和通道訂閱正確,直接影響子類 onMessage 的執行。

這種設計通過模板方法模式實現了通用邏輯和業務邏輯的分離,父類控制流程,子類提供具體實現。

2.4 配置類
YudaoRabbitMQAutoConfiguration
/*** RabbitMQ 消息隊列配置類* 配置RabbitMQ的消息轉換器,使用Jackson進行JSON序列化* 當生產者發送消息時,將Java對象轉換為JSON格式的字節數組* 當消費者接收消息時,將JSON格式的消息轉換回Java對象*/
@AutoConfiguration
@Slf4j
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
public class YudaoRabbitMQAutoConfiguration {/*** Jackson2JsonMessageConverter Bean:使用 jackson 序列化消息*/@Beanpublic MessageConverter createMessageConverter() {return new Jackson2JsonMessageConverter();}
}

YudaoRabbitMQAutoConfiguration:配置 RabbitMQ 消息轉換器,使用 Jackson 進行 JSON 序列化。

YudaoRedisMQConsumerAutoConfiguration

/*** Redis 消息隊列 Consumer 配置類* redisMessageListenerContainer(): 創建Pub/Sub消息監聽容器* redisStreamMessageListenerContainer(): 創建Stream消息監聽容器* redisPendingMessageResendJob(): 創建待處理消息重發任務*/
@Slf4j
@EnableScheduling // 啟用定時任務,用于 RedisPendingMessageResendJob 重發消息
@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
public class YudaoRedisMQConsumerAutoConfiguration {/*** 創建 Redis Pub/Sub 廣播消費的容器*/@Bean@ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的時候,才需要注冊 Redis pubsub 監聽public RedisMessageListenerContainer redisMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners) {// 創建 RedisMessageListenerContainer 對象RedisMessageListenerContainer container = new RedisMessageListenerContainer();// 設置 RedisConnection 工廠。container.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory());// 添加監聽器listeners.forEach(listener -> {listener.setRedisMQTemplate(redisMQTemplate);container.addMessageListener(listener, new ChannelTopic(listener.getChannel()));log.info("[redisMessageListenerContainer][注冊 Channel({}) 對應的監聽器({})]",listener.getChannel(), listener.getClass().getName());});return container;}/*** 創建 Redis Stream 重新消費的任務*/@Bean@ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的時候,才需要注冊 Redis pubsub 監聽public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> listeners,RedisMQTemplate redisTemplate,@Value("${spring.application.name}") String groupName,RedissonClient redissonClient) {return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient);}/*** 創建 Redis Stream 集群消費的容器** 基礎知識:<a href="https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html">Redis Stream 的 xreadgroup 命令</a>*/@Bean(initMethod = "start", destroyMethod = "stop")@ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的時候,才需要注冊 Redis pubsub 監聽public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners) {RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate();checkRedisVersion(redisTemplate);// 第一步,創建 StreamMessageListenerContainer 容器// 創建 options 配置StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions =StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().batchSize(10) // 一次性最多拉取多少條消息.targetType(String.class) // 目標類型。統一使用 String,通過自己封裝的 AbstractStreamMessageListener 去反序列化.build();// 創建 container 對象StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions);// 第二步,注冊監聽器,消費對應的 Stream 主題String consumerName = buildConsumerName();listeners.parallelStream().forEach(listener -> {log.info("[redisStreamMessageListenerContainer][開始注冊 StreamKey({}) 對應的監聽器({})]",listener.getStreamKey(), listener.getClass().getName());// 創建 listener 對應的消費者分組try {redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());} catch (Exception ignore) {}// 設置 listener 對應的 redisTemplatelistener.setRedisMQTemplate(redisMQTemplate);// 創建 Consumer 對象Consumer consumer = Consumer.from(listener.getGroup(), consumerName);// 設置 Consumer 消費進度,以最小消費進度為準StreamOffset<String> streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed());// 設置 Consumer 監聽StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest.builder(streamOffset).consumer(consumer).autoAcknowledge(false) // 不自動 ack.cancelOnError(throwable -> false); // 默認配置,發生異常就取消消費,顯然不符合預期;因此,我們設置為 falsecontainer.register(builder.build(), listener);log.info("[redisStreamMessageListenerContainer][完成注冊 StreamKey({}) 對應的監聽器({})]",listener.getStreamKey(), listener.getClass().getName());});return container;}/*** 構建消費者名字,使用本地 IP + 進程編號的方式。* 參考自 RocketMQ clientId 的實現** @return 消費者名字*/private static String buildConsumerName() {return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID());}/*** 校驗 Redis 版本號,是否滿足最低的版本號要求!*/private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {// 獲得 Redis 版本Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);String version = MapUtil.getStr(info, "redis_version");// 校驗最低版本必須大于等于 5.0.0int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false));if (majorVersion < 5) {throw new IllegalStateException(StrUtil.format("您當前的 Redis 版本為 {},小于最低要求的 5.0.0 版本!" +"請參考 {} 文檔進行安裝。", version, DocumentEnum.REDIS_INSTALL.getUrl()));}}}

方法 1: redisMessageListenerContainer

  • 作用
    • 創建并配置 RedisMessageListenerContainer,用于監聽 Redis Pub/Sub 通道消息。
    • 注冊所有 AbstractRedisChannelMessageListener 子類的監聽器(如 StringTextChannelConsumerListener)。

方法 2: redisPendingMessageResendJob

作用

  • 創建 RedisPendingMessageResendJob Bean,定時重發 Redis Stream 的待處理(pending)消息。

方法 3: redisStreamMessageListenerContainer

  • 作用
    • 創建并配置 StreamMessageListenerContainer,用于監聽 Redis Stream 消息。
    • 注冊所有 AbstractRedisStreamMessageListener 子類的監聽器。
  • 輸入參數
    • RedisMQTemplate redisMQTemplate:提供 Redis 連接和攔截器。
    • List<AbstractRedisStreamMessageListener<?>> listeners:所有 Stream 監聽器。
  • 輸出:StreamMessageListenerContainer<String, ObjectRecord<String, String>>,用于 Stream 消息消費。
  • 實現邏輯
    1. 檢查 Redis 版本
      • checkRedisVersion(redisTemplate); 確保 Redis 版本 ≥ 5.0.0(支持 Stream)。
    2. 創建容器
      • 配置 containerOptions:
        • batchSize(10):每次拉取最多 10 條消息。
        • targetType(String.class):消息體為 String 類型(JSON 字符串),由監聽器反序列化。
      • 創建 StreamMessageListenerContainer。
    3. 注冊監聽器
      • 使用 parallelStream 并行處理監聽器,提高效率。
      • 對每個監聽器:
        • 創建消費者組(createGroup)。
        • 注入 redisMQTemplate。
        • 創建 Consumer(組名 + 消費者名)。
        • 設置 StreamOffset(從最后消費位置讀取)。
        • 注冊監聽器,配置手動 ACK 和錯誤處理。
      • 日志記錄注冊過程。
2.5 RedisPendingMessageResendJob(定時任務類)
/*** 這個任務用于處理,crash 之后的消費者未消費完的消息* 重新分發信息(清理員)*/
@Slf4j
@AllArgsConstructor
public class RedisPendingMessageResendJob {private static final String LOCK_KEY = "redis:pending:msg:lock";/*** 消息超時時間,默認 5 分鐘** 1. 超時的消息才會被重新投遞* 2. 由于定時任務 1 分鐘一次,消息超時后不會被立即重投,極端情況下消息5分鐘過期后,再等 1 分鐘才會被掃瞄到*/private static final int EXPIRE_TIME = 5 * 60;private final List<AbstractRedisStreamMessageListener<?>> listeners;private final RedisMQTemplate redisTemplate;private final String groupName;private final RedissonClient redissonClient;/*** 一分鐘執行一次,這里選擇每分鐘的35秒執行,是為了避免整點任務過多的問題*/@Scheduled(cron = "35 * * * * ?")public void messageResend() {RLock lock = redissonClient.getLock(LOCK_KEY);// 嘗試加鎖if (lock.tryLock()) {try {execute();} catch (Exception ex) {log.error("[messageResend][執行異常]", ex);} finally {lock.unlock();}}}/*** 執行清理邏輯** @see <a href="https://gitee.com/zhijiantianya/ruoyi-vue-pro/pulls/480/files">討論</a>*/private void execute() {//redisTemplate.getRedisTemplate().opsForStream() 獲取 Redis Stream 的操作接口,用于執行 pending、range、add、acknowledge 等操作。StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream();//檢查每條傳送帶listeners.forEach(listener -> {//ops.pending(listener.getStreamKey(), groupName):查詢 Stream 和消費者組的 pending 消息概況。PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(ops.pending(listener.getStreamKey(), groupName));// 每個消費者的 pending 隊列消息數量(檢查每個傳送帶(Stream)上有哪些卡住的包裹(pending 消息))Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> {log.info("[processPendingMessage][消費者({}) 消息數量({})]", consumerName, pendingMessageCount);// 每個消費者的 pending消息的詳情信息,getStreamKey():返回 Stream 名稱(如 "string.text.query.stream"),pendingMessagesPerConsumer:Map,鍵是消費者名稱,值是 pending 消息數量PendingMessages pendingMessages = ops.pending(listener.getStreamKey(), Consumer.from(groupName, consumerName), Range.unbounded(), pendingMessageCount);if (pendingMessages.isEmpty()) {return;}pendingMessages.forEach(pendingMessage -> {// 獲取消息上一次傳遞到 consumer 的時間,long lastDelivery = pendingMessage.getElapsedTimeSinceLastDelivery().getSeconds();if (lastDelivery < EXPIRE_TIME){return;}// 獲取指定 id 的消息體(對于超時的包裹,清理員去傳送帶上找到它的具體內容(records),比如包裹里裝了啥(JSON 數據)。如果沒找到(比如包裹被刪了),就跳過)List<MapRecord<String, Object, Object>> records = ops.range(listener.getStreamKey(),Range.of(Range.Bound.inclusive(pendingMessage.getIdAsString()), Range.Bound.inclusive(pendingMessage.getIdAsString())));if (CollUtil.isEmpty(records)) {return;}// 重新投遞消息redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord().ofObject(records.get(0).getValue()) // 設置內容.withStreamKey(listener.getStreamKey()));// ack 消息消費完成redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, records.get(0));log.info("[processPendingMessage][消息({})重新投遞成功]", records.get(0).getId());});});});}
}
  • RedisPendingMessageResendJob 是一個定時任務類,專門用于處理 Redis Stream 消息隊列中因消費者異常(如崩潰)而未確認(ACK)的待處理(pending)消息。
  • 它通過定期掃描消費者組的 pending 消息,檢查消息是否超時(默認 5 分鐘),并將超時的消息重新投遞到 Stream,同時確認(ACK)原消息,以避免消息丟失或重復處理。

3. 總結

3.1?整體背景

想象你在一家快遞公司(你的系統)工作,Redis 消息隊列就像公司的兩條傳送帶:

  • Pub/Sub 傳送帶(廣播模式):像廣播電臺,包裹(消息)發出去,所有聽眾(消費者)都能收到,但包裹不會保存。
  • Stream 傳送帶(集群模式):像物流流水線,包裹有編號(消息 ID),可以保存、追蹤,多個快遞員(消費者)分工處理。

實現代碼CurdTestController 負責把包裹放上傳送帶,框架類代碼負責管理傳送帶、派送包裹、處理異常(比如包裹卡住)。下面,我會先介紹每個角色的作用(類分析),然后講一個包裹從發出到送達的故事(全過程)。


3.2?類結構與作用分析

3.2.1 框架的 Redis MQ 相關類
  1. YudaoRedisMQProducerAutoConfiguration
    • 作用:像快遞公司的“生產設備管理員”,負責初始化消息發送工具(RedisMQTemplate)。
    • 職責
      • 創建 RedisMQTemplate Bean,注入 StringRedisTemplate 和攔截器(RedisMessageInterceptor)。
      • 配置攔截器,增強消息發送的擴展性(如日志、租戶處理)。
    • 關鍵方法
      • redisMQTemplate:構造 RedisMQTemplate,添加攔截器。
    • 使用場景:Spring 啟動時,自動配置 RedisMQTemplate,供生產者(如 StringTextRedisProducer)使用。
  2. YudaoRedisMQConsumerAutoConfiguration
    • 作用:像“派送中心管理員”,負責配置 Pub/Sub 和 Stream 的消息監聽容器,以及異常包裹清理任務。
    • 職責
      • 配置 RedisMessageListenerContainer:監聽 Pub/Sub 通道(如 string-text-query-channel)。
      • 配置 StreamMessageListenerContainer:監聽 Stream 隊列(如 string-text-query-stream)。
      • 創建 RedisPendingMessageResendJob:定時清理 Stream 的待處理(pending)包裹。
      • 檢查 Redis 版本,確保支持 Stream(≥5.0.0)。
      • 生成消費者名稱(如 192.168.1.1@1234)。
    • 關鍵方法
      • redisMessageListenerContainer:注冊 Pub/Sub 監聽器。
      • redisStreamMessageListenerContainer:注冊 Stream 監聽器,設置批處理(10 條/次)、手動 ACK。
      • redisPendingMessageResendJob:創建定時任務。
      • buildConsumerName:生成唯一消費者名稱。
      • checkRedisVersion:驗證 Redis 版本。
    • 使用場景:Spring 啟動時,自動配置消費者容器,監聽消息并處理異常。
  3. RedisMQTemplate
    • 作用:像“傳送帶操作員”,負責把包裹放上 Pub/Sub 或 Stream 傳送帶,并支持攔截器擴展。
    • 職責
      • 發送 Pub/Sub 消息:convertAndSend 到指定通道。
      • 發送 Stream 消息:opsForStream().add 到指定 Stream,返回消息 ID。
      • 執行攔截器:發送前后調用 sendMessageBefore/sendMessageAfter。
      • 提供 opsForStream() 等接口,供定時任務(如 RedisPendingMessageResendJob)操作 Stream。
    • 關鍵方法
      • send(T extends AbstractRedisChannelMessage):發送 Pub/Sub 消息。
      • send(T extends AbstractRedisStreamMessage):發送 Stream 消息。
      • addInterceptor:添加攔截器。
    • 使用場景:被生產者(如 StringTextRedisProducer)調用發送消息,被定時任務調用操作 Stream。
  4. RedisMessageInterceptor
    • 作用:像“包裹檢查員”,在消息發送或消費前后進行額外處理(如日志、租戶隔離)。
    • 職責
      • 定義接口:sendMessageBefore/After、consumeMessageBefore/After。
      • 默認空實現,允許自定義擴展。
    • 使用場景:由 RedisMQTemplate 在消息發送/消費時調用,典型用于多租戶場景或日志記錄。
  5. RedisPendingMessageResendJob
    • 作用:像“包裹清理員”,定時(每分鐘 35 秒)檢查 Stream 傳送帶上卡住的包裹(pending 消息),重新投遞超時的包裹。
    • 職責
      • 使用分布式鎖(RedissonClient)確保單一實例執行。
      • 掃描每個 Stream 的 pending 消息,檢查超時(5 分鐘)。
      • 重新投遞超時消息,確認(ACK)原消息。
    • 關鍵方法
      • messageResend:定時任務入口,加鎖調用 execute。
      • execute:掃描 pending 消息,重新投遞并 ACK。
    • 使用場景:消費者崩潰未確認消息時,定時重發確保消息不丟失。
  6. AbstractRedisMessage
    • 作用:像“包裹模板”,定義消息的基本結構。
    • 職責
      • 提供 headers(鍵值對),存儲元數據(如租戶 ID)。
      • 抽象基類,供具體消息類繼承。
    • 使用場景:被 AbstractRedisChannelMessage 和 AbstractRedisStreamMessage 繼承。
  7. AbstractRedisChannelMessage
    • 作用:像“Pub/Sub 包裹模板”,定義 Pub/Sub 消息的結構。
    • 職責
      • 提供 getChannel(),默認返回類名,子類可自定義通道(如 string-text-query-channel)。
      • 忽略序列化通道名(@JsonIgnore)。
    • 使用場景:被你的 StringTextQueryChannelMessage 繼承。
  8. AbstractRedisStreamMessage
    • 作用:像“Stream 包裹模板”,定義 Stream 消息的結構。
    • 職責
      • 提供 getStreamKey(),默認返回類名,子類可自定義 Stream(如 string-text-query-stream)。
      • 忽略序列化 Stream 鍵。
    • 使用場景:被你的 StringTextQueryStreamMessage 繼承。
  9. AbstractRedisChannelMessageListener<T>
    • 作用:像“Pub/Sub 快遞員”,監聽 Pub/Sub 通道,處理消息。
    • 職責
      • 自動獲取消息類型(messageType)和通道(channel)。
      • 反序列化消息(JsonUtils.parseObject)。
      • 執行攔截器(consumeMessageBefore/After)。
      • 調用子類的 onMessage 處理消息。
    • 關鍵方法
      • onMessage(Message, byte[]):處理原始 Redis 消息。
      • onMessage(T):抽象方法,子類實現具體邏輯。
    • 使用場景:被你的 StringTextChannelConsumerListener 繼承。
  10. AbstractRedisStreamMessageListener<T>
    • 作用:像“Stream 快遞員”,監聽 Stream 隊列,處理消息。
    • 職責
      • 自動獲取消息類型(messageType)、Stream 鍵(streamKey)、消費者組(group)。
      • 反序列化消息,執行攔截器,調用子類 onMessage。
      • 手動 ACK 消息(opsForStream().acknowledge)。
    • 關鍵方法
      • onMessage(ObjectRecord):處理 Stream 消息。
      • onMessage(T):抽象方法,子類實現。
    • 使用場景:被你的 StringTextStreamConsumerListener 繼承。

3.2.2 實現定義類
  1. CurdTestController
    • 作用:像“客戶服務中心”,接收用戶請求,觸發消息發送或異步查詢。
    • 職責
      • 提供 REST 接口:
        • /getStringText:異步查詢(StringTextService.getUserPageAsync)。
        • /getStringTextByMQ:觸發 MQ 消息(未實現具體生產者)。
        • /getStringTextByStream:發送 Stream 消息(StringTextRedisProducer.sendStreamMessage)。
        • /getStringTextByChannel:發送 Pub/Sub 消息(StringTextRedisProducer.sendChannelMessage)。
      • 記錄日志,處理異常。
    • 使用場景:用戶通過 HTTP 請求觸發消息隊列或異步處理。
  2. StringTextRedisProducer
    • 作用:像“包裹打包員”,創建并發送 Pub/Sub 和 Stream 消息。
    • 職責
      • sendStreamMessage:發送 StringTextQueryStreamMessage 到 string-text-query-stream。
      • sendChannelMessage:發送 StringTextQueryChannelMessage 到 string-text-query-channel。
      • 使用 RedisMQTemplate 發送消息,記錄日志。
    • 使用場景:被 CurdTestController 調用,發送查詢請求到消息隊列。
  3. StringTextChannelConsumerListener
    • 作用:像“Pub/Sub 派送員”,監聽 string-text-query-channel,處理消息。
    • 職責
      • 接收 StringTextQueryChannelMessage,轉換為 StringTextPageReqVO。
      • 調用 StringTextService.getUserPage 查詢數據。
      • 記錄處理日志。
    • 使用場景:處理 Pub/Sub 廣播消息,適合實時通知場景。
  4. StringTextStreamConsumerListener
    • 作用:像“Stream 派送員”,監聽 string-text-query-stream,處理消息。
    • 職責
      • 接收 StringTextQueryStreamMessage,轉換為 StringTextPageReqVO。
      • 調用 StringTextService.getUserPage 查詢數據。
      • 手動 ACK 消息,記錄日志。
    • 使用場景:處理 Stream 集群消息,適合持久化、可靠投遞場景。
  5. StringTextQueryChannelMessage
    • 作用:像“Pub/Sub 包裹”,定義 Pub/Sub 消息結構。
    • 職責
      • 包含 text、pageNo、pageSize,校驗非空。
      • 指定通道 string-text-query-channel。
    • 使用場景:由 StringTextRedisProducer 發送,StringTextChannelConsumerListener 消費。
  6. StringTextQueryStreamMessage
    • 作用:像“Stream 包裹”,定義 Stream 消息結構。
    • 職責
      • 包含 text、pageNo、pageSize,校驗非空。
      • 指定 Stream string-text-query-stream。
    • 使用場景:由 StringTextRedisProducer 發送,StringTextStreamConsumerListener 消費。
  7. StringTextServiceImpl
    • 作用:像“數據倉庫”,提供數據查詢服務。
    • 職責
      • getUserPage:同步查詢分頁數據。
      • getUserPageAsync:異步查詢,使用線程池(curdAsyncExecutor)。
      • 調用 StringTextMapper 訪問數據庫。
    • 使用場景:被控制器和消費者調用,處理查詢邏輯。

3.3 文字圖總結

3.3.1 圖示總結
YudaoRedisMQProducerAutoConfiguration
└─ redisMQTemplate(StringRedisTemplate redisTemplate, List<RedisMessageInterceptor> interceptors)│  │  // 初始化消息發送工具,注入 Redis 模板和攔截器(默認空)│  ├─ Parameters:│  ├─ redisTemplate: StringRedisTemplate // Redis 操作模板│  └─ interceptors: List<RedisMessageInterceptor> // 攔截器列表(默認空)│  └─ Returns: RedisMQTemplate│  └─ RedisMQTemplate(redisTemplate)│  └─ addInterceptor(interceptor) // 添加攔截器(默認空,支持鏈式調用)
YudaoRedisMQConsumerAutoConfiguration
├─ checkRedisVersion(StringRedisTemplate redisTemplate)
│  │  
│  │  // 校驗 Redis 版本 ≥ 5.0.0
│  │  
│  ├─ Parameters: redisTemplate: StringRedisTemplate
│  └─ Returns: void
│
├─ redisMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners)
│  │  
│  │  // 配置 Pub/Sub 監聽容器(基于 Redis 發布訂閱模式)
│  │  
│  ├─ Parameters:
│  │  ├─ redisMQTemplate: RedisMQTemplate
│  │  └─ listeners: List<AbstractRedisChannelMessageListener<?>> 
│  │       // 包含 StringTextChannelConsumerListener 實現類
│  │  
│  └─ Returns: RedisMessageListenerContainer
│       │  
│       └─ addMessageListener(StringTextChannelConsumerListener listener, ChannelTopic("string-text-query-channel"))
│           │  
│           │  // 綁定監聽器到指定通道(channel)
│           │  
│           ├─ Parameters:
│           │  ├─ listener: StringTextChannelConsumerListener
│           │  └─ topic: ChannelTopic("string-text-query-channel") 
│           │       // 通道名稱固定為 string-text-query-channel
│           │  
│           └─ StringTextChannelConsumerListener.setRedisMQTemplate(redisMQTemplate)
│               // 注入消息發送模板,用于消費時的響應邏輯
│
├─ redisStreamMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners)
│  │  
│  │  // 配置 Stream 監聽容器(基于 Redis Stream 數據結構)
│  │  
│  ├─ Parameters:
│  │  ├─ redisMQTemplate: RedisMQTemplate
│  │  └─ listeners: List<AbstractRedisStreamMessageListener<?>> 
│  │       // 包含 StringTextStreamConsumerListener 實現類
│  │  
│  └─ Returns: StreamMessageListenerContainer
│       │  
│       └─ register(StreamReadRequest("string-text-query-stream", Consumer("my-app", "192.168.1.1@1234"), StringTextStreamConsumerListener listener))
│           │  
│           │  // 綁定監聽器到指定 Stream(支持消費者組模式)
│           │  
│           ├─ Parameters:
│           │  ├─ request: StreamReadRequest 
│           │  │  // StreamKey: string-text-query-stream 
│           │  │  // Consumer: my-app@192.168.1.1@1234(消費者組名+實例標識)
│           │  └─ listener: StringTextStreamConsumerListener
│           │  
│           ├─ redisTemplate.opsForStream().createGroup("string-text-query-stream", "my-app")
│           │  // 自動創建消費者組(若不存在),組名與應用名一致
│           │  
│           └─ StringTextStreamConsumerListener.setRedisMQTemplate(redisMQTemplate)
│               // 注入消息發送模板,用于消費時的響應邏輯
│
└─ redisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> listeners, RedisMQTemplate redisTemplate, String groupName, RedissonClient redissonClient)│  │  // 創建定時任務,處理 Stream 中未確認的 pending 消息│  // 防止消息堆積,保證至少一次投遞語義│  ├─ Parameters:│  ├─ listeners: List<AbstractRedisStreamMessageListener<?>> │  │       // 包含 StringTextStreamConsumerListener 實現類│  ├─ redisTemplate: RedisMQTemplate│  ├─ groupName: String // 默認使用 spring.application.name("my-app")│  └─ redissonClient: RedissonClient // 分布式鎖,保證任務單點執行│  └─ Returns: RedisPendingMessageResendJob// 定時任務會定期掃描 pending 消息并重新投遞
通過 Stream 發送消息的流程
CurdTestController
└─ getStringTextByStream(StringTextPageReqVO pageReqVO)│  │  // 處理 Stream 消息請求│  Parameters: pageReqVO: {text="example", pageNo=1, pageSize=10}│  Returns: "Stream 消息已發送,異步處理中"│  └─ StringTextRedisProducer.sendStreamMessage(text, pageNo, pageSize)│  │  // 發送 Stream 消息(參數:example, 1, 10)│  Returns: void│  └─ RedisMQTemplate.send(StringTextQueryStreamMessage message)│  │  // 發送到 Stream(消息內容包含 text/pageNo/pageSize)│  Returns: RecordId(如 "1234567890-0")│  ├─ sendMessageBefore(message)│  // 調用消息發送前的攔截器(默認無操作)│  ├─ JsonUtils.toJsonString(message)│  // 序列化消息為 JSON│  Returns: "{\"text\":\"example\",\"pageNo\":1,\"pageSize\":10}"│  ├─ redisTemplate.opsForStream().add(StreamRecords)│  // 添加到 Stream(StreamKey: string-text-query-stream)│  Parameters: JSON 字符串 + StreamKey│  Returns: RecordId(消息唯一標識)│  └─ sendMessageAfter(message)// 調用消息發送后的攔截器(默認無操作)
StreamMessageListenerContainer
└─ StringTextStreamConsumerListener.onMessage(ObjectRecord<String, String> message)│  │  // 處理 Stream 消息(參數:包含 StreamKey 和 JSON 內容的記錄)│  Returns: void│  ├─ JsonUtils.parseObject(message.getValue(), StringTextQueryStreamMessage.class)│  // 反序列化 JSON 為消息對象│  Returns: {text="example", pageNo=1, pageSize=10}│  ├─ consumeMessageBefore(messageObj)│  // 調用消息消費前的攔截器(默認無操作)│  ├─ onMessage(StringTextQueryStreamMessage message)│  // 子類自定義處理邏輯(由業務實現)│  Parameters: 反序列化后的消息對象│  Returns: void│  │  └─ StringTextService.getUserPage(StringTextPageReqVO reqVO)│      │  │      │  // 業務邏輯:查詢數據│      │  Parameters: {text="example", pageNo=1, pageSize=10}│      │  Returns: 分頁結果(包含查詢到的數據)│      │  │      └─ StringTextMapper.selectPage(reqVO)│          // 數據庫層查詢│          Returns: 從數據庫獲取的分頁結果│  ├─ redisMQTemplate.getRedisTemplate().opsForStream().acknowledge("my-app", message)│  // 關鍵步驟:確認消息已消費│  Parameters: 消費者組名 + 消息記錄│  Returns: void│  (若不確認,消息會被視為 pending 并由定時任務重新投遞)│  └─ consumeMessageAfter(messageObj)// 調用消息消費后的攔截器(默認無操作)
通過 Pub/Sub 發送消息的流程
CurdTestController
└─ getStringTextByChannel(StringTextPageReqVO pageReqVO)│  │  // 處理 Pub/Sub 消息請求│  Parameters: pageReqVO: {text="example", pageNo=1, pageSize=10}│  Returns: "Channel 消息已發送,異步處理中"│  └─ StringTextRedisProducer.sendChannelMessage(text, pageNo, pageSize)│  │  // 發送 Pub/Sub 消息(參數:example, 1, 10)│  Returns: void│  └─ RedisMQTemplate.send(StringTextQueryChannelMessage message)│  │  // 發送到通道(channel: string-text-query-channel)│  Returns: void│  ├─ sendMessageBefore(message)│  // 調用消息發送前的攔截器(默認無操作)│  ├─ JsonUtils.toJsonString(message)│  // 序列化消息為 JSON│  Returns: "{\"text\":\"example\",\"pageNo\":1,\"pageSize\":10}"│  ├─ redisTemplate.convertAndSend("string-text-query-channel", json)│  // 發布到 Redis 通道│  Parameters: 通道名 + JSON 字符串│  Returns: void(無返回值,Fire-and-Forget 模式)│  └─ sendMessageAfter(message)// 調用消息發送后的攔截器(默認無操作)
RedisMessageListenerContainer
└─ StringTextChannelConsumerListener.onMessage(Message message, byte[] bytes)│  │  // 處理 Pub/Sub 消息(參數:消息對象+通道字節數組)│  Returns: void(無返回值,Fire-and-Forget 模式)│  ├─ JsonUtils.parseObject(message.getBody(), StringTextQueryChannelMessage.class)│  // 反序列化 JSON 為消息對象│  Returns: {text="example", pageNo=1, pageSize=10}│  ├─ consumeMessageBefore(messageObj)│  // 調用消息消費前的攔截器(默認無操作)│  ├─ onMessage(StringTextQueryChannelMessage message)│  // 子類自定義處理邏輯(由業務實現)│  Parameters: 反序列化后的消息對象│  Returns: void│  │  └─ StringTextService.getUserPage(StringTextPageReqVO reqVO)│      │  │      │  // 業務邏輯:查詢數據│      │  Parameters: {text="example", pageNo=1, pageSize=10}│      │  Returns: 分頁結果(包含查詢到的數據)│      │  │      └─ StringTextMapper.selectPage(reqVO)│          // 數據庫層查詢│          Returns: 從數據庫獲取的分頁結果│  └─ consumeMessageAfter(messageObj)// 調用消息消費后的攔截器(默認無操作)

RedisPendingMessageResendJob 定時任務流程
3.3.2?類作用總結
  • 框架類
    • YudaoRedisMQProducerAutoConfiguration:初始化生產工具。
    • YudaoRedisMQConsumerAutoConfiguration:配置消費容器和定時任務。
    • RedisMQTemplate:發送消息,操作 Stream.
    • RedisMessageInterceptor:擴展點(未使用)。
    • RedisPendingMessageResendJob:重發 pending 消息。
    • AbstractRedisMessage:消息基類。
    • AbstractRedisChannelMessage:Pub/Sub 消息模板。
    • AbstractRedisStreamMessage:Stream 消息模板。
    • AbstractRedisChannelMessageListener:Pub/Sub 消費者基類。
    • AbstractRedisStreamMessageListener:Stream 消費者基類。
  • 具體業務實現類
    • CurdTestController:觸發消息發送。
    • StringTextRedisProducer:發送消息。
    • StringTextChannelConsumerListener:處理 Pub/Sub 消息。
    • StringTextStreamConsumerListener:處理 Stream 消息。
    • StringTextQueryChannelMessage:Pub/Sub 消息。
    • StringTextQueryStreamMessage:Stream 消息。
    • StringTextServiceImpl:查詢數據。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/86703.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/86703.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/86703.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【凌智視覺模塊】rv1106 部署 ppocrv4 檢測模型 rknn 推理

PP-OCRv4 文本框檢測 1. 模型介紹 如有需要可以前往我們的倉庫進行查看 凌智視覺模塊 PP-OCRv4在PP-OCRv3的基礎上進一步升級。整體的框架圖保持了與PP-OCRv3相同的pipeline&#xff0c;針對檢測模型和識別模型進行了數據、網絡結構、訓練策略等多個模塊的優化。 從算法改…

uniapp Vue2 獲取電量的獨家方法:繞過官方插件限制

在使用 uniapp 進行跨平臺應用開發時&#xff0c;獲取設備電量信息是一個常見的需求。然而&#xff0c;uniapp 官方提供的uni.getBatteryInfo方法存在一定的局限性&#xff0c;它不僅需要下載插件&#xff0c;而且目前僅支持 Vue3&#xff0c;這讓使用 Vue2 進行開發的開發者陷…

Go語言中的if else控制語句

if else是Go語言中最基礎也最常用的條件控制語句&#xff0c;用于根據條件執行不同的代碼塊。下面我將詳細介紹Go語言中if else的各種用法和特性。 1. 基本語法 1.1. 最簡單的if語句 if 條件表達式 {// 條件為true時執行的代碼 } 示例&#xff1a; if x > 10 {fmt.Prin…

[Spring]-AOP

AOP場景 AOP: Aspect Oriented Programming (面向切面編程) OOP: Object Oriented Programming (面向對象編程) 場景設計 設計: 編寫一個計算器接口和實現類&#xff0c;提供加減乘除四則運算 需求: 在加減乘除運算的時候需要記錄操作日志(運算前參數、運算后結果)實現方案:…

Web3 借貸與清算機制全解析:鏈上金融的運行邏輯

Web3 借貸與清算機制全解析&#xff1a;鏈上金融的運行邏輯 超額抵押借款 例如&#xff0c;借款人用ETH為抵押借入DAI&#xff1b;借款人的ETH的價值一定是要超過DAI的價值&#xff1b;借款人可以任意自由的使用自己借出的DAI 穩定幣 第一步&#xff1a;借款人需要去提供一定…

RK3588開發筆記-GNSS-RTK模塊調試

目錄 前言 一、什么是GNSS/RTK 二、硬件連接 三、內核配置 四、模塊調試 五、ntripclient使用 總結 前言 在RK3588平臺上集成高精度定位功能是許多工業級應用的需求。本文記錄了我調試GNSS-RTK模塊的全過程,包含硬件連接、驅動移植、數據解析和精度優化等關鍵環節,希望對…

Vue.js $emit的介紹和簡單使用

前言 在 Vue.js 開發中&#xff0c;組件化是核心思想之一。但組件間的通信是一個重要課題&#xff0c;特別是子組件向父組件傳遞數據的場景。Vue 提供了多種通信方式&#xff0c;而$emit正是實現子→父通信的關鍵方法。本文將深入解析$emit的原理、使用場景及最佳實踐。 一、$e…

【Linux 學習計劃】-- 簡易版shell編寫

目錄 思路 創建自己的命令行 獲取用戶命令 分割命令 檢查是否是內建命令 cd命令實現 進程程序替換執行程序 總代碼 結語 思路 int main() {while (1){// 1. 自己的命令行PrintCommandLine();// 2. 獲取用戶命令char command[SIZE];int n GetUserCommand(command, si…

一個完整的日志收集方案:Elasticsearch + Logstash + Kibana+Filebeat (二)

&#x1f4c4; 本地 Windows 部署 Logstash 連接本地 Elasticsearch 指南 ? 目標 在本地 Windows 上安裝并運行 Logstash配置 Logstash 將數據發送至本地 Elasticsearch測試數據采集與 ES 存儲流程 &#x1f9f0; 前提條件 軟件版本要求安裝說明Java17Oracle JDK 下載 或 O…

Java使用Selenium反爬蟲優化方案

當我們爬取大站的時候&#xff0c;就得需要對抗反爬蟲機制的場景&#xff0c;因為項目要求使用Java和Selenium。Selenium通常用于模擬用戶操作&#xff0c;但效率較低&#xff0c;所以需要我們結合其他技術來實現高效。 在 Java 中使用 Selenium 進行高效反爬蟲對抗時&#xff…

狀態管理方案對比與決策

1. 狀態管理的基本概念 現代前端應用隨著功能復雜度提升&#xff0c;狀態管理已成為架構設計的核心挑戰。狀態管理本質上解決的是數據的存儲、變更追蹤和響應式更新問題&#xff0c;以確保UI與底層數據保持同步。 核心挑戰: 狀態共享與組件通信可預測的狀態變更性能優化與重…

Fetch與Axios:區別、聯系、優缺點及使用差異

Fetch與Axios&#xff1a;區別、聯系、優缺點及使用差異 文章目錄 Fetch與Axios&#xff1a;區別、聯系、優缺點及使用差異一、聯系二、區別1. 瀏覽器支持與兼容性2. 響應處理3. 請求攔截和響應攔截4. 錯誤處理 三、優缺點1. Fetch API優點缺點 2. Axios優點缺點 四、使用上的差…

【Docker】快速入門與項目部署實戰

我們在部署一個項目時&#xff0c;會出現一系列問題比如&#xff1a; 命令太多了&#xff0c;記不住軟件安裝包名字復雜&#xff0c;不知道去哪里找安裝和部署步驟復雜&#xff0c;容易出錯 其實上述問題不僅僅是新手&#xff0c;即便是運維在安裝、部署的時候一樣會覺得麻煩…

Java面試題尚硅谷版第1季

1、寫出如下代碼運行結果 1.1、 使用局部變量表和操作數棧解題 1.2、使用前置和后置遞增解題 2、寫一個單例模式 2.1、考察知識點 2.2、單例模式實現 3、類加載和初始化順序 package classload;public class Father {private int i test();private static int j method();st…

關于Qt阻斷樣式繼承的解決辦法

引言 在使用 Qt 開發桌面應用時&#xff0c;借助樣式表&#xff08;StyleSheet&#xff09;來統一定義界面風格是非常常見的做法。通常&#xff0c;你會在主程序中通過 qApp->setStyleSheet(...) 或者直接給某個父控件設置樣式表&#xff0c;讓所有的子控件都采用相同的配色…

鼠標右鍵添加新建某種文件的方法

場景 我經常用到.emmx&#xff0c;.eddx文件&#xff0c;電腦上裝的是wpsX億圖&#xff08;因為有wps會員&#xff09;&#xff0c;沒有開億圖會員。 然后問題就是&#xff0c;思維導圖和流程圖我都能正常開&#xff0c;正常編輯&#xff0c;但鼠標右鍵沒有新建這兩個文件的按…

Inxpect安全雷達傳感器與控制器:動態檢測 + 抗干擾技術重構工業安全防護體系

Inxpect 推出工業安全領域新型智能傳感器與控制器&#xff0c;其核心產品為雷達掃描儀&#xff0c;具備動態調整檢測區域、抗干擾能力強等特點&#xff0c;可精準檢測危險區域人員進入或存在情況&#xff0c;適用于移動機器人等場景。 Inxpect安全雷達傳感器核心功能 動態檢測…

【AI學習】李廣密與階躍星辰首席科學家張祥雨對談:多模態發展的歷史和未來

仔細閱讀了文章《專訪張祥雨&#xff1a;多模態推理和自主學習是未來的 2 個 「GPT-4」 時刻》 https://mp.weixin.qq.com/s/892QuRPH9uP6zN6dS-HZMw 非常贊嘆的一篇文章&#xff0c;說清楚了NLP、CV發展中的許多重大問題&#xff0c;讀來醍醐灌頂&#xff01;這樣的文章&…

C++中std::deque詳解和實戰工程代碼示例

C中std::deque詳解和實戰工程代碼示例 std::deque&#xff08;雙端隊列&#xff09;是 C 標準庫中的一個序列容器&#xff0c;與 std::vector 類似&#xff0c;但它支持從頭部和尾部高效地插入和刪除元素。它底層采用分段連續空間實現&#xff0c;兼具靈活性與性能。 一、基本…

【AI大模型入門指南】概念與專有名詞詳解 (二)

【AI大模型入門指南】概念與專有名詞詳解 &#xff08;二&#xff09; 一 、前言 當你和聊天機器人聊得天花亂墜時&#xff0c;當你用文字讓AI生成精美圖片時&#xff0c;當手機相冊自動幫你分類照片時 —— 這些看似智能的操作背后&#xff0c;都藏著 AI 大模型的身影。 本…