Python中如何处理Web服务端超长时间任务的有条件停止问题

之前服务端遇到定时任务或者异步任务,都会用 celery 解决。

但是这次需求是:用户在 a 时间点发送一个请求到 web 服务端,服务端就需要开始某个人物 taska,然后 ,可能在几个小时后,再发送一个请求到 web 服务端,(此次请求会包含一些任务停止的标准,比如:对 taska 处理结果希望到达某个值,或者某个时间点),之后,任务根据这次的参数判断什么时候停止。

看了下 celery 貌似不符合这种场景,请教下,这种场景下有什么方案?谢谢! PS:停止的条件不是一开始就知道的(就是不能在任务开始的时候就知道任务结束条件的),需要任务开始一段时间,才能够得到,所以第二个请求一定要发的。


Python中如何处理Web服务端超长时间任务的有条件停止问题

2 回复

这个问题在Web开发里挺常见的,特别是做后台处理或者数据导出的时候。核心思路就是把任务执行和请求响应拆开,用异步任务队列,然后给每个任务一个独立的ID,通过这个ID去控制它。

我一般用Celery,它比较成熟。下面是一个简单的实现方案:

# tasks.py - Celery任务定义
from celery import Celery, current_task
import time

app = Celery('tasks', broker='redis://localhost:6379/0')

# 存储任务停止标志的字典,生产环境建议用Redis
task_stop_flags = {}

@app.task(bind=True)
def long_running_task(self, task_id):
    """长时间运行的任务"""
    # 初始化停止标志
    task_stop_flags[task_id] = False
    
    for i in range(100):  # 模拟长时间任务
        if task_stop_flags.get(task_id, False):
            # 如果收到停止信号,清理并退出
            del task_stop_flags[task_id]
            return f"Task {task_id} stopped by user"
        
        # 模拟工作
        time.sleep(1)
        self.update_state(state='PROGRESS', meta={'current': i, 'total': 100})
    
    del task_stop_flags[task_id]
    return f"Task {task_id} completed"

def stop_task(task_id):
    """停止指定任务"""
    if task_id in task_stop_flags:
        task_stop_flags[task_id] = True
        return True
    return False
# views.py - Web接口部分(以Flask为例)
from flask import Flask, jsonify, request
from tasks import long_running_task, stop_task
import uuid

app = Flask(__name__)

@app.route('/start-task', methods=['POST'])
def start_task():
    """启动长时间任务"""
    task_id = str(uuid.uuid4())
    task = long_running_task.delay(task_id)
    return jsonify({
        'task_id': task_id,
        'celery_task_id': task.id,
        'status': 'started'
    })

@app.route('/stop-task', methods=['POST'])
def stop_task_endpoint():
    """停止任务"""
    task_id = request.json.get('task_id')
    if not task_id:
        return jsonify({'error': 'task_id required'}), 400
    
    if stop_task(task_id):
        return jsonify({'status': 'stop_signal_sent'})
    return jsonify({'error': 'task not found'}), 404

@app.route('/task-status/<celery_task_id>')
def task_status(celery_task_id):
    """查询任务状态"""
    task = long_running_task.AsyncResult(celery_task_id)
    
    if task.state == 'PENDING':
        response = {'state': task.state, 'status': 'Pending...'}
    elif task.state == 'PROGRESS':
        response = {
            'state': task.state,
            'current': task.info.get('current', 0),
            'total': task.info.get('total', 1)
        }
    else:  # SUCCESS or FAILURE
        response = {
            'state': task.state,
            'result': task.info if task.info else str(task.result)
        }
    
    return jsonify(response)

这个方案的关键点:

  1. 任务分离:Web请求快速返回,实际任务在后台执行
  2. 任务标识:用UUID给每个任务唯一ID,用于控制
  3. 状态检查:任务内部定期检查停止标志
  4. 状态查询:提供单独的接口查询任务进度

启动服务后:

  • 调用 /start-task 开始任务,会返回 task_id
  • 调用 /stop-task 并传入 task_id 可以发送停止信号
  • 通过 /task-status/<celery_task_id> 查看任务状态

简单说就是用任务队列加状态检查机制。


显然是需要在任务执行过程中不断检查结束任务条件是否达成,是则退出
或者,任务启动之后再创建一个线程来执行实际任务,主线程则不断检查停止标记是否达成,是则停止任务线程执行

回到顶部