Redis Stream:輕量級消息隊列深度解析

📨 Redis Stream:輕量級消息隊列深度解析

文章目錄

  • 📨 Redis Stream:輕量級消息隊列深度解析
  • 🧠 一、Stream 數據結構解析
    • 💡 Stream 核心概念
    • 📋 Stream 底層結構
  • ? 二、消息生產與消費
    • 🚀 消息生產(XADD)
    • 📥 消息消費(XREAD)
    • 🆔 消息ID機制
  • 🔄 三、消費組機制詳解
    • 💡 消費組核心概念
    • 🛠? 消費組管理命令
    • ? 消費組實戰示例
  • 📊 四、Redis Stream vs Kafka
    • 📋 特性對比表
    • 🎯 適用場景對比
    • 🔧 技術選型建議
  • 🚀 五、實戰應用案例
    • 🛒 案例1:訂單處理隊列
    • ? 案例2:延遲消息實現
    • 🔄 案例3:消息回溯與重放
  • 💡 六、總結與最佳實踐
    • 🎯 適用場景總結
    • 🔧 生產環境建議
    • 🚀 性能優化技巧

🧠 一、Stream 數據結構解析

💡 Stream 核心概念

Redis Stream 是 Redis 5.0 引入的??持久化消息數據結構????,它提供了完整的消息隊列功能,包括消息持久化、消費組、消息確認等特性。

Stream
Entry 1
Entry 2
Entry 3
...
ID: 1640995200000-0
Field1: Value1
Field2: Value2

Stream 核心特性??:

  • 📝 ??消息持久化??:所有消息持久化存儲
  • 🔄 ??消費組支持??:多個消費組獨立消費
  • ? ??消息確認??:確保消息至少消費一次
  • ? ??消息回溯??:支持歷史消息重新消費
  • 🚀 ??高性能??:基于內存的高吞吐量

📋 Stream 底層結構

??Stream 內部實現??:

// Redis Stream 底層結構
typedef struct stream {rax *rax;               // 基數樹存儲消息uint64_t length;        // 消息數量streamID last_id;       // 最后消息IDrax *cgroups;           // 消費組
} stream;// 消息ID結構
typedef struct streamID {uint64_t ms;            // 時間戳uint64_t seq;           // 序列號
} streamID;

??消息存儲格式??:

+----------+----------+----------+----------+
| 消息ID    | 字段1     | 字段2     | 字段3     |
+----------+----------+----------+----------+
| 1640995200000-0 | name:張三 | age:25 | city:北京 |
+----------+----------+----------+----------+

? 二、消息生產與消費

🚀 消息生產(XADD)

??基本消息生產??:

# 添加消息到流(自動生成ID)
XADD orders * user_id 1001 product_id 2001 quantity 2# 輸出:1640995200000-0(生成的消息ID)# 指定消息ID
XADD orders 1640995200000-1 user_id 1002 product_id 2002 quantity 1# 限制Stream長度(近似修剪)
XADD orders MAXLEN ~ 1000 * user_id 1003 product_id 2003 quantity 3

??Java 生產者示例??:

