Reids消息隊列(Message Queue)
消息隊列 是指利用 高效可靠 的 消息傳遞機制 進行與平臺無關的 數據交流,并基于數據通信來進行分布式系統的集成。
消息隊列具有 低耦合、可靠投遞、廣播、流量控制、最終一致性 等功能。
常見的消息隊列 有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ 等。
通過提供 消息傳遞 和 消息排隊 模型,它可以在 分布式環境 下提供 應用解耦、彈性伸縮、冗余存儲、流量削峰、異步通信、數據同步 等功能。
基于List結構的模擬消息隊列
Redis的list數據結構是一個雙向鏈表,使用出隊入隊即可實現消息隊列。
LPUSH 、RPOP 或 RPUSH 、 LPOP
LPUSH:將一個或多個值value插入到列表key的表頭如果有多個value值,那么各個value值按從左到右的順序依次插入到表頭。
RPOP:移除并返回列表key的尾元素。
RPUSH與LPOP同理。
通過 LPUSH,RPOP 這樣的方式,會存在一個性能風險點,就是消費者如果想要及時的處理數據,就要在程序中寫個類似 while(true) 這樣的邏輯,不停地去調用 RPOP 或 LPOP 命令,這就會給消費者程序帶來些不必要的性能損失。
LPUSH、BRPOP 或 RPUSH、BLPOP
Redis 還提供了 BLPOP、BRPOP 這種阻塞式讀取的命令(帶 B-Bloking的都是阻塞式),客戶端在沒有讀到隊列數據時,自動阻塞,直到有新的數據寫入隊列,再開始讀取新數據。這種方式就節省了不必要的 CPU 開銷。
數據存入格式:lpush listname v1 v2 v3
v 表示存入鏈表的值
阻塞等待指令格式:blpop list_name timeout
listname 為 取出內容的列表名
timeout為等待超時時間,如果為0,則可無限等待
優點
- 利用Redis存儲,不受限于JVM內存
- 基于Redis的持久化機制,數據安全性有保證
- 可以滿足消息有序性
缺點
- 無法避免消息丟失:從redis中取出消息后,如果尚未處理完出現異常,取出的消息就丟失了
- 只支持單消費者,一條消息被取走后,其他消費者無法再獲取。
基于PubSub的消息隊列
"發布/訂閱"模式包含兩種角色,分別是發布者和訂閱者。訂閱者可以訂閱一個或者多個頻道(channel),而發布者可以向指定的頻道(channel)發送消息,所有訂閱此頻道的訂閱者都會收到此消息。
常用的指令:
subscribe channel1 [channel...]
: 訂閱一個或多個頻道
publish channel1 msg
:向一個頻道發送消息
psubscribe pattern [pattern]
:訂閱與pattern格式匹配的所有頻道
pattern通配符:
h?llo:?可以替換為任意一個其他字母,比如hello;而hllo和hkkllo不行
hllo: 可以替換為0個或多個其他字母,比如hllo、hello、heeeello
h[ae]llo:可以替換為任意一個中括號中的字母,比如hallo、hello;而hillo不行
優點
- 采用發布訂閱模型,支持多生產、多消費
缺點
- 不支持數據持久化;如果出現網絡斷開、Redis 宕機等,消息就會被丟棄。假設一個消費者都沒有,那消息就直接被丟棄了。
- 無法避免消息丟失。
- 消息堆積有上限、超出時數據丟失。
基于Stream的消息隊列——單消費方式
Redis 5.0 版本新增了一個更強大的數據結構——Stream。它提供了消息的持久化和主備復制功能,可以讓任何客戶端訪問任何時刻的數據,并且能記住每一個客戶端的訪問位置,還能保證消息不丟失。
存入消息的方式之一_XADD
命令格式:XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...]
NOMKSTREAM:如果隊列不存在,是否自動創建隊列,默認自動創建
<MAXLEN | MINID> [= | ~] threshold [LIMIT count] :設置消息隊列的最大消息數量
<* | id> 消息的唯一id,* 代表由redis自動生成。* 格式是"時間戳-遞增數字",例如"1644804662707-0"
field value [field value …]:發送到隊列中的消息隊列,稱為Entry。格式就是多個key-value鍵值對。
示例:
# 創建名為 users 的隊列,并向其中發送一個消息,內容是:{name=jack,age=21},并且使用Redis自動生成id
XADD users * name jack age 21
讀取消息的方式之一——XREAD
命令格式:XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
[COUNT count]:每次讀取消息的最大數量
[BLOCK milliseconds]:當沒有消息時,是否阻塞、阻塞時長
STREAMS key [key …]:要從哪個隊列讀取消息,key就是隊列名
起始id,只返回大于該id的消息;0代表從第一個消息開始;$代表從最新的消息開始
讀取最新的消息示例:
XREAD COUNT 1 BLOCK 1000 STREAMS users $
優點
- 消息可回溯
- 一個消息可被多個消費者讀取
- 可以阻塞讀取
缺點
- 有消息漏讀風險
基于Stream的消息隊列——消費者組
消費者組:將多個消費者劃分到一個組里,監聽同一個隊列。
特點
- 消費分流:隊列中的消息會分流給組內的不同消費者,而不是重復消費,加快消息的處理速度。
- 消息標識:消費者組會維護一個標示,記錄組內最后一個被處理的消息,哪怕消費者宕機重啟,還是能夠從標示之后讀取消息,確保每一個消息都被消費。
- 消息確認:消費者獲取消息之后,消息處于pending狀態,并存入一個pending-list。當處理完后需要XACK來確認消息,標記消息已處理,之后才會從pending-list移除。
創建消費者組
XGROUP CREATE key groupName ID [MKSTREAM]
key:隊列名稱
groupName:消費者組名稱
ID:起始ID標示,$代表隊列中最后一個消息,0代表隊列中第一個消息
MKSTREAM:隊列不存在時自動創建隊列
其他對應指令:
# 刪除指定的消費者組
XGROUP DESTROY key group# 給指定的消費者組添加消費者
XGROUP CREATECONSUMER key group consumer# 刪除消費者組中的指定消費者
XGROUP DELCONSUMER key group consumer
從消費者組讀取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds][NOACK] STREAMS key [key ...] id [id ...]
group:消費者組名稱
consumer:消費者名稱,如果消費者不存在,會自動創建一個消費者
count:本次查詢的最大數量
BLOCK milliseconds:當前消息等待最大時長
NOACK:無需手動ACK,獲取消息后自動確認
STREAMS key:指定監聽一個或多個隊列名稱
ID:獲取消息的起始ID:
- “ >” 從下一個未消費的消息開始
- 其他:根據指定ID從pending-list中獲取一個消費但未確認的消息,例如0,是從pending-list中第一個消息開始
確認消息
XACK key group id [id ...]
key:消息隊列名稱
group:組名
id:確認的消息ID
查看未確認的消息
XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
總結
如果業務要求較高,可以考慮使用更加專業的 Kafka、RocketMQ、RabbitMQ。