【Redis筆記】Redis消息隊列方案

Reids消息隊列(Message Queue)

消息隊列 是指利用 高效可靠 的 消息傳遞機制 進行與平臺無關的 數據交流,并基于數據通信來進行分布式系統的集成。
消息隊列具有 低耦合、可靠投遞、廣播、流量控制、最終一致性 等功能。
常見的消息隊列 有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ 等。
通過提供 消息傳遞 和 消息排隊 模型,它可以在 分布式環境 下提供 應用解耦、彈性伸縮、冗余存儲、流量削峰、異步通信、數據同步 等功能。

基于List結構的模擬消息隊列

Redis的list數據結構是一個雙向鏈表,使用出隊入隊即可實現消息隊列。

LPUSH 、RPOP 或 RPUSH 、 LPOP

LPUSH:將一個或多個值value插入到列表key的表頭如果有多個value值,那么各個value值按從左到右的順序依次插入到表頭。
RPOP:移除并返回列表key的尾元素。
RPUSH與LPOP同理。
通過 LPUSH,RPOP 這樣的方式,會存在一個性能風險點,就是消費者如果想要及時的處理數據,就要在程序中寫個類似 while(true) 這樣的邏輯,不停地去調用 RPOP 或 LPOP 命令,這就會給消費者程序帶來些不必要的性能損失。

LPUSH、BRPOP 或 RPUSH、BLPOP

Redis 還提供了 BLPOP、BRPOP 這種阻塞式讀取的命令(帶 B-Bloking的都是阻塞式),客戶端在沒有讀到隊列數據時,自動阻塞,直到有新的數據寫入隊列,再開始讀取新數據。這種方式就節省了不必要的 CPU 開銷。

數據存入格式:lpush listname v1 v2 v3

v 表示存入鏈表的值

阻塞等待指令格式:blpop list_name timeout

listname 為 取出內容的列表名
timeout為等待超時時間,如果為0,則可無限等待

優點

  • 利用Redis存儲,不受限于JVM內存
  • 基于Redis的持久化機制,數據安全性有保證
  • 可以滿足消息有序性

缺點

  • 無法避免消息丟失:從redis中取出消息后,如果尚未處理完出現異常,取出的消息就丟失了
  • 只支持單消費者,一條消息被取走后,其他消費者無法再獲取。

基于PubSub的消息隊列

"發布/訂閱"模式包含兩種角色,分別是發布者和訂閱者。訂閱者可以訂閱一個或者多個頻道(channel),而發布者可以向指定的頻道(channel)發送消息,所有訂閱此頻道的訂閱者都會收到此消息。

常用的指令:
subscribe channel1 [channel...]: 訂閱一個或多個頻道
publish channel1 msg:向一個頻道發送消息
psubscribe pattern [pattern]:訂閱與pattern格式匹配的所有頻道

pattern通配符:
h?llo:?可以替換為任意一個其他字母,比如hello;而hllo和hkkllo不行
hllo: 可以替換為0個或多個其他字母,比如hllo、hello、heeeello
h[ae]llo:可以替換為任意一個中括號中的字母,比如hallo、hello;而hillo不行

優點

  • 采用發布訂閱模型,支持多生產、多消費

缺點

  • 不支持數據持久化;如果出現網絡斷開、Redis 宕機等,消息就會被丟棄。假設一個消費者都沒有,那消息就直接被丟棄了。
  • 無法避免消息丟失。
  • 消息堆積有上限、超出時數據丟失。

基于Stream的消息隊列——單消費方式

Redis 5.0 版本新增了一個更強大的數據結構——Stream。它提供了消息的持久化和主備復制功能,可以讓任何客戶端訪問任何時刻的數據,并且能記住每一個客戶端的訪問位置,還能保證消息不丟失。

存入消息的方式之一_XADD

命令格式:XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...]

NOMKSTREAM:如果隊列不存在,是否自動創建隊列,默認自動創建
<MAXLEN | MINID> [= | ~] threshold [LIMIT count] :設置消息隊列的最大消息數量
<* | id> 消息的唯一id,* 代表由redis自動生成。* 格式是"時間戳-遞增數字",例如"1644804662707-0"
field value [field value …]:發送到隊列中的消息隊列,稱為Entry。格式就是多個key-value鍵值對。

示例:

# 創建名為 users 的隊列,并向其中發送一個消息,內容是:{name=jack,age=21},并且使用Redis自動生成id
XADD users * name jack age 21

讀取消息的方式之一——XREAD

命令格式:XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

[COUNT count]:每次讀取消息的最大數量
[BLOCK milliseconds]:當沒有消息時,是否阻塞、阻塞時長
STREAMS key [key …]:要從哪個隊列讀取消息,key就是隊列名
起始id,只返回大于該id的消息;0代表從第一個消息開始;$代表從最新的消息開始

讀取最新的消息示例:

XREAD COUNT 1 BLOCK 1000 STREAMS users $

優點

  • 消息可回溯
  • 一個消息可被多個消費者讀取
  • 可以阻塞讀取

缺點

  • 有消息漏讀風險

基于Stream的消息隊列——消費者組

