Spring Boot 整合 Redis 實現發布/訂閱(含ACK機制 - 事件驅動方案)

Spring Boot整合Redis實現發布/訂閱(含ACK機制)全流程

一、整體架構

二、實現步驟

步驟1:添加Maven依賴

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>

步驟2:配置Redis連接

# application.yml
spring:redis:host: localhostport: 6379lettuce:pool:max-active: 16max-idle: 8
# redisStream配置信息
app:redis:stream: app-eventsgroup: app-groupconsumer: consumer-${random.int(1000)}

步驟3:創建消費者組

@Configuration
public class RedisConfig {@Value("${app.redis.stream}")private String streamKey;@Value("${app.redis.group}")private String groupName;@Beanpublic void createConsumerGroup(StringRedisTemplate redisTemplate) {try {redisTemplate.opsForStream().createGroup(streamKey, groupName);} catch (Exception e) {System.out.println("消費者組已存在: " + groupName);}}
}

步驟4:配置消息監聽容器

@Configuration
public class RedisConfig {// 配置消息監聽線程池@Bean(name = "redisStreamTaskExecutor")public ThreadPoolTaskExecutor redisStreamTaskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(4);executor.setThreadNamePrefix("redis-stream-");return executor;}// 創建消息監聽容器@Bean(initMethod = "start", destroyMethod = "stop")public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamContainer(RedisConnectionFactory factory,@Qualifier("redisStreamTaskExecutor") ThreadPoolTaskExecutor executor) {StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(1)).executor(executor).batchSize(10).build();return StreamMessageListenerContainer.create(factory, options);}
}

步驟5:注冊消息監聽器

