Python 操作 Redis 的客戶端 - Redis Stream
- 1. Redis Stream
- 2. Redis Commands
- 2.1. `CoreCommands.xadd()` (生產端)
- 2.2. `CoreCommands.xlen()` (生產端)
- 2.3. `CoreCommands.xdel()` (生產端)
- 2.4. `CoreCommands.xrange()` (生產端)
- 2.5. `RedisClusterCommands.delete()`
- 3. Redis Stream Examples
- References
redis-py - Python Redis 客戶端
https://redis.io/docs/latest/develop/clients/redis-py/
https://redis-py.pythonlang.cn/en/stable/index.html
redis-py (Redis Python client)
https://github.com/redis/redis-py
The Python interface to the Redis key-value store.
redis-py is the Python client for Redis. redis-py requires a running Redis server.
The sections below explain how to install redis-py and connect your Python application to a Redis database.
Redis Commands
https://redis.readthedocs.io/en/stable/commands.html
Redis 是一個開源的,內存中的數據結構存儲系統,它可以用作數據庫、緩存和消息中間件。
1. Redis Stream
https://redis.com.cn/redis-stream.html
Redis Stream 主要用于消息隊列 (Message Queue, MQ),Redis 本身是有一個 Redis 發布訂閱 (pub/sub) 來實現消息隊列的功能,但它有個缺點就是消息無法持久化,如果出現網絡斷開、Redis 宕機等,消息就會被丟棄。
發布訂閱 (pub/sub) 可以分發消息,但無法記錄歷史消息。
Redis Stream 提供了消息的持久化和主備復制功能,可以讓任何客戶端訪問任何時刻的數據,并且能記住每一個客戶端的訪問位置,還能保證消息不丟失。
Redis Stream 是一種數據結構,用于處理大規模的時間序列數據。Redis Stream 的每個條目都有一個唯一的 id
和關聯的數據字段,適合用于日志、事件收集或實時數據流等場景。
2. Redis Commands
2.1. CoreCommands.xadd()
(生產端)
https://redis.readthedocs.io/en/stable/commands.html
https://redis.io/docs/latest/commands/xadd/
xadd(name, fields, id='*', maxlen=None, approximate=True, nomkstream=False, minid=None, limit=None, ref_policy=None)
Add to a stream.
name
: name of the stream (name
是隊列名稱,如果隊列 name
不存在就創建。)
fields
: dict of field/value pairs to insert into the stream (添加到流的字段和值。)
id
: location to insert this record. By default it is appended. (消息 id,使用 *
表示由 Redis 自動生成。可以自定義,但是要自己保證遞增性。)
maxlen
: truncate old stream members beyond this size. Can’t be specified with minid
. (maxlen
是可選參數,設置流最大長度。)
approximate
: actual stream length may be slightly more than maxlen
nomkstream
: When set to true, do not make a stream
minid
: the minimum id
in the stream to query. Can’t be specified with maxlen
. (可選參數,只有 id
大于給定 minid
的條目才會被加入流。)
limit
: specifies the maximum number of entries to retrieve
ref_policy
: optional reference policy for consumer groups when trimming:
KEEPREF
(default): When trimming, preserves references in consumer groups’ PELDELREF
: When trimming, removes all references from consumer groups’ PELACKED
: When trimming, only removes entries acknowledged by all consumer groups
Parameters
name
(Union[bytes, str, memoryview])fields
(Dict[Union[bytes, bytearray, memoryview, str, int, float], Union[bytes, bytearray, memoryview, str, int, float]])id
(Union[int, bytes, str, memoryview])maxlen
(Optional[int])approximate
(bool)nomkstream
(bool)minid
(Optional[Union[int, bytes, str, memoryview]])limit
(Optional[int])ref_policy
(Optional[Literal[‘KEEPREF’, ‘DELREF’, ‘ACKED’]])
Return type
Union[Awaitable[Any], Any]
2.2. CoreCommands.xlen()
(生產端)
https://redis.io/docs/latest/commands/xlen/
xlen(name)
Returns the number of elements in a given stream.
Parameters
name
(Union[bytes, str, memoryview])
Return type
- Union[Awaitable[Any], Any]
2.3. CoreCommands.xdel()
(生產端)
https://redis.io/docs/latest/commands/xdel/
xdel(name, *ids)
Deletes one or more messages from a stream.
Parameters
name
(Union[bytes, str, memoryview]) - name of the stream.*ids
(Union[int, bytes, str, memoryview]) - message ids to delete.
Return type
Union[Awaitable[Any], Any]
2.4. CoreCommands.xrange()
(生產端)
https://redis.io/docs/latest/commands/xrange/
xrange(name, min='-', max='+', count=None)
Read stream values within an interval.
name
: name of the stream.
start
: first stream ID. defaults to ‘-‘
, meaning the earliest available.
finish
: last stream ID. defaults to ‘+’
, meaning the latest available.
count
: if set, only return this many items, beginning with the
earliest available.
Parameters
name
(Union[bytes, str, memoryview])min
(Union[int, bytes, str, memoryview])max
(Union[int, bytes, str, memoryview])count
(Optional[int])
Return type
Union[Awaitable[Any], Any]
(base) yongqiang@yongqiang:~$ sudo service redis-server start
[sudo] password for yongqiang:
Starting redis-server: redis-server.
(base) yongqiang@yongqiang:~$
(base) yongqiang@yongqiang:~$ sudo service redis-server status* redis-server is running
(base) yongqiang@yongqiang:~$
#!/usr/bin/env python
# coding=utf-8import redis# 連接到 Redis 服務器
r = redis.Redis(host='localhost', port=6379, db=0)# 定義流的名稱
stream_key = "forever_stream"# 使用 xadd 將事件添加到流
r.xadd(stream_key, {"event": "signup", "user_id": 123})
r.xadd(stream_key, {"event": "login", "user_id": 456})
r.xadd(stream_key, {"event": "logout", "user_id": 789})# 打印當前流中的所有條目
entries = r.xrange(stream_key)
for entry in entries:print(entry)
/home/yongqiang/miniconda3/bin/python /home/yongqiang/stable_diffusion_work/stable_diffusion_diffusers/yongqiang.py
(b'1756565556646-0', {b'event': b'signup', b'user_id': b'123'})
(b'1756565556646-1', {b'event': b'login', b'user_id': b'456'})
(b'1756565556646-2', {b'event': b'logout', b'user_id': b'789'})Process finished with exit code 0
2.5. RedisClusterCommands.delete()
delete(*keys)
Deletes the given keys in the cluster. The keys are first split up into slots and then an DEL
command is sent for every slot
Non-existent keys are ignored. Returns the number of keys that were deleted.
Parameters
keys
(Union[bytes, str, memoryview]) –
Return type
Union[Awaitable[Any], Any]
#!/usr/bin/env python
# coding=utf-8import redis# 連接到 Redis 服務器
redis_client = redis.Redis(host='localhost', port=6379, db=0)# 定義流的名稱
stream_key = "forever_stream"# 使用 xadd 將事件添加到流
redis_client.xadd(stream_key, {"event": "signup", "user_id": 123})
redis_client.xadd(stream_key, {"event": "login", "user_id": 456})
redis_client.xadd(stream_key, {"event": "logout", "user_id": 789})# 打印當前流中的所有條目
entries = redis_client.xrange(stream_key)
print(f"length: {len(entries)}")
for entry in entries:print(entry)redis_client.delete(stream_key)entries = redis_client.xrange(stream_key)
print(f"length: {len(entries)}")
/home/yongqiang/miniconda3/bin/python /home/yongqiang/stable_diffusion_work/stable_diffusion_diffusers/yongqiang.py
length: 3
(b'1756567113334-0', {b'event': b'signup', b'user_id': b'123'})
(b'1756567113334-1', {b'event': b'login', b'user_id': b'456'})
(b'1756567113334-2', {b'event': b'logout', b'user_id': b'789'})
length: 0Process finished with exit code 0
DEL
https://redis.io/docs/latest/commands/del/
Syntax
DEL key [key ...]
Removes the specified keys. A key is ignored if it does not exist.
刪除整個 Redis Stream。
(base) yongqiang@yongqiang:~$ redis-cli
127.0.0.1:6379> del "forever_stream"
(integer) 1
127.0.0.1:6379>
127.0.0.1:6379> del "forever_stream"
(integer) 0
127.0.0.1:6379> exit
(base) yongqiang@yongqiang:~$
3. Redis Stream Examples
https://redis.readthedocs.io/en/stable/examples/redis-stream-example.html
https://redis-py.pythonlang.cn/en/stable/examples/redis-stream-example.html
References
[1] Yongqiang Cheng, https://yongqiang.blog.csdn.net/
[2] What is Redis? https://www.ibm.com/think/topics/redis
[3] Redis 教程, https://redis.com.cn/