Redis內存隊列Stream

本文為個人學習筆記整理,僅供交流參考,非專業教學資料,內容請自行甄別

文章目錄

  • 概述
  • 一、生產者端操作
  • 二、消費者端操作
  • 三、消費組操作
  • 四、狀態查詢操作
  • 五、確認消息
  • 六、消息隊列的選擇


概述

??Stream是Redis5.0推出的支持多播的可持久化的消息隊列
在這里插入圖片描述
圖片來源:圖靈學院

??如上圖所示,Stream依舊是key,value的形式,key對應的是隊列的名稱,而value的結構則是上圖的鏈表,其主要的結構:

  • ID:每條消息都有一個唯一的ID,如果沒有指定,則使用Redis自帶的生成策略,格式為當前的毫秒級別時間戳-該毫秒時間點內的消息序號,是單調遞增的。
  • Consumer Group:消費組,一個Stream中可以包含多個消費組,而每個消費組又由多個消費者組成。每個消費組是互相獨立的,共同消費隊列中的消息。
    • last_delivered_id:是一個游標,表示當前消費組已經消費到哪條消息了。同一個消費組中的任何一個消費者讀取了消息都會使last_delivered_id往前移動。
  • Consumer:消費者,同一個消費組中的消費者是競爭關系,并能在組內由唯一的名稱。
    • pending_ids[]:用于記錄當前客戶端已經讀取,但是尚未ACK的消息。如果客戶端沒有 ack,這個變量里面的消息 ID 會越來越多。是Stream的ACK機制的實現,保證消息的可靠投遞。
  • Message Content:消息內容,和其他類型的存儲格式類似,都是key-value的格式。

一、生產者端操作

??向隊列中添加一條消息,會返回消息的ID,隊列不存在則會創建。

  • xadd stream:創建隊列。
  • streamtest1:隊列的名稱,前綴需要加上stream。
  • *:自動生成隊列中消息的ID 一定是單調有序自增的。
127.0.0.1:6381> xadd streamtest1 * name zhangsan age 18 
"1751714750056-0"

??查看隊列的長度:

127.0.0.1:6381> xlen streamtest1

??xrange streamtest1 - + 查看隊列中所有的元素:

  • xrange:關鍵字
  • streamtest1 :隊列名稱
  • -:表示查詢范圍的最小值,可以指定消息ID
  • +:表示查詢范圍的最大值,可以指定消息ID
1) 1) "1751714750056-0"2) 1) "name"2) "zhangsan"3) "age"4) "18"
2) 1) "1751714996108-0"2) 1) "name"2) "lisi"3) "age"4) "20"
3) 1) "1751715003212-0"2) 1) "name"2) "wanger"3) "age"4) "25"

??xdel 刪除指定ID的消息。

127.0.0.1:6381> xdel streamtest1 1751715003212-0
(integer) 1

二、消費者端操作

??從隊列中讀取消息:

  • xread:從隊列讀取消息
  • count:讀取消息的條數
  • streams:關鍵字
  • streamtest1:隊列名稱
  • 0-0:指定讀取消息的范圍,可以指定ID
127.0.0.1:6381> xread count 1 streams streamtest1 0-0
1) 1) "streamtest1"2) 1) 1) "1751714750056-0"2) 1) "name"2) "zhangsan"3) "age"4) "18"

??可以指定讀取的范圍:

127.0.0.1:6381> xread count 1 streams streamtest1 1751714750056-0
1) 1) "streamtest1"2) 1) 1) "1751714996108-0"2) 1) "name"2) "lisi"3) "age"4) "20"

??從隊列的尾部讀取數據,加$,但是默認不返回數據:

127.0.0.1:6381> xread count 1 streams streamtest1 $
(nil)

??需要配合使用block阻塞 + 另一個生產者寫入一個新消息,:
??生產者發送消息:

127.0.0.1:6381> xadd streamtest1 * name zhaoliu age 17
"1751716196234-0"

??消費者阻塞等待生產者發送消息:

127.0.0.1:6381> xread block 0 count 1 streams streamtest1 $
1) 1) "streamtest1"2) 1) 1) "1751716196234-0"2) 1) "name"2) "zhaoliu"3) "age"4) "17"
(44.14s)

??一般來說客戶端如果想要使用 xread 進行順序消費,一定要記住當前消費到哪里了,也就是返回的消息
ID。下次繼續調用 xread 時,將上次返回的最后一個消息 ID 作為參數傳遞進去,就可以繼續消費后續的
消息。

三、消費組操作

