Python中如何使用Tornado WebSocket + Redis实现广播功能,解决单客户端连接阻塞问题,以及如何存储用户连接并持续推送信息

引入需要的模块
import os

from tornado.web import RequestHandler, Application,Cookie
from tornado.httpserver import HTTPServer
from tornado.websocket import WebSocketHandler
from tornado.ioloop import IOLoop

from redisp import redisPool
from tornado.httpclient import AsyncHTTPClient #异步的 http 请求客户端

#
# from queue import Queue
# q = Queue()

class BaseHandler(RequestHandler):
pass

# 定义首页视图处理类,提示用户登录
class IndexHandler(BaseHandler):
def get(self):
self.render(‘index.html’)


# 定义登录视图处理类
class LoginHandler(BaseHandler):
def get(self):
# 获取用户登录的昵称
nickname=self.get_argument(‘nickname’)
# 将用户登录的昵称保存在 cookie 中,安全 cookie
self.set_secure_cookie(‘nickname’,nickname,expires_days=None)
self.render(‘chat.html’,nickname=nickname)


# 定义接收 /发送聊天消息的视图处理类,继承自 websocket 的 WebSocketHandler
class ChatHandler(WebSocketHandler):
# 定义一个集合,用来保存在线的所有用户
online_users = set()
uselist = []
# 从客户端获取 cookie 信息
redisPool = redisPool()
rp = redisPool.getRedisPool()

@classmethod
def send_trick(cls,rp,user):
old_send_message = ‘’
while True:
send_message = rp.get(‘SSEMarketData’)
if old_send_message != send_message:
# for user in self.online_users:
user.write_message(send_message)
old_send_message = send_message

# 重写 open 方法,当有新的聊天用户进入的时候自动触发该函数
def open(self):
# 当有新的用户上线,将该用户加入集合中
self.online_users.add(self)
self.uselist.append(self)
# 将新用户加入的信息发送给所有的在线用户
for user in self.online_users:
user_name = self.get_secure_cookie(“nickname”).decode(‘utf-8’)
user.write_message(’ [%s ] 进入了聊天室’ % user_name)


# 重写 on_message 方法,当聊天消息有更新时自动触发的函数
def on_message(self, message):
# 将在线用户发送的消息通过服务器转发给所有的在线用户

for user in self.online_users:
user_name = self.get_secure_cookie(“nickname”).decode(‘utf-8’)
user.write_message(’%s:%s’ % (user_name, message))
if message:
old_send_message = ‘’
while True:
send_message = self.rp.get(‘SSEMarketData’)
if old_send_message != send_message:
# for user in self.online_users:
self.write_message(send_message)
old_send_message = send_message
# ChatHandler.send_trick(self.rp,self.uselist[-1])


# 重写 on_close 方法,当有用户离开时自动触发的函数
def on_close(self):
# 先将用户从列表中移除
self.online_users.remove(self)
# 将该用户离开的消息发送给所有在线的用户
for user in self.online_users:
user.write_message(’ [%s ] 离开了聊天室~’ % self.request.remote_ip)


def check_origin(self, origin):
return True


# 程序运行入口
if name==‘main’:

BASE_DIR = os.path.dirname(file)
app = Application([
(r’/’, IndexHandler),
(r’/login’, LoginHandler),
(r’/chat’, ChatHandler),
],
template_path=os.path.join(BASE_DIR, ‘templates’),
static_path=os.path.join(BASE_DIR, ‘static’),
debug=True,
login_url=’/login’,
cookie_secret=‘CPFL2FJOTQGzR/8DXZEyfAjxS9hSTk0Bs0f/A12RMnwI8UvYUk5NAbNH/wNdkTJ8’)
server = HTTPServer(app)
server.listen(8000)
IOLoop.current().start()
Python中如何使用Tornado WebSocket + Redis实现广播功能,解决单客户端连接阻塞问题,以及如何存储用户连接并持续推送信息


2 回复

我来给你一个完整的实现方案。核心思路是:用Redis的Pub/Sub做消息分发,避免单连接阻塞;用字典存储WebSocket连接,实现用户管理。

import tornado.ioloop
import tornado.web
import tornado.websocket
import asyncio
import redis.asyncio as redis
import json
import uuid

