Python 监听 Redis 的 channel 问题如何解决?

在 flask 项目中,单独开一个进程来订阅第三方 redis 的 channel,代码如下

r = redis.Redis(connection_pool=pool)
p = r.pubsub()
p.subscribe([channel])
for sub in p.listen():
    print(sub)

在项目启动之后该监听服务正常,但是在十几分钟之后该服务不再工作,有老哥知道原因吗?是不是有个配置能够保证该 redis 连接为长连接。


Python 监听 Redis 的 channel 问题如何解决?

2 回复

在Python里监听Redis的channel,直接用redis-py库的pub/sub功能就行。下面是个完整例子:

import redis
import threading
import time

def subscriber():
    r = redis.Redis(host='localhost', port=6379, decode_responses=True)
    p = r.pubsub()
    p.subscribe('mychannel')
    
    print("开始监听频道...")
    for message in p.listen():
        if message['type'] == 'message':
            print(f"收到消息: {message['data']}")

def publisher():
    r = redis.Redis(host='localhost', port=6379, decode_responses=True)
    for i in range(5):
        r.publish('mychannel', f'消息 {i}')
        time.sleep(1)

if __name__ == '__main__':
    # 启动订阅者线程
    sub_thread = threading.Thread(target=subscriber)
    sub_thread.daemon = True
    sub_thread.start()
    
    # 等订阅者准备好
    time.sleep(1)
    
    # 发布消息
    publisher()

关键点就几个:

  1. 创建pubsub()对象来管理订阅
  2. subscribe()方法订阅频道
  3. listen()方法阻塞等待消息,返回的是个生成器
  4. 消息类型要判断,'message'才是正经数据

如果要订阅多个频道就用psubscribe('channel*')支持通配符。记得处理连接异常和重连,生产环境用连接池。

简单说就是创建pubsub对象,订阅频道,循环listen等消息。


再弄个线程,用 timer 每 5 分钟 keepalive 一下。:-)

回到顶部