Python中服务端处理耗时任务的思路与实现方法
需求是客户会上传单个或者很多文件,要求提供的接口把处理结果返回客户端。 我目前的思路有: 对于单个文件可以直接等待处理完成,然后把结果返回客户端。 对于批量文件可以通过 celery 把处理任务发送其他 worker 去处理,然后,返回一个新的接口供客户端进行结果轮询。
(服务端处理这些文件需要好几分钟) 现在有几个问题就是:
- 如果要求一次请求就给出客户端返回结果,那么服务端就会阻塞住接口或者占用一个处理线程。那么用户量多的时候,一直被阻塞住怎么办吗?
- 如果不要求一次请求返回结果,我上面的思路是否有问题。还有其他的解决思路吗?
谢谢!
Python中服务端处理耗时任务的思路与实现方法
异步任务 && IO 多路复用
对于服务端处理耗时任务,核心思路是把耗时操作异步化,避免阻塞主线程。这里给你一个基于asyncio和concurrent.futures的完整实现方案。
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from typing import Any, Callable
class AsyncTaskHandler:
def __init__(self):
# 线程池用于IO密集型任务
self.thread_pool = ThreadPoolExecutor(max_workers=10)
# 进程池用于CPU密集型任务
self.process_pool = ProcessPoolExecutor(max_workers=4)
async def handle_io_task(self, task_func: Callable, *args) -> Any:
"""处理IO密集型任务(如网络请求、文件操作)"""
loop = asyncio.get_event_loop()
# 将阻塞的IO操作放到线程池中执行
result = await loop.run_in_executor(
self.thread_pool,
lambda: task_func(*args)
)
return result
async def handle_cpu_task(self, task_func: Callable, *args) -> Any:
"""处理CPU密集型任务(如数据处理、计算)"""
loop = asyncio.get_event_loop()
# 将CPU密集型任务放到进程池中执行
result = await loop.run_in_executor(
self.process_pool,
lambda: task_func(*args)
)
return result
async def long_running_task(self, task_id: str, duration: int):
"""模拟长时间运行的任务"""
print(f"任务 {task_id} 开始执行,预计耗时 {duration} 秒")
await asyncio.sleep(duration) # 模拟耗时操作
print(f"任务 {task_id} 执行完成")
return f"任务 {task_id} 结果"
# 使用示例
async def main():
handler = AsyncTaskHandler()
# 启动多个耗时任务
tasks = [
handler.long_running_task("task1", 3),
handler.long_running_task("task2", 2),
handler.long_running_task("task3", 1)
]
# 并发执行所有任务
results = await asyncio.gather(*tasks)
print(f"所有任务完成: {results}")
# 对于Web服务,可以这样集成
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
task_handler = AsyncTaskHandler()
@app.post("/run-task")
async def run_task(background_tasks: BackgroundTasks):
"""将耗时任务放到后台执行"""
background_tasks.add_task(
task_handler.long_running_task,
"background_task",
5
)
return {"message": "任务已提交到后台执行"}
@app.get("/async-task")
async def async_task():
"""异步处理耗时任务"""
result = await task_handler.long_running_task("async_task", 3)
return {"result": result}
if __name__ == "__main__":
# 运行示例
asyncio.run(main())
这个方案的关键点:
- IO密集型任务用线程池处理,避免GIL影响
- CPU密集型任务用进程池处理,充分利用多核
- 异步框架(如FastAPI)天然支持这种模式
- BackgroundTasks适合不需要即时返回结果的任务
对于更复杂的场景,可以考虑使用Celery或RQ做分布式任务队列,但上面这个方案能满足大部分需求。
总结:根据任务类型选择合适的并发模型。
与其在这多耗脑细胞不如多买几台服务器
服务器就是钱啊。。。
感觉用户量多的时候无解,只能做集群来提升任务处理能力
那有什么异步框架可以推荐下?
还有几个疑问请教:就是如果服务端接口采用异步框架处理这个任务。
(假设这耗时任务使用了 5 分钟)
1.如果客户端使用同步请求,那么客户端是否就一直阻塞五分钟?这样不会出现请求超时,连接中断问题吗?
2.如果客户端使用异步请求,那么客户端五分钟之后就能接到处理结果?这五分钟之内是怎么保持连接不中断的?
谢谢!
他说的是直传
可以看看优酷,YouTube 对上传视频转码是如何处理的,上传后出现到列表,状态显示正在处理,用户可以随时上来看处理完了没
我觉得只要用 websocket 让客户知道处理的进度就好了
接口直接返回一个 upload_id,异步处理上传,再给一个查询 upload_id 进度的接口
文件存储用第三方


