目錄
- 一、Redis Stream
- 1.1 場景1:多個客戶端可以同時接收到消息
- 1.1.1 XADD - 向stream添加Entry(發消息 )
- 1.1.2 XREAD - 從stream中讀取Entry(收消息)
- 1.1.3 XRANGE - 從stream指定區間讀取Entry(收消息)
- 1.2 場景2:多個客戶端僅收到一部分消息(分片sharded、消費組group)
- 1.2.1 XGROUP CREATE - 創建消費組
- 1.2.2 XREADGROUP - 從消費組中讀取消息
- 1.2.3 XACK - 確認消息
- 1.2.4 XPENDING - 讀取PEL消息
- 1.2.5 XCLAIM & XAUTOCLAIM - 轉移PEL中消息的所有權給其他消費者
- 1.2.6 統計命令
- 1.3 其他
- 二、Redisson Stream
一、Redis Stream
之前介紹過Redis Pub/Sub相關內容,通過Redis Pub/Sub可以實現發布/訂閱消息傳遞范式,但是存在丟消息的可能,而本文介紹的Redis Stream是一種可用來實現 可靠消息隊列、支持消息分組(類似Kafka Group) 的數據結構。
關于Redis Stream的使用存在如下2個場景
- 場景1: 多個客戶端可以同時接收到消息
- 場景2: 多個客戶端僅收到一部分消息(分片sharded),例如發送消息A,B,C,客戶端1收到A,C,客戶端2收到B(參考Kafka group概念)。
關于場景1,則可參考XADD、XREAD、XRANGE
等相關命令的使用,
關于場景2,則需要了解XGROUP CREATE、XREADGROUP、XACK
等相關命令的使用。
1.1 場景1:多個客戶端可以同時接收到消息
場景1中相關命令XADD、XREAD、XRANGE
的使用匯總如下圖:
1.1.1 XADD - 向stream添加Entry(發消息 )
向stream添加Entry(多個key/value對),XADD命令格式:
XADD
stream名稱 id key1 value1 key2 value2 …
其中id為此次entry的唯一ID,而key1 value1 key2 value2 …即為entry的具體內容,
id為*
則表示由Redis自動生成ID:<millisecondsTime>-<sequenceNumber>
,
亦可明確指定id。
示例:
XADD mystream * name 羅 age 18
XADD mystream 1692632086370-0 name 劉 age 18
1.1.2 XREAD - 從stream中讀取Entry(收消息)
從stream中讀取entry,XREAD命令格式:
XREAD
COUNT
最多讀取數量BLOCK
阻塞等待毫秒數STREAMS
stream名稱 上次接收的id
通過XADD添加一條消息,多個執行XREAD的客戶端都會讀取到該消息,
XREAD會從參數中指定的 上次接收的id 之后開始讀取后續的消息,
上次接受的id 可設置為$
,需配合BLOCK使用,表示僅讀取從阻塞開始后新添加的消息(即不關心歷史消息),
上次接受的id 可設置為+
,需要Redis版本>=7.4 RC1,表示僅讀取最后一條消息。
阻塞等待的毫秒數 如果為0,則表示一直阻塞,直到讀取到一條消息。
示例:
# 從頭開始讀取1條消息
XREAD STREAMS mystream 0# 從頭開始讀取2條消息
XREAD COUNT 2 STREAMS mystream 0-0
# 從指定消息ID之后開始讀取2條消息
XREAD COUNT 2 STREAMS mystream 1692632086370-0# 最長阻塞5秒,最多讀取100條消息,僅讀取從阻塞開始后新添加的消息
XREAD BLOCK 5000 COUNT 100 STREAMS mystream $
# 繼續從上次接受的id之后繼續讀取
XREAD BLOCK 5000 COUNT 100 STREAMS mystream 1526999644174-3# 讀取最后一條消息(需要Redis版本>=7.4 RC1)
XREAD STREAM mystream +
1.1.3 XRANGE - 從stream指定區間讀取Entry(收消息)
從stream指定區間(起始ID范圍)正向讀取Entry,XRANGE命令格式:
XRANGE
stream名稱 起始id 結束idCOUNT
最多讀取數量
按起始到結束正向返回消息,
-
表示最小ID,+
表示最大ID
示例:
# 返回全部消息(從前到后依次返回)
XRANGE mystream - +
# 返回5條消息(從前到后依次返回)
XRANGE mystream - + COUNT 5# 返回指定id(包括指定id)之后5條消息(從前到后依次返回)
XRANGE mystream 1718951980910-0 + COUNT 5# 返回指定id(不包括指定id)之后5條消息(從前到后依次返回)
XRANGE mystream (1718951980910-0 + COUNT 5
從stream指定區間(起始ID范圍)逆向讀取Entry,XREVRANGE命令格式:
XREVRANGE
stream名稱 結束id 起始idCOUNT
最多讀取數量
按結束到起始逆向返回消息。
示例:
返回全部消息(從后到前逆向依次返回)
XREVRANGE mystream + -
# 返回2條消息(從后到前逆向依次返回)
XREVRANGE mystream + - COUNT 2
1.2 場景2:多個客戶端僅收到一部分消息(分片sharded、消費組group)
場景2中相關命令XGROUP CREATE、XREADGROUP、XACK、XPENDING、XCLAIM
等使用匯總如下圖:
1.2.1 XGROUP CREATE - 創建消費組
給stream創建消費分組,分組間彼此隔離,分組內多個consumer會輪流消費消息(分片),XGROUP CREATE命令格式:
XGROUP CREATE
stream名稱 group名稱 起始讀取id [MKSTREAM
]
起始讀取id 為0
,表示從頭開始讀取,
起始讀取id 為$
,表示從最后一條消息之后開始讀取,
MKSTREAM
子命令是可選的,表示自動創建stream。
示例:
# 為mystream創建分組mygroup1,且從最新消息開始消費XGROUP CREATE mystream mygroup1 $
1.2.2 XREADGROUP - 從消費組中讀取消息
以分組group讀取stream中的消息,group中每個客戶端需要指定consumer名稱,多個consumer分攤group中的消息,而多個group間彼此隔離,XREADGROUP
命令格式:
XREADGROUP GROUP
group名稱 consumer名稱COUNT
最多讀取數量BLOCK
阻塞等待毫秒數 [NOACK
]STREAMS
stream名稱 上次接收的id
PEL(Pending Entries List): 當使用XREADGROUP
讀取分組下消息時,服務器會記住哪條消息發給了分組下的哪個消費者,該記錄存儲在消費者組中,稱為PEL,即已發送但尚未確認的消息ID列表。后續在消費者處理完消息后,消費者必須手動調用XACK命令對消息ID進行確認,以便從PEL中刪除掛起的消息,關于PEL的結構可參見下圖(截取自RedisInsight工具):
上次接收的id 為>
,表示消費者只希望接收從未傳遞給任何其他消費者的消息,即給我新的信息,>
號表示從當前消費組的last_delivered_id后面開始讀。
上次接收的id 設為0
或其他有效的id,則表示僅讀取 PEL(當前consumer沒有確認的消息) 中指定id之后的消息。
NOACK
子命令式可選的,表示無需確認消息,NOACK子命令適用于對可靠性要求不高、偶爾的消息丟失是可以接受的情況,使用NOACK子命令可以避免將消息添加到PEL( Pending Entries List),相當于在讀取消息后自動確認消息,后續無需再調用XACK命令進行確認,
示例:
# 消費者c1阻塞讀取mystream下分組mygroup1的最新消息(直到讀取到1條消息后解除阻塞)
XREADGROUP GROUP mygroup1 c1 BLOCK 0 STREAMS mystream ># 消費者c1讀取mystream下分組mygroup1的PEL消息(即已投遞給c1但c1未進行確認的消息列表)
XREADGROUP GROUP mygroup1 c1 STREAMS mystream 0
1.2.3 XACK - 確認消息
確認stream下指定分組group的某條消息已被成功消費,XACK
命令格式:
XACK
stream名稱 group名稱 消息id
示例:
# 確認1條消息
XACK mystream mygroup1 1719206857966-0 # 同時確認3條消息
XACK mystream mygroup1 1719206857966-0 1719206909894-0 1719207195666-0
1.2.4 XPENDING - 讀取PEL消息
讀取stream中指定分組group的PEL掛起消息列表,XPENDING
命令格式:
XPENDING
stream名稱 group名稱IDEL
空閑毫秒數 起始消息id 結束消息id 查詢數量 consumer名稱
示例:
# 查詢mystream下mygroup1分組的PEL列表
XPENDING mystream mygroup1# 查詢mystream下mygroup1分組下的消費者c1的空閑9秒的最多10條PEL消息
XPENDING mystream mygroup1 IDLE 9000 - + 10 c1
1.2.5 XCLAIM & XAUTOCLAIM - 轉移PEL中消息的所有權給其他消費者
通過XPENDING查詢出PEL消息(已投遞未確認)后,若原先消息對應的consumer已經掛掉,沒有能力繼續處理消息,則可通過XCLIAM將對應的消息轉移給同分組下的其他consumer進行處理,XCLAIM
命令格式如下:
XCLAIM
stream名稱 group名稱 consumer名稱 空閑時長毫秒 消息id1 消息id2
轉移后消息上次投遞時間會重置為當前時間(即消息空閑idle時間為0),
默認會返回已經轉移成功的消息內容,且消息投遞計數會加1,
也可添加JUSTID
子命令,則只返回消息ID不返回消息內容,且消息投遞計數不變,
若多個客戶端同時通過XCLAIM轉移同一條消息的所有權,則只會有一個客戶端轉移成功。
Redis官方原文如下:
Note that the message is claimed only if its idle time is greater than the minimum idle time we specify when calling XCLAIM. Because as a side effect XCLAIM will also
- reset the idle time (since this is a new attempt at processing the message),
- two consumers trying to claim a message at the same time will never both succeed: only one will successfully claim the message. This avoids that we process a given message multiple times in a trivial way (yet multiple processing is possible and unavoidable in the general case).
示例:
# mystream下mygroup1分組下的PEL消息1526569498055-0且空閑時長超過1小時,則將其轉移給消費者c2
XCLAIM mystream mygroup1 c2 3600000 1526569498055-0
亦可通過XAUTOCLAIM
將PEL中指定起始消息ID后的消息批量進行轉移,XAUTOCLIAM
命令格式如下:
XAUTOCLAIM
stream名稱 group名稱 consumer名稱 空閑時長毫秒 起始消息idCOUNT
消息數量
示例:
# 掃描mystream下mygroup1分組下的所有PEL消息,空閑時長超過1小時,則最多轉移25條消息給消費者c2
XAUTOCLAIM mystream mygroup1 c2 3600000 0-0 COUNT 25
1.2.6 統計命令
# 查詢stream下的分組信息
XINFO GROUPS stream名稱# 查詢stream信息
XINFO STREAM stream名稱# 查詢stream下指定分組的消費者信息
XINFO CONSUMERS stream名稱 group名稱
1.3 其他
刪除stream中的消息:
XDEL
stream名稱 id1 id2 …
查詢stream中的消息(entry)數量:
XLEN
stream名稱
壓縮stream中的消息數據量:
XTRIM
stream名稱MAXLEN
保留的最近消息數量
XTRIM
stream名稱MINID
消息ID(小于此ID的消息均會被刪除)
二、Redisson Stream
在Redisson中可通過Stream實現Redis Stream,
場景1 相關示例代碼如下:
@Test
void testStream() throws InterruptedException {String streamName = "mystream";MyMessage2 myMessage = this.buildMyMessageWithTimestampId();//獲取StreamRStream<String, Object> stream = this.redisson.getStream(streamName);//發消息 - XADD mystream * name 我的消息 age 18StreamMessageId entryId = stream.add(StreamAddArgs.entries(myMessage.toMap()));log.info("stream[{}] add success, id: {}", streamName, entryId);//讀消息 - XREAD COUNT 5 BLOCK 5000 STREAMS mystream 0Map<StreamMessageId, Map<String, Object>> entries = stream.read(StreamReadArgs.greaterThan(StreamMessageId.ALL).count(5).timeout(Duration.ofSeconds(5)));entries.forEach((id, entryMap) -> {log.info("stream[{}] read message: id={}, entry: {}", streamName, id, entryMap);});//讀取區間內消息 - XRANGE mystream 0 entryId COUNT 10entries = stream.range(10, StreamMessageId.ALL, entryId);entries.forEach((id, entryMap) -> {log.info("stream[{}] range message: id={}, entry: {}", streamName, id, entryMap);});
}
場景2 相關示例代碼如下:
@Resource
private RedissonClient redisson;@Test
void testStreamGroup() throws InterruptedException {String streamName = "mystream";String groupName = "mygroup1";String consumerName = "c1";MyMessage2 myMessage = this.buildMyMessageWithTimestampId();//獲取StreamRStream<String, Object> stream = this.redisson.getStream(streamName);//發消息 - XADD mystream * name 我的消息 age 18StreamMessageId entryId = stream.add(StreamAddArgs.entries(myMessage.toMap()));log.info("stream[{}] add success, id: {}", streamName, entryId);//查詢已存在的分組 - XINFO GROUPS mystreamList<StreamGroup> streamGroups = stream.listGroups();streamGroups.forEach(streamGroup -> {log.info("stream[{}] listGroups groupName: {}", streamName, streamGroup.getName());});Boolean existGroup = streamGroups.stream().anyMatch(group -> groupName.equals(group.getName()));if (!existGroup) {//創建分組 - XGROUP CREATE mygroup1 $stream.createGroup(StreamCreateGroupArgs.name(groupName)//此處id支持:NEWEST即$,ALL即0.id(StreamMessageId.ALL));log.info("stream[{}] createGroup success, groupName: {}", streamName, groupName);}//讀分組消息 - XREADGROUP GROUP mygroup1 c1 COUNT 5 BLOCK 5000 STREAMS mystream >Map<StreamMessageId, Map<String, Object>> entries = stream.readGroup(groupName, consumerName,//greaterThan即設置從哪個消息ID之后開始讀取,支持:NEVER_DELIVERED即>、ALL即0StreamReadGroupArgs.greaterThan(StreamMessageId.NEVER_DELIVERED).count(5).timeout(Duration.ofSeconds(5)));entries.forEach((id, entryMap) -> {log.info("stream[{}] readGroup groupName: {}, consumerName: {}, message: id={}, entry: {}",streamName, groupName, consumerName, id, entryMap);});//讀取PEL中未確認的消息 - XPENDING mystream mygroup1 - + 100 c1Map<StreamMessageId, Map<String, Object>> streamMessageIdMapMap = stream.pendingRange(groupName, consumerName, StreamMessageId.MIN, StreamMessageId.MAX, 100);streamMessageIdMapMap.forEach((id, entryMap) -> {log.info("stream[{}] pendingRange groupName: {}, consumerName: {}, message: id={}, entry: {}",streamName, groupName, consumerName, id, entryMap);//確認消息(從PEL中移除) - XACK mystream mygroup1 1600000000000-0stream.ack(groupName, id);log.info("stream[{}] ack groupName: {}, consumerName: {}, message: id={}",streamName, groupName, consumerName, id);});}
參考:
Redis Stream
https://redis.io/docs/latest/develop/data-types/streams/
https://redis.io/docs/latest/commands/xreadgroup/
Redisson Stream
https://github.com/redisson/redisson/wiki/7.-Distributed-collections#720-stream