什么是Stream?
Stream 實際上是一個具有消息發布/訂閱功能的組件,也就常說的消息隊列。其實這種類似于 broker/consumer(生產者/消費者)的數據結構很常見,比如 RabbitMQ 消息中間件、Celery 消息中間件,以及 Kafka 分布式消息系統等,而 Redis Stream 正是借鑒了 Kafaka 系統。
1) 優點
Strean 除了擁有很高的性能和內存利用率外, 它最大的特點就是提供了消息的持久化存儲,以及主從復制功能,從而解決了網絡斷開、Redis 宕機情況下,消息丟失的問題,即便是重啟 Redis,存儲的內容也會存在。
2) 流程
Stream 消息隊列主要由四部分組成,分別是:消息本身、生產者、消費者和消費組,對于前述三者很好理解,下面了解什么是消費組。
一個 Stream 隊列可以擁有多個消費組,每個消費組中又包含了多個消費者,組內消費者之間存在競爭關系。當某個消費者消費了一條消息時,同組消費者,都不會再次消費這條消息。被消費的消息 ID 會被放入等待處理的 Pending_ids 中。每消費完一條信息,消費組的游標就會向前移動一位,組內消費者就繼續去爭搶下消息。
?Redis Stream 消息隊列結構程如下圖所示:
?
下面對上圖涉及的專有名詞做簡單解釋:
- Stream direction:表示數據流,它是一個消息鏈,將所有的消息都串起來,每個消息都有一個唯一標識 ID 和對應的消息內容(Message content)。
- Consumer Group :表示消費組,擁有唯一的組名,使用 XGROUP CREATE 命令創建。一個 Stream 消息鏈上可以有多個消費組,一個消費組內擁有多個消費者,每一個消費者也有一個唯一的 ID 標識。
- last_delivered_id :表示消費組游標,每個消費組都會有一個游標 last_delivered_id,任意一個消費者讀取了消息都會使游標 last_delivered_id 往前移動。
- pending_ids :Redis 官方稱為 PEL,表示消費者的狀態變量,它記錄了當前已經被客戶端讀取的消息?ID,但是這些消息沒有被 ACK(確認字符)。如果客戶端沒有 ACK,那么這個變量中的消息 ID 會越來越多,一旦被某個消息被 ACK,它就開始減少。
3) ACK?
ACK(Acknowledge character)即確認字符,在數據通信中,接收方傳遞給發送方的一種傳輸類控制字符。表示發來的數據已確認接收無誤。在 TCP/IP 協議中,如果接收方成功的接收到數據,那么會回復一個 ACK 數據。通常 ACK 信號有自己固定的格式,長度大小,由接收方回復給發送方。
常用命令匯總
命令 | 說明 |
---|---|
XADD? | 添加消息到末尾。 |
XTRIM | 對 Stream 流進行修剪,限制長度。 |
XDEL | 刪除指定的消息。 |
XLEN | 獲取流包含的元素數量,即消息長度。 |
XRANGE | 獲取消息列表,會自動過濾已經刪除的消息。 |
XREVRANGE? | 反向獲取消息列表,ID 從大到小。 |
XREAD | 以阻塞或非阻塞方式獲取消息列表。 |
XGROUP CREATE | 創建消費者組。 |
XREADGROUP GROUP | 讀取消費者組中的消息。 |
XACK | 將消息標記為"已處理"。 |
XGROUP SETID | 為消費者組設置新的最后遞送消息ID。 |
XGROUP DELCONSUMER | 刪除消費者。 |
XGROUP DESTROY | 刪除消費者組。 |
XPENDING | 顯示待處理消息的相關信息。 |
XCLAIM? | 轉移消息的歸屬權。 |
XINFO | 查看 Stream 流、消費者和消費者組的相關信息。 |
XINFO GROUPS | 查看消費者組的信息。 |
XINFO STREAM? | 查看 Stream 流信息。 |
XINFO?CONSUMERS key group | 查看組內消費者流信息。 |
創建消息ID
當創建一個 Srteam 時, 需要創建消息 ID,該 ID 是唯一、不可重復的,并且只增不減。消息 ID 有兩種創建方式,一是系統自動生成,二是自定義創建。
1) 系統自動創建
語法格式如下:
XADD key ID field value [field value ...]
參數說明如下:
- key :指定隊列名稱,如果不存就創建;
- ID :消息 id,我們使用
*
表示由 redis 生成,可以自定義,但是要自己保證遞增性; - field value :消息記錄。
返回值是毫秒時間戳格式的字符串。比如?1610619132674-2,它表示在該毫秒內產生的第 2 條消息。使用示例:
XADD mystream * username cc 10
2) 自定義ID
自定義 ID 比較簡單,但是需要注意的是 ID 的形式必須是 “整數”,并且后面加入消息的 ID 必須大于前面消息的 ID,也就是自定義 ID 也必須遵守遞增的規則。示例如下:
XADD mystream1 001 name zhangsan addr hebei
創建消費組
Redis Stream通過XGROUP CREATE
指令創建消費組(Consumer Group),在創建時,需要傳遞起始消息的 ID 用來初始化 last_delivered_id 變量。語法格式如下:
<span style="color:#444444">XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]</span>
參數說明如下:
- key :指定 Stream 隊列名稱,若不存在則自動創建。
- groupname :自定義消費組的名稱,不可重復。
- $ :表示從尾部開始消費,只接受新消息,而當前 Stream 的消息則被忽略。
消費消息
Redis Stream 通過XREADGROUP
命令使消費組消費信息,它和XREAD
命令一樣,都可以阻塞等待新消息。讀到新消息后,對應的消息 ID 就會進入消費者的 PLE(正在處理的消息)結構里,客戶端處理完畢后使用 XACK 命令通知 Redis 服務器,本條消息已經處理完畢,該消息的 ID 就會從 PEL 中移除。示意圖如下
XREADGROUP
命令的語法格式如下所示:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]??
參數說明如下:
- group :消費組名稱。
- consumer :消費者名稱。
- count : 要讀取的數量。
- milliseconds : 阻塞時間,以毫秒為單位。
- key :? 鍵指定的隊列名稱。
- ID : 表示消息 ID。