??創建消費者群組:

  • XGROUP create:創建消費者群組
  • streamtest1:隊列名稱
  • group1:群組名稱
  • 0-0:從消息隊列頭部進行消費,如果是$ 從尾部消費
127.0.0.1:6381> XGROUP create streamtest1 group1 0-0
OK
127.0.0.1:6381> XGROUP create streamtest1 group2 $
OK

??消費者群組讀取消息,同樣支持阻塞讀取

  • XREADGROUP group:關鍵字,群組從隊列讀取消息
  • group1:群組名稱
  • g1:具體的消費者名稱,群組內唯一
  • count:關鍵字,讀取的數量
  • streams:關鍵字
  • streamtest1:隊列名稱
127.0.0.1:6381> XREADGROUP group group1 g1 count 1 streams streamtest1 >
1) 1) "streamtest1"2) 1) 1) "1751714750056-0"2) 1) "name"2) "zhangsan"3) "age"4) "18"

??在讀取前,隊列中的狀態,group1中的consumers和pending都是0。

127.0.0.1:6381> XINFO groups streamtest1
1) 1) "name"2) "group1"3) "consumers"4) (integer) 05) "pending"6) (integer) 07) "last-delivered-id"8) "0-0"

??讀取之后,有了一個消費者,并且待確認的數量為1。

127.0.0.1:6381> XINFO groups streamtest1
1) 1) "name"2) "group1"3) "consumers"4) (integer) 15) "pending"6) (integer) 17) "last-delivered-id"8) "1751714750056-0"

四、狀態查詢操作

??查詢隊列的狀態:

  • XINFO stream:關鍵字
  • streamtest1:隊列的名稱
127.0.0.1:6381> XINFO stream streamtest11) "length" 		# 長度2) (integer) 33) "radix-tree-keys" 4) (integer) 15) "radix-tree-nodes"6) (integer) 27) "groups"	# 隊列中群組個數8) (integer) 29) "last-generated-id" # 最后生成消息的ID
10) "1751716196234-0"
11) "first-entry"	# 隊列中第一個元素
12) 1) "1751714750056-0"2) 1) "name"2) "zhangsan"3) "age"4) "18"
13) "last-entry" # 隊列中最后一個元素
14) 1) "1751716196234-0"2) 1) "name"2) "zhaoliu"3) "age"4) "17"

??查詢消費者組的信息:

  • XINFO groups:關鍵字
  • streamtest1:隊列的名稱
127.0.0.1:6381> XINFO groups streamtest1
1) 1) "name"	# 群組名稱2) "group1"3) "consumers" # 消費者4) (integer) 05) "pending" # 待消費數量6) (integer) 07) "last-delivered-id" 8) "0-0"
2) 1) "name"2) "group2"3) "consumers"4) (integer) 05) "pending"6) (integer) 07) "last-delivered-id"8) "1751716196234-0"

??查詢某個消費者組中消費者的信息:

127.0.0.1:6381> XINFO consumers streamtest1 group1
1) 1) "name"2) "g1"3) "pending"	#代表查詢了消息,但是沒有確認的數量4) (integer) 15) "idle"6) (integer) 398240

五、確認消息

??當我使用group1消費者組中的g1消費者對streamtest1隊列中的元素進行消費時,會返回一個Id和具體的元素信息:
在這里插入圖片描述
??此時我還沒有手動ack,查詢消費者的信息,pending為1代表待確認。
在這里插入圖片描述
??隊列中所有的元素:
在這里插入圖片描述
??確認消息的命令:

  • XACK:消息確認
  • streamtest1:隊列名稱
  • group1:分組名稱
  • 1751714996108-0:待確認的消息的ID
127.0.0.1:6381> XACK streamtest1 group1 1751714996108-0
(integer) 1

??再去查詢消費者的信息,發現pending變成了0:
在這里插入圖片描述
??再次獲取消息,獲取的是已消費元素的下一個消息,當隊列中所有的消息都ack之后,再次嘗試獲取消息,獲取到的是nil。在這里插入圖片描述
??消息的ack,僅僅是將消息從 消費者組的 Pending Entries List中移除,消息仍保留在 Stream 主體中,直到被主動刪除。

六、消息隊列的選擇

??基于Redis實現消息隊列通常有以下的方式:

  • List結構的lpush + brpop。
  • PUB/SUB模式。
  • Stream消息隊列。

