Redis Stream消息隊列

什么是Stream?

Stream 實際上是一個具有消息發布/訂閱功能的組件,也就常說的消息隊列。其實這種類似于 broker/consumer(生產者/消費者)的數據結構很常見,比如 RabbitMQ 消息中間件、Celery 消息中間件,以及 Kafka 分布式消息系統等,而 Redis Stream 正是借鑒了 Kafaka 系統。

1) 優點

Strean 除了擁有很高的性能和內存利用率外, 它最大的特點就是提供了消息的持久化存儲,以及主從復制功能,從而解決了網絡斷開、Redis 宕機情況下,消息丟失的問題,即便是重啟 Redis,存儲的內容也會存在。

2) 流程

Stream 消息隊列主要由四部分組成,分別是:消息本身、生產者、消費者和消費組,對于前述三者很好理解,下面了解什么是消費組。

一個 Stream 隊列可以擁有多個消費組,每個消費組中又包含了多個消費者,組內消費者之間存在競爭關系。當某個消費者消費了一條消息時,同組消費者,都不會再次消費這條消息。被消費的消息 ID 會被放入等待處理的 Pending_ids 中。每消費完一條信息,消費組的游標就會向前移動一位,組內消費者就繼續去爭搶下消息。

?Redis Stream 消息隊列結構程如下圖所示:

Redis Stream結構圖

?

下面對上圖涉及的專有名詞做簡單解釋:
  • Stream direction:表示數據流,它是一個消息鏈,將所有的消息都串起來,每個消息都有一個唯一標識 ID 和對應的消息內容(Message content)。
  • Consumer Group :表示消費組,擁有唯一的組名,使用 XGROUP CREATE 命令創建。一個 Stream 消息鏈上可以有多個消費組,一個消費組內擁有多個消費者,每一個消費者也有一個唯一的 ID 標識。
  • last_delivered_id :表示消費組游標,每個消費組都會有一個游標 last_delivered_id,任意一個消費者讀取了消息都會使游標 last_delivered_id 往前移動。
  • pending_ids :Redis 官方稱為 PEL,表示消費者的狀態變量,它記錄了當前已經被客戶端讀取的消息?ID,但是這些消息沒有被 ACK(確認字符)。如果客戶端沒有 ACK,那么這個變量中的消息 ID 會越來越多,一旦被某個消息被 ACK,它就開始減少。
3) ACK?

ACK(Acknowledge character)即確認字符,在數據通信中,接收方傳遞給發送方的一種傳輸類控制字符。表示發來的數據已確認接收無誤。在 TCP/IP 協議中,如果接收方成功的接收到數據,那么會回復一個 ACK 數據。通常 ACK 信號有自己固定的格式,長度大小,由接收方回復給發送方。

常用命令匯總

Redis Stream命令
命令說明
XADD?添加消息到末尾。
XTRIM對 Stream 流進行修剪,限制長度。
XDEL刪除指定的消息。
XLEN獲取流包含的元素數量,即消息長度。
XRANGE獲取消息列表,會自動過濾已經刪除的消息。
XREVRANGE?反向獲取消息列表,ID 從大到小。
XREAD以阻塞或非阻塞方式獲取消息列表。
XGROUP CREATE創建消費者組。
XREADGROUP GROUP讀取消費者組中的消息。
XACK將消息標記為"已處理"。
XGROUP SETID為消費者組設置新的最后遞送消息ID。
XGROUP DELCONSUMER刪除消費者。
XGROUP DESTROY刪除消費者組。
XPENDING顯示待處理消息的相關信息。
XCLAIM?轉移消息的歸屬權。
XINFO查看 Stream 流、消費者和消費者組的相關信息。
XINFO GROUPS查看消費者組的信息。
XINFO STREAM?查看 Stream 流信息。
XINFO?CONSUMERS key group查看組內消費者流信息。

創建消息ID

當創建一個 Srteam 時, 需要創建消息 ID,該 ID 是唯一、不可重復的,并且只增不減。消息 ID 有兩種創建方式,一是系統自動生成,二是自定義創建。

1) 系統自動創建

語法格式如下:

XADD key ID field value [field value ...]

參數說明如下:

  • key :指定隊列名稱,如果不存就創建;
  • ID :消息 id,我們使用*表示由 redis 生成,可以自定義,但是要自己保證遞增性;
  • field value :消息記錄。


返回值是毫秒時間戳格式的字符串。比如?1610619132674-2,它表示在該毫秒內產生的第 2 條消息。使用示例:

XADD mystream * username cc 10

2) 自定義ID

自定義 ID 比較簡單,但是需要注意的是 ID 的形式必須是 “整數”,并且后面加入消息的 ID 必須大于前面消息的 ID,也就是自定義 ID 也必須遵守遞增的規則。示例如下:

XADD mystream1 001 name zhangsan addr hebei

創建消費組

