Python 操作 Redis 的客戶端 - Redis Stream

Python 操作 Redis 的客戶端 - Redis Stream

  • 1. Redis Stream
  • 2. Redis Commands
    • 2.1. `CoreCommands.xadd()` (生產端)
    • 2.2. `CoreCommands.xlen()` (生產端)
    • 2.3. `CoreCommands.xdel()` (生產端)
    • 2.4. `CoreCommands.xrange()` (生產端)
    • 2.5. `RedisClusterCommands.delete()`
  • 3. Redis Stream Examples
  • References

redis-py - Python Redis 客戶端
https://redis.io/docs/latest/develop/clients/redis-py/
https://redis-py.pythonlang.cn/en/stable/index.html

redis-py (Redis Python client)
https://github.com/redis/redis-py

The Python interface to the Redis key-value store.

redis-py is the Python client for Redis. redis-py requires a running Redis server.

The sections below explain how to install redis-py and connect your Python application to a Redis database.

Redis Commands
https://redis.readthedocs.io/en/stable/commands.html

Redis 是一個開源的,內存中的數據結構存儲系統,它可以用作數據庫、緩存和消息中間件。

1. Redis Stream

https://redis.com.cn/redis-stream.html

Redis Stream 主要用于消息隊列 (Message Queue, MQ),Redis 本身是有一個 Redis 發布訂閱 (pub/sub) 來實現消息隊列的功能,但它有個缺點就是消息無法持久化,如果出現網絡斷開、Redis 宕機等,消息就會被丟棄。

發布訂閱 (pub/sub) 可以分發消息,但無法記錄歷史消息。

Redis Stream 提供了消息的持久化和主備復制功能,可以讓任何客戶端訪問任何時刻的數據,并且能記住每一個客戶端的訪問位置,還能保證消息不丟失。

Redis Stream 是一種數據結構,用于處理大規模的時間序列數據。Redis Stream 的每個條目都有一個唯一的 id 和關聯的數據字段,適合用于日志、事件收集或實時數據流等場景。

2. Redis Commands

2.1. CoreCommands.xadd() (生產端)

https://redis.readthedocs.io/en/stable/commands.html
https://redis.io/docs/latest/commands/xadd/

xadd(name, fields, id='*', maxlen=None, approximate=True, nomkstream=False, minid=None, limit=None, ref_policy=None)

Add to a stream.

name: name of the stream (name 是隊列名稱,如果隊列 name 不存在就創建。)
fields: dict of field/value pairs to insert into the stream (添加到流的字段和值。)
id: location to insert this record. By default it is appended. (消息 id,使用 * 表示由 Redis 自動生成。可以自定義,但是要自己保證遞增性。)
maxlen: truncate old stream members beyond this size. Can’t be specified with minid. (maxlen 是可選參數,設置流最大長度。)
approximate: actual stream length may be slightly more than maxlen
nomkstream: When set to true, do not make a stream
minid: the minimum id in the stream to query. Can’t be specified with maxlen. (可選參數,只有 id 大于給定 minid 的條目才會被加入流。)
limit: specifies the maximum number of entries to retrieve
ref_policy: optional reference policy for consumer groups when trimming:

  • KEEPREF (default): When trimming, preserves references in consumer groups’ PEL
  • DELREF: When trimming, removes all references from consumer groups’ PEL
  • ACKED: When trimming, only removes entries acknowledged by all consumer groups

Parameters

  • name (Union[bytes, str, memoryview])
  • fields (Dict[Union[bytes, bytearray, memoryview, str, int, float], Union[bytes, bytearray, memoryview, str, int, float]])
  • id (Union[int, bytes, str, memoryview])
  • maxlen (Optional[int])
  • approximate (bool)
  • nomkstream (bool)
  • minid (Optional[Union[int, bytes, str, memoryview]])
  • limit (Optional[int])
  • ref_policy (Optional[Literal[‘KEEPREF’, ‘DELREF’, ‘ACKED’]])

Return type

  • Union[Awaitable[Any], Any]

2.2. CoreCommands.xlen() (生產端)

https://redis.io/docs/latest/commands/xlen/

xlen(name)

Returns the number of elements in a given stream.

Parameters

  • name (Union[bytes, str, memoryview])

Return type

  • Union[Awaitable[Any], Any]