消費者組:將多個消費者劃分到一個組里,監聽同一個隊列。

特點

  • 消費分流:隊列中的消息會分流給組內的不同消費者,而不是重復消費,加快消息的處理速度。
  • 消息標識:消費者組會維護一個標示,記錄組內最后一個被處理的消息,哪怕消費者宕機重啟,還是能夠從標示之后讀取消息,確保每一個消息都被消費。
  • 消息確認:消費者獲取消息之后,消息處于pending狀態,并存入一個pending-list。當處理完后需要XACK來確認消息,標記消息已處理,之后才會從pending-list移除。

創建消費者組

XGROUP CREATE key groupName ID [MKSTREAM]

key:隊列名稱
groupName:消費者組名稱
ID:起始ID標示,$代表隊列中最后一個消息,0代表隊列中第一個消息
MKSTREAM:隊列不存在時自動創建隊列

其他對應指令:

# 刪除指定的消費者組
XGROUP DESTROY key group# 給指定的消費者組添加消費者
XGROUP CREATECONSUMER key group consumer# 刪除消費者組中的指定消費者
XGROUP DELCONSUMER key group consumer

從消費者組讀取消息

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds][NOACK] STREAMS key [key ...] id [id ...]

group:消費者組名稱
consumer:消費者名稱,如果消費者不存在,會自動創建一個消費者
count:本次查詢的最大數量
BLOCK milliseconds:當前消息等待最大時長
NOACK:無需手動ACK,獲取消息后自動確認
STREAMS key:指定監聽一個或多個隊列名稱
ID:獲取消息的起始ID:

  • “ >” 從下一個未消費的消息開始
  • 其他:根據指定ID從pending-list中獲取一個消費但未確認的消息,例如0,是從pending-list中第一個消息開始

確認消息

XACK key group id [id ...]

key:消息隊列名稱
group:組名
id:確認的消息ID

查看未確認的消息

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]

總結

如果業務要求較高,可以考慮使用更加專業的 Kafka、RocketMQ、RabbitMQ。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/711071.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/711071.shtml
英文地址,請注明出處:http://en.pswp.cn/news/711071.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

ensp路由器將不同網絡連通在一起

1.拓撲結構信息如下 二層交換機&#xff1a;lsw2,lsw3,lsw5,lsw6 不進行ip配置&#xff0c;只是定義vlan&#xff0c;和主機標注的保持一致&#xff0c;向下連接pc用access&#xff0c;向上連接路由交換機用trunk lsw2配置信息如下圖 定義vlan&#xff0c;設置各個連接口的方式…

tcpdump 常用用法

簡要記錄下tcpdump用法 監控某個ip上的某個端口的流量 tcpdump -i enp0s25 tcp port 5432 -nn -S 各個參數作用 -i enp0s25 指定抓包的網卡是enp0s25 -nn 顯示ip地址和數字端口 &#xff0c;如果只 -n 則顯示ip&#xff0c;但是端口為services文件中的服務名 如果一個…

用python寫一個自動化部署工具

效果 起因 現在springboot項目的自動化部署已經非常普遍&#xff0c;有用Jenkins的&#xff0c;有用git鉤子函數的&#xff0c;有用docker的…等等。這段時間在玩python&#xff0c;想著用python實現自動化部署&#xff0c;即能鍛煉下編碼能力&#xff0c;又方便運維。于是開始…

每日學習總結20240228

每日總結 20240228 1.獲取系統命令執行結果 #include <stdio.h>#define TRUE 1 #define FALSE 0int get_system_cmd_result(const char *command, char *buffer, int bufferLen) {FILE *pipe popen(command, "r");if (pipe NULL) {return FALSE;}while (f…

HTML-表格、表單和CSS初識,選擇器,書寫規范

&#xff11;. 表格標簽 &#xff11;.&#xff11;創建表格 表格標簽是一種用來處理&#xff0c;顯示表格式數據的常用標簽。 注意&#xff1a; &#xff11;. tr 用于定義表格中的一行&#xff0c;必須嵌套在 table標簽中&#xff0c;在 table中包含幾對 tr&#xff0c;就有…

實用指南:SOLIDWORKS數據失真問題的解決之道

在數據處理和模擬計算的過程中&#xff0c;數據失真是一個常見的挑戰。數據失真指的是由于計算機或人為操作導致的原始數據與計算結果或實際情況之間的偏差。特別是在使用SOLIDWORKS這類工程設計軟件時&#xff0c;數據失真可能由多種因素引起&#xff0c;如軟件版本老舊、設置…

AI大模型-啟航

文章目錄 什么是大模型&#xff1f;&#xff08;大體現在參數量巨大&#xff09;大模型將會改變那些行業&#xff08;大模型有哪些作用&#xff1f;&#xff09;如何搞數據訓練模型&#xff1f;LangChain帶來的技術變革LangChain架構 什么是大模型&#xff1f;&#xff08;大體…

九、GG bond的邏輯運算

