Python中如何在Flask搭建的Web应用中监听RabbitMQ订阅者返回信息,并将成功信息返回客户端?

flask 搭建的 web 应用,想使用 RabbitMQ 异步处理请求,结果服务层使用轮询模式监听异步信息之后,请求服务器,报错视图函数没有返回响应,该怎样处理呢,求解。


Python中如何在Flask搭建的Web应用中监听RabbitMQ订阅者返回信息,并将成功信息返回客户端?
2 回复

在Flask里监听RabbitMQ消息并实时返回给客户端,可以用WebSocket或者SSE(Server-Sent Events)。这里给你个用Flask-SocketIO的完整例子,它比较适合实时双向通信。

首先安装依赖:

pip install flask flask-socketio pika eventlet

然后是这个完整的应用代码:

from flask import Flask, render_template
from flask_socketio import SocketIO, emit
import pika
import threading
import json
import time

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
socketio = SocketIO(app, cors_allowed_origins="*")

# RabbitMQ连接配置
RABBITMQ_HOST = 'localhost'
QUEUE_NAME = 'response_queue'

def setup_rabbitmq_consumer():
    """建立RabbitMQ连接并开始消费消息"""
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host=RABBITMQ_HOST)
    )
    channel = connection.channel()
    
    # 声明队列,确保它存在
    channel.queue_declare(queue=QUEUE_NAME, durable=True)
    
    def callback(ch, method, properties, body):
        try:
            # 解析消息
            message = json.loads(body)
            print(f"收到RabbitMQ消息: {message}")
            
            # 通过SocketIO发送给所有连接的客户端
            socketio.emit('rabbitmq_response', {
                'status': 'success',
                'data': message,
                'timestamp': time.time()
            })
            
            # 确认消息已处理
            ch.basic_ack(delivery_tag=method.delivery_tag)
            
        except Exception as e:
            print(f"处理消息时出错: {e}")
    
    # 设置消费者
    channel.basic_consume(
        queue=QUEUE_NAME,
        on_message_callback=callback,
        auto_ack=False
    )
    
    print("开始监听RabbitMQ消息...")
    channel.start_consuming()

@app.route('/')
def index():
    """示例页面"""
    return render_template('index.html')

@socketio.on('connect')
def handle_connect():
    print('客户端已连接')
    emit('connection_response', {'status': 'connected'})

@socketio.on('disconnect')
def handle_disconnect():
    print('客户端断开连接')

if __name__ == '__main__':
    # 在后台线程中启动RabbitMQ消费者
    rabbitmq_thread = threading.Thread(target=setup_rabbitmq_consumer, daemon=True)
    rabbitmq_thread.start()
    
    # 启动Flask应用
    socketio.run(app, debug=True, port=5000)

还需要一个简单的HTML页面(templates/index.html):

<!DOCTYPE html>
<html>
<head>
    <title>RabbitMQ消息监听</title>
    <script src="https://cdn.socket.io/4.5.0/socket.io.min.js"></script>
</head>
<body>
    <h1>RabbitMQ消息监听器</h1>
    <div id="messages"></div>
    
    <script>
        const socket = io();
        const messagesDiv = document.getElementById('messages');
        
        socket.on('connect', function() {
            addMessage('已连接到服务器');
        });
        
        socket.on('rabbitmq_response', function(data) {
            addMessage(`收到消息: ${JSON.stringify(data)}`);
        });
        
        function addMessage(text) {
            const p = document.createElement('p');
            p.textContent = text;
            messagesDiv.appendChild(p);
        }
    </script>
</body>
</html>

这个方案的工作原理:

  1. Flask-SocketIO建立WebSocket连接,让服务器能主动推消息给浏览器
  2. 单独开个线程跑RabbitMQ消费者,避免阻塞Flask主线程
  3. 收到MQ消息后,用socketio.emit()实时推给所有连接的客户端
  4. 前端用JavaScript接收并显示这些消息

关键点:RabbitMQ消费者跑在独立线程里,通过SocketIO这个桥梁把消息转发给网页客户端。记得处理连接异常和消息确认,生产环境还要加重连机制。

简单说就是用WebSocket桥接Flask和RabbitMQ。


webshock

回到顶部