@Component
public class StreamListenerRegistrar {@Value("${app.redis.stream}")private String streamKey;@Value("${app.redis.group}")private String groupName;@Value("${app.redis.consumer}")private String consumerName;@PostConstructpublic void registerListener(StreamMessageListenerContainer container, RedisMessageProcessor processor) {StreamReadRequest<String> readRequest = StreamReadRequest.builder(StreamOffset.create(streamKey, ReadOffset.lastConsumed())).consumer(Consumer.from(groupName, consumerName)).autoAcknowledge(false) // 手動ACK.build();container.register(readRequest, processor);}
}

步驟6:實現消息處理器

@Component
public class RedisMessageProcessor implements StreamListener<String, MapRecord<String, String, String>> {@Overridepublic void onMessage(MapRecord<String, String, String> record) {CompletableFuture.runAsync(() -> {try {// 業務處理邏輯processBusiness(record);// 處理成功發送ACKredisTemplate.opsForStream().acknowledge(streamKey, groupName, record.getId());} catch (Exception e) {// 失敗消息進入Pending List}});}private void processBusiness(MapRecord<String, String, String> record) throws Exception {String eventType = record.getValue().get("eventType");String payload = record.getValue().get("payload");// 根據事件類型處理switch (eventType) {case "ORDER_CREATED": handleOrder(payload); break;case "PAYMENT_PROCESSED": handlePayment(payload); break;}}
}

步驟7:實現Pending消息處理器

@Component
@Slf4j
public class PendingMessageProcessor {@Value("${app.redis.stream}")private String streamKey;@Value("${app.redis.group}")private String groupName;@Value("${app.redis.consumer}")private String consumerName;// 每分鐘處理一次Pending消息@Scheduled(fixedRate = 60000)public void processPendingMessages() {// 1. 查詢Pending消息PendingMessages pending = redisTemplate.opsForStream().pending(streamKey, groupName, Range.unbounded(), 100);pending.forEach(this::handlePendingMessage);}private void handlePendingMessage(PendingMessage pending) {try {// 2. 重新認領消息List<MapRecord<String, String, String>> records = redisTemplate.opsForStream().claim(streamKey, Consumer.from(groupName, consumerName), Duration.ofSeconds(30), pending.getId());if (!records.isEmpty()) {MapRecord<String, String, String> record = records.get(0);// 3. 重試處理messageProcessor.processBusiness(record);// 4. 處理成功發送ACKredisTemplate.opsForStream().acknowledge(streamKey, groupName, record.getId());}} catch (Exception e) {// 5. 超過重試次數移入死信隊列if (pending.getTotalDeliveryCount() > 3) {moveToDeadLetterQueue(pending);}}}private void moveToDeadLetterQueue(PendingMessage pending) {// 獲取消息內容List<MapRecord<String, String, String>> records = redisTemplate.opsForStream().range(streamKey, Range.from(pending.getId()));if (!records.isEmpty()) {// 添加到死信隊列redisTemplate.opsForStream().add("dead-letter:" + streamKey, records.get(0).getValue());// 確認原始消息redisTemplate.opsForStream().acknowledge(streamKey, groupName, pending.getId());}}
}

步驟8:實現消息生產者

@Service
public class RedisMessageProducer {@Value("${app.redis.stream}")private String streamKey;public String sendMessage(String eventType, String payload) {Map<String, String> message = Map.of("eventType", eventType,"payload", payload,"timestamp", String.valueOf(System.currentTimeMillis()));return redisTemplate.opsForStream().add(streamKey, message).getValue();}
}

步驟9:創建REST接口

@RestController
@RequestMapping("/messages")
public class MessageController {private final RedisMessageProducer producer;@PostMappingpublic String sendMessage(@RequestBody MessageRequest request) {return producer.sendMessage(request.getEventType(), request.getPayload());}@Datapublic static class MessageRequest {private String eventType;private String payload;}
}

三、消息生命周期流程圖

1. 正常消息處理流程

2. Pending消息處理流程

?

3. ACK機制工作原理

四、生產環境建議

  1. 消費者命名策略

    @Value("${app.redis.consumer}")
    private String consumerName;// 在應用啟動時設置
    @PostConstruct
    public void initConsumerName() {String hostName = InetAddress.getLocalHost().getHostName();String port = environment.getProperty("server.port");consumerName = "consumer-" + hostName + "-" + port;
    }
  2. 動態配置重試策略

    app:pending:max_retry: 5retry_interval: 30000 # 30秒
  3. 死信隊列監控

    @Scheduled(fixedRate = 3600000) // 每小時檢查一次
    public void checkDeadLetterQueue() {Long size = redisTemplate.opsForStream().size("dead-letter:" + streamKey);if (size > 0) {alertService.sendAlert("死信隊列有 " + size + " 條未處理消息");}
    }
  4. 消息TTL設置

    // 發送消息時設置最大長度
    public String sendMessage(String eventType, String payload) {MapRecord<String, String, String> record = ...;return redisTemplate.opsForStream().add(Record.of(record).withMaxLen(10000).approximate(true));
    }

六、總結

本文詳細介紹了Spring Boot整合Redis實現發布/訂閱功能并添加ACK機制的完整方案:

  1. 事件驅動架構:使用Redis Stream監聽器實現真正的發布/訂閱模式

  2. 可靠ACK機制:通過手動ACK確認確保消息可靠處理

  3. 自動恢復系統:Pending消息處理器自動處理失敗消息

  4. 死信隊列:隔離無法處理的消息,防止系統阻塞

  5. 生產就緒:包含多實例部署、動態配置、監控告警等生產級特性

該方案適用于需要高可靠性消息傳遞的場景,如訂單處理、支付系統、事件溯源等,在保證系統吞吐量的同時提供了消息可靠性保障。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/92555.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/92555.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/92555.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Sklearn 機器學習 線性回歸

??親愛的技術愛好者們,熱烈歡迎來到 Kant2048 的博客!我是 Thomas Kant,很開心能在CSDN上與你們相遇~?? 本博客的精華專欄: 【自動化測試】 【測試經驗】 【人工智能】 【Python】 Sklearn 機器學習線性回歸實戰詳解 線性回歸是機器學習中最基礎也最經典的算法之一,…

AJAX案例合集

案例一&#xff1a;更換網站背景JS核心代碼<script>document.querySelector(.bg-ipt).addEventListener(change, e > {//選擇圖片上傳&#xff0c;設置body背景const fd new FormData()fd.append(img, e.target.files[0])axios({url: http://hmajax.itheima.net/api/…

vscode環境下c++的常用快捷鍵和插件

本文提供一些能夠在vscode的環境下&#xff0c;提高c代碼書寫效率的快捷鍵&#xff0c;插件以及設置等等。 快捷鍵ctrlshiftx&#xff1a; 彈出插件菜單ctrlshiftp&#xff1a;彈出命令面板可以快捷執行一些常見命令插件安裝這個后&#xff0c;可以按住ctrl跳轉到方法的實現&am…

React + ts 中應用 Web Work 中集成 WebSocket

一、Web Work定義useEffect(() > {let webSocketIndex -1const websocketWorker new Worker(new URL(./websocketWorker.worker.ts?worker, import.meta.url),{type: module // 必須聲明模塊類型});//初始化WEBSOCKET&#xff08;多個服務器選擇最快建立連接…

RabbitMQ面試精講 Day 3:Exchange類型與路由策略詳解

【RabbitMQ面試精講 Day 3】Exchange類型與路由策略詳解 文章標簽 RabbitMQ,消息隊列,Exchange,路由策略,AMQP,面試題,分布式系統 文章簡述 本文是"RabbitMQ面試精講"系列第3天內容&#xff0c;深入解析RabbitMQ的核心組件——Exchange及其路由策略。文章詳細剖析…

深入解析Hadoop MapReduce Shuffle過程:從環形緩沖區溢寫到Sort與Merge源碼

MapReduce與Shuffle過程概述在大數據處理的經典范式MapReduce中&#xff0c;Shuffle過程如同人體血液循環系統般連接著計算框架的各個組件。作為Hadoop最核心的分布式計算模型&#xff0c;MapReduce通過"分而治之"的思想將海量數據處理分解為Map和Reduce兩個階段&…

Kafka MQ 消費者

Kafka MQ 消費者 1 創建消費者 在讀取消息之前,需要先創建一個KafkaConsumer對象。創建KafkaConsumer對象與創建KafkaProducer對象非常相似—把想要傳給消費者的屬性放在Properties對象里。本章后續部分將深入介紹所有的配置屬性。為簡單起見,這里只提供3個必要的屬性:boo…

人工智能——Opencv圖像色彩空間轉換、灰度實驗、圖像二值化處理、仿射變化

一、圖像色彩空間轉換&#xff08;一&#xff09;顏色加法1、直接相加1、直接相加2、調用cv.add()函數進行飽和操作 在OpenCV中進行顏色的加法&#xff0c;我們說圖像即數組&#xff0c;所以從數據類型來說我們可以直接用numpy的知識來進行直接相加&#xff0c;但是存在…

【JToken】JToken == null 判斷無效的問題

if (innerNode null) {continue; }Debug.Log($"toNode type: {node["toNode"]?.GetType()}");發現這個JToken 無法正確的判斷 是否為 null&#xff0c;再排除邏輯問題后&#xff0c;我基本能確定的是 這個對象 不返回的不是真正的C# NULL 輸出類型后是 N…

C++基于libmodbus庫實現modbus TCP/RTU通信

今天看到了一個參考項目中用到了modbus庫&#xff0c;看著使用很是方便&#xff0c;于是記錄一下。后面有時間了或者用到了再詳細整理。 參考&#xff1a;基于libmodbus庫實現modbus TCP/RTU通信-CSDN博客 一、介紹 1.1庫文件包含 1.2最簡單的使用 本人在QT6.5下&#xff0…

【原創】微信小程序添加TDesign組件

前言 TDesign 是騰訊公司推出的一款UI界面庫,至于騰訊的實力嘛,也不用多說了。 官網:https://tdesign.tencent.com/ 源碼:https://github.com/Tencent/tdesign 目前處于活躍狀態,發文前5日,該庫仍在更新中… 遇到的問題 雖然騰訊為微信小程序開發提供了一個討論的論壇,…

Vue的路由模式的區別和原理

路由模式 Vue 的路由模式指的是 Vue Router 提供的 URL 處理方式&#xff0c;主要有兩種&#xff1a;Hash 模式和History 模式。 Hash模式 在 Vue Router 中&#xff0c;默認使用的是 hash 模式&#xff0c;即 mode: hash。如果想要使用 history 模式&#xff0c;可以設置 mode…

通過TPLink路由器進行用戶行為審計實戰

用戶行為審計是指對用戶在網絡平臺上的行為進行監控和記錄&#xff0c;以便對其行為進行分析和評估的過程。隨著互聯網的普及和發展&#xff0c;用戶行為審計在網絡安全和數據隱私保護方面起到了重要的作用。 用戶行為審計可以幫助發現和預防網絡安全威助。通過對用戶的行為進行…

MYSQL 第一次作業

新建產品庫mysql> CREATE DATABASE mydb6_product;使用產品庫mysql> USE mydb6_product;創建employess表mysql> CREATE TABLE employees (-> id INT PRIMARY KEY,-> name VARCHAR(50) NOT NULL,-> age INT,-> gender VARCHAR(10) NOT NULL DEFAULT unknow…

暑期前端訓練day7——有關vue-diff算法的思考

前言 今天分享一下我對vue的diff的探究&#xff0c;跟我一起深入底層&#xff0c;看一看vue是怎么進行diff的&#xff0c;它的復雜度如何&#xff0c;為什么性能這么高&#xff0c;diff的目標是盡可能的復用原來的真實dom&#xff0c;減少刪除真實dom和創建真實的dom的開銷&…

【Docker】Docker的初步認識以及Ubuntu下的Docker環境安裝、配置

前言 在當今快速迭代的軟件開發與部署領域&#xff0c;容器化技術已成為不可或缺的核心力量&#xff0c;而 Docker 作為容器化技術的杰出代表&#xff0c;正以其輕量、高效、可移植的特性深刻改變著開發與運維的模式。它有效解決了 “在我機器上能運行&#xff0c;在你那里卻不…

【密碼學】2. 古典密碼

目錄2. 古典密碼2.1 經典加密技術基礎2.2 代換技術2.2.1 算術密碼2.2.2 代換密碼&#xff08;Substitution Cipher&#xff09;2.3 置換技術2.4 乘積密碼2.5 歷史上的教訓2. 古典密碼 2.1 經典加密技術基礎 分類 代換&#xff08;Substitution&#xff09;&#xff1a;明文內…

CSS3文本陰影特效全攻略

CSS3文本陰影效果實現 下面我將創建一個展示各種CSS3文本陰影效果的頁面&#xff0c;包含多種樣式示例和代碼實現。 設計思路 創建具有視覺吸引力的標題區域提供多種文本陰影效果實例顯示對應的CSS代碼以供參考添加交互元素讓用戶自定義效果 實現代碼 <!DOCTYPE html&g…

JavaScript 03 嚴格檢查模式Strict字符串類型詳解

2.4 嚴格檢查模式Strict在 JavaScript 里&#xff0c;也是 有 “作用域” 這個說法的。 所以說&#xff0c;變量 也分 全局變量 和 局部變量。 當我們 直接 把 代碼 寫在 script 雙標簽里面的時候&#xff0c;我們 JS 會認為 這只是 一個 沒有名字的 函數&#xff01;&#xff…

車載診斷ECU架構

我是穿拖鞋的漢子,魔都中堅持長期主義的汽車電子工程師。 老規矩,分享一段喜歡的文字,避免自己成為高知識低文化的工程師: 做到欲望極簡,了解自己的真實欲望,不受外在潮流的影響,不盲從,不跟風。把自己的精力全部用在自己。一是去掉多余,凡事找規律,基礎是誠信;二是…