Python中如何使用Flask实现Web页面展示异步任务的执行过程

小弟不才,目前手上有个运维的项目在自己做,功能就是 web 上请求执行一个异步任务,使用 celery 作为任务队列管理并执行。目前需求是如何在 web 界面上展示这些任务执行过程,任务执行的时候会写 log 日志,我的想法是后台直接读对应的 log 日志,然后使用 websocket 推送到前端,这部分有没有什么推荐的方案?还要能支持多客户端请求查看不同任务情况


Python中如何使用Flask实现Web页面展示异步任务的执行过程
21 回复

socketio , 不同的任务发送到不同的频道就可以啦


这个问题很常见,用Flask展示异步任务进度,核心是结合服务器推送事件任务队列。直接给你一个能跑通的方案。

核心思路:

  1. Celery 处理后台任务。
  2. Flask 提供两个路由:一个启动任务,一个推送进度。
  3. SSE 让浏览器能实时接收进度更新。

1. 安装依赖

pip install flask celery redis eventlet

需要运行一个Redis服务(localhost:6379)作为Celery的消息代理。

2. 项目代码 app.py (主Flask应用)

from flask import Flask, render_template, Response, jsonify
import celery_app
import time

app = Flask(__name__)

@app.route('/')
def index():
    """渲染主页面"""
    return render_template('index.html')

@app.route('/start_task')
def start_task():
    """启动一个后台任务,并返回任务ID"""
    task = celery_app.long_running_task.delay()
    return jsonify({'task_id': task.id})

@app.route('/progress/<task_id>')
def progress(task_id):
    """SSE端点,用于向客户端推送指定任务的进度"""
    def generate():
        # 获取Celery任务对象
        task = celery_app.long_running_task.AsyncResult(task_id)
        while not task.ready():  # 当任务未完成时
            # 获取当前进度信息,这里假设任务会更新自己的状态
            progress_data = task.info if isinstance(task.info, dict) else {}
            current = progress_data.get('current', 0)
            total = progress_data.get('total', 100)
            percent = int((current / total) * 100) if total > 0 else 0
            # 按照SSE格式发送数据
            data = f"data: {percent}\n\n"
            yield data
            time.sleep(0.5)  # 每0.5秒检查一次
        # 任务完成时发送100%
        yield f"data: 100\n\n"
    return Response(generate(), mimetype='text/event-stream')

celery_app.py (Celery配置和任务定义)

from celery import Celery
import time

# 配置Celery,使用Redis作为消息代理
celery_app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@celery_app.task(bind=True)  # bind=True 允许任务访问self(任务实例)
def long_running_task(self):
    """模拟一个长时间运行的任务,并更新自己的进度状态"""
    total = 10
    for i in range(total):
        time.sleep(1)  # 模拟耗时操作
        # 更新任务状态,这是一个字典,会被发送给前端
        self.update_state(state='PROGRESS',
                          meta={'current': i + 1, 'total': total})
    return {'result': 'Task completed!'}

templates/index.html (前端页面)

<!DOCTYPE html>
<html>
<head>
    <title>任务进度展示</title>
</head>
<body>
    <h2>异步任务执行进度</h2>
    <button onclick="startTask()">开始任务</button>
    <div>
        <progress id="progressBar" value="0" max="100" style="width:300px;"></progress>
        <span id="progressText">0%</span>
    </div>
    <div id="status"></div>

    <script>
        function startTask() {
            document.getElementById('status').innerText = '任务启动中...';
            fetch('/start_task')
                .then(response => response.json())
                .then(data => {
                    const taskId = data.task_id;
                    document.getElementById('status').innerText = `任务ID: ${taskId}`;
                    // 建立SSE连接,监听进度
                    const eventSource = new EventSource(`/progress/${taskId}`);
                    eventSource.onmessage = function(event) {
                        const percent = event.data;
                        document.getElementById('progressBar').value = percent;
                        document.getElementById('progressText').innerText = percent + '%';
                        if (percent >= 100) {
                            eventSource.close();
                            document.getElementById('status').innerText = '任务完成!';
                        }
                    };
                });
        }
    </script>
</body>
</html>

3. 如何运行

  1. 启动Redis服务。
  2. 在一个终端启动Celery worker:
    celery -A celery_app.celery_app worker --loglevel=info --pool=eventlet
    
  3. 在另一个终端启动Flask应用:
    python app.py
    
  4. 浏览器打开 http://127.0.0.1:5000,点击“开始任务”按钮,就能看到进度条实时更新了。

总结: 用Celery+SSE是Flask里做任务进度反馈的标准做法。

我试过这个,不同任务发送到不同的 room 里是可以,但是多个人一起用就有问题,就会其中一个人 web 上会显示,其他人查看其他任务的时候就会等待很久才会显示,我尝试用多线程,但是 python 多线程又毕竟蛋疼

前端定时刷新查询任务状态的接口?

用 flask-sockets 搞过类似的项目,感觉还行

以前挂在 supervisor 上的时候, 默认的 WEB UI 那个用 Unix socket 实现的倒是可以看, 中文友好问题得自己配置点东西
后来挂在 systemd 上以后, 用 Cockpit 看部分日志, 不过和你说的不太一样
然后工作时候是托管在阿里云日志里了… 也不是你想要的…

总而言之, Google 搜关键词吧: Python realtime log web ui

看一看大项目的实现,jenkins 是通过轮询 接口+标志位实现的日志实时刷新添加,Python 那 Websocket 的并发实在不够看,建议采用 jenkins 的实现方式

前端轮询就可以了,后端只负责记录进度状态,前端谁查谁看结果

考虑过前端轮循,打算将任务运行过程保存到 redis,然后再前端轮循,就是不知道有没有更好的方案

看来这种是比较合适的了,就是后端记录任务日志采用哪种方式比较合理,异步任务执行的时候只会写 log,后端读取日志然后存入 redis 供前端轮循?

fluentd 读取日志,自己写下配置,很简单

之前做过一个类似的功能,是启动一个子进程 tail -f 来实现的

你这个功能,XXL-JOB 已经实现了,实时查看任务滚动日志,可以参考下。

http://www.xuxueli.com/xxl-job/

启子进程,那不是每一个客户端请求,就要启一个子进程 tail -f 了?

这个是很不错,可惜是 java 的,有 python 的类似项目吗?谢啦

#14 他里面有 PYTHON 的模式啊 还有 SHELL 的 甚至还有 NODE 的

是的,主要是内部项目,访问量不大,所以不需要考虑性能问题。

flush 不熟悉,django 的看看

谢啦,我学习一下

学习学习

回到顶部