Redis Stream通過XGROUP CREATE指令創建消費組(Consumer Group),在創建時,需要傳遞起始消息的 ID 用來初始化 last_delivered_id 變量。語法格式如下:

<span style="color:#444444">XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]</span>

參數說明如下:

  • key :指定 Stream 隊列名稱,若不存在則自動創建。
  • groupname :自定義消費組的名稱,不可重復。
  • $ :表示從尾部開始消費,只接受新消息,而當前 Stream 的消息則被忽略。

消費消息

Redis Stream 通過XREADGROUP命令使消費組消費信息,它和XREAD命令一樣,都可以阻塞等待新消息。讀到新消息后,對應的消息 ID 就會進入消費者的 PLE(正在處理的消息)結構里,客戶端處理完畢后使用 XACK 命令通知 Redis 服務器,本條消息已經處理完畢,該消息的 ID 就會從 PEL 中移除。示意圖如下

redis stream

XREADGROUP命令的語法格式如下所示:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]??

參數說明如下:

  • group :消費組名稱。
  • consumer :消費者名稱。
  • count : 要讀取的數量。
  • milliseconds : 阻塞時間,以毫秒為單位。
  • key :? 鍵指定的隊列名稱。
  • ID : 表示消息 ID。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/164102.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/164102.shtml
英文地址,請注明出處:http://en.pswp.cn/news/164102.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

字符串匹配算法——KMP

有文本串aabaabaaf&#xff0c;模式串aabaaf問文本串中是否出現過模式串 暴力解法 最不用動腦子的&#xff0c;直接兩層for循環&#xff0c;逐個匹配&#xff0c;匹配到不相等的值時把文本串后移一位&#xff0c;再重新比較。這種方法的復雜度是O(mn)&#xff0c;該方法低效的…

關鍵字const的修飾(指針)

A.const修飾變量 變量是可以修改的&#xff0c;如果把變量的地址交給?個指針變量&#xff0c;通過指針變量的也可以修改這個變量。 但是如果我們希望?個變量加上?些限制&#xff0c;不能被修改&#xff0c;怎么做呢&#xff1f;這就是const的作?。 #include <stdio.h&…

postpresql 查詢某張表的字段名和字段類型

postpresql 查詢某張表的字段名和字段類型 工作中第一次接觸postpresql&#xff0c;接觸到這么個需求&#xff0c;只是對sql有點了解&#xff0c;于是就網上查閱資料。得知通過系統表可以查詢&#xff0c;設計到幾張系統表&#xff1a;pg_class、pg_attrubute、information_sc…

axios二次封裝配置請求攔截器和響應攔截器

我們為什么要對axios進行二次封裝&#xff1f; 因為我們可以使用請求攔截器在發送請求之前處理一些業務&#xff0c;使用響應攔截器在服務器數據返回后處理一些業務。 我們通常創建一個api文件夾&#xff0c;再創建一個request.js文件&#xff0c;用于存放重寫后的axios。 /…

SiP系統級封裝、SOC芯片和合封芯片主要區別!合封和sip一樣嗎?

SiP系統級封裝、SOC芯片和合封芯片技術是三種備受關注的技術。它們在提高系統性能、穩定性和功耗效率方面都發揮著重要作用 但在集成方式、應用領域和技術特點等方面存在一些區別。本文將從多個角度對這三種技術進行深入解讀。 一、集成方式 合封芯片則是一種將多個芯片或不…

Vue彈窗的使用與傳值

使用element-UI中的Dialog 對話框 vue組件結合實現~~~~ 定義html <div click"MyAnalyze()">我的區劃</div><el-dialog title"" :visible.sync"dialogBiomeVisible"><NationalBiome :closeValue"TypeBiome" cl…

輕松入門Axios:前端開發中的HTTP利器

輕松入門Axios&#xff1a;前端開發中的HTTP利器 前言為什么選擇Axios1. **簡單易用:**2. **功能豐富:**3. **廣泛支持的瀏覽器和環境:**4. **跨域支持:**5. **社區活躍:**6. **對于處理錯誤的友好性:**7. **對于并發請求的支持:** 安裝與引用1. 使用 npm 安裝 Axios&#xff1…

基于51單片機車載空調系統設計proteus仿真+源程序)

一、系統方案 1、本設計采用這51單片機作為主控器。 2、DS18B20采集溫度值送到液晶1602顯示。 3、按鍵設置報警值。 4、溫度控制風扇檔位。 二、硬件設計 原理圖如下&#xff1a; 三、單片機軟件設計 1、首先是系統初始化 /T0初始化*/ void init_t0() { //TMOD0x01;//定時器…

數據庫實驗三 Sql多表查詢和視圖

數據庫實驗三 Sql多表查詢和視圖 一、Sql表二、在線練習 一、Sql表 www.db-book.com 二、在線練習 對所有表執行查詢語句&#xff0c;查看有哪些數據。 select * from tableName; 一、執行以下查詢語句&#xff0c;寫出查詢意圖。 (1) select * from student,takes whe…