??如果一定需要使用Redis實現消息隊列的功能,推薦使用Stream實現。前兩者都有比較明顯的弊端:

  • lpush + brpop的方案,如果線程一直阻塞,超過了一定的時間,客戶端會斷開連接,那么執行POP命令的線程就會拋出異常。并且消息的消費是點到點的,不支持分組消費,以及廣播模式,重復消費。
  • PUB/SUB模式的方案,如果發布者發送消息時,訂閱者不在線,那么這條消息就會丟失。并且無法存儲消息。Pub/Sub 模式不適合做消息存儲,消息積壓類的業務,而是擅長處理廣播,即時通訊,即時反饋的業務。

??使用Redis stream的注意點:

  1. Stream的消息過多怎么辦?限制隊列的長度,xadd 可以指定參數maxLen 防止隊列爆滿。
  2. 消費者沒有對消息ack怎么辦?消費組中的消費者有一個pending_ids的集合,沒有ack,這個集合會越變越長。盡可能快速消費并ack。
  3. 出現死信問題怎么辦?某個消息任意消費者消費都會出現異常,無法ack。通過xpending查詢投遞次數,超過一定的次數就認為是死信,執行xdel命令刪除消息。
  4. 如何保證高可用?集群部署Redis,或者使用主從 + 哨兵迷失
  5. 如何進行消息分區?自己通過一致性hash算法實現。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/913382.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/913382.shtml
英文地址,請注明出處:http://en.pswp.cn/news/913382.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Minio安裝配置,桶權限設置,nginx代理 https minio

**起因:因為用到ruoyi-vue-plus框架中遇到生產環境是https,但是http的minio上傳的文件不能在后臺系統中訪問**安裝配置minio1. 下載安裝2. 賦文件執行權限3.創建配置文件4.創建minio.service新版minio創建桶需要配置桶權限1.下載客戶端2.設置訪問權限3.連…

數論基礎知識和模板

質數篩 用于快速處理 1~n 中所有素數的算法 因為依次遍歷判斷每一個數是否質數太慢,所以把一些明顯不能質數的篩出來 普通篩法,對于每個整數,刪除掉其倍數。 bool vis[N];//0表示是質數 int pri[N],o; //質數表 void get(int n…

Ubuntu20.04.6桌面版系統盤制作與安裝

概述 本教程講述Ubuntu20.04.6桌面版的系統U盤制作與安裝,所需工具為一臺電腦、大于4G的U盤、一個需要安裝Ubuntu系統的主機。 步驟1:下載系統鏡像與rufus 在ubuntu官網下載 ubuntu-20.04.6-desktop-amd64.iso,如圖 下載rufus工具&#xf…

【C++復習3】類和對象

1.3.1.簡述一下什么是面向對象回答:1. 面向對象是一種編程思想,把一切東西看成是一個個對象,比如人、耳機、鼠標、水杯等,他們各 自都有屬性,比如:耳機是白色的,鼠標是黑色的,水杯是…

數據結構之二叉平衡樹

系列文章目錄 數據結構之ArrayList_arraylist o(1) o(n)-CSDN博客 數據結構之LinkedList-CSDN博客 數據結構之棧_棧有什么方法-CSDN博客 數據結構之隊列-CSDN博客 數據結構之二叉樹-CSDN博客 數據結構之優先級隊列-CSDN博客 常見的排序方法-CSDN博客 數據結構之Map和Se…

Maven引入第三方JAR包實戰指南

要將第三方提供的 JAR 包引入本地 Maven 倉庫,可通過以下步驟實現(以 Oracle JDBC 驅動為例):🔧 方法 1:使用 install:install-file 命令(推薦)定位 JAR 文件 將第三方 JAR 包&#…

JavaSE -- 泛型詳細介紹

