Python中Websocket Client的实现与常见问题请教

网上的文档太少了,先问第一个问题,如何订阅指定频道?

翻遍了谷歌也找不到,我看到不少文章都是写的:

ws.send(json.dumps({"event":"subscribe","channel":"trades"})

想不通,这个只是普通发消息的方法,哪门子是订阅啊?


Python中Websocket Client的实现与常见问题请教
5 回复

一般在 URL 中的 path 部分指定。这也要看 Server 端的实现,或者说是约定了。


帖子回复:

要搞一个WebSocket客户端,直接用websockets库最省事。先装包:pip install websockets

下面是基础实现,包含连接、收发消息、异常处理和重连:

import asyncio
import websockets
import json
from typing import Optional

class WebSocketClient:
    def __init__(self, uri: str, max_reconnect_attempts: int = 5):
        self.uri = uri
        self.connection: Optional[websockets.WebSocketClientProtocol] = None
        self.max_reconnect_attempts = max_reconnect_attempts
        self.reconnect_count = 0
        
    async def connect(self):
        """建立连接并处理重试逻辑"""
        while self.reconnect_count < self.max_reconnect_attempts:
            try:
                print(f"尝试连接到 {self.uri}")
                self.connection = await websockets.connect(self.uri)
                self.reconnect_count = 0
                print("连接成功")
                return
            except (websockets.WebSocketException, OSError) as e:
                self.reconnect_count += 1
                wait_time = min(2 ** self.reconnect_count, 30)
                print(f"连接失败 ({e}),{wait_time}秒后重试...")
                await asyncio.sleep(wait_time)
        
        raise ConnectionError(f"无法连接到 {self.uri},已重试{self.max_reconnect_attempts}次")

    async def send_message(self, message: dict):
        """发送JSON格式消息"""
        if not self.connection:
            raise RuntimeError("未建立连接")
        
        try:
            await self.connection.send(json.dumps(message))
        except websockets.ConnectionClosed:
            print("连接已关闭,尝试重连...")
            await self.reconnect()
            await self.send_message(message)  # 重连后重新发送

    async def receive_messages(self):
        """持续接收消息"""
        while True:
            try:
                message = await self.connection.recv()
                data = json.loads(message)
                print(f"收到消息: {data}")
                # 在这里处理业务逻辑
                
            except websockets.ConnectionClosed:
                print("连接断开")
                await self.reconnect()
            except json.JSONDecodeError:
                print(f"收到非JSON消息: {message}")

    async def reconnect(self):
        """重新连接"""
        if self.connection:
            await self.connection.close()
        await self.connect()

    async def run(self):
        """主运行循环"""
        await self.connect()
        
        # 启动接收任务
        receive_task = asyncio.create_task(self.receive_messages())
        
        # 示例:定时发送心跳
        try:
            while True:
                await self.send_message({"type": "ping", "data": "heartbeat"})
                await asyncio.sleep(30)
        finally:
            receive_task.cancel()
            if self.connection:
                await self.connection.close()

# 使用示例
async def main():
    client = WebSocketClient("wss://echo.websocket.org")
    await client.run()

if __name__ == "__main__":
    asyncio.run(main())

常见问题处理:

  1. 连接断开:代码里已经加了自动重连机制,指数退避避免服务器压力
  2. 消息格式:统一用JSON序列化,收到非JSON消息会捕获异常
  3. 并发问题:用asyncio.create_task处理并发接收,不会阻塞发送
  4. 资源清理:finally块确保连接关闭,避免资源泄漏

总结建议:用异步处理加自动重连能解决大部分问题。

订阅能在 path 里直接指定?就是类似 ws.subscribe(url) 的方法也不需要?

主要看服务端是怎么弄的,要么是 url 中指定,要么连接成功后发送一个消息过去。

已经找到实现方法了,跟服务的无关,是框架的机制。

回到顶部