Redis Stream Redisson Stream

目錄

    • 一、Redis Stream
      • 1.1 場景1:多個客戶端可以同時接收到消息
        • 1.1.1 XADD - 向stream添加Entry(發消息 )
        • 1.1.2 XREAD - 從stream中讀取Entry(收消息)
        • 1.1.3 XRANGE - 從stream指定區間讀取Entry(收消息)
      • 1.2 場景2:多個客戶端僅收到一部分消息(分片sharded、消費組group)
        • 1.2.1 XGROUP CREATE - 創建消費組
        • 1.2.2 XREADGROUP - 從消費組中讀取消息
        • 1.2.3 XACK - 確認消息
        • 1.2.4 XPENDING - 讀取PEL消息
        • 1.2.5 XCLAIM & XAUTOCLAIM - 轉移PEL中消息的所有權給其他消費者
        • 1.2.6 統計命令
      • 1.3 其他
    • 二、Redisson Stream

一、Redis Stream

之前介紹過Redis Pub/Sub相關內容,通過Redis Pub/Sub可以實現發布/訂閱消息傳遞范式,但是存在丟消息的可能,而本文介紹的Redis Stream是一種可用來實現 可靠消息隊列、支持消息分組(類似Kafka Group) 的數據結構。

關于Redis Stream的使用存在如下2個場景

  • 場景1: 多個客戶端可以同時接收到消息
  • 場景2: 多個客戶端僅收到一部分消息(分片sharded),例如發送消息A,B,C,客戶端1收到A,C,客戶端2收到B(參考Kafka group概念)。

關于場景1,則可參考XADD、XREAD、XRANGE等相關命令的使用,
關于場景2,則需要了解XGROUP CREATE、XREADGROUP、XACK等相關命令的使用。

1.1 場景1:多個客戶端可以同時接收到消息

場景1中相關命令XADD、XREAD、XRANGE的使用匯總如下圖:
在這里插入圖片描述

1.1.1 XADD - 向stream添加Entry(發消息 )

向stream添加Entry(多個key/value對),XADD命令格式:

XADD stream名稱 id key1 value1 key2 value2 …

其中id為此次entry的唯一ID,而key1 value1 key2 value2 …即為entry的具體內容,
id為*則表示由Redis自動生成ID:<millisecondsTime>-<sequenceNumber>
亦可明確指定id。

示例:

XADD mystream * name 羅 age 18
XADD mystream 1692632086370-0 name 劉 age 18
1.1.2 XREAD - 從stream中讀取Entry(收消息)

從stream中讀取entry,XREAD命令格式:

XREAD COUNT 最多讀取數量 BLOCK 阻塞等待毫秒數 STREAMS stream名稱 上次接收的id

通過XADD添加一條消息,多個執行XREAD的客戶端都會讀取到該消息,
XREAD會從參數中指定的 上次接收的id 之后開始讀取后續的消息,
上次接受的id 可設置為$,需配合BLOCK使用,表示僅讀取從阻塞開始后新添加的消息(即不關心歷史消息),
上次接受的id 可設置為+,需要Redis版本>=7.4 RC1,表示僅讀取最后一條消息。
阻塞等待的毫秒數 如果為0,則表示一直阻塞,直到讀取到一條消息。

示例:

# 從頭開始讀取1條消息
XREAD STREAMS mystream 0# 從頭開始讀取2條消息
XREAD COUNT 2 STREAMS mystream 0-0
# 從指定消息ID之后開始讀取2條消息
XREAD COUNT 2 STREAMS mystream 1692632086370-0# 最長阻塞5秒,最多讀取100條消息,僅讀取從阻塞開始后新添加的消息
XREAD BLOCK 5000 COUNT 100 STREAMS mystream $
# 繼續從上次接受的id之后繼續讀取
XREAD BLOCK 5000 COUNT 100 STREAMS mystream 1526999644174-3# 讀取最后一條消息(需要Redis版本>=7.4 RC1)
XREAD STREAM mystream +
1.1.3 XRANGE - 從stream指定區間讀取Entry(收消息)

