Python中WebSocket连接后立即断开的原因及解决方法
服务端用的 flask_socketio,
客户端参照 flask_socketio 的官方文档引用
cdn.cloudflare.com/ajax/libs/socket.io/1.3.6/socket.io.min.js
这个 Socket.IO 库连接,没有问题,然鹅用 websocket 测试工具http://www.blue-zero.com/WebSocket/ 测试发现,一连接上就断开,请教各位大大
Python中WebSocket连接后立即断开的原因及解决方法
4 回复
请搜索并阅读《提问的智慧》
Python中WebSocket连接后立即断开的常见原因及解决方法
这个问题通常由几个关键因素导致,下面我结合代码示例来说明。
1. 最常见原因:缺少心跳机制或PING/PONG处理 许多WebSocket服务器(如Nginx、某些云服务)有闲置超时设置。如果客户端不发送PING帧,连接会被主动断开。
import asyncio
import websockets
async def keep_alive(websocket):
"""心跳保持连接"""
try:
while True:
await asyncio.sleep(20) # 每20秒发送一次PING
await websocket.ping()
except websockets.exceptions.ConnectionClosed:
pass
async def main():
uri = "ws://your-server.com/socket"
async with websockets.connect(uri) as websocket:
# 启动心跳任务
keep_alive_task = asyncio.create_task(keep_alive(websocket))
try:
async for message in websocket:
print(f"收到消息: {message}")
finally:
keep_alive_task.cancel()
await keep_alive_task
asyncio.run(main())
2. 协议版本或头部问题 某些服务器对WebSocket版本或头部有严格要求。
import websockets
async def connect_with_custom_headers():
uri = "ws://your-server.com/socket"
headers = {
"User-Agent": "MyClient/1.0",
"Sec-WebSocket-Version": "13", # 明确指定版本
}
async with websockets.connect(
uri,
extra_headers=headers,
ping_interval=20, # 自动PING间隔
ping_timeout=30 # PONG等待超时
) as websocket:
# 连接建立后的业务逻辑
await websocket.send("Hello")
response = await websocket.recv()
print(f"服务器响应: {response}")
asyncio.run(connect_with_custom_headers())
3. SSL/TLS证书验证问题(WSS连接)
import ssl
import websockets
async def connect_with_ssl():
uri = "wss://your-secure-server.com/socket"
# 如果需要自定义SSL上下文
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False # 仅测试环境使用
ssl_context.verify_mode = ssl.CERT_NONE
async with websockets.connect(
uri,
ssl=ssl_context, # 传入SSL上下文
ping_interval=None # 禁用自动PING,手动处理
) as websocket:
# 手动发送PING
await websocket.ping()
await websocket.send("Data")
# 设置接收超时
try:
response = await asyncio.wait_for(websocket.recv(), timeout=10)
print(f"收到: {response}")
except asyncio.TimeoutError:
print("接收超时")
asyncio.run(connect_with_ssl())
4. 服务器端断开连接的处理
import websockets
from websockets.exceptions import ConnectionClosed
async def robust_connection():
uri = "ws://your-server.com/socket"
reconnect_attempts = 0
max_reconnect = 5
while reconnect_attempts < max_reconnect:
try:
async with websockets.connect(uri) as websocket:
print("连接成功")
reconnect_attempts = 0 # 重置重连计数
# 业务循环
while True:
try:
message = await websocket.recv()
print(f"消息: {message}")
except ConnectionClosed as e:
print(f"连接关闭: {e.code} - {e.reason}")
break
except (OSError, websockets.exceptions.InvalidURI) as e:
reconnect_attempts += 1
wait_time = min(2 ** reconnect_attempts, 30) # 指数退避
print(f"连接失败,{wait_time}秒后重试...")
await asyncio.sleep(wait_time)
print("达到最大重连次数,放弃连接")
asyncio.run(robust_connection())
快速检查清单:
- 检查服务器是否有闲置超时设置
- 实现PING/PONG心跳机制
- 验证WebSocket协议版本(通常为13)
- 检查SSL/TLS证书(WSS连接时)
- 查看服务器返回的关闭代码和原因
总结: 加个心跳保活,检查协议头和SSL配置基本就能解决。
客户端只能用 socketio
Python 有 websocket 客户端
用这来调试服务端 调好了剩下都是 js 的事情
不用管鹅厂什么测试工具

