Python websocket-client 如何保持长连接呢?

最近在了解 websocket,但是发现死活不能保持长连接,用 tornado 实现了一个 server:

# coding=utf8

from tornado.websocket import WebSocketHandler from tornado.web import asynchronous, Application from tornado.httpserver import HTTPServer from tornado.ioloop import IOLoop from tornado.gen import coroutine, sleep

class WSHandler(WebSocketHandler):

@asynchronous
def open(self, *args, **kwargs):
    print '{0} connected'.format(self.request.remote_ip)

@coroutine
def on_message(self, message):
    if message:
        if message:
            yield sleep(5)
            self.write_message("it\'s is done")

def on_close(self):
    print 'client is exit'

def main():

app = Application([(r'/', WSHandler)],
                  static_path='static')
server = HTTPServer(app)
server.listen(8088)
IOLoop.instance().start()

if name == ‘main’: main()

客户端代码:

import socket
from websocket import create_connection
import websocket
while 1:
    ws = create_connection("ws://localhost:8088/", timeout=5)
    if ws.connected:
        ws.send('am coming', opcode=websocket.ABNF.OPCODE_TEXT)
        print ws.recv()
    # ws.close()

当 client 调用recv()函数后就退出了,有什么办法可以做到一直保持链接,然后客户端得到消息可以正常处理


Python websocket-client 如何保持长连接呢?

7 回复

心跳包试试


websocket-client 库保持长连接,核心就是处理网络异常和实现重连逻辑。直接上代码,这个例子能自动重连,还处理了心跳:

import websocket
import threading
import time
import json
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class StableWebSocketClient:
    def __init__(self, url, reconnect_interval=5):
        self.url = url
        self.reconnect_interval = reconnect_interval
        self.ws = None
        self.connected = False
        self.should_reconnect = True
        
    def on_message(self, ws, message):
        """收到消息的处理"""
        logger.info(f"收到消息: {message}")
        # 这里处理你的业务逻辑
        try:
            data = json.loads(message)
            # 处理数据...
        except json.JSONDecodeError:
            pass
            
    def on_error(self, ws, error):
        """错误处理"""
        logger.error(f"WebSocket错误: {error}")
        
    def on_close(self, ws, close_status_code, close_msg):
        """连接关闭处理"""
        logger.warning(f"连接关闭: {close_status_code} - {close_msg}")
        self.connected = False
        
        if self.should_reconnect:
            logger.info(f"{self.reconnect_interval}秒后尝试重连...")
            time.sleep(self.reconnect_interval)
            self.connect()
            
    def on_open(self, ws):
        """连接成功回调"""
        logger.info("连接成功")
        self.connected = True
        
        # 启动心跳线程
        heartbeat_thread = threading.Thread(target=self.send_heartbeat)
        heartbeat_thread.daemon = True
        heartbeat_thread.start()
        
    def send_heartbeat(self):
        """发送心跳保持连接"""
        while self.connected and self.ws:
            try:
                # 发送心跳消息,根据你的服务器协议调整
                self.ws.send(json.dumps({"type": "ping"}))
                time.sleep(30)  # 每30秒发送一次心跳
            except Exception as e:
                logger.error(f"发送心跳失败: {e}")
                break
                
    def connect(self):
        """建立连接"""
        try:
            logger.info(f"正在连接到 {self.url}")
            
            # 创建WebSocket连接
            self.ws = websocket.WebSocketApp(
                self.url,
                on_open=self.on_open,
                on_message=self.on_message,
                on_error=self.on_error,
                on_close=self.on_close
            )
            
            # 设置连接选项
            self.ws.run_forever(
                ping_interval=20,    # 每20秒发送一次ping
                ping_timeout=10,     # ping超时时间
                reconnect=5          # 自动重连次数
            )
            
        except Exception as e:
            logger.error(f"连接失败: {e}")
            if self.should_reconnect:
                time.sleep(self.reconnect_interval)
                self.connect()
                
    def send(self, message):
        """发送消息"""
        if self.connected and self.ws:
            try:
                self.ws.send(message)
            except Exception as e:
                logger.error(f"发送消息失败: {e}")
                
    def disconnect(self):
        """断开连接"""
        self.should_reconnect = False
        if self.ws:
            self.ws.close()
            
    def run(self):
        """运行客户端"""
        connect_thread = threading.Thread(target=self.connect)
        connect_thread.daemon = True
        connect_thread.start()

# 使用示例
if __name__ == "__main__":
    client = StableWebSocketClient("wss://echo.websocket.org")
    client.run()
    
    # 主线程可以继续做其他事情
    try:
        while True:
            # 这里可以添加发送消息的逻辑
            # client.send('{"type": "data", "content": "hello"}')
            time.sleep(1)
    except KeyboardInterrupt:
        client.disconnect()
        logger.info("客户端已停止")

关键点:

  1. run_forever() 参数ping_intervalping_timeout 是保持TCP层连接的关键
  2. 重连机制:在 on_close 回调里实现,连接断开后自动重连
  3. 应用层心跳:有些服务器需要业务心跳,用单独的线程发送
  4. 异常处理:所有网络操作都要有 try-except
  5. 线程安全:用 threading 处理心跳和重连,避免阻塞主循环

简单总结: 用好 run_forever 的参数加上自动重连,就能稳定保持长连接。

逻辑没写对吧,每次循环都创建新的连接。建议看看官方给的例子。

把 if 改成 while,下面再 sleep 1 秒,试试看。

改用 socketio

create_connection 不能自动复用已有链接吧?你这每次 while 都新建一个链接

create_connection 放循环外去啊…

回到顶部