經典滑動窗口試題(一)

&#x1f4d8;北塵_&#xff1a;個人主頁 &#x1f30e;個人專欄:《Linux操作系統》《經典算法試題 》《C》 《數據結構與算法》 ??走在路上&#xff0c;不忘來時的初心 文章目錄 一、將x減到0的最小操作數1、題目講解2、講解算法原理3、代碼實現 二、無重復的最長子串1、題…

OpenCV數據類型及CV_16UC1深度圖ros訂閱

最近用到深度圖,對其數據類型及顯示有些迷惑,記筆記于此: 目錄 一、cv::Mat 的數據類型及轉換方式1. cv::Mat 數據類型2. cv::Mat 數據類型互轉2.1 OpenCV數據類型轉換的函數2.2 可視化深度圖像(CV_16UC1)二、cv::Mat 與 sensor_msgs::msg::Image 互轉(基于cv_bridge)1.…

黑臭水體的“黑”和“臭”形成的機理

水體“黑”和“臭”即呈現令人不悅的顏色和(或)散發令人不適氣味的水體。由于水環境遭受超過其自凈能力的有機污染&#xff0c;有機物的好氧分解使水體中耗氧速率大于復氧速率&#xff0c;造成水體缺氧&#xff0c;致使有機物降解不完全、速度減緩&#xff0c;厭氧生物降解過程…

mybatis 語法使用各種踩坑(持續更新中。。。)

1、大小寫命名&#xff1a;這個別說了&#xff0c;都是淚。 2、聯表查詢查詢&#xff0c;多條合成一條&#xff0c;不生效的原因 博主各種檢查關聯關系和字段大小寫&#xff0c;本來是4條數據最后合成一條數據&#xff0c;死活給你直接返回了4條數據&#xff0c;而且每個類似p…

leetcode刷題之用棧實現隊列(C語言版)

leetcode刷題之用棧實現隊列&#xff08;C語言版&#xff09; 一、題目描述二、題目要求三、題目解析Ⅰ、typedef structⅡ、MyQueue* myQueueCreateⅢ、void myQueuePush(MyQueue* obj, int x)Ⅳ、int myQueuePeek(MyQueue* obj)Ⅴ、int myQueuePop(MyQueue* obj)Ⅶ、bool myQ…

邦芒忠告:求職者面試時絕不能說的8件事

求職者在面試時應該注意言行舉止&#xff0c;避免提及一些敏感或不合適的話題&#xff0c;以下是一些絕不能說的事情&#xff1a; 1、攻擊性言辭&#xff1a;不要使用攻擊性言辭&#xff0c;如貶低、批評或攻擊公司、同事或競爭對手等&#xff0c;這會給人留下不成熟、不尊重他…

新手必看!!附源碼!!STM32通用定時器-比較輸出PWM

一、什么是PWM? PWM&#xff08;脈沖寬度調制&#xff09;是一種用于控制電子設備的技術。它通過調整信號的脈沖寬度來控制電壓的平均值。PWM常用于調節電機速度、控制LED亮度、產生模擬信號等應用。 二、PWM的原理 PWM的基本原理是通過以一定頻率產生的脈沖信號&#xff0…

SPSS多元對應分析

前言&#xff1a; 本專欄參考教材為《SPSS22.0從入門到精通》&#xff0c;由于軟件版本原因&#xff0c;部分內容有所改變&#xff0c;為適應軟件版本的變化&#xff0c;特此創作此專欄便于大家學習。本專欄使用軟件為&#xff1a;SPSS25.0 本專欄所有的數據文件請點擊此鏈接下…

紅隊攻防實戰之釘釘RCE

我這一生如履薄冰&#xff0c;你說我能走到對岸嗎&#xff1f; 本文首發于SecIN社區&#xff0c;原創作者即是本人 前言 網絡安全技術學習&#xff0c;承認??的弱點不是丑事。只有對原理了然于?&#xff0c;才能突破更多的限制。擁有快速學習能力的白帽子&#xff0c;是不…

vue3 教程(中)

偵聽器 用于偵聽指定變量&#xff0c;當其響應式狀態變化時觸發回調函數。 watch() watch() 需明確指定偵聽的數據源&#xff0c;并且僅當數據源變化時&#xff0c;才會執行回調&#xff0c;在創建偵聽器時&#xff0c;不會執行回調&#xff0c;可以獲取到數據源變化前后的值…

Flutter 父子組件通信

在Flutter 中父組件調用子組件的方法可以通過GlobalKey實現&#xff0c;而子組件調用父組件方法可以通過回調函數實現。 父組件 class _MyHomePageState extends State<MyHomePage> {final GlobalKey<LoadPencilState> loadPencilKey GlobalKey<LoadPencilSt…