從stream指定區間(起始ID范圍)正向讀取Entry,XRANGE命令格式:

XRANGE stream名稱 起始id 結束id COUNT 最多讀取數量

按起始到結束正向返回消息,
-表示最小ID,+表示最大ID

示例:

# 返回全部消息(從前到后依次返回)
XRANGE mystream - + 
# 返回5條消息(從前到后依次返回)
XRANGE mystream - + COUNT 5# 返回指定id(包括指定id)之后5條消息(從前到后依次返回)
XRANGE mystream 1718951980910-0 + COUNT 5# 返回指定id(不包括指定id)之后5條消息(從前到后依次返回)
XRANGE mystream (1718951980910-0 + COUNT 5

從stream指定區間(起始ID范圍)逆向讀取Entry,XREVRANGE命令格式:

XREVRANGE stream名稱 結束id 起始id COUNT 最多讀取數量

按結束到起始逆向返回消息。

示例:

返回全部消息(從后到前逆向依次返回)
XREVRANGE mystream + -
# 返回2條消息(從后到前逆向依次返回)
XREVRANGE mystream + - COUNT 2

1.2 場景2:多個客戶端僅收到一部分消息(分片sharded、消費組group)

場景2中相關命令XGROUP CREATE、XREADGROUP、XACK、XPENDING、XCLAIM等使用匯總如下圖:

在這里插入圖片描述

1.2.1 XGROUP CREATE - 創建消費組

給stream創建消費分組,分組間彼此隔離,分組內多個consumer會輪流消費消息(分片),XGROUP CREATE命令格式:

XGROUP CREATE stream名稱 group名稱 起始讀取id [MKSTREAM]

起始讀取id0,表示從頭開始讀取,
起始讀取id$,表示從最后一條消息之后開始讀取,
MKSTREAM子命令是可選的,表示自動創建stream。

示例:

# 為mystream創建分組mygroup1,且從最新消息開始消費XGROUP CREATE mystream mygroup1 $
1.2.2 XREADGROUP - 從消費組中讀取消息

以分組group讀取stream中的消息,group中每個客戶端需要指定consumer名稱,多個consumer分攤group中的消息,而多個group間彼此隔離,XREADGROUP命令格式:

XREADGROUP GROUP group名稱 consumer名稱 COUNT 最多讀取數量 BLOCK 阻塞等待毫秒數 [NOACK] STREAMS stream名稱 上次接收的id

PEL(Pending Entries List): 當使用XREADGROUP讀取分組下消息時,服務器會記住哪條消息發給了分組下的哪個消費者,該記錄存儲在消費者組中,稱為PEL,即已發送但尚未確認的消息ID列表。后續在消費者處理完消息后,消費者必須手動調用XACK命令對消息ID進行確認,以便從PEL中刪除掛起的消息,關于PEL的結構可參見下圖(截取自RedisInsight工具):
在這里插入圖片描述

上次接收的id>,表示消費者只希望接收從未傳遞給任何其他消費者的消息,即給我新的信息>號表示從當前消費組的last_delivered_id后面開始讀。
上次接收的id 設為0或其他有效的id,則表示僅讀取 PEL(當前consumer沒有確認的消息) 中指定id之后的消息。

NOACK子命令式可選的,表示無需確認消息,NOACK子命令適用于對可靠性要求不高、偶爾的消息丟失是可以接受的情況,使用NOACK子命令可以避免將消息添加到PEL( Pending Entries List),相當于在讀取消息后自動確認消息,后續無需再調用XACK命令進行確認,

示例:

# 消費者c1阻塞讀取mystream下分組mygroup1的最新消息(直到讀取到1條消息后解除阻塞)
XREADGROUP GROUP mygroup1 c1 BLOCK 0 STREAMS mystream ># 消費者c1讀取mystream下分組mygroup1的PEL消息(即已投遞給c1但c1未進行確認的消息列表)
XREADGROUP GROUP mygroup1 c1 STREAMS mystream 0
1.2.3 XACK - 確認消息

確認stream下指定分組group的某條消息已被成功消費,XACK命令格式:

XACK stream名稱 group名稱 消息id

示例:

# 確認1條消息 
XACK mystream mygroup1 1719206857966-0 # 同時確認3條消息
XACK mystream mygroup1 1719206857966-0 1719206909894-0 1719207195666-0
1.2.4 XPENDING - 讀取PEL消息

讀取stream中指定分組group的PEL掛起消息列表,XPENDING命令格式:

XPENDING stream名稱 group名稱 IDEL 空閑毫秒數 起始消息id 結束消息id 查詢數量 consumer名稱

示例:

# 查詢mystream下mygroup1分組的PEL列表
XPENDING mystream mygroup1# 查詢mystream下mygroup1分組下的消費者c1的空閑9秒的最多10條PEL消息
XPENDING mystream mygroup1 IDLE 9000 - + 10 c1
1.2.5 XCLAIM & XAUTOCLAIM - 轉移PEL中消息的所有權給其他消費者

通過XPENDING查詢出PEL消息(已投遞未確認)后,若原先消息對應的consumer已經掛掉,沒有能力繼續處理消息,則可通過XCLIAM將對應的消息轉移給同分組下的其他consumer進行處理,XCLAIM命令格式如下:

XCLAIM stream名稱 group名稱 consumer名稱 空閑時長毫秒 消息id1 消息id2

轉移后消息上次投遞時間會重置為當前時間(即消息空閑idle時間為0),
默認會返回已經轉移成功的消息內容,且消息投遞計數會加1,
也可添加JUSTID子命令,則只返回消息ID不返回消息內容,且消息投遞計數不變,
若多個客戶端同時通過XCLAIM轉移同一條消息的所有權,則只會有一個客戶端轉移成功。
Redis官方原文如下:

Note that the message is claimed only if its idle time is greater than the minimum idle time we specify when calling XCLAIM. Because as a side effect XCLAIM will also

  • reset the idle time (since this is a new attempt at processing the message),
  • two consumers trying to claim a message at the same time will never both succeed: only one will successfully claim the message. This avoids that we process a given message multiple times in a trivial way (yet multiple processing is possible and unavoidable in the general case).

示例:

# mystream下mygroup1分組下的PEL消息1526569498055-0且空閑時長超過1小時,則將其轉移給消費者c2
XCLAIM mystream mygroup1 c2 3600000 1526569498055-0

亦可通過XAUTOCLAIM將PEL中指定起始消息ID后的消息批量進行轉移,XAUTOCLIAM命令格式如下:

XAUTOCLAIM stream名稱 group名稱 consumer名稱 空閑時長毫秒 起始消息id COUNT 消息數量

示例:

# 掃描mystream下mygroup1分組下的所有PEL消息,空閑時長超過1小時,則最多轉移25條消息給消費者c2
XAUTOCLAIM mystream mygroup1 c2 3600000 0-0 COUNT 25
1.2.6 統計命令
# 查詢stream下的分組信息
XINFO GROUPS stream名稱# 查詢stream信息
XINFO STREAM stream名稱# 查詢stream下指定分組的消費者信息
XINFO CONSUMERS stream名稱 group名稱

1.3 其他

刪除stream中的消息:

XDEL stream名稱 id1 id2 …

查詢stream中的消息(entry)數量:

XLEN stream名稱

壓縮stream中的消息數據量:

XTRIM stream名稱 MAXLEN 保留的最近消息數量
XTRIM stream名稱 MINID 消息ID(小于此ID的消息均會被刪除)

二、Redisson Stream

在Redisson中可通過Stream實現Redis Stream,

場景1 相關示例代碼如下:

@Test
void testStream() throws InterruptedException {String streamName = "mystream";MyMessage2 myMessage = this.buildMyMessageWithTimestampId();//獲取StreamRStream<String, Object> stream = this.redisson.getStream(streamName);//發消息 - XADD mystream * name 我的消息 age 18StreamMessageId entryId = stream.add(StreamAddArgs.entries(myMessage.toMap()));log.info("stream[{}] add success, id: {}", streamName, entryId);//讀消息 - XREAD COUNT 5 BLOCK 5000 STREAMS mystream 0Map<StreamMessageId, Map<String, Object>> entries = stream.read(StreamReadArgs.greaterThan(StreamMessageId.ALL).count(5).timeout(Duration.ofSeconds(5)));entries.forEach((id, entryMap) -> {log.info("stream[{}] read message: id={}, entry: {}", streamName, id, entryMap);});//讀取區間內消息 - XRANGE mystream 0 entryId COUNT 10entries = stream.range(10, StreamMessageId.ALL, entryId);entries.forEach((id, entryMap) -> {log.info("stream[{}] range message: id={}, entry: {}", streamName, id, entryMap);});
}

場景2 相關示例代碼如下:

@Resource
private RedissonClient redisson;@Test
void testStreamGroup() throws InterruptedException {String streamName = "mystream";String groupName = "mygroup1";String consumerName = "c1";MyMessage2 myMessage = this.buildMyMessageWithTimestampId();//獲取StreamRStream<String, Object> stream = this.redisson.getStream(streamName);//發消息 - XADD mystream * name 我的消息 age 18StreamMessageId entryId = stream.add(StreamAddArgs.entries(myMessage.toMap()));log.info("stream[{}] add success, id: {}", streamName, entryId);//查詢已存在的分組 - XINFO GROUPS mystreamList<StreamGroup> streamGroups = stream.listGroups();streamGroups.forEach(streamGroup -> {log.info("stream[{}] listGroups groupName: {}", streamName, streamGroup.getName());});Boolean existGroup = streamGroups.stream().anyMatch(group -> groupName.equals(group.getName()));if (!existGroup) {//創建分組 - XGROUP CREATE mygroup1 $stream.createGroup(StreamCreateGroupArgs.name(groupName)//此處id支持:NEWEST即$,ALL即0.id(StreamMessageId.ALL));log.info("stream[{}] createGroup success, groupName: {}", streamName, groupName);}//讀分組消息 - XREADGROUP GROUP mygroup1 c1 COUNT 5 BLOCK 5000 STREAMS mystream >Map<StreamMessageId, Map<String, Object>> entries = stream.readGroup(groupName, consumerName,//greaterThan即設置從哪個消息ID之后開始讀取,支持:NEVER_DELIVERED即>、ALL即0StreamReadGroupArgs.greaterThan(StreamMessageId.NEVER_DELIVERED).count(5).timeout(Duration.ofSeconds(5)));entries.forEach((id, entryMap) -> {log.info("stream[{}] readGroup groupName: {}, consumerName: {}, message: id={}, entry: {}",streamName, groupName, consumerName, id, entryMap);});//讀取PEL中未確認的消息 - XPENDING mystream mygroup1 - + 100 c1Map<StreamMessageId, Map<String, Object>> streamMessageIdMapMap = stream.pendingRange(groupName, consumerName, StreamMessageId.MIN, StreamMessageId.MAX, 100);streamMessageIdMapMap.forEach((id, entryMap) -> {log.info("stream[{}] pendingRange groupName: {}, consumerName: {}, message: id={}, entry: {}",streamName, groupName, consumerName, id, entryMap);//確認消息(從PEL中移除) - XACK mystream mygroup1 1600000000000-0stream.ack(groupName, id);log.info("stream[{}] ack groupName: {}, consumerName: {}, message: id={}",streamName, groupName, consumerName, id);});}

參考:

Redis Stream
https://redis.io/docs/latest/develop/data-types/streams/
https://redis.io/docs/latest/commands/xreadgroup/

Redisson Stream
https://github.com/redisson/redisson/wiki/7.-Distributed-collections#720-stream

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/36745.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/36745.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/36745.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【DevExpress】WPF DevExpressMVVM 24.1版本開發指南

DevExpressMVVM WPF 環境安裝 前言重要Bug&#xff08;必看&#xff09;環境安裝控件目錄Theme 主題LoginWindow 登陸窗口INavigationService 導航服務DockLayout Dock類型的畫面布局TreeView 樹狀列表注意引用類型的時候ImageSource是PresentationCore程序集的博主找了好久&am…

[筆記] keytool 導入服務器證書和證書私鑰

背景 我當前手頭已有一個服務器證書和對應的私鑰&#xff0c;現在需要轉換為 Java KeyStore 格式使用&#xff0c;找了一大圈才發現 keytool 無法直接導入服務器證書和私鑰&#xff0c;當然證書可以直接導入&#xff0c;但是私鑰是無法直接導入。找了一大圈發現可以先將服務器…

LeetCode題解:1669. 合并兩個鏈表,JavaScript,詳細注釋

原題鏈接&#xff1a; https://leetcode.cn/problems/merge-in-between-linked-lists/ 解題思路&#xff1a; 注意該題傳入的a和b是鏈表的索引&#xff0c;而不是節點的值先遍歷list1&#xff0c;找到a-1和b1節點將a-1的next指向list2的頭節點在將list2的尾節點的next指向b1節…

Navicat 外網連接 mysql (1、通過SSH方式內網訪問 2、對外開放3306端口)

1、通過SSH方式內網訪問 直接常規方式使用IP、賬號密碼連接&#xff0c;失敗 SSH方式&#xff1a; 常規 選項卡中&#xff1a;localhost錄入數據庫賬號密碼 SSH 選項卡中&#xff1a;勾選使用SSH&#xff0c;輸入服務器IP、賬號、密碼 如果出現該錯誤&#xff0c;可能是服務器…

計算機網絡重點名詞解釋整理

名詞解釋 GPTVersion 一、網絡協議 網絡協議 數據交換的規則 組成&#xff1a;語義、語法、定時 二、DHCP DHCP 動態規劃主機配置協議 作用&#xff1a;讓計算機自動獲取IP地址 特點&#xff1a;即插即用&#xff0c;不需要手動設置 三、信號的基本調制方法以及定義 …

Windows下activemq開啟jmx

1.activemq版本信息 activemq&#xff1a;apache-activemq-5.18.4 2.Windows下activemq開啟jmx 1.進入activemq conf目錄&#xff0c;備份activemq.xml文件 2.編輯activemq.xml文件&#xff0c;在broker節點增加useJmx"true" <broker xmlns"http://active…

C++循環隊列 自定義queue

原理解析 看main部分的注釋&#xff0c;對照著函數&#xff0c;應該能看懂。 #include <iostream> class Queue {public:static constexpr int MAX_SIZE 5;int items[MAX_SIZE];int front, rear;Queue() : front(-1), rear(-1) {}void enqueue(int value) {if ((rear …

理解 Vue.js 中的 immediate: true

理解 Vue.js 中的 immediate: true 在使用 Vue.js 時&#xff0c;監聽器 (watchers) 是一種非常重要的工具&#xff0c;它允許我們觀察和響應數據的變化。在定義監聽器時&#xff0c;我們通常會在組件的 watch 選項中添加相關配置。immediate: true 是其中的一個配置選項。本文…

無線通訊幾種常規天線類別簡介

天線對于無線模塊來說至關重要&#xff0c;合適的天線可以優化通信網絡&#xff0c;增加其通信的范圍和可靠性。天線的選型對最后的模塊通信影響很大&#xff0c;不合適的天線會導致通信質量下降。針對不同的市場應用&#xff0c;天線的材質、安置方式、性能也大不一樣。下面簡…

近期計算機領域的熱點技術

隨著科技的飛速發展&#xff0c;計算機領域的新技術、新趨勢層出不窮。本文將探討近期計算機領域的幾個熱點技術趨勢&#xff0c;并對它們進行簡要的分析和展望。 一、人工智能與機器學習 人工智能&#xff08;AI&#xff09;和機器學習&#xff08;ML&#xff09;是近年來計算…

基于Vue 3.x與TypeScript的PPTIST本地部署與無公網IP遠程演示文稿

文章目錄 前言1. 本地安裝PPTist2. PPTist 使用介紹3. 安裝Cpolar內網穿透4. 配置公網地址5. 配置固定公網地址 前言 本文主要介紹如何在Windows系統環境本地部署開源在線演示文稿應用PPTist&#xff0c;并結合cpolar內網穿透工具實現隨時隨地遠程訪問與使用該項目。 PPTist …

[gpt胡說八道篇] 使用Docker快速啟動Doris

Docker 是一種輕量級的虛擬化技術&#xff0c;我們可以利用 Docker 快速的在本地啟動一個 Doris 的實例&#xff0c;方便進行開發和測試。下面我們來看一下如何操作。 1. 拉取 Docker 鏡像 首先&#xff0c;我們需要從 Docker Hub 上拉取 Doris 的鏡像。打開終端&#xff0c;輸…

Qt Qvariant

QVariant 是 Qt 框架中的一個非常強大的類&#xff0c;它用于存儲各種不同類型的數據&#xff0c;并提供了一種統一的方式來處理這些數據。QVariant 可以存儲大多數基本數據類型&#xff0c;如整數、浮點數、字符串、日期時間等&#xff0c;以及更復雜的數據類型&#xff0c;如…

ChatGPT的原理可以通俗易懂地介紹

ChatGPT的原理可以通俗易懂地介紹如下&#xff1a; 基礎架構&#xff1a; ChatGPT基于OpenAI的GPT&#xff08;Generative Pre-trained Transformer&#xff09;模型&#xff0c;尤其是GPT-3的架構進行構建。GPT模型是一種基于Transformer架構的預訓練語言模型&#xff0c;特別…

基于STM32的智能水質監測系統

目錄 引言環境準備智能水質監測系統基礎代碼實現&#xff1a;實現智能水質監測系統 4.1 數據采集模塊4.2 數據處理與分析4.3 控制系統實現4.4 用戶界面與數據可視化應用場景&#xff1a;水質管理與優化問題解決方案與優化收尾與總結 1. 引言 智能水質監測系統通過使用STM32嵌…

RISC-V知識總結 —— 向量(擴展)指令集

資源1:晏明 - RISC-V向量擴展指令架構及LLVM自動向量化支持 - 202112118 - 第13屆開源開發工具大會&#xff08;OSDTConf2021&#xff09;_嗶哩嗶哩_bilibili資源2:張先軼 - 基于RISC-V向量指令集優化基礎計算軟件生態【第12屆開源開發工具大會&#xff08;OSDT2020&#xff09…

設計模式(實際項目)-狀態機模式

需求背景&#xff1a;存在狀態流轉的預約單 一.數據庫設計 CREATE TABLE appointment (id bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 主鍵id,appoint_type int(11) NOT NULL COMMENT 預約類型(0:線下查房...),appoint_user_id bigint(20) NOT NULL COMMENT 預約人…

研導智能科技——AI輔助科研產品開發

人工智能&#xff08;AI&#xff09;技術的飛速發展為科研領域帶來了革命性的變化。本公司致力于開發基于人工智能的科研輔助產品&#xff0c;旨在通過智能化手段提高科研人員的工作效率和研究質量。目前&#xff0c;我們成功開發了研導學術平臺&#xff08;www.zhiyanxueshu.c…

Linux運維:MySQL數據庫(1)

1.信息與數據&#xff1a; 數據是信息的載體&#xff0c;信息是數據的內涵。數據庫就是存儲數據的倉庫&#xff0c;并長期存儲在計算機磁盤中&#xff0c;可由多個用戶和應用程序共享的數據集合&#xff0c;就是數據庫。 2.數據庫中的數據的特點&#xff1a; 2.1.數據是按照某…

RuleApp1.4.6文章社區客戶端 廣告聯盟支持Docx導入

支持編譯為安卓&#xff0c;蘋果&#xff0c;小程序&#xff0c;H5網頁的社區客戶端代碼&#xff0c;包括文章模塊&#xff0c;用戶模塊&#xff0c;動態模塊&#xff0c;支付模塊&#xff0c;聊天模塊&#xff0c;廣告模塊&#xff0c;商城模塊等基礎功能&#xff0c;包含VIP會…