Python中websocket-client库无法从外部调用发送事件的问题

A 内容

# -*- coding:utf-8 -*-
import websocket 
import requests
import re
import json
import time

def on_message(ws, message): # 服务器有数据更新时,主动推送过来的数据 # message = json.loads(message) print(message)

def on_error(ws, error): # 程序报错时,就会触发 on_error 事件 # print(error) time.sleep(10) # 延时十秒,预防假死 Start() # 重连

def on_close(ws): print(“Connection closed ……”)

def on_open(ws): # 连接到服务器之后就会触发 on_open 事件,这里用于 send 数据 # print(req) ws.send(1)

def Start(): # websocket.enableTrace(True) # 调试开启和关闭 ws = websocket.WebSocketApp(“ws://192.168.31.100:9393”, on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open ws.run_forever(ping_timeout=30)

if name == “main”: Start()

我的 websocket 客户端代码是酱紫的;
那么问题来了,比如是有一另外一个 py 叫 B;
我如何在 B 里面调用 A 的 websocket 发送内容?

ws.send('字符串') 是发送内容;
我如何在外部进行调用 然后发送呢?
因为我只想在我需要的时候主动发送某些内容给服务器;
而不是一直等待 on_message 的响应后才发送;


Python中websocket-client库无法从外部调用发送事件的问题

8 回复

-- coding:utf-8 --
import websocket
import requests
import re
import json
import time,threading


def on_message(ws, message): # 服务器有数据更新时,主动推送过来的数据
# message = json.loads(message)
print(message)


def on_error(ws, error): # 程序报错时,就会触发 on_error 事件
# print(error)
time.sleep(10) # 延时十秒,预防假死
Start() # 重连


def on_close(ws):
print(“Connection closed ……”)


def on_open(ws): # 连接到服务器之后就会触发 on_open 事件,这里用于 send 数据
# print(req)
ws.send(1)

def get_msg():
return '内容’

def send_msg(ws):
msg=get_msg()
ws.send(msg)



def Start():
# websocket.enableTrace(True) # 调试开启和关闭
ws = websocket.WebSocketApp(“ws://192.168.31.100:9393”, on_message=on_message, on_error=on_error, on_close=on_close)
ws.on_open = on_open
threading.Thread(target=send_msg,args=(ws,)).start()
ws.run_forever(ping_timeout=30)


if name == “main”:
Start()


这个问题我遇到过。核心原因是websocket-client的回调函数(比如on_message)默认在它自己的线程里运行,你直接在主线程或者其他地方调用发送函数会出线程安全问题。

最直接的解决办法是用一个线程安全的队列(比如queue.Queue)来做通信。让on_message回调只负责把收到的消息放进队列,然后你的主逻辑从队列里取消息处理。反过来,你要发送消息时,也把消息放进另一个发送队列,然后让websocket-client在一个专门的发送线程里从这个队列取消息发出去。

下面是一个完整的例子,展示了怎么用队列来解耦接收、发送和你的主业务逻辑:

import threading
import queue
import time
from websocket import WebSocketApp

class WebSocketManager:
    def __init__(self, url):
        self.url = url
        self.ws = None
        self.receive_queue = queue.Queue()
        self.send_queue = queue.Queue()
        self.connected = False
        
    def on_message(self, ws, message):
        """收到消息时,只放入接收队列"""
        self.receive_queue.put(message)
        
    def on_open(self, ws):
        """连接建立后,启动发送线程"""
        self.connected = True
        self.ws = ws
        # 启动发送线程
        send_thread = threading.Thread(target=self._send_worker, daemon=True)
        send_thread.start()
        print("连接已建立,发送线程已启动")
        
    def on_close(self, ws, close_status_code, close_msg):
        self.connected = False
        print("连接关闭")
        
    def _send_worker(self):
        """发送工作线程,专门从发送队列取消息并发送"""
        while self.connected:
            try:
                # 阻塞等待发送队列中的消息
                message = self.send_queue.get(timeout=1)
                if self.ws and self.connected:
                    self.ws.send(message)
                    print(f"已发送: {message}")
            except queue.Empty:
                continue
            except Exception as e:
                print(f"发送出错: {e}")
                
    def send_message(self, message):
        """外部调用此方法发送消息(线程安全)"""
        if self.connected:
            self.send_queue.put(message)
        else:
            print("未连接,无法发送")
            
    def get_message(self):
        """外部调用此方法获取接收到的消息"""
        try:
            return self.receive_queue.get_nowait()
        except queue.Empty:
            return None
            
    def connect(self):
        """启动WebSocket连接"""
        self.ws = WebSocketApp(
            self.url,
            on_open=self.on_open,
            on_message=self.on_message,
            on_close=self.on_close
        )
        # 在后台线程运行WebSocket
        wst = threading.Thread(target=self.ws.run_forever, daemon=True)
        wst.start()

# 使用示例
if __name__ == "__main__":
    manager = WebSocketManager("wss://echo.websocket.events")
    manager.connect()
    
    # 等待连接建立
    time.sleep(2)
    
    # 外部发送消息
    manager.send_message("Hello WebSocket!")
    
    # 主循环处理接收到的消息
    for i in range(5):
        msg = manager.get_message()
        if msg:
            print(f"收到消息: {msg}")
        time.sleep(1)

这个模式的关键点:

  1. on_message回调里不做复杂处理,只把消息丢进接收队列
  2. 单独开一个发送线程,专门从发送队列取消息并通过ws.send()发送
  3. 外部通过send_message()方法把要发的消息放进发送队列,这样就安全了
  4. 通过get_message()从接收队列取消息处理

这样设计之后,你的业务逻辑和WebSocket的收发就完全解耦了,可以从任何地方安全地调用发送方法。

总结:用队列做中间层来解耦收发逻辑。

加一个线程 传递 Ws 对象即可

![]( )

大概好像不行哎。。

不对。好像是可以的。好像是哪里有点问题。说不上来。

这加个线程,只是多了一个函数方法,可是如何调用这个方法呢?

比如怎么调用 send_msg(‘字符串内容’)

前辈你别消失了啊。。

回到顶部