遇到ZeroMQ连接问题,先别慌,多半是配置或环境问题。下面我直接给几个常见场景的排查代码,你挨个试试。
1. 基础连接测试(先确认基础通信)
import zmq
import time
def test_basic_connection():
context = zmq.Context()
# 服务端
server_socket = context.socket(zmq.REP)
server_socket.bind("tcp://*:5555")
print("Server started on port 5555")
# 客户端
client_socket = context.socket(zmq.REQ)
client_socket.connect("tcp://localhost:5555")
# 测试消息
client_socket.send(b"ping")
print("Client sent: ping")
if server_socket.poll(3000): # 3秒超时
msg = server_socket.recv()
print(f"Server received: {msg}")
server_socket.send(b"pong")
if client_socket.poll(3000):
reply = client_socket.recv()
print(f"Client received: {reply}")
print("✓ Basic connection successful!")
else:
print("✗ Client timeout waiting for reply")
else:
print("✗ Server timeout waiting for message")
server_socket.close()
client_socket.close()
context.term()
if __name__ == "__main__":
test_basic_connection()
2. 网络配置检查(防火墙/地址问题)
import socket
import zmq
def check_network_config():
# 检查端口是否可达
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
try:
result = sock.connect_ex(('localhost', 5555))
if result == 0:
print("✓ Port 5555 is reachable")
else:
print(f"✗ Port 5555 unreachable (error: {result})")
finally:
sock.close()
# 检查ZeroMQ版本
print(f"ZeroMQ version: {zmq.zmq_version()}")
print(f"pyzmq version: {zmq.__version__}")
# 测试不同绑定地址
context = zmq.Context()
test_socket = context.socket(zmq.REP)
addresses = [
"tcp://127.0.0.1:5556",
"tcp://localhost:5557",
"tcp://0.0.0.0:5558" # 监听所有接口
]
for addr in addresses:
try:
test_socket.bind(addr)
print(f"✓ Successfully bound to {addr}")
test_socket.unbind(addr)
except zmq.ZMQError as e:
print(f"✗ Failed to bind {addr}: {e}")
test_socket.close()
context.term()
check_network_config()
3. 完整带错误处理的客户端示例
import zmq
import sys
def robust_client(server_addr="tcp://localhost:5555", retries=3):
context = zmq.Context()
client = context.socket(zmq.REQ)
client.setsockopt(zmq.LINGER, 0) # 立即关闭
client.setsockopt(zmq.RCVTIMEO, 5000) # 5秒接收超时
print(f"Connecting to {server_addr}...")
client.connect(server_addr)
for attempt in range(retries):
try:
client.send(b"Hello")
print(f"Attempt {attempt+1}: Sent request")
reply = client.recv()
print(f"✓ Received: {reply.decode()}")
break
except zmq.Again:
print(f"✗ Attempt {attempt+1}: Timeout, retrying...")
if attempt == retries - 1:
print("✗ All retries failed")
sys.exit(1)
except zmq.ZMQError as e:
print(f"✗ ZMQ error: {e}")
sys.exit(1)
client.close()
context.term()
if __name__ == "__main__":
# 使用方式
# robust_client("tcp://192.168.1.100:5555") # 远程地址
robust_client() # 本地默认
快速排查步骤:
- 先运行第一个基础测试,确认本地ZeroMQ工作正常
- 如果连本地都不通,检查pyzmq安装:
pip list | grep pyzmq
- 远程连接时,确保服务器防火墙开放了对应端口
- 检查地址格式是否正确,特别是Windows下用
tcp://127.0.0.1:端口比localhost更可靠
一句话建议: 先本地测试,再逐层排查网络配置。