class WebSocketHandler(tornado.websocket.WebSocketHandler):
    # 存储所有活跃连接 {connection_id: WebSocketHandler实例}
    connections = {}
    
    def initialize(self, redis_client):
        self.redis_client = redis_client
        self.connection_id = str(uuid.uuid4())
        self.pubsub = None
        
    async def open(self):
        """客户端连接时调用"""
        # 存储连接
        WebSocketHandler.connections[self.connection_id] = self
        print(f"新连接: {self.connection_id}, 当前连接数: {len(WebSocketHandler.connections)}")
        
        # 订阅Redis频道
        self.pubsub = self.redis_client.pubsub()
        await self.pubsub.subscribe('broadcast_channel')
        
        # 启动消息监听任务
        asyncio.create_task(self.listen_redis())
        
    async def listen_redis(self):
        """监听Redis消息并推送给客户端"""
        try:
            async for message in self.pubsub.listen():
                if message['type'] == 'message':
                    data = message['data']
                    if isinstance(data, bytes):
                        data = data.decode('utf-8')
                    
                    # 推送给当前客户端
                    if self.ws_connection:
                        await self.write_message(data)
        except Exception as e:
            print(f"监听Redis出错: {e}")
        finally:
            if self.pubsub:
                await self.pubsub.unsubscribe()
                await self.pubsub.close()
    
    async def on_message(self, message):
        """接收客户端消息"""
        try:
            data = json.loads(message)
            # 这里可以处理客户端发送的消息
            # 例如:{"type": "chat", "content": "hello"}
            print(f"收到消息: {data}")
            
            # 如果需要广播给所有客户端,可以发布到Redis
            # await self.redis_client.publish('broadcast_channel', json.dumps(data))
            
        except json.JSONDecodeError:
            print(f"无效的JSON: {message}")
    
    def on_close(self):
        """客户端断开连接时调用"""
        if self.connection_id in WebSocketHandler.connections:
            del WebSocketHandler.connections[self.connection_id]
        print(f"连接关闭: {self.connection_id}, 剩余连接数: {len(WebSocketHandler.connections)}")
        
        # 关闭Redis订阅
        if self.pubsub:
            asyncio.create_task(self.pubsub.unsubscribe())

class BroadcastHandler(tornado.web.RequestHandler):
    """HTTP接口,用于触发广播"""
    
    def initialize(self, redis_client):
        self.redis_client = redis_client
    
    async def post(self):
        """接收广播消息并发布到Redis"""
        try:
            message = self.get_argument('message', '')
            if not message:
                self.write({'error': '消息不能为空'})
                return
                
            # 发布到Redis频道
            await self.redis_client.publish('broadcast_channel', message)
            
            self.write({'success': True, 'message': '广播已发送'})
        except Exception as e:
            self.write({'error': str(e)})

async def make_app():
    """创建应用"""
    # 创建Redis客户端
    redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
    
    # 测试Redis连接
    try:
        await redis_client.ping()
        print("Redis连接成功")
    except Exception as e:
        print(f"Redis连接失败: {e}")
        raise
    
    return tornado.web.Application([
        (r'/ws', WebSocketHandler, {'redis_client': redis_client}),
        (r'/broadcast', BroadcastHandler, {'redis_client': redis_client}),
    ])

async def main():
    """主函数"""
    app = await make_app()
    app.listen(8888)
    print("WebSocket服务器启动在: ws://localhost:8888/ws")
    print("广播接口: POST http://localhost:8888/broadcast?message=你的消息")
    
    # 启动Tornado事件循环
    await asyncio.Event().wait()

if __name__ == '__main__':
    asyncio.run(main())

关键点解释:

  1. Redis Pub/Sub解耦:每个WebSocket连接独立订阅Redis频道,消息通过Redis广播,避免单连接阻塞影响其他客户端。

  2. 连接管理:用类变量connections字典存储所有活跃连接,key是唯一的connection_id,value是WebSocketHandler实例。

  3. 异步处理:使用async/await确保非阻塞,listen_redis()在后台任务中运行,持续监听Redis消息。

  4. 消息推送:收到Redis消息后,通过write_message()推送给对应客户端。

  5. 资源清理:连接关闭时,从connections字典移除,并取消Redis订阅。

使用方式:

  1. 启动Redis服务器
  2. 运行Python脚本
  3. 客户端连接:ws://localhost:8888/ws
  4. 发送广播:POST请求到http://localhost:8888/broadcast?message=你的消息

总结:用Redis做消息中转,连接字典做用户管理,异步避免阻塞。


tornado 有自带的 websocket,,可以实现你现在的功能,,网上好像有现成的代码,,你搜一下

回到顶部