使用redis 的stream 做消息中間件 多線程消費消息

1.redis stream 特點

1.支持消息持久化
2.消費者組模式
3.消息確認機制
4. 消息重試機制
5. 死信隊列

2. 消息生產者服務

2.1 如下代碼
@Service
@Slf4j
public class StreamMessageProducer {@Autowiredprivate StringRedisTemplate redisTemplate;private static final String STREAM_KEY = "message:stream";/*** 發送消息*/public String sendMessage(String topic, Object message) {try {StringRecord record = StreamRecords.string(Collections.singletonMap("message", JSON.toJSONString(message))).withStreamKey(STREAM_KEY + ":" + topic);RecordId recordId = redisTemplate.opsForStream().add(record);log.info("消息發送成功: topic={}, messageId={}", topic, recordId);return recordId.getValue();} catch (Exception e) {log.error("消息發送失敗: topic={}, message={}", topic, message, e);throw new RuntimeException("消息發送失敗", e);}}/*** 批量發送消息*/public List<String> sendMessages(String topic, List<Object> messages) {try {List<MapRecord<String, String, String>> records = messages.stream().map(msg -> StreamRecords.string(Collections.singletonMap("message", JSON.toJSONString(msg))).withStreamKey(STREAM_KEY + ":" + topic)).collect(Collectors.toList());List<String> messageIds = new ArrayList<>();for (MapRecord<String, String, String> record : records) {RecordId recordId = redisTemplate.opsForStream().add(record);messageIds.add(recordId.getValue());}log.info("批量消息發送成功: topic={}, count={}", topic, messageIds.size());return messageIds;} catch (Exception e) {log.error("批量消息發送失敗: topic={}", topic, e);throw new RuntimeException("批量消息發送失敗", e);}}
}

3.消息消費者服務(多線程消費)