2.3. CoreCommands.xdel() (生產端)

https://redis.io/docs/latest/commands/xdel/

xdel(name, *ids)

Deletes one or more messages from a stream.

Parameters

  • name (Union[bytes, str, memoryview]) - name of the stream.
  • *ids (Union[int, bytes, str, memoryview]) - message ids to delete.

Return type
Union[Awaitable[Any], Any]

2.4. CoreCommands.xrange() (生產端)

https://redis.io/docs/latest/commands/xrange/

xrange(name, min='-', max='+', count=None)

Read stream values within an interval.

name: name of the stream.
start: first stream ID. defaults to ‘-‘, meaning the earliest available.
finish: last stream ID. defaults to ‘+’, meaning the latest available.
count: if set, only return this many items, beginning with the
earliest available.

Parameters

  • name (Union[bytes, str, memoryview])
  • min (Union[int, bytes, str, memoryview])
  • max (Union[int, bytes, str, memoryview])
  • count (Optional[int])

Return type
Union[Awaitable[Any], Any]

(base) yongqiang@yongqiang:~$ sudo service redis-server start
[sudo] password for yongqiang:
Starting redis-server: redis-server.
(base) yongqiang@yongqiang:~$
(base) yongqiang@yongqiang:~$ sudo service redis-server status* redis-server is running
(base) yongqiang@yongqiang:~$
#!/usr/bin/env python
# coding=utf-8import redis# 連接到 Redis 服務器
r = redis.Redis(host='localhost', port=6379, db=0)# 定義流的名稱
stream_key = "forever_stream"# 使用 xadd 將事件添加到流
r.xadd(stream_key, {"event": "signup", "user_id": 123})
r.xadd(stream_key, {"event": "login", "user_id": 456})
r.xadd(stream_key, {"event": "logout", "user_id": 789})# 打印當前流中的所有條目
entries = r.xrange(stream_key)
for entry in entries:print(entry)
/home/yongqiang/miniconda3/bin/python /home/yongqiang/stable_diffusion_work/stable_diffusion_diffusers/yongqiang.py 
(b'1756565556646-0', {b'event': b'signup', b'user_id': b'123'})
(b'1756565556646-1', {b'event': b'login', b'user_id': b'456'})
(b'1756565556646-2', {b'event': b'logout', b'user_id': b'789'})Process finished with exit code 0

2.5. RedisClusterCommands.delete()

  • delete(*keys)

Deletes the given keys in the cluster. The keys are first split up into slots and then an DEL command is sent for every slot

Non-existent keys are ignored. Returns the number of keys that were deleted.

Parameters
keys (Union[bytes, str, memoryview]) –

Return type
Union[Awaitable[Any], Any]

#!/usr/bin/env python
# coding=utf-8import redis# 連接到 Redis 服務器
redis_client = redis.Redis(host='localhost', port=6379, db=0)# 定義流的名稱
stream_key = "forever_stream"# 使用 xadd 將事件添加到流
redis_client.xadd(stream_key, {"event": "signup", "user_id": 123})
redis_client.xadd(stream_key, {"event": "login", "user_id": 456})
redis_client.xadd(stream_key, {"event": "logout", "user_id": 789})# 打印當前流中的所有條目
entries = redis_client.xrange(stream_key)
print(f"length: {len(entries)}")
for entry in entries:print(entry)redis_client.delete(stream_key)entries = redis_client.xrange(stream_key)
print(f"length: {len(entries)}")
/home/yongqiang/miniconda3/bin/python /home/yongqiang/stable_diffusion_work/stable_diffusion_diffusers/yongqiang.py 
length: 3
(b'1756567113334-0', {b'event': b'signup', b'user_id': b'123'})
(b'1756567113334-1', {b'event': b'login', b'user_id': b'456'})
(b'1756567113334-2', {b'event': b'logout', b'user_id': b'789'})
length: 0Process finished with exit code 0
  • DEL

https://redis.io/docs/latest/commands/del/

Syntax

DEL key [key ...]

Removes the specified keys. A key is ignored if it does not exist.
刪除整個 Redis Stream。

(base) yongqiang@yongqiang:~$ redis-cli
127.0.0.1:6379> del "forever_stream"
(integer) 1
127.0.0.1:6379>
127.0.0.1:6379> del "forever_stream"
(integer) 0
127.0.0.1:6379> exit
(base) yongqiang@yongqiang:~$