public class StreamProducer {private Jedis jedis;public String produceMessage(String streamKey, Map<String, String> message) {// 自動生成消息IDreturn jedis.xadd(streamKey, StreamEntryID.NEW_ENTRY, message);}public String produceMessageWithId(String streamKey, String id, Map<String, String> message) {// 指定消息IDreturn jedis.xadd(streamKey, new StreamEntryID(id), message);}
}

📥 消息消費(XREAD)

??獨立消費者模式??:

# 讀取所有可用消息
XREAD STREAMS orders 0# 從指定ID開始讀取
XREAD STREAMS orders 1640995200000-0# 阻塞讀取新消息(最多等待5000ms)
XREAD BLOCK 5000 STREAMS orders $# 批量讀取多條消息
XREAD COUNT 10 STREAMS orders 0

??消費者組示例??:

public class StreamConsumer {private Jedis jedis;public void consumeMessages(String streamKey, String consumerGroup, String consumerName) {while (true) {// 阻塞讀取消息List<Entry<String, List<StreamEntry>>> messages = jedis.xreadGroup(consumerGroup, consumerName, XReadGroupParams.xReadGroupParams().block(5000),Collections.singletonMap(streamKey, StreamEntryID.UNRECEIVED_ENTRY));for (StreamEntry entry : messages.get(0).getValue()) {processMessage(entry);// 確認消息處理完成jedis.xack(streamKey, consumerGroup, entry.getID());}}}private void processMessage(StreamEntry entry) {Map<String, String> fields = entry.getFields();System.out.println("處理消息: " + fields);}
}

🆔 消息ID機制

??ID生成策略??:

# 時間戳-序列號格式
1640995200000-0  # 2022年1月1日 00:00:00 的第0條消息
1640995201000-0  # 1秒后的第0條消息
1640995201000-1  # 同一毫秒的第1條消息# 特殊ID含義
0-0              # 從開始讀取
$                # 只讀取新消息

??ID操作示例??:

# 查詢消息范圍
XRANGE orders 1640995200000-0 1640995201000-0# 反向查詢
XREVRANGE orders + - COUNT 10# 刪除特定消息
XDEL orders 1640995200000-0

🔄 三、消費組機制詳解

💡 消費組核心概念

Stream
Consumer Group
Consumer 1
Consumer 2
Consumer 3
Message 1
Message 4
Message 2
Message 5
Message 3
Message 6

🛠? 消費組管理命令

??創建消費組??:

# 創建消費組,從Stream開頭消費
XGROUP CREATE orders order-group 0# 創建消費組,只消費新消息
XGROUP CREATE orders order-group $# 刪除消費組
XGROUP DESTROY orders order-group# 查看Stream信息
XINFO STREAM orders# 查看消費組信息
XINFO GROUPS orders# 查看消費者信息
XINFO CONSUMERS orders order-group

??消費者操作??:

# 從消費組讀取消息
XREADGROUP GROUP order-group consumer1 COUNT 10 STREAMS orders ># 確認消息處理
XACK orders order-group 1640995200000-0# 查看待處理消息
XPENDING orders order-group# 認領超時消息
XCLAIM orders order-group consumer2 3600000 1640995200000-0

? 消費組實戰示例

??Java 消費組實現??:

public class ConsumerGroupExample {public void startConsumer(String stream, String group, String consumer) {// 初始化消費組initConsumerGroup(stream, group);while (true) {try {// 讀取消息List<Map.Entry<String, List<StreamEntry>>> messages = jedis.xreadGroup(group, consumer, XReadGroupParams.xReadGroupParams().block(1000),Collections.singletonMap(stream, ">"));for (StreamEntry entry : messages.get(0).getValue()) {processMessage(entry);// 確認消息jedis.xack(stream, group, entry.getID());}} catch (Exception e) {handleError(e);}}}private void initConsumerGroup(String stream, String group) {try {jedis.xgroupCreate(stream, group, null, false);} catch (Exception e) {// 消費組可能已存在System.out.println("消費組已存在: " + e.getMessage());}}
}

📊 四、Redis Stream vs Kafka

📋 特性對比表

特性Redis StreamApache Kafka優勢方
部署復雜度極簡(內置Redis)復雜(需要ZooKeeper)Redis
消息持久化支持(但受內存限制)支持(磁盤持久化)Kafka
吞吐量極高(10萬+/秒)高(10萬+/秒)相當
延遲極低(亞毫秒級)低(毫秒級)Redis
消息保留基于內存限制基于時間和大小Kafka
消費組支持支持相當
分區支持有限(單個Stream)完整支持Kafka
生態工具較少豐富Kafka
適用規模中小規模(GB級)大規模(TB級)Kafka

🎯 適用場景對比

低延遲/簡單部署
大規模/高持久化
中等規模/平衡需求
消息場景需求
需求分析
選擇Redis Stream
選擇Kafka
根據團隊熟悉度選擇
實時通知
聊天應用
任務隊列
日志收集
事件溯源
流處理

🔧 技術選型建議

??選擇 Redis Stream 當??:

  • ? 需要極低的消息延遲
  • ? 希望簡單部署和維護
  • ? 數據量在內存可容納范圍內
  • ? 已經使用 Redis 基礎設施

??選擇 Kafka 當??:

  • ? 需要處理海量消息(TB級別)
  • ? 需要長時間消息保留
  • ? 需要強大的流處理生態
  • ? 有專業的運維團隊

🚀 五、實戰應用案例

🛒 案例1:訂單處理隊列

??訂單隊列架構??:

XADD
減庫存
處理支付
發送通知
訂單服務
orders_stream
訂單消費組
庫存服務
支付服務
通知服務
庫存DB
支付DB
消息服務

??訂單生產者??:

public class OrderProducer {public void createOrder(Order order) {Map<String, String> fields = new HashMap<>();fields.put("order_id", order.getId());fields.put("user_id", order.getUserId());fields.put("amount", order.getAmount().toString());fields.put("status", "created");String messageId = jedis.xadd("orders_stream", StreamEntryID.NEW_ENTRY, fields);log.info("訂單消息已發送: {}", messageId);}
}

??庫存消費者??:

public class InventoryConsumer {public void processOrders() {while (true) {List<Map.Entry<String, List<StreamEntry>>> messages = jedis.xreadGroup("order_group", "inventory_consumer", XReadGroupParams.xReadGroupParams().block(1000),Collections.singletonMap("orders_stream", ">"));for (StreamEntry entry : messages.get(0).getValue()) {try {reduceInventory(entry.getFields());jedis.xack("orders_stream", "order_group", entry.getID());} catch (Exception e) {log.error("處理庫存失敗: {}", entry.getID(), e);}}}}
}

? 案例2:延遲消息實現

??延遲消息方案??:

XADD
定時檢查
XADD
生產者
延遲隊列
等待
消費者
就緒隊列
真實消費者

??延遲消息實現??:

public class DelayedMessageService {private ScheduledExecutorService scheduler;public void sendDelayedMessage(String stream, Map<String, String> message, long delayMs) {// 存儲到延遲隊列String id = jedis.xadd("delayed:" + stream, StreamEntryID.NEW_ENTRY, message);// 定時任務處理延遲scheduler.schedule(() -> {// 從延遲隊列移動到就緒隊列moveToReadyQueue(stream, id);}, delayMs, TimeUnit.MILLISECONDS);}private void moveToReadyQueue(String stream, String messageId) {// 讀取延遲消息List<StreamEntry> entries = jedis.xrange("delayed:" + stream, messageId, messageId);if (!entries.isEmpty()) {StreamEntry entry = entries.get(0);// 添加到就緒隊列jedis.xadd(stream, StreamEntryID.NEW_ENTRY, entry.getFields());// 刪除延遲消息jedis.xdel("delayed:" + stream, messageId);}}
}

🔄 案例3:消息回溯與重放

??消息重放服務??:

public MessageReplayService {public void replayMessages(String stream, String startId, String endId) {// 創建臨時消費組用于重放String replayGroup = "replay_" + System.currentTimeMillis();jedis.xgroupCreate(stream, replayGroup, new StreamEntryID(startId), false);// 重放消息List<StreamEntry> messages = jedis.xrange(stream, startId, endId);for (StreamEntry message : messages) {processReplayMessage(message.getFields());}// 清理臨時消費組jedis.xgroupDestroy(stream, replayGroup);}
}

💡 六、總結與最佳實踐

🎯 適用場景總結

??適合使用 Redis Stream??:

場景推薦度理由
實時通知系統???低延遲,簡單易用
任務隊列???持久化,消費組支持
聊天消息???時序消息,快速存取
事件溯源??消息回溯能力
日志收集?中小規模日志
金融交易?需要更強持久化保證

🔧 生產環境建議

??配置優化??:

# redis.conf 優化配置
maxmemory 4gb
maxmemory-policy allkeys-lru
stream-node-max-bytes 4096
stream-node-max-entries 100# 監控配置
slowlog-log-slower-than 10000
latency-monitor-threshold 100

??監控指標??:

# 監控Stream狀態
redis-cli xinfo stream orders_stream# 監控消費組
redis-cli xinfo groups orders_stream# 監控內存使用
redis-cli info memory | grep used_memory_stream# 監控消息積壓
redis-cli xlen orders_stream

??故障處理??:

# 處理消息積壓
# 1. 增加消費者數量
# 2. 調整消費組參數
# 3. 清理過期消息# 處理內存不足
# 1. 設置Stream最大長度
# 2. 啟用Stream修剪
# 3. 監控內存使用

🚀 性能優化技巧

??1. 批量處理??:

// 批量讀取消息
List<Map.Entry<String, List<StreamEntry>>> messages = jedis.xreadGroup(group, consumer, XReadGroupParams.xReadGroupParams().count(100).block(1000),Collections.singletonMap(stream, ">"));// 批量確認消息
for (StreamEntry entry : messages) {pendingAck.add(entry.getID());
}
jedis.xack(stream, group, pendingAck.toArray(new StreamEntryID[0]));

??2. 內存優化??:

# 定期修剪Stream
XADD orders MAXLEN ~ 10000 * field1 value1# 監控大Stream
redis-cli memory usage orders_stream

??3. 消費者優化??:

// 消費者負載均衡
public class ConsumerBalancer {public static String getConsumerName(String serviceId) {return "consumer_" + serviceId + "_" + ThreadLocalRandom.current().nextInt(1000);}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/96177.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/96177.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/96177.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Android studio的adb和終端的adb互相搶占端口

在Android Studio調試時&#xff0c;有時候也需要借助終端的adb命令&#xff0c;他們互相搶占端 口&#xff0c;導致調試麻煩解決如下&#xff1a;① 終端adb的版本是&#xff1a;1.0.39路徑是:/usr/lib/android-sdk/platform-tools/adb② Android Studio使用的adb來源于Androi…

GEO服務商推薦:移山科技以劃時代高精尖技術引領AI搜索優化新紀元

引言&#xff1a;AI搜索生態重塑與GEO優化戰略地位躍升AI技術對信息檢索范式的顛覆GEO優化在企業增長中的核心作用第一章&#xff1a;AI搜索新紀元的企業營銷挑戰與機遇生成式AI成為用戶主要信息入口的行業趨勢企業在AI搜索中的“答案主權”爭奪戰GEO優化服務商的核心能力模型&…

Android SystemServer 系列專題【AttentionManagerService】

AttentionManagerService是framework中用來實現屏幕感知的一個系統級服務&#xff0c;他繼承于systemserver。我們可以通過dumpsys attention來獲取他的一些信息。如下針對屏幕感知的功能的引入來針對這個服務進行一個介紹。1、屏幕感知Settings UI實現屏幕感知的功能在A14上面…

nginx 反向代理使用變量的坑

nginx采用反向代理的時候使用變量的坑 正常情況&#xff1a; location ~ ^/prod-api(?<rest>/.*)?$ {# 假設 $mes_backend 形如: http://127.0.0.1:16889proxy_pass $mes_backend$rest$is_args$args;proxy_http_version 1.1;proxy_set_header Host $host;…

Origin繪制徑向條形圖|科研論文圖表教程

數據排列格式截圖&#xff0c;請查看每張圖↘右下角水印 目錄 數據排列格式截圖&#xff0c;請查看每張圖↘右下角水印 本 期 導 讀 No.1 理解圖形 1 定義 2 特點 3 適用場景 No.2 畫圖教程 1 導入數據&#xff0c;繪制圖形 2 設置繪圖細節 本 期 導 讀 徑…

MySQL InnoDB 的 MVCC 機制

前言 多版本并發控制&#xff08;MVCC&#xff09;是 MySQL InnoDB 存儲引擎實現高性能事務的核心機制。它通過創建數據快照&#xff0c;使得讀寫操作可以無鎖并發&#xff0c;極大地提升了數據庫的并發性能。本文將深入探討 MVCC 的工作原理、實現細節以及它與事務隔離級別的緊…

景區負氧離子氣象站:引領綠色旅游,暢吸清新每一刻

在綠色旅游成為消費主流的今天&#xff0c;游客對 “清新空氣” 的需求不再是模糊的期待&#xff0c;而是可感知、可選擇的具體體驗。景區負氧離子氣象站的出現&#xff0c;正以科技之力重塑綠色旅游格局&#xff0c;讓 “暢吸清新每一刻” 從口號變為觸手可及的現實&#xff0…

Pytorch筆記一之 cpu模型保存、加載與推理

Pytorch筆記一之 cpu模型保存、加載與推理 1.保存模型 首先&#xff0c;在加載模型之前&#xff0c;我們需要了解如何保存模型。PyTorch 提供了兩種保存模型的方法&#xff1a;保存整個模型和僅保存模型的狀態字典&#xff08;state dict&#xff09;。推薦使用第二種方式&…

當AI在代碼車間組裝模塊:初級開發者的創意反成「核心算法」

前言&#xff1a;哈嘍&#xff0c;大家好&#xff0c;今天給大家分享一篇文章&#xff01;并提供具體代碼幫助大家深入理解&#xff0c;徹底掌握&#xff01;創作不易&#xff0c;如果能幫助到大家或者給大家一些靈感和啟發&#xff0c;歡迎收藏關注哦 &#x1f495; 目錄當AI在…

技術視界 | 跨域機器人通信與智能系統:打破壁壘的開源探索

8 月 16 日&#xff0c;在 OpenLoong 社區舉辦的第九期線下分享會上&#xff0c;國家地方共建人形機器人創新中心的軟件開發負責人 Amadeus 博士帶來了一場主題為“跨域機器人通信與智能系統&#xff1a;打破行業壁壘的創新方案”的演講。深入探討了當前機器人領域的一個關鍵痛…

Android入門到實戰(八):從發現頁到詳情頁——跳轉、傳值與RecyclerView多類型布局

一. 引言在上一篇文章里&#xff0c;我們從零開始實現了 App 的 發現頁面&#xff0c;通過網絡請求獲取數據&#xff0c;并使用 RecyclerView 展示了劇集列表。但光有發現頁還不夠&#xff0c;用戶在點擊一部劇時&#xff0c;自然希望進入到一個更詳細的頁面&#xff0c;去查看…

【工具】41K star!網頁一鍵變桌面應用

項目中遇到了一個需要將現有的 web 頁面打包成一個 桌面應用 的需求。 最一開始想到的是 Electron&#xff0c;但是它還需要一些開發工作并且打包后的應用體積比較大&#xff0c;調研后發現了開源工具 Pake。 它能讓你用最輕量的方式&#xff0c;把任何網頁一鍵打包成跨平臺桌…

浪潮CD1000-移動云電腦-RK3528芯片-2+32G-安卓9-2種開啟ADB ROOT刷機教程方法

浪潮CD1000-移動云電腦-RK3528芯片-232G-安卓9-2種開啟ADB ROOT刷機教程方法 往期文章&#xff1a; 浪潮CD1000-移動云電腦-RK3528芯片-232G-安卓9-開啟ADB ROOT破解教程 地址1&#xff1a;浪潮CD1000-移動云電腦-RK3528芯片-232G-開啟ADB ROOT破解教程-CSDN博客 中國移動浪潮…

Day23_【機器學習—聚類算法—K-Means聚類 及評估指標SSE、SC、CH】

一、聚類算法概念屬于無監督學習算法&#xff0c;即有特征無標簽&#xff0c;根據樣本之間的相似性&#xff0c;將樣本劃分到不同的類別中。所謂相似性可以理解為歐氏距離、曼哈頓距離、切比雪夫距離... 。分類按顆粒度分為&#xff1a;粗聚類、細聚類。按實現方法分為&#xf…

android seekbar顯示刻度

SeekBar簡介 SeekBar是Android中的一個可交互UI組件&#xff0c;允許用戶通過拖動滑塊在特定范圍內選擇數值。繼承自ProgressBar&#xff0c;但增加了用戶手動調節功能&#xff0c;常用于音量控制、亮度調節等場景。 核心屬性 android:maxHeight // 背景高度 android:progres…

【高并發內存池】五、頁緩存的設計

文章目錄Ⅰ. page cache頁緩存的結構設計Ⅱ. 完善central cache中的 get_span() 函數Ⅲ. 實現頁緩存獲取span對象的接口Ⅰ. page cache頁緩存的結構設計 ? 首先頁緩存還是一個哈希桶的結構&#xff0c;但是和前兩者不同的是&#xff0c;頁緩存的哈希桶中存放的是一個或者多個…

Elasticsearch(text和keyword)區別分析

text:全文檢索類型,經過分詞處理,支持模糊匹配? keyword:精確匹配類型,適用于聚合、排序和過濾? text 1. 核心屬性 ?analyzer屬性?: 指定用于索引和搜索的分詞器 默認使用標準分析器(Standard Analyzer) 示例:"analyzer": "ik_max_word"(中文…

通過tailscale實現一臺電腦上vscode通過ssh連接另一臺電腦上的VMware Linux 虛擬機

當需要通過一臺windows電腦上的vscode來ssh連接另一臺電腦上的linux虛擬機進行遠程操作&#xff0c;可以通過tailscale來實現。 Linux虛擬機上安裝tailscale 由于掛代理下載仍然很慢&#xff0c;而清華鏡像源又沒有tailscale的軟件包&#xff0c;所以可以通過下載 DEB 包安裝…

[Upscayl圖像增強] docs | 前端 | Electron工具(web->app)

鏈接&#xff1a;https://upscayl.org/docs&#xff1a;Upscayl Upscayl是一款桌面應用程序&#xff0c;允許用戶使用人工智能放大和增強圖像。 提供了一個用戶友好的圖形界面&#xff08;渲染器用戶界面&#xff09;&#xff0c;用戶可以選擇圖像或文件夾&#xff0c;從多種AI…

阿里云通義MoE全局均衡技術:突破專家負載失衡的革新之道

MoE模型的基本原理與核心價值 混合專家模型&#xff08;Mixture of Experts&#xff0c;MoE&#xff09;是當前AI大模型領域最重要的架構創新之一&#xff0c;其核心思想是通過多個“專家”網絡協同處理輸入數據&#xff0c;并由門控網絡動態選擇或組合各個專家的輸出&#xf…