描述 GG bond想要鍛煉自己的邏輯能力&#xff0c;于是輸入了兩個整型變量x和y&#xff0c;分別判斷它們的與、或、非關系&#xff0c;你能幫他輸出x與y&#xff0c;x或y&#xff0c;非x&#xff0c;非y的值嗎&#xff1f; 輸入描述&#xff1a; 輸入兩個整數x和y&#xff0c…

Vue+SpringBoot打造不良郵件過濾系統

目錄 一、摘要1.1 項目介紹1.2 項目錄屏 二、功能模塊2.1 系統用戶模塊2.2 收件箱模塊2.3 發件箱模塊2.4 垃圾箱模塊2.5 回收站模塊2.6 郵箱過濾設置模塊 三、實體類設計3.1 系統用戶3.2 郵件3.3 其他實體 四、系統展示五、核心代碼5.1 查詢收件箱檔案5.2 查詢回收站檔案5.3 新…

Linux學習-etcdctl安裝

etcdctl3.5下載鏈接 1. 先通過上面鏈接下載gz包2. 解壓 [rootk8s-master ~]# tar xf etcd-v3.5.11-linux-amd64.tar.gz [rootk8s-master etcd-v3.5.11-linux-amd64]# ls Documentation etcd etcdctl etcdutl README-etcdctl.md README-etcdutl.md README.md READMEv2-e…

圖像分割 - 查找圖像的輪廓(cv2.findContours函數)

1、前言 輪廓,是指圖像中或者物體的外邊緣線條。在簡單的幾何圖形中,圖形的輪廓是由平滑的線條構成,容易被識別。但不規則的圖形或者生活中常見的物體輪廓復雜,識別起來比較困難 2、findContours函數 這里先介紹函數的參數,具體的含義會在下面實驗中闡述 opencv 提供的輪…

『大模型筆記』自用的“科技文章翻譯 GPT”和它的 Prompt

自用的“科技文章翻譯 GPT”和它的 Prompt 你是一位精通簡體中文的專業翻譯,尤其擅長將專業學術論文翻譯成淺顯易懂的科普文章。請你幫我將以下英文段落翻譯成中文,風格與中文科普讀物相似。規則: - 翻譯時要準確傳達原文的事實和背景。 - 即使上意譯也要保留原始段落格式,…

每天一個數據分析題(一百八十四)

在下列哪種情況下線性回歸模型不適合代替邏輯回歸模型&#xff1f; A. 預測的目標變量是連續型的并且分布范圍不受限制 B. 預測的目標變量是二元的并且服從二項分布 C. 自變量與因變量之間的關系可以假設為線性關系 D. 需要預測客戶的具體購買金額 題目來源于CDA模擬題庫 …

React入門之React_渲染基礎用法和class實例寫法

渲染元素 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>04元素渲染</title><script src&…

什么是RPC?談談你對RPC的理解

RPC&#xff08;Remote Procedure Call&#xff0c;遠程過程調用&#xff09;是一種計算機通信協議。它允許一臺計算機&#xff08;客戶端&#xff09;通過網絡調用另一臺計算機&#xff08;服務器&#xff09;上的程序&#xff0c;并等待該程序的結果返回。RPC抽象了網絡通信的…

go mod中如何解決 xxx/yyy/lib@v1.1.0: unrecognized import path

需要檢查的幾個地方 這個錯誤通常出現在 Go 模塊系統無法找到指定版本的模塊時。有幾種可能的原因和解決方法&#xff1a; 模塊未被發布或標記&#xff1a; 確保 xxx/yyy/lib 模塊的版本 v1.1.0 已經被正確地發布或標記。你可以在對應的 GitLab 倉庫中查看是否存在 v1.1.0 標簽…

2024-2-29-網絡編程作業

1>TCP 源代碼: 服務器端&#xff1a; #include <myhead.h> #define SER_IP "10.168.1.111" #define SER_PORT 8888 #define MAXSIZE 128 int main(int argc, char const *argv[]) {int sfd socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in sin;sin…

代碼隨想錄算法訓練營|day47

第九章 動態規劃 198.打家劫舍213.打家劫舍II337.打家劫舍III代碼隨想錄文章詳解 198.打家劫舍 dp[i]表示偷第i家及之前所能獲取的最大金額 偷第i家&#xff1a;dp[i] dp[i-2]nums[i]&#xff0c;不偷第i家&#xff1a;dp[i] dp[i-1] func rob(nums []int) int {if len(num…

RDD簡介與基礎編程

1. 什么是RDD&#xff1f; RDD&#xff08;Resilient Distributed Dataset&#xff09;叫做彈性分布式數據集&#xff0c;是Spark中最基本的數據處理模型。在代碼中&#xff0c;RDD是一個抽象類&#xff0c;他代表著一個彈性的、不可變的、可分區的、里面的元素可并行計算的集…

android TextView 實現富文本顯示

android TextView 實現富文本顯示&#xff0c;實現抖音直播間公屏消息案例 使用&#xff1a; val tvContent: TextView helper.getView(R.id.tvContent)//自己根據UI業務要求&#xff0c;可以控制 圖標顯示 大小val levelLabel MyImgLabel( bitmap 自己業務上的bitmap )va…