泛型 簡介 集合存儲數據底層是利用 Object 來接收的,意思是說如果不對類型加以限制,所有數據類型柔和在一起,這時如何保證數據的安全性呢(如果不限制存入的數據類型,任何數據都能存入,當我們取出數據進行強…

使用 Python 實現 ETL 流程:從文本文件提取到數據處理的全面指南

文章大綱: 引言:什么是 ETL 以及其重要性 ETL(提取-轉換-加載)是數據處理領域中的核心概念,代表了從源數據到目標系統的三個關鍵步驟:**提取(Extract)**數據、**轉換(Tra…

selenium基礎知識 和 模擬登錄selenium版本

前言 selenium框架是Python用于控制瀏覽器的技術,在Python爬蟲獲取頁面源代碼的時候,是最重要的技術之一,通過控制瀏覽器,更加靈活便捷的獲取瀏覽器中網頁的源代碼。 還沒有安裝啟動selenium的同志請先看我的上一篇文章進行配置啟動 和 XPath基礎 對selenium進行瀏覽器和驅動…

JS 網頁全自動翻譯v3.17發布,全面接入 GiteeAI 大模型翻譯及自動部署

兩行 js 實現 html 全自動翻譯。 無需改動頁面、無語言配置文件、無 API Key、對 SEO 友好! 升級說明 translate.service 深度綁定 GiteeAI 作為公有云翻譯大模型算力支持translate.service 增加shell一鍵部署后通過訪問自助完成GiteeAI的開通及整個接入流程。增加…

數據結構:數組:插入操作(Insert)與刪除操作(Delete)

目錄 插入操作(Inserting in an Array) 在紙上模擬你會怎么做? 代碼實現 復雜度分析 刪除操作(Deleting from an Array) 在紙上模擬一下怎么做? 代碼實現 復雜度分析 插入操作(Inserti…

Qt之修改純色圖片的顏色

這里以修改QMenu圖標顏色為例,效果如下: MyMenu.h #ifndef MYMENU_H #define MYMENU_H#include <QMenu>class MyMenu : public QMenu { public:explicit MyMenu(QWidget *parent = nullptr);protected:void mouseMoveEvent(QMouseEvent *event) override; };#endif /…

uni-app實現單選,多選也能搜索,勾選,選擇,回顯

前往插件市場安裝插件下拉搜索選擇框 - DCloud 插件市場&#xff0c;該插件示例代碼有vue2和vue3代碼 是支持微信小程序和app的 示例代碼&#xff1a; <template><view><!-- 基礎用法 --><cuihai-select-search:options"options"v-model&quo…

【機器學習深度學習】 微調的十種形式全解析

目錄 一、為什么要微調&#xff1f; 二、微調的 10 種主流方式 ? 1. 全參數微調&#xff08;Full Fine-tuning&#xff09; ? 2. 凍結部分層微調&#xff08;Partial Fine-tuning&#xff09; ? 3. 參數高效微調&#xff08;PEFT&#xff09; &#x1f538; 3.1 LoRA&…

信刻光盤安全隔離與文件單向導入/導出系統

北京英特信網絡科技有限公司成立于2005年&#xff0c;是專業的數據光盤擺渡、刻錄分發及光盤存儲備份領域的科技企業&#xff0c;專注為軍隊、軍工、司法、保密等行業提供數據光盤安全擺渡、跨網交換、檔案歸檔檢測等專業解決方案。 公司立足信創產業&#xff0c;產品國產安全可…

Python-標準庫-os

1 需求 2 接口 3 示例 4 參考資料 在 Python 中&#xff0c;os&#xff08;Operating System&#xff09;模塊是一個非常重要的內置標準庫&#xff0c;提供了許多與操作系統進行交互的函數和方法&#xff0c;允許開發者在 Python 程序中執行常見的操作系統任務&#xff0c;像文…

OpenCV CUDA模塊設備層-----在 GPU 上執行類似于 std::copy 的操作函數warpCopy()

操作系統&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 編程語言&#xff1a;C11 算法描述 OpenCV 的 CUDA 模塊&#xff08;cudev&#xff09; 中的一個設備端內聯模板函數&#xff0c;用于在 GPU 上執行類似于 std::copy 的操作&#xff…

Vue Router 中$route.path與 params 的關系

1. params 參數的本質&#xff1a;路徑的動態片段在 Vue Router 中&#xff0c;params 參數是通過路由配置的動態路徑片段定義的&#xff0c;例如&#xff1a;// 路由配置{ path: /user/:id, component: User }當訪問/user/123時&#xff0c;/user/123是完整的路徑&#xff0c;…

React 極簡響應式滑塊驗證組件實現,隨機滑塊位置

&#x1f3af; 滑塊驗證組件 (Slider Captcha) 一個現代化、響應式的滑塊驗證組件&#xff0c;專為 React 應用設計&#xff0c;提供流暢的用戶體驗和強大的安全驗證功能。 ? 功能特性 &#x1f3ae; 核心功能 智能滑塊拖拽 – 支持鼠標和觸摸屏操作&#xff0c;響應靈敏隨…

STM32第十六天藍牙模塊

一&#xff1a;藍牙模塊HC-05 1&#xff1a;硬件引腳配置&#xff1a; | 標號 | PIN | 說明 | |------|-------|---------------------------------------| | 1 | START | 狀態引出引腳&#xff08;未連接/連接輸出信號時&#xff09; |…