3. Redis Stream Examples

https://redis.readthedocs.io/en/stable/examples/redis-stream-example.html
https://redis-py.pythonlang.cn/en/stable/examples/redis-stream-example.html

References

[1] Yongqiang Cheng, https://yongqiang.blog.csdn.net/
[2] What is Redis? https://www.ibm.com/think/topics/redis
[3] Redis 教程, https://redis.com.cn/

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/94848.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/94848.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/94848.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【Qt開發】按鈕類控件(一)-> QPushButton

目錄 1 -> 什么是 PushButton? 2 -> 相關屬性 3 -> 代碼示例 3.1 -> 帶有圖標的按鈕 3.2 -> 帶有快捷鍵的按鈕 4 -> 總結 1 -> 什么是 PushButton? 在 Qt 框架中,QPushButton 是最基礎且最常用的按鈕控件之一&am…

Citrix 零日漏洞自五月起遭積極利用

安全研究員 Kevin Beaumont 披露了有關 CVE-2025-6543 的驚人細節,這是一個嚴重的 Citrix NetScaler 漏洞,在該公司發布補丁之前的幾個月里,該漏洞被積極利用作為零日攻擊。 Citrix 最初將其輕描淡寫為簡單的“拒絕服務”漏洞,但…

【系列08】端側AI:構建與部署高效的本地化AI模型 第7章:架構設計與高效算子

第7章:架構設計與高效算子 要將AI模型成功部署到端側,除了對現有模型進行壓縮和優化,更根本的方法是在設計之初就考慮其在資源受限環境下的運行效率。本章將深入探討如何設計高效的網絡架構,以及如何理解并優化常用的核心算子。高…

42-Ansible-Inventory

文章目錄Ansible基本概述手動運維時代(原始社會)自動化運維時代自動化運維工具的優勢Ansible的功能及優點Ansible的架構Ansible的執行流程安裝AnsibleAnsible配置文件生效順序Ansible inventory主機清單Ansible基于免秘鑰方式管理客戶端小結Ansible-Adho…

Go語言runtime/trace工具全面解析

基本概念與功能 Go語言的runtime/trace是Go標準庫中內置的性能分析工具,主要用于追蹤和可視化Go程序的運行時行為。它能夠記錄程序執行期間的各種事件,包括goroutine調度、系統調用、垃圾回收(GC)、網絡I/O、鎖等待等關鍵信息。 trace工具的核心功能包括: goroutine生命周期…

Docker(自寫)

Docker程序是跑在操作系統上的,而操作系統上又裝了各種不同版本的依賴庫和配置程序依賴環境,環境不同,程序就可能跑不起來,如果我們能將環境和程序一起打包docker就是可以將程序和環境一起打包并運行的工具軟件基礎鏡像DockerFile…

深度拆解 OpenHarmony 位置服務子系統:從 GNSS 到分布式協同定位的全鏈路實戰

1. 系統概述 OpenHarmony 的“定位子系統”就是硬件服務子系統集里的 “位置服務子系統”(Location SubSystem)。它向下對接 GNSS/GPS、基站、Wi-Fi 等定位模組,向上以 標準位置 API 形式為應用提供 實時位置、軌跡、地理圍欄 等能力,并可與分布式軟總線聯動,實現 跨設備…

React Native基本用法

1,index調用registerComponent,把appName注入到React Native的根節點。 2,package.json是全局大管家,package-lock.json鎖定版本,不會手動編輯,通過install安裝 3, bebal.config.json bebal.config.json是翻…

LoraConfig target modules加入embed_tokens(64)

LoraConfig target modules加入embed_tokens 更好且成本更低的方法 嵌入層(embedding layer)的 lora_embedding_A 和 lora_embedding_B 頭部(head)是否需加入目標模塊列表 用戶警告 解除權重綁定 解綁以后是隨機權重,怎么辦 更好且成本更低的方法 “有沒有一種更好且成本…

筆記共享平臺|基于Java+vue的讀書筆記共享平臺系統(源碼+數據庫+文檔)

