本文為個人學習筆記整理,僅供交流參考,非專業教學資料,內容請自行甄別
文章目錄
- 概述
- 一、生產者端操作
- 二、消費者端操作
- 三、消費組操作
- 四、狀態查詢操作
- 五、確認消息
- 六、消息隊列的選擇
概述
??Stream是Redis5.0推出的支持多播的可持久化的消息隊列
:
圖片來源:圖靈學院
??如上圖所示,Stream依舊是key,value的形式,key對應的是隊列的名稱,而value的結構則是上圖的鏈表,其主要的結構:
- ID:每條消息都有一個唯一的ID,如果沒有指定,則使用Redis自帶的生成策略,格式為
當前的毫秒級別時間戳-該毫秒時間點內的消息序號
,是單調遞增的。 - Consumer Group:消費組,一個Stream中可以包含多個消費組,而每個消費組又由多個消費者組成。每個消費組是互相獨立的,共同消費隊列中的消息。
- last_delivered_id:是一個游標,表示當前消費組已經消費到哪條消息了。同一個消費組中的任何一個消費者讀取了消息都會使last_delivered_id往前移動。
- Consumer:消費者,同一個消費組中的消費者是競爭關系,并能在組內由唯一的名稱。
- pending_ids[]:用于記錄當前客戶端已經讀取,但是尚未ACK的消息。如果客戶端沒有 ack,這個變量里面的消息 ID 會越來越多。是Stream的ACK機制的實現,保證消息的可靠投遞。
- Message Content:消息內容,和其他類型的存儲格式類似,都是key-value的格式。
一、生產者端操作
??向隊列中添加一條消息,會返回消息的ID,隊列不存在則會創建。
- xadd stream:創建隊列。
- streamtest1:隊列的名稱,前綴需要加上stream。
- *:自動生成隊列中消息的ID 一定是單調有序自增的。
127.0.0.1:6381> xadd streamtest1 * name zhangsan age 18
"1751714750056-0"
??查看隊列的長度:
127.0.0.1:6381> xlen streamtest1
??xrange streamtest1 - + 查看隊列中所有的元素:
- xrange:關鍵字
- streamtest1 :隊列名稱
- -:表示查詢范圍的最小值,可以指定消息ID
- +:表示查詢范圍的最大值,可以指定消息ID
1) 1) "1751714750056-0"2) 1) "name"2) "zhangsan"3) "age"4) "18"
2) 1) "1751714996108-0"2) 1) "name"2) "lisi"3) "age"4) "20"
3) 1) "1751715003212-0"2) 1) "name"2) "wanger"3) "age"4) "25"
??xdel 刪除指定ID的消息。
127.0.0.1:6381> xdel streamtest1 1751715003212-0
(integer) 1
二、消費者端操作
??從隊列中讀取消息:
- xread:從隊列讀取消息
- count:讀取消息的條數
- streams:關鍵字
- streamtest1:隊列名稱
- 0-0:指定讀取消息的范圍,可以指定ID
127.0.0.1:6381> xread count 1 streams streamtest1 0-0
1) 1) "streamtest1"2) 1) 1) "1751714750056-0"2) 1) "name"2) "zhangsan"3) "age"4) "18"
??可以指定讀取的范圍:
127.0.0.1:6381> xread count 1 streams streamtest1 1751714750056-0
1) 1) "streamtest1"2) 1) 1) "1751714996108-0"2) 1) "name"2) "lisi"3) "age"4) "20"
??從隊列的尾部讀取數據,加$,但是默認不返回數據:
127.0.0.1:6381> xread count 1 streams streamtest1 $
(nil)
??需要配合使用block阻塞 + 另一個生產者寫入一個新消息,:
??生產者發送消息:
127.0.0.1:6381> xadd streamtest1 * name zhaoliu age 17
"1751716196234-0"
??消費者阻塞等待生產者發送消息:
127.0.0.1:6381> xread block 0 count 1 streams streamtest1 $
1) 1) "streamtest1"2) 1) 1) "1751716196234-0"2) 1) "name"2) "zhaoliu"3) "age"4) "17"
(44.14s)
??一般來說客戶端如果想要使用 xread 進行順序消費,一定要記住當前消費到哪里了,也就是返回的消息
ID。下次繼續調用 xread 時,將上次返回的最后一個消息 ID 作為參數傳遞進去,就可以繼續消費后續的
消息。
三、消費組操作
??創建消費者群組:
- XGROUP create:創建消費者群組
- streamtest1:隊列名稱
- group1:群組名稱
- 0-0:從消息隊列頭部進行消費,如果是$ 從尾部消費
127.0.0.1:6381> XGROUP create streamtest1 group1 0-0
OK
127.0.0.1:6381> XGROUP create streamtest1 group2 $
OK
??消費者群組讀取消息,同樣支持阻塞讀取
- XREADGROUP group:關鍵字,群組從隊列讀取消息
- group1:群組名稱
- g1:具體的消費者名稱,群組內唯一
- count:關鍵字,讀取的數量
- streams:關鍵字
- streamtest1:隊列名稱
127.0.0.1:6381> XREADGROUP group group1 g1 count 1 streams streamtest1 >
1) 1) "streamtest1"2) 1) 1) "1751714750056-0"2) 1) "name"2) "zhangsan"3) "age"4) "18"
??在讀取前,隊列中的狀態,group1中的consumers和pending都是0。
127.0.0.1:6381> XINFO groups streamtest1
1) 1) "name"2) "group1"3) "consumers"4) (integer) 05) "pending"6) (integer) 07) "last-delivered-id"8) "0-0"
??讀取之后,有了一個消費者,并且待確認的數量為1。
127.0.0.1:6381> XINFO groups streamtest1
1) 1) "name"2) "group1"3) "consumers"4) (integer) 15) "pending"6) (integer) 17) "last-delivered-id"8) "1751714750056-0"
四、狀態查詢操作
??查詢隊列的狀態:
- XINFO stream:關鍵字
- streamtest1:隊列的名稱
127.0.0.1:6381> XINFO stream streamtest11) "length" # 長度2) (integer) 33) "radix-tree-keys" 4) (integer) 15) "radix-tree-nodes"6) (integer) 27) "groups" # 隊列中群組個數8) (integer) 29) "last-generated-id" # 最后生成消息的ID
10) "1751716196234-0"
11) "first-entry" # 隊列中第一個元素
12) 1) "1751714750056-0"2) 1) "name"2) "zhangsan"3) "age"4) "18"
13) "last-entry" # 隊列中最后一個元素
14) 1) "1751716196234-0"2) 1) "name"2) "zhaoliu"3) "age"4) "17"
??查詢消費者組的信息:
- XINFO groups:關鍵字
- streamtest1:隊列的名稱
127.0.0.1:6381> XINFO groups streamtest1
1) 1) "name" # 群組名稱2) "group1"3) "consumers" # 消費者4) (integer) 05) "pending" # 待消費數量6) (integer) 07) "last-delivered-id" 8) "0-0"
2) 1) "name"2) "group2"3) "consumers"4) (integer) 05) "pending"6) (integer) 07) "last-delivered-id"8) "1751716196234-0"
??查詢某個消費者組中消費者的信息:
127.0.0.1:6381> XINFO consumers streamtest1 group1
1) 1) "name"2) "g1"3) "pending" #代表查詢了消息,但是沒有確認的數量4) (integer) 15) "idle"6) (integer) 398240
五、確認消息
??當我使用group1消費者組中的g1消費者對streamtest1隊列中的元素進行消費時,會返回一個Id和具體的元素信息:
??此時我還沒有手動ack,查詢消費者的信息,pending為1代表待確認。
??隊列中所有的元素:
??確認消息的命令:
- XACK:消息確認
- streamtest1:隊列名稱
- group1:分組名稱
- 1751714996108-0:待確認的消息的ID
127.0.0.1:6381> XACK streamtest1 group1 1751714996108-0
(integer) 1
??再去查詢消費者的信息,發現pending變成了0:
??再次獲取消息,獲取的是已消費元素的下一個消息
,當隊列中所有的消息都ack之后,再次嘗試獲取消息,獲取到的是nil。
??消息的ack,僅僅是將消息從 消費者組的 Pending Entries List中移除,消息仍保留在 Stream 主體中,直到被主動刪除。
六、消息隊列的選擇
??基于Redis實現消息隊列通常有以下的方式:
- List結構的lpush + brpop。
- PUB/SUB模式。
- Stream消息隊列。
??如果一定需要使用Redis實現消息隊列的功能,推薦使用Stream實現。前兩者都有比較明顯的弊端:
- lpush + brpop的方案,如果線程一直阻塞,超過了一定的時間,客戶端會斷開連接,那么執行POP命令的線程就會拋出異常。并且消息的消費是點到點的,不支持分組消費,以及廣播模式,重復消費。
- PUB/SUB模式的方案,如果發布者發送消息時,訂閱者不在線,那么這條消息就會丟失。并且無法存儲消息。Pub/Sub 模式不適合做消息存儲,消息積壓類的業務,而是擅長處理廣播,即時通訊,即時反饋的業務。
??使用Redis stream的注意點:
- Stream的消息過多怎么辦?限制隊列的長度,xadd 可以指定參數maxLen 防止隊列爆滿。
- 消費者沒有對消息ack怎么辦?消費組中的消費者有一個pending_ids的集合,沒有ack,這個集合會越變越長。盡可能快速消費并ack。
- 出現死信問題怎么辦?某個消息任意消費者消費都會出現異常,無法ack。通過xpending查詢投遞次數,超過一定的次數就認為是死信,執行xdel命令刪除消息。
- 如何保證高可用?集群部署Redis,或者使用主從 + 哨兵迷失
- 如何進行消息分區?自己通過一致性hash算法實現。