Python Flask接口如何异步执行长时间运行的Shell任务并立即返回响应?

嗯, 已经了解要用到多线程或多进程.
主要是不知道有没有好的方法在 flask 里生成新的线程执行 shell
Python Flask接口如何异步执行长时间运行的Shell任务并立即返回响应?

30 回复

celery ?


在Flask里处理长时间运行的Shell任务,可以用subprocess.Popen配合threadingmultiprocessing。这里给你个直接能用的方案,用ThreadPoolExecutor来管理后台线程:

from flask import Flask, jsonify
from concurrent.futures import ThreadPoolExecutor
import subprocess
import uuid

app = Flask(__name__)
executor = ThreadPoolExecutor(max_workers=4)
task_status = {}

def run_shell_task(task_id, command):
    try:
        task_status[task_id] = {'status': 'running'}
        result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=3600)
        task_status[task_id] = {
            'status': 'completed',
            'returncode': result.returncode,
            'stdout': result.stdout,
            'stderr': result.stderr
        }
    except subprocess.TimeoutExpired:
        task_status[task_id] = {'status': 'timeout'}
    except Exception as e:
        task_status[task_id] = {'status': 'error', 'message': str(e)}

@app.route('/run-task', methods=['POST'])
def run_task():
    task_id = str(uuid.uuid4())
    command = "your_long_running_shell_command_here"
    
    executor.submit(run_shell_task, task_id, command)
    
    return jsonify({
        'task_id': task_id,
        'status': 'started',
        'message': 'Task submitted successfully'
    })

@app.route('/task-status/<task_id>')
def get_status(task_id):
    status = task_status.get(task_id, {'status': 'not_found'})
    return jsonify(status)

if __name__ == '__main__':
    app.run(debug=True)

核心思路是:接口收到请求时生成唯一任务ID,用线程池异步执行Shell命令,立即返回任务ID。任务状态存在内存字典里,可以通过另一个接口查询进度。

注意这个方案把状态存在内存里,重启服务会丢失。如果需要持久化,可以换成数据库存储状态。

简单说就是:用线程池异步跑Shell,通过任务ID查询结果。

发异步任务

这是个内部的接口, 只会有一个任务执行, 是不是有更简单的方法

这是个内部的接口, 只会有一个任务执行, 是不是有更简单的方法

有,ThreadPoolExecutor

你这需求建议用 SimpleHTTPServer 加 threading mixin 不要用 flask

用 Process 这个类就可以啦

可以用 threading 库啊,用别的线程去执行生成 shell 的代码。

在数据库里面放 2 个表
1 任务表,任务 ID + 任务结果
2 用户消息表
用户请求来了
先在任务表创建一个任务记录拿到任务 ID
开个线程去后台执行 shell 把任务 id 传过去,以便任务完成后按任务 id 把结果填回 任务表,
给用户消息表插条消息,放个链接用任务 id 去任务临时表取结果。

subprocess 可以生成一个新的进程执行 shell 吗

线程或者进程开出去不就行了

创建一个新的线程去执行 shell 任务,当前线程返回执行完成。

我之前也碰到,我用的 django。业务场景跟你差不多,我后来用了类似 8 楼的方法。我单独开了个进程去查询数据库,没有想到其他的方法。

开个新线程不就可以了,问题在哪?

不要开线程去 直接 fork 两次 exec 就完事
你又不关心执行结果 不要开线程

能说具体些吗?我没太懂, 怎么 fork 两次 exec

flask_rq2 可以说很简单了

用 jenkins 调用

用 rq,轻量级的很好用

不好意思,我莫名其妙就回复到你了…



自己 google 一下
你执行的 shell 函数,最终实现是 fork 了一次,阻塞 wait,所以卡住
改成 fork 两次,直接 os._exit(0), 给 pid 1 接管你的子进程就是
这和写守护进程的原理是一样的

所有的外部进程调用都是 fork exec 的组合,这是 linux 编程基础,只盯着 python 自然一头雾水

from concurrent.futures import ThreadPoolExecutor 这个应该是最简单的解决方案了,随便搜一下就知道怎么用



好的我去了解一下这个扩展

fork and exec +1

subprocess.Popen

subprocess.Popen(’/path/shell.sh’)

任务队列是正道,如果觉得 celery 麻烦,可以试试轻量化的 Huey,redis 默认配置就能跑

回到顶部