Python Flask接口如何异步执行长时间运行的Shell任务并立即返回响应?
嗯, 已经了解要用到多线程或多进程.
主要是不知道有没有好的方法在 flask 里生成新的线程执行 shell
Python Flask接口如何异步执行长时间运行的Shell任务并立即返回响应?
celery ?
在Flask里处理长时间运行的Shell任务,可以用subprocess.Popen配合threading或multiprocessing。这里给你个直接能用的方案,用ThreadPoolExecutor来管理后台线程:
from flask import Flask, jsonify
from concurrent.futures import ThreadPoolExecutor
import subprocess
import uuid
app = Flask(__name__)
executor = ThreadPoolExecutor(max_workers=4)
task_status = {}
def run_shell_task(task_id, command):
try:
task_status[task_id] = {'status': 'running'}
result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=3600)
task_status[task_id] = {
'status': 'completed',
'returncode': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr
}
except subprocess.TimeoutExpired:
task_status[task_id] = {'status': 'timeout'}
except Exception as e:
task_status[task_id] = {'status': 'error', 'message': str(e)}
@app.route('/run-task', methods=['POST'])
def run_task():
task_id = str(uuid.uuid4())
command = "your_long_running_shell_command_here"
executor.submit(run_shell_task, task_id, command)
return jsonify({
'task_id': task_id,
'status': 'started',
'message': 'Task submitted successfully'
})
@app.route('/task-status/<task_id>')
def get_status(task_id):
status = task_status.get(task_id, {'status': 'not_found'})
return jsonify(status)
if __name__ == '__main__':
app.run(debug=True)
核心思路是:接口收到请求时生成唯一任务ID,用线程池异步执行Shell命令,立即返回任务ID。任务状态存在内存字典里,可以通过另一个接口查询进度。
注意这个方案把状态存在内存里,重启服务会丢失。如果需要持久化,可以换成数据库存储状态。
简单说就是:用线程池异步跑Shell,通过任务ID查询结果。
发异步任务
这是个内部的接口, 只会有一个任务执行, 是不是有更简单的方法
这是个内部的接口, 只会有一个任务执行, 是不是有更简单的方法
有,ThreadPoolExecutor
你这需求建议用 SimpleHTTPServer 加 threading mixin 不要用 flask
用 Process 这个类就可以啦
可以用 threading 库啊,用别的线程去执行生成 shell 的代码。
在数据库里面放 2 个表
1 任务表,任务 ID + 任务结果
2 用户消息表
用户请求来了
先在任务表创建一个任务记录拿到任务 ID
开个线程去后台执行 shell 把任务 id 传过去,以便任务完成后按任务 id 把结果填回 任务表,
给用户消息表插条消息,放个链接用任务 id 去任务临时表取结果。
subprocess 可以生成一个新的进程执行 shell 吗
线程或者进程开出去不就行了
创建一个新的线程去执行 shell 任务,当前线程返回执行完成。
我之前也碰到,我用的 django。业务场景跟你差不多,我后来用了类似 8 楼的方法。我单独开了个进程去查询数据库,没有想到其他的方法。
开个新线程不就可以了,问题在哪?
不要开线程去 直接 fork 两次 exec 就完事
你又不关心执行结果 不要开线程
能说具体些吗?我没太懂, 怎么 fork 两次 exec
flask_rq2 可以说很简单了
用 jenkins 调用
用 rq,轻量级的很好用
不好意思,我莫名其妙就回复到你了…
自己 google 一下
你执行的 shell 函数,最终实现是 fork 了一次,阻塞 wait,所以卡住
改成 fork 两次,直接 os._exit(0), 给 pid 1 接管你的子进程就是
这和写守护进程的原理是一样的
所有的外部进程调用都是 fork exec 的组合,这是 linux 编程基础,只盯着 python 自然一头雾水
from concurrent.futures import ThreadPoolExecutor 这个应该是最简单的解决方案了,随便搜一下就知道怎么用
好的我去了解一下这个扩展
👌
fork and exec +1
subprocess.Popen
subprocess.Popen(’/path/shell.sh’)
任务队列是正道,如果觉得 celery 麻烦,可以试试轻量化的 Huey,redis 默认配置就能跑

