Python中如何在Celery任务中运行Flask-SocketIO?

项目结构如下:

website/
	celery/
    	task.py
    socketio/
    	socket.py
    views/
    	index.py
    init.py
manage.py

我的 flask app 在 init 里初始化,celery 在 task 里初始化,socket 在 socket.py 里初始化。 然后在 index 里创建一个路由

@app.route('/index')
def indexx():
    from website.celery.tasks import test
    test.delay()
    return 'a'

test 如下:

from website.socketio.socket import socket
[@celery](/user/celery).task
def test():
    # socketio = SocketIO(message_queue='redis://localhost:6379/1')
    socketio.emit('response',{'data':'sdasda'},broadcast=True)
    print "aaaa"

然后运行,celery shell 显示任务执行了,但是 socket 发的消息并没有在前端收到怎么办呢?好急 请教各位大神啦!


Python中如何在Celery任务中运行Flask-SocketIO?

10 回复

并不是同一个 socket


在Celery任务中运行Flask-SocketIO的核心挑战是:Celery工作进程是独立的,无法直接访问主Flask应用的SocketIO实例。你需要通过消息队列(如Redis)来桥接它们。

这里的关键是使用socketio.AsyncRedisManagersocketio.KombuManager,让Celery任务能向同一个消息队列发送事件,然后由Flask-SocketIO服务器接收并广播给客户端。

下面是一个完整示例:

首先,安装必要的包:

pip install flask flask-socketio celery redis eventlet

项目结构:

your_project/
├── app.py          # Flask主应用
├── celery_app.py   # Celery配置
└── tasks.py        # Celery任务

1. Celery配置 (celery_app.py):

from celery import Celery

celery_app = Celery(
    'tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)

2. Flask应用与SocketIO配置 (app.py):

from flask import Flask
from flask_socketio import SocketIO
import eventlet
eventlet.monkey_patch()

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'

# 使用Redis作为消息队列
socketio = SocketIO(
    app,
    message_queue='redis://localhost:6379/0',
    cors_allowed_origins="*"
)

@app.route('/')
def index():
    return 'Flask-SocketIO Server'

@socketio.on('connect')
def handle_connect():
    print('Client connected')

if __name__ == '__main__':
    socketio.run(app, debug=True)

3. Celery任务 (tasks.py):

from celery_app import celery_app
from flask_socketio import SocketIO
import eventlet
eventlet.monkey_patch()

# 创建独立的SocketIO实例用于发射事件
socketio = SocketIO(message_queue='redis://localhost:6379/0')

@celery_app.task
def background_task(data):
    """在Celery任务中发射SocketIO事件"""
    # 执行一些耗时操作...
    result = f"Processed: {data}"
    
    # 通过消息队列发射事件到Flask-SocketIO服务器
    socketio.emit('task_update', 
                 {'data': result, 'status': 'completed'},
                 namespace='/')
    
    return result

4. 启动服务:

  • 启动Redis:redis-server
  • 启动Flask应用:python app.py
  • 启动Celery worker:celery -A tasks worker --loglevel=info

5. 客户端示例 (HTML):

<script src="https://cdn.socket.io/4.5.0/socket.io.min.js"></script>
<script>
    const socket = io('http://localhost:5000');
    
    socket.on('task_update', function(data) {
        console.log('Task update:', data);
        document.getElementById('output').innerHTML = 
            `Result: ${data.data}, Status: ${data.status}`;
    });
    
    // 触发Celery任务
    function startTask() {
        fetch('/start-task', { method: 'POST' });
    }
</script>
<button onclick="startTask()">Start Task</button>
<div id="output"></div>

工作原理:

  1. Celery任务中的socketio.emit()将事件发送到Redis消息队列
  2. Flask-SocketIO服务器从同一个Redis队列接收到事件
  3. 服务器将事件广播给所有连接的客户端

关键点:

  • 必须使用相同的消息队列URL(这里是Redis)
  • 在Celery任务中创建独立的SocketIO实例,只用于发射事件
  • 确保安装了eventletgevent用于异步支持

一句话总结: 用Redis消息队列桥接Celery任务和Flask-SocketIO服务器。

Socket 的后端用的是什么?

大神求助,后端是 flask-socketio,celery 中的 socket 实例是从 socket.py 里面 import 进来的

因为 celery 和 web 是分开的 process 所以你需要有一个共同的后端……

手机看了一下文档 If using multiple processes, a message queue service is used by the processes to coordinate operations such as broadcasting. The supported queues are Redis, RabbitMQ, and any other message queues supported by the Kombu package.

我的 socket 是这样初始化的 socketio = SocketIO(app) socketio.run(app),celery 是使用 redis://localhost:6379/0,尝试 socketio = SocketIO(app,message_queue=“redis://localhost:6379/0”)这样初始化,结果初始化后连接不到 socket 了

你可以在 index.py 里起一个 route,这个 route 里执行 emit,test 里直接 request 这个 route 就可以了

这样相当于中转了一下,确实可行哈,感谢

no thanks,我最近也在弄这个,有另一种实现方式 https://github.com/miguelgrinberg/Flask-SocketIO/issues/47

居然是 3 年前的问题了😂

回到顶部