筆記共享平臺|讀書筆記共享平臺系統 目錄 基于Javavue的讀書筆記共享平臺系統 一、前言 二、系統設計 三、系統功能設計 四、數據庫設計 五、核心代碼 六、論文參考 七、最新計算機畢設選題推薦 八、源碼獲取 博主介紹:??大廠碼農|畢設布道師&#xff…

【VSCode】VSCode為Java C/S項目添加圖形用戶界面

為Java C/S項目添加圖形用戶界面 現在我們來為它添加圖形用戶界面(GUI)。我將使用Java Swing庫創建一個簡單的GUI,因為它內置于Java標準庫中,無需額外依賴。 客戶端GUI實現 首先,我們將修改客戶端代碼,添加一個Swing GUI界面&…

【云原生】Docker 搭建Kafka服務兩種方式實戰操作詳解

目錄 一、前言 二、Docker 搭建kafka介紹 2.1 Docker 命令部署 2.2 使用Docker Compose 部署 2.3 使用 Docker Swarm 2.4 使用 Kubernetes 2.5 部署建議 三、Docker 搭建kafka操作方式一 3.1 前置準備 3.2 完整操作過程 3.2.1 創建docker網絡 3.2.2 啟動zookeeper容…

DBeaver中禁用PostgreSQL SSL的配置指南

在DBeaver中為PostgreSQL連接禁用SSL是一個常見的配置,特別是當你的數據庫服務器未啟用SSL或遇到連接問題時。我來為你詳細講解操作步驟和注意事項。 🛠? DBeaver中禁用PostgreSQL SSL的配置指南 詳細步驟 打開驅動設置:在DBeaver中創建新的…

數組去重【JavaScript】

數組去重,并且key和val相同的對象視為相同的,需要去重。主函數:/*** 數組去重* 兩個屬性相同的對象也認為是相同的* param {Array} arr* return {Array} */ function uniqueArray(arr) {const result []// outer: 標簽,標記外層循…

基于單片機設計的智能停車系統_271

文章目錄 一、前言 1.1 項目介紹 【1】項目開發背景 【2】設計實現的功能 【3】項目硬件模塊組成 【4】設計意義 【5】國內外研究現狀 【6】摘要 1.2 設計思路 1.3 系統功能總結 1.4 開發工具的選擇 【1】設備端開發 【2】上位機開發 1.5 參考文獻 1.6 系統框架圖 1.7 系統原理…

for in+邏輯表達式 生成迭代對象,最后轉化為列表 ——注意list是生成器轉化為列表,但[生成器]得到的就是一個列表,其中包含一個生成器元素

(int(digit) ** 2 for digit in str(n))這個不是 數組(list),而是一個 生成器表達式 (generator expression)。它的作用是:str(n) 把數字 n 轉成字符串,例如 n 82 → "82"。for digit in str(n) 遍歷字符串中的每個字符 → "…

通信算法之321:verilog中generate if 用法-綜合掉無用分支

文章目錄 一.示例代碼 二.優缺分析 三. generate - case 一.示例代碼 提示:參考 // 根據添加/補償頻偏的標志,確定使用的頻偏wire signed [WIDTH-1 : 0] freq;generateif(FREQ_FLAG == 1b1) beg

Shell 入門

目錄 一、Shell 是什么 二、 .sh 腳本調用 .py 腳本 Python 核心邏輯腳本(data_processor.py) Shell 腳本(pipeline.sh) 三、常見命令 四、.sh腳本 1. 簡單例子 2. 進階例子 3. 猜數字游戲 一、Shell 是什么 Shell 的本…

UNet改進(36):融合FSATFusion的醫學圖像分割

1. 注意力機制的理論基礎 1.1 空間注意力機制 空間注意力機制模擬人類視覺系統,能夠關注圖像中的顯著區域。其核心思想是根據特征圖的空間位置生成權重圖,突出重要區域并抑制無關信息。常見的實現方式是通過沿通道維度的池化操作獲取空間統計信息,然后通過卷積層生成空間注…

docker安裝kafka、zookeeper詳細步驟

Kafka 簡介 Kafka 是一個分布式流處理平臺,由 LinkedIn 開發并開源,主要用于高吞吐量的實時數據管道和流處理。 核心特性 高吞吐量:支持每秒百萬級消息處理,適合大數據場景。 持久化存儲:消息可持久化到磁盤,并支持多副本備份。 分布式架構:支持水平擴展,通過分區(P…