Python中如何使用Tornado WebSocket + Redis实现广播功能,解决单客户端连接阻塞问题,以及如何存储用户连接并持续推送信息
引入需要的模块
import os
from tornado.web import RequestHandler, Application,Cookie
from tornado.httpserver import HTTPServer
from tornado.websocket import WebSocketHandler
from tornado.ioloop import IOLoop
from redisp import redisPool
from tornado.httpclient import AsyncHTTPClient #异步的 http 请求客户端
#
# from queue import Queue
# q = Queue()
class BaseHandler(RequestHandler):
pass
# 定义首页视图处理类,提示用户登录
class IndexHandler(BaseHandler):
def get(self):
self.render(‘index.html’)
# 定义登录视图处理类
class LoginHandler(BaseHandler):
def get(self):
# 获取用户登录的昵称
nickname=self.get_argument(‘nickname’)
# 将用户登录的昵称保存在 cookie 中,安全 cookie
self.set_secure_cookie(‘nickname’,nickname,expires_days=None)
self.render(‘chat.html’,nickname=nickname)
# 定义接收 /发送聊天消息的视图处理类,继承自 websocket 的 WebSocketHandler
class ChatHandler(WebSocketHandler):
# 定义一个集合,用来保存在线的所有用户
online_users = set()
uselist = []
# 从客户端获取 cookie 信息
redisPool = redisPool()
rp = redisPool.getRedisPool()
@classmethod
def send_trick(cls,rp,user):
old_send_message = ‘’
while True:
send_message = rp.get(‘SSEMarketData’)
if old_send_message != send_message:
# for user in self.online_users:
user.write_message(send_message)
old_send_message = send_message
# 重写 open 方法,当有新的聊天用户进入的时候自动触发该函数
def open(self):
# 当有新的用户上线,将该用户加入集合中
self.online_users.add(self)
self.uselist.append(self)
# 将新用户加入的信息发送给所有的在线用户
for user in self.online_users:
user_name = self.get_secure_cookie(“nickname”).decode(‘utf-8’)
user.write_message(’ [%s ] 进入了聊天室’ % user_name)
# 重写 on_message 方法,当聊天消息有更新时自动触发的函数
def on_message(self, message):
# 将在线用户发送的消息通过服务器转发给所有的在线用户
for user in self.online_users:
user_name = self.get_secure_cookie(“nickname”).decode(‘utf-8’)
user.write_message(’%s:%s’ % (user_name, message))
if message:
old_send_message = ‘’
while True:
send_message = self.rp.get(‘SSEMarketData’)
if old_send_message != send_message:
# for user in self.online_users:
self.write_message(send_message)
old_send_message = send_message
# ChatHandler.send_trick(self.rp,self.uselist[-1])
# 重写 on_close 方法,当有用户离开时自动触发的函数
def on_close(self):
# 先将用户从列表中移除
self.online_users.remove(self)
# 将该用户离开的消息发送给所有在线的用户
for user in self.online_users:
user.write_message(’ [%s ] 离开了聊天室~’ % self.request.remote_ip)
def check_origin(self, origin):
return True
# 程序运行入口
if name==‘main’:
BASE_DIR = os.path.dirname(file)
app = Application([
(r’/’, IndexHandler),
(r’/login’, LoginHandler),
(r’/chat’, ChatHandler),
],
template_path=os.path.join(BASE_DIR, ‘templates’),
static_path=os.path.join(BASE_DIR, ‘static’),
debug=True,
login_url=’/login’,
cookie_secret=‘CPFL2FJOTQGzR/8DXZEyfAjxS9hSTk0Bs0f/A12RMnwI8UvYUk5NAbNH/wNdkTJ8’)
server = HTTPServer(app)
server.listen(8000)
IOLoop.current().start()
Python中如何使用Tornado WebSocket + Redis实现广播功能,解决单客户端连接阻塞问题,以及如何存储用户连接并持续推送信息
2 回复
我来给你一个完整的实现方案。核心思路是:用Redis的Pub/Sub做消息分发,避免单连接阻塞;用字典存储WebSocket连接,实现用户管理。
import tornado.ioloop
import tornado.web
import tornado.websocket
import asyncio
import redis.asyncio as redis
import json
import uuid
class WebSocketHandler(tornado.websocket.WebSocketHandler):
# 存储所有活跃连接 {connection_id: WebSocketHandler实例}
connections = {}
def initialize(self, redis_client):
self.redis_client = redis_client
self.connection_id = str(uuid.uuid4())
self.pubsub = None
async def open(self):
"""客户端连接时调用"""
# 存储连接
WebSocketHandler.connections[self.connection_id] = self
print(f"新连接: {self.connection_id}, 当前连接数: {len(WebSocketHandler.connections)}")
# 订阅Redis频道
self.pubsub = self.redis_client.pubsub()
await self.pubsub.subscribe('broadcast_channel')
# 启动消息监听任务
asyncio.create_task(self.listen_redis())
async def listen_redis(self):
"""监听Redis消息并推送给客户端"""
try:
async for message in self.pubsub.listen():
if message['type'] == 'message':
data = message['data']
if isinstance(data, bytes):
data = data.decode('utf-8')
# 推送给当前客户端
if self.ws_connection:
await self.write_message(data)
except Exception as e:
print(f"监听Redis出错: {e}")
finally:
if self.pubsub:
await self.pubsub.unsubscribe()
await self.pubsub.close()
async def on_message(self, message):
"""接收客户端消息"""
try:
data = json.loads(message)
# 这里可以处理客户端发送的消息
# 例如:{"type": "chat", "content": "hello"}
print(f"收到消息: {data}")
# 如果需要广播给所有客户端,可以发布到Redis
# await self.redis_client.publish('broadcast_channel', json.dumps(data))
except json.JSONDecodeError:
print(f"无效的JSON: {message}")
def on_close(self):
"""客户端断开连接时调用"""
if self.connection_id in WebSocketHandler.connections:
del WebSocketHandler.connections[self.connection_id]
print(f"连接关闭: {self.connection_id}, 剩余连接数: {len(WebSocketHandler.connections)}")
# 关闭Redis订阅
if self.pubsub:
asyncio.create_task(self.pubsub.unsubscribe())
class BroadcastHandler(tornado.web.RequestHandler):
"""HTTP接口,用于触发广播"""
def initialize(self, redis_client):
self.redis_client = redis_client
async def post(self):
"""接收广播消息并发布到Redis"""
try:
message = self.get_argument('message', '')
if not message:
self.write({'error': '消息不能为空'})
return
# 发布到Redis频道
await self.redis_client.publish('broadcast_channel', message)
self.write({'success': True, 'message': '广播已发送'})
except Exception as e:
self.write({'error': str(e)})
async def make_app():
"""创建应用"""
# 创建Redis客户端
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 测试Redis连接
try:
await redis_client.ping()
print("Redis连接成功")
except Exception as e:
print(f"Redis连接失败: {e}")
raise
return tornado.web.Application([
(r'/ws', WebSocketHandler, {'redis_client': redis_client}),
(r'/broadcast', BroadcastHandler, {'redis_client': redis_client}),
])
async def main():
"""主函数"""
app = await make_app()
app.listen(8888)
print("WebSocket服务器启动在: ws://localhost:8888/ws")
print("广播接口: POST http://localhost:8888/broadcast?message=你的消息")
# 启动Tornado事件循环
await asyncio.Event().wait()
if __name__ == '__main__':
asyncio.run(main())
关键点解释:
-
Redis Pub/Sub解耦:每个WebSocket连接独立订阅Redis频道,消息通过Redis广播,避免单连接阻塞影响其他客户端。
-
连接管理:用类变量
connections字典存储所有活跃连接,key是唯一的connection_id,value是WebSocketHandler实例。 -
异步处理:使用
async/await确保非阻塞,listen_redis()在后台任务中运行,持续监听Redis消息。 -
消息推送:收到Redis消息后,通过
write_message()推送给对应客户端。 -
资源清理:连接关闭时,从connections字典移除,并取消Redis订阅。
使用方式:
- 启动Redis服务器
- 运行Python脚本
- 客户端连接:
ws://localhost:8888/ws - 发送广播:POST请求到
http://localhost:8888/broadcast?message=你的消息
总结:用Redis做消息中转,连接字典做用户管理,异步避免阻塞。
tornado 有自带的 websocket,,可以实现你现在的功能,,网上好像有现成的代码,,你搜一下

