Python中如何在Flask搭建的Web应用中监听RabbitMQ订阅者返回信息,并将成功信息返回客户端?
flask 搭建的 web 应用,想使用 RabbitMQ 异步处理请求,结果服务层使用轮询模式监听异步信息之后,请求服务器,报错视图函数没有返回响应,该怎样处理呢,求解。
Python中如何在Flask搭建的Web应用中监听RabbitMQ订阅者返回信息,并将成功信息返回客户端?
2 回复
在Flask里监听RabbitMQ消息并实时返回给客户端,可以用WebSocket或者SSE(Server-Sent Events)。这里给你个用Flask-SocketIO的完整例子,它比较适合实时双向通信。
首先安装依赖:
pip install flask flask-socketio pika eventlet
然后是这个完整的应用代码:
from flask import Flask, render_template
from flask_socketio import SocketIO, emit
import pika
import threading
import json
import time
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
socketio = SocketIO(app, cors_allowed_origins="*")
# RabbitMQ连接配置
RABBITMQ_HOST = 'localhost'
QUEUE_NAME = 'response_queue'
def setup_rabbitmq_consumer():
"""建立RabbitMQ连接并开始消费消息"""
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=RABBITMQ_HOST)
)
channel = connection.channel()
# 声明队列,确保它存在
channel.queue_declare(queue=QUEUE_NAME, durable=True)
def callback(ch, method, properties, body):
try:
# 解析消息
message = json.loads(body)
print(f"收到RabbitMQ消息: {message}")
# 通过SocketIO发送给所有连接的客户端
socketio.emit('rabbitmq_response', {
'status': 'success',
'data': message,
'timestamp': time.time()
})
# 确认消息已处理
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"处理消息时出错: {e}")
# 设置消费者
channel.basic_consume(
queue=QUEUE_NAME,
on_message_callback=callback,
auto_ack=False
)
print("开始监听RabbitMQ消息...")
channel.start_consuming()
@app.route('/')
def index():
"""示例页面"""
return render_template('index.html')
@socketio.on('connect')
def handle_connect():
print('客户端已连接')
emit('connection_response', {'status': 'connected'})
@socketio.on('disconnect')
def handle_disconnect():
print('客户端断开连接')
if __name__ == '__main__':
# 在后台线程中启动RabbitMQ消费者
rabbitmq_thread = threading.Thread(target=setup_rabbitmq_consumer, daemon=True)
rabbitmq_thread.start()
# 启动Flask应用
socketio.run(app, debug=True, port=5000)
还需要一个简单的HTML页面(templates/index.html):
<!DOCTYPE html>
<html>
<head>
<title>RabbitMQ消息监听</title>
<script src="https://cdn.socket.io/4.5.0/socket.io.min.js"></script>
</head>
<body>
<h1>RabbitMQ消息监听器</h1>
<div id="messages"></div>
<script>
const socket = io();
const messagesDiv = document.getElementById('messages');
socket.on('connect', function() {
addMessage('已连接到服务器');
});
socket.on('rabbitmq_response', function(data) {
addMessage(`收到消息: ${JSON.stringify(data)}`);
});
function addMessage(text) {
const p = document.createElement('p');
p.textContent = text;
messagesDiv.appendChild(p);
}
</script>
</body>
</html>
这个方案的工作原理:
- Flask-SocketIO建立WebSocket连接,让服务器能主动推消息给浏览器
- 单独开个线程跑RabbitMQ消费者,避免阻塞Flask主线程
- 收到MQ消息后,用
socketio.emit()实时推给所有连接的客户端 - 前端用JavaScript接收并显示这些消息
关键点:RabbitMQ消费者跑在独立线程里,通过SocketIO这个桥梁把消息转发给网页客户端。记得处理连接异常和消息确认,生产环境还要加重连机制。
简单说就是用WebSocket桥接Flask和RabbitMQ。
webshock

