Python中websocket-client库无法从外部调用发送事件的问题
A 内容
# -*- coding:utf-8 -*-
import websocket
import requests
import re
import json
import time
def on_message(ws, message): # 服务器有数据更新时,主动推送过来的数据
# message = json.loads(message)
print(message)
def on_error(ws, error): # 程序报错时,就会触发 on_error 事件
# print(error)
time.sleep(10) # 延时十秒,预防假死
Start() # 重连
def on_close(ws):
print(“Connection closed ……”)
def on_open(ws): # 连接到服务器之后就会触发 on_open 事件,这里用于 send 数据
# print(req)
ws.send(1)
def Start():
# websocket.enableTrace(True) # 调试开启和关闭
ws = websocket.WebSocketApp(“ws://192.168.31.100:9393”, on_message=on_message, on_error=on_error, on_close=on_close)
ws.on_open = on_open
ws.run_forever(ping_timeout=30)
if name == “main”:
Start()
我的 websocket 客户端代码是酱紫的;
那么问题来了,比如是有一另外一个 py 叫 B;
我如何在 B 里面调用 A 的 websocket 发送内容?
ws.send('字符串') 是发送内容;
我如何在外部进行调用 然后发送呢?
因为我只想在我需要的时候主动发送某些内容给服务器;
而不是一直等待 on_message 的响应后才发送;
Python中websocket-client库无法从外部调用发送事件的问题
-- coding:utf-8 --
import websocket
import requests
import re
import json
import time,threading
def on_message(ws, message): # 服务器有数据更新时,主动推送过来的数据
# message = json.loads(message)
print(message)
def on_error(ws, error): # 程序报错时,就会触发 on_error 事件
# print(error)
time.sleep(10) # 延时十秒,预防假死
Start() # 重连
def on_close(ws):
print(“Connection closed ……”)
def on_open(ws): # 连接到服务器之后就会触发 on_open 事件,这里用于 send 数据
# print(req)
ws.send(1)
def get_msg():
return '内容’
def send_msg(ws):
msg=get_msg()
ws.send(msg)
def Start():
# websocket.enableTrace(True) # 调试开启和关闭
ws = websocket.WebSocketApp(“ws://192.168.31.100:9393”, on_message=on_message, on_error=on_error, on_close=on_close)
ws.on_open = on_open
threading.Thread(target=send_msg,args=(ws,)).start()
ws.run_forever(ping_timeout=30)
if name == “main”:
Start()
这个问题我遇到过。核心原因是websocket-client的回调函数(比如on_message)默认在它自己的线程里运行,你直接在主线程或者其他地方调用发送函数会出线程安全问题。
最直接的解决办法是用一个线程安全的队列(比如queue.Queue)来做通信。让on_message回调只负责把收到的消息放进队列,然后你的主逻辑从队列里取消息处理。反过来,你要发送消息时,也把消息放进另一个发送队列,然后让websocket-client在一个专门的发送线程里从这个队列取消息发出去。
下面是一个完整的例子,展示了怎么用队列来解耦接收、发送和你的主业务逻辑:
import threading
import queue
import time
from websocket import WebSocketApp
class WebSocketManager:
def __init__(self, url):
self.url = url
self.ws = None
self.receive_queue = queue.Queue()
self.send_queue = queue.Queue()
self.connected = False
def on_message(self, ws, message):
"""收到消息时,只放入接收队列"""
self.receive_queue.put(message)
def on_open(self, ws):
"""连接建立后,启动发送线程"""
self.connected = True
self.ws = ws
# 启动发送线程
send_thread = threading.Thread(target=self._send_worker, daemon=True)
send_thread.start()
print("连接已建立,发送线程已启动")
def on_close(self, ws, close_status_code, close_msg):
self.connected = False
print("连接关闭")
def _send_worker(self):
"""发送工作线程,专门从发送队列取消息并发送"""
while self.connected:
try:
# 阻塞等待发送队列中的消息
message = self.send_queue.get(timeout=1)
if self.ws and self.connected:
self.ws.send(message)
print(f"已发送: {message}")
except queue.Empty:
continue
except Exception as e:
print(f"发送出错: {e}")
def send_message(self, message):
"""外部调用此方法发送消息(线程安全)"""
if self.connected:
self.send_queue.put(message)
else:
print("未连接,无法发送")
def get_message(self):
"""外部调用此方法获取接收到的消息"""
try:
return self.receive_queue.get_nowait()
except queue.Empty:
return None
def connect(self):
"""启动WebSocket连接"""
self.ws = WebSocketApp(
self.url,
on_open=self.on_open,
on_message=self.on_message,
on_close=self.on_close
)
# 在后台线程运行WebSocket
wst = threading.Thread(target=self.ws.run_forever, daemon=True)
wst.start()
# 使用示例
if __name__ == "__main__":
manager = WebSocketManager("wss://echo.websocket.events")
manager.connect()
# 等待连接建立
time.sleep(2)
# 外部发送消息
manager.send_message("Hello WebSocket!")
# 主循环处理接收到的消息
for i in range(5):
msg = manager.get_message()
if msg:
print(f"收到消息: {msg}")
time.sleep(1)
这个模式的关键点:
on_message回调里不做复杂处理,只把消息丢进接收队列- 单独开一个发送线程,专门从发送队列取消息并通过
ws.send()发送 - 外部通过
send_message()方法把要发的消息放进发送队列,这样就安全了 - 通过
get_message()从接收队列取消息处理
这样设计之后,你的业务逻辑和WebSocket的收发就完全解耦了,可以从任何地方安全地调用发送方法。
总结:用队列做中间层来解耦收发逻辑。
加一个线程 传递 Ws 对象即可

大概好像不行哎。。
不对。好像是可以的。好像是哪里有点问题。说不上来。
这加个线程,只是多了一个函数方法,可是如何调用这个方法呢?
比如怎么调用 send_msg(‘字符串内容’)
前辈你别消失了啊。。