3.1 代碼如下
@Service
@Slf4j
public class StreamMessageConsumer {@Autowiredprivate StringRedisTemplate redisTemplate;private static final String STREAM_KEY = "message:stream";private final Map<String, StreamMessageHandler<?>> handlers = new ConcurrentHashMap<>();/*** 注冊消息處理器*/public <T> void registerHandler(String topic, Class<T> messageType, Consumer<T> handler) {handlers.put(topic, new StreamMessageHandler<>(messageType, handler));}/*** 啟動消費*/@PostConstructpublic void startConsuming() {for (String topic : handlers.keySet()) {String streamKey = STREAM_KEY + ":" + topic;String consumerGroup = "group:" + topic;String consumerName = "consumer:" + UUID.randomUUID().toString();try {// 創建消費者組(如果不存在)createConsumerGroupIfNotExists(streamKey, consumerGroup);// 啟動消費線程Thread consumerThread = new Thread(() -> consumeMessages(streamKey, consumerGroup, consumerName, topic));consumerThread.setName("stream-consumer-" + topic);consumerThread.start();} catch (Exception e) {log.error("啟動消費者失敗: topic={}", topic, e);}}}private void createConsumerGroupIfNotExists(String streamKey, String groupName) {try {redisTemplate.opsForStream().createGroup(streamKey, groupName);} catch (Exception e) {// 組已存在,忽略異常log.debug("Consumer group already exists: {}", groupName);}}private void consumeMessages(String streamKey, String group, String consumer, String topic) {StreamMessageHandler<?> handler = handlers.get(topic);while (!Thread.currentThread().isInterrupted()) {try {// 讀取消息List<MapRecord<String, String, String>> records = redisTemplate.opsForStream().read(Consumer.from(group, consumer),StreamReadOptions.empty().count(10).block(Duration.ofSeconds(1)),StreamOffset.create(streamKey, ReadOffset.lastConsumed()));if (records != null && !records.isEmpty()) {for (MapRecord<String, String, String> record : records) {try {// 處理消息processMessage(record, handler);// 確認消息redisTemplate.opsForStream().acknowledge(streamKey, group, record.getId());} catch (Exception e) {log.error("消息處理失敗: messageId={}", record.getId(), e);}}}} catch (Exception e) {log.error("消息消費異常: topic={}", topic, e);try {Thread.sleep(1000);} catch (InterruptedException ie) {Thread.currentThread().interrupt();break;}}}}private <T> void processMessage(MapRecord<String, String, String> record, StreamMessageHandler<T> handler) {try {String messageJson = record.getValue().get("message");T message = JSON.parseObject(messageJson, handler.getMessageType());handler.getHandler().accept(message);} catch (Exception e) {log.error("消息處理失敗: {}", record, e);throw e;}}
}@Data
@AllArgsConstructor
class StreamMessageHandler<T> {private Class<T> messageType;private Consumer<T> handler;
}

4.消息重試服務

4.1 代碼如下
@Service
@Slf4j
public class StreamMessageRetryService {@Autowiredprivate StringRedisTemplate redisTemplate;private static final String STREAM_KEY = "message:stream";private static final int MAX_RETRY_COUNT = 3;/*** 處理待處理的消息*/@Scheduled(fixedDelay = 60000) // 每分鐘執行一次public void processPendingMessages() {for (String topic : getTopics()) {String streamKey = STREAM_KEY + ":" + topic;String groupName = "group:" + topic;try {// 獲取待處理的消息PendingMessages pending = redisTemplate.opsForStream().pending(streamKey, groupName, Range.unbounded(), 100);if (pending != null) {for (PendingMessage message : pending.getPendingMessages()) {processRetry(streamKey, groupName, message);}}} catch (Exception e) {log.error("處理待處理消息失敗: topic={}", topic, e);}}}private void processRetry(String streamKey, String groupName, PendingMessage message) {try {if (message.getTotalDeliveryCount() > MAX_RETRY_COUNT) {// 超過重試次數,移動到死信隊列moveToDeadLetter(streamKey, groupName, message.getIdAsString());} else {// 重新投遞消息redisTemplate.opsForStream().claim(streamKey, groupName, "retry-consumer", Duration.ofMinutes(1), message.getIdAsString());}} catch (Exception e) {log.error("處理重試消息失敗: messageId={}", message.getIdAsString(), e);}}private void moveToDeadLetter(String streamKey, String groupName, String messageId) {try {// 讀取消息內容List<MapRecord<String, String, String>> messages = redisTemplate.opsForStream().range(streamKey, Range.closed(messageId, messageId));if (messages != null && !messages.isEmpty()) {MapRecord<String, String, String> message = messages.get(0);// 存儲到死信隊列redisTemplate.opsForStream().add(streamKey + ":dead", message.getValue());// 確認原消息redisTemplate.opsForStream().acknowledge(streamKey, groupName, messageId);}} catch (Exception e) {log.error("移動消息到死信隊列失敗: messageId={}", messageId, e);}}
}

5.使用示例

5.1代碼如下
@Service
@Slf4j
public class MessageService {@Autowiredprivate StreamMessageProducer producer;@Autowiredprivate StreamMessageConsumer consumer;@PostConstructpublic void init() {// 注冊訂單消息處理器consumer.registerHandler("order", OrderMessage.class, this::processOrderMessage);// 注冊支付消息處理器consumer.registerHandler("payment", PaymentMessage.class, this::processPaymentMessage);}/*** 發送訂單消息*/public String sendOrderMessage(OrderMessage message) {return producer.sendMessage("order", message);}/*** 處理訂單消息*/private void processOrderMessage(OrderMessage message) {try {log.info("處理訂單消息: {}", message);// 處理訂單邏輯} catch (Exception e) {log.error("訂單消息處理失敗", e);throw e;}}/*** 處理支付消息*/private void processPaymentMessage(PaymentMessage message) {try {log.info("處理支付消息: {}", message);// 處理支付邏輯} catch (Exception e) {log.error("支付消息處理失敗", e);throw e;}}
}@Data
@AllArgsConstructor
class OrderMessage {private String orderId;private String status;private BigDecimal amount;
}@Data
@AllArgsConstructor
class PaymentMessage {private String paymentId;private String orderId;private BigDecimal amount;private String status;
}

6.監控和管理服務

6.1 代碼如下
@Service
@Slf4j
public class StreamMonitorService {@Autowiredprivate StringRedisTemplate redisTemplate;/*** 獲取Stream信息*/public StreamInfo getStreamInfo(String topic) {try {String streamKey = STREAM_KEY + ":" + topic;StreamInfo.StreamInfoBuilder builder = StreamInfo.builder();// 獲取消息總數Long length = redisTemplate.opsForStream().size(streamKey);builder.messageCount(length != null ? length : 0);// 獲取消費者組信息StreamInfo.GroupInfo groupInfo = getGroupInfo(streamKey);builder.groupInfo(groupInfo);// 獲取最新消息IDbuilder.lastMessageId(getLastMessageId(streamKey));return builder.build();} catch (Exception e) {log.error("獲取Stream信息失敗: topic={}", topic, e);throw new RuntimeException("獲取Stream信息失敗", e);}}/*** 清理過期消息*/@Scheduled(cron = "0 0 * * * *") // 每小時執行public void cleanupOldMessages() {try {for (String topic : getTopics()) {String streamKey = STREAM_KEY + ":" + topic;// 保留最近24小時的消息long maxLength = 1000000; // 最大消息數redisTemplate.opsForStream().trim(streamKey, maxLength, true);}} catch (Exception e) {log.error("清理過期消息失敗", e);}}
}@Data
@Builder
class StreamInfo {private long messageCount;private String lastMessageId;private GroupInfo groupInfo;@Data@Builderstatic class GroupInfo {private String name;private int consumerCount;private long pendingMessageCount;}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/62385.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/62385.shtml
英文地址,請注明出處:http://en.pswp.cn/web/62385.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Python100道練習題

Python100道練習題 BIlibili 1、兩數之和 num1 20 num2 22result num1 num2print(result)2、一百以內的偶數 list1 []for i in range(1,100):if i % 2 0:list1.append(i) print(list1)3、一百以內的奇數 # 方法一 list1 [] for i in range(1,100):if i % 2 ! 0:lis…

Java轉C之并發和多線程

提綱&#xff1a; 概念介紹與對比概述 簡述Java與C在并發和多線程方面的核心區別解釋C11標準、POSIX、C11 <threads.h>、Pthread等名詞 Java多線程與并發回顧 線程、Runnable、ExecutorService概念說明同步關鍵字與工具類含義 C并發基礎 沒有Java式的內置線程類&#xf…

Ubuntu系統本地化搭建Maxakb+Ollama

安裝docker 最詳細的ubuntu 安裝 docker教程-騰訊云開發者社區-騰訊云 安裝Ollama Ollama官網 執行命令&#xff1a; curl -fsSL https://ollama.com/install.sh | sh安裝完成后下載模型 執行命令&#xff1a; ollama run llama3.3:70b安裝MaxKb 執行命令&#xff1a; d…

基于JAVA的旅游網站系統設計

摘要 隨著信息技術和網絡技術的迅速發展&#xff0c;人們的生活質量和觀念也在發生著改變&#xff0c;各地爭相發展旅游業&#xff0c;傳統的 旅游社已經無法滿足人們的需求&#xff0c;旅游網站將突破傳統在時間和地域的限制&#xff0c;成為方便、快捷、安全、可靠的旅游 方…

【Flux.jl】 卷積神經網絡

Flux.jl 是包含卷積神經網絡的, 但是官方API文件中沒有給出一個完整的程序框架, 只是對所需神經元給了局部解釋, 此外對 model-zoo 模型動物園中的案例沒有及時跟著 Flux.jl 的版本更新, 也無法運行出來結果。 因此本文搭建了一個完整可訓練的卷積神經網絡。 Conv 卷積算子…

H5游戲出海如何獲得更多增長機會?

海外H5小游戲的崛起給了國內眾多中小廠商出海發展的機會&#xff0c;開發者如何在海外市場獲得更多的增長機會&#xff1f;#APP出海# H5游戲如何在海外獲得核心用戶&#xff1f; HTML5游戲的開發與運營者們首先可以利用量多質高的HTML5游戲&#xff0c;維持海外用戶粘性&…

Next.js系統性教學:深入理解和應用組件組合模式

更多有關Next.js教程&#xff0c;請查閱&#xff1a; 【目錄】Next.js 獨立開發系列教程-CSDN博客 目錄 更多有關Next.js教程&#xff0c;請查閱&#xff1a; 1. 什么是組件組合模式&#xff1f; 1.1 組件組合模式概述 1.2 組件組合模式的優勢 2. Next.js 中的組件組合模式…

國際薦酒師Peter助力第六屆地博會,推動地理標志產品國際化發展

國際薦酒師Peter Lisicky助力第六屆知交會暨地博會&#xff0c;推動地理標志產品國際化發展 第六屆粵港澳大灣區知識產權交易博覽會暨國際地理標志產品交易博覽會于2024年12月9日至11日在中新廣州知識城盛大舉行&#xff0c;吸引了全球眾多行業專家、企業代表及相關機構齊聚一…

Mybatis 延遲加載的實現原理詳細解析

Mybatis 延遲加載的實現原理詳細解析 &#xff08;1&#xff09;代理對象機制的深入探討 代理對象的生成&#xff1a;Mybatis 使用代理對象來實現延遲加載是基于 Java 的代理機制。當開啟延遲加載并且配置正確后&#xff0c;對于需要延遲加載的關聯對象&#xff0c;Mybatis 會…

2024 亞馬遜云科技re:Invent:Werner Vogels架構哲學,大道至簡 六大經驗助力架構優化

在2024亞馬遜云科技re:Invent全球大會第四天的主題演講中&#xff0c;亞馬遜副總裁兼CTO Dr.Werner Vogels分享了 The Way of Simplexity&#xff0c;繁簡之道&#xff0c;濃縮了Werner在亞馬遜20年構建架構的經驗。 Werner表示&#xff0c;復雜性總是會“悄無聲息”地滲透進來…

Java Web 開發學習中:過濾器與 Ajax 異步請求

一、過濾器 Filter&#xff1a; 過濾器的概念與用途 在一個龐大的 Web 應用中&#xff0c;有許多資源需要受到保護或進行特定的預處理。過濾器就像是一位智能的守衛&#xff0c;站在資源的入口處&#xff0c;根據預先設定的規則&#xff0c;決定哪些請求可以順利訪問資源&…

ThinkPHP框架審計--基礎

基礎入門 搭建好thinkphp 查看版本方法&#xff0c;全局搜version 根據開發手冊可以大致了解該框架的路由 例如訪問url http://127.0.0.1:8094/index.php/index/index/index 對應代碼位置 例如在代碼下面添加新方法 那么訪問這個方法的url就是 http://127.0.0.1:8094/index.…

淺談Python庫之?Requests

一、?Requests的介紹 Requests 是一個簡單易用的 HTTP 庫&#xff0c;用于發送各種 HTTP 請求。它由 Kenneth Reitz 創建&#xff0c;并廣泛用于 Python 社區中。 二、?Requests的特點 1、人性化的 API&#xff1a;簡潔的接口使得編寫請求代碼變得簡單直觀。 2、跨平臺&…

如何在vue中使用ECharts

一. 打開ECharts官網,點擊快速入門 下面是ECharts官網的鏈接 https://echarts.apache.org/ 二.在vue中使用 1.首先先引入Echarts js文件 如下圖&#xff0c;下面的第一張圖片是官網的實現&#xff0c;第二章圖片是我根據官網的實現 2.給ECharts 創建一個DOM容器 3. 使用ec…

網絡原理之 IP 協議

目錄 1. IP 協議報文格式 2. 網段劃分 3. 地址管理 1) 動態分配 2) NAT 機制 (網絡地址轉換) 3) IPv6 4. 路由選擇 1. IP 協議報文格式 IP 協議是網絡層的重點協議。 網絡層要做的事情&#xff0c;主要就是兩方面&#xff1a; 1) 地址管理 制定一系列的規則&#xff…

HyperMesh CFD功能詳解:后處理功能Part 2

Clips Clips 按鈕包含兩個工具。Box Clip用于空間上的裁剪&#xff0c;Scalar Clip可以根據物理量的范圍裁剪。 示例&#xff1a;Box Clips 裁剪 示例&#xff1a;Scalar Clips 裁剪 通過裁剪&#xff0c;僅顯示density范圍是10~20的等值面 示例&#xff1a;顯示效果控制 部分透…

Java項目實戰II基于微信小程序的跑腿系統(開發文檔+數據庫+源碼)

目錄 一、前言 二、技術介紹 三、系統實現 四、核心代碼 五、源碼獲取 全棧碼農以及畢業設計實戰開發&#xff0c;CSDN平臺Java領域新星創作者&#xff0c;專注于大學生項目實戰開發、講解和畢業答疑輔導。獲取源碼聯系方式請查看文末 一、前言 在快節奏的現代生活中&…

【機器學習與數據挖掘實戰案例01】基于支持向量回歸的市財政收入分析

【作者主頁】Francek Chen 【專欄介紹】 ? ? ?機器學習與數據挖掘實戰 ? ? ? 機器學習是人工智能的一個分支&#xff0c;專注于讓計算機系統通過數據學習和改進。它利用統計和計算方法&#xff0c;使模型能夠從數據中自動提取特征并做出預測或決策。數據挖掘則是從大型數…

windows下nacos啟動報錯:java.lang.unsatisfiedLinkError: C:\USers\亂碼AppData\xxx.dll

問題 看了許多別的帖子&#xff0c;大家都是因為缺少dll包&#xff0c;下載安裝 Microsoft Visual C 2015 Redistributable 就可以。但我試過了不行。思來想去&#xff0c;之前正常的時候用的JDK版本是17&#xff0c;后面別的項目用1.8給切換回來了。然后嘗試配置環境變量將JD…

JavaEE 【知識改變命運】03 多線程(3)

文章目錄 多線程帶來的風險-線程安全線程不安全的舉例分析產出線程安全的原因&#xff1a;1.線程是搶占式的2. 多線程修改同一個變量&#xff08;程序的要求&#xff09;3. 原子性4. 內存可見性5. 指令重排序 總結線程安全問題產生的原因解決線程安全問題1. synchronized關鍵字…