Python中Websocket Client的实现与常见问题请教
网上的文档太少了,先问第一个问题,如何订阅指定频道?
翻遍了谷歌也找不到,我看到不少文章都是写的:
ws.send(json.dumps({"event":"subscribe","channel":"trades"})
想不通,这个只是普通发消息的方法,哪门子是订阅啊?
Python中Websocket Client的实现与常见问题请教
5 回复
帖子回复:
要搞一个WebSocket客户端,直接用websockets库最省事。先装包:pip install websockets。
下面是基础实现,包含连接、收发消息、异常处理和重连:
import asyncio
import websockets
import json
from typing import Optional
class WebSocketClient:
def __init__(self, uri: str, max_reconnect_attempts: int = 5):
self.uri = uri
self.connection: Optional[websockets.WebSocketClientProtocol] = None
self.max_reconnect_attempts = max_reconnect_attempts
self.reconnect_count = 0
async def connect(self):
"""建立连接并处理重试逻辑"""
while self.reconnect_count < self.max_reconnect_attempts:
try:
print(f"尝试连接到 {self.uri}")
self.connection = await websockets.connect(self.uri)
self.reconnect_count = 0
print("连接成功")
return
except (websockets.WebSocketException, OSError) as e:
self.reconnect_count += 1
wait_time = min(2 ** self.reconnect_count, 30)
print(f"连接失败 ({e}),{wait_time}秒后重试...")
await asyncio.sleep(wait_time)
raise ConnectionError(f"无法连接到 {self.uri},已重试{self.max_reconnect_attempts}次")
async def send_message(self, message: dict):
"""发送JSON格式消息"""
if not self.connection:
raise RuntimeError("未建立连接")
try:
await self.connection.send(json.dumps(message))
except websockets.ConnectionClosed:
print("连接已关闭,尝试重连...")
await self.reconnect()
await self.send_message(message) # 重连后重新发送
async def receive_messages(self):
"""持续接收消息"""
while True:
try:
message = await self.connection.recv()
data = json.loads(message)
print(f"收到消息: {data}")
# 在这里处理业务逻辑
except websockets.ConnectionClosed:
print("连接断开")
await self.reconnect()
except json.JSONDecodeError:
print(f"收到非JSON消息: {message}")
async def reconnect(self):
"""重新连接"""
if self.connection:
await self.connection.close()
await self.connect()
async def run(self):
"""主运行循环"""
await self.connect()
# 启动接收任务
receive_task = asyncio.create_task(self.receive_messages())
# 示例:定时发送心跳
try:
while True:
await self.send_message({"type": "ping", "data": "heartbeat"})
await asyncio.sleep(30)
finally:
receive_task.cancel()
if self.connection:
await self.connection.close()
# 使用示例
async def main():
client = WebSocketClient("wss://echo.websocket.org")
await client.run()
if __name__ == "__main__":
asyncio.run(main())
常见问题处理:
- 连接断开:代码里已经加了自动重连机制,指数退避避免服务器压力
- 消息格式:统一用JSON序列化,收到非JSON消息会捕获异常
- 并发问题:用
asyncio.create_task处理并发接收,不会阻塞发送 - 资源清理:finally块确保连接关闭,避免资源泄漏
总结建议:用异步处理加自动重连能解决大部分问题。
订阅能在 path 里直接指定?就是类似 ws.subscribe(url) 的方法也不需要?
主要看服务端是怎么弄的,要么是 url 中指定,要么连接成功后发送一个消息过去。
已经找到实现方法了,跟服务的无关,是框架的机制。


