Python中如何使用Redis 5.0新特性Stream数据
转载自: https://blog.csdn.net/copyangle/article/details/81975975
7 月份 redis5.0 隆重推出重量级新特性 Stream !各位码农是不是早就磨拳擦掌跃跃欲试呢。下面介绍如何在 python 中使用 Stream 新特性。
Stream 的命令行使用方法参考:Redis5.0 重量级特性 Stream 尝鲜
用到的包:redis-py-cloud
通常使用的操作 redis 集群包 redis-py-cluster 现在并不支持 Stream 数据,所以 redis-py-cloud 就是在 redis-py-cluster 基础上实现的针对 Stream 数据类型的 python 包。
TPS:redis-py-cloud 可以在 python 2.7.5 版本使用,但是推荐 3.0 以上版本,其他版本未测试。
python 中首先安装 redis-py-cloud 包。git 地址如下:
git 安装:
https://github.com/ChinaGoldBear/redis-py-cloud
下载完成, 执行 python setup.py install 安装。
pip 安装:
pip install redis-py-cloud 然后就可以用了!配置集群机器:
from rediscluster import RedisCluster
'''连接集群''' startup_nodes = [{"host": "ip3", "port": "7000"}, {"host": "ip2", "port": "7000"}, {"host": "ip1", "port": "7000"}, ]
REDIS_CLUSTER = RedisCluster(startup_nodes=startup_nodes, decode_responses=True) 测试:
REDIS_CLUSTER.xadd(STREAM_NAME, "*", 100,{"name": "data"}) OK,大功告成!
详细教程参考:使用 redis-py-cloud 操作 redis5 stream 数据 https://blog.csdn.net/copyangle/article/details/81975975
Python中如何使用Redis 5.0新特性Stream数据
要使用Redis 5.0的Stream,你得先装个redis-py库,版本最好>=3.0。Stream的核心就是按时间顺序存消息,每个消息有唯一ID和键值对数据。
基本操作就这几步:
- 连接Redis:
import redis; r = redis.Redis() - 生产消息(XADD):用
xadd发消息,*让Redis自动生成ID。msg_id = r.xadd('mystream', {'field1': 'value1', 'temperature': '22.5'}) print(f"消息已发送,ID: {msg_id}") - 消费消息(XREAD):
- 从头读:指定起始ID为
0。messages = r.xread({'mystream': '0'}) - 阻塞读新消息:用
block=0一直等,$表示只读最新的。messages = r.xread({'mystream': '$'}, block=0) for stream, message_list in messages: for msg_id, fields in message_list: print(f"收到消息 {msg_id}: {fields}")
- 从头读:指定起始ID为
- 管理消费者组(XGROUP):这是关键,让多个消费者分担。
组里读用# 创建组(如果流不存在,用mkstream创建) r.xgroup_create('mystream', 'mygroup', id='0', mkstream=True) # 消费者从组里读 messages = r.xreadgroup('mygroup', 'consumer1', {'mystream': '>'})xreadgroup,>表示没分配给别人的消息。读完得确认(xack)。
简单说,发消息用xadd,简单读用xread,正经多消费者协作就用消费者组(xgroup_create, xreadgroup, xack)。先跑通这几个基础命令,Stream基本就上手了。
no test. no benchmark. no pull request.
no document.
5 星给 1 星。
饭端到你面前了还得喂你嘴里? 真是哪都有无脑喷子

