Python中Celery任务被强制kill中断后,Flower监控中丢失的任务状态仍显示为STARTED如何解决

RT,我这边在测 celery 的意外容错的强健性的时候,我在每个 worker 强行执行了类似下面的命令:

ps -ef |grep 'worker' | grep -v grep|cut -c 9-16 | xargs kill -9

然后再将我手里的三个 worker 重启:

celery -A hellscan worker -E -l INFO -n xxx.%h --concurrency=2

在我的配置文件里,以前设置过:

TASK_REJECT_ON_WORKER_LOST = True
CELERY_ACKS_LATE = True

按理说是支持这种重启后,会继续加载运行任务的。结果我发现在日志里,worker 并没有失败或者结束。 在 flower 里查看,那几个中断的任务一直处于 STARTED 状态,强行 terminate 也没用。

而且似乎占用了我取得任务的名额,我这里设置--concurrency=2,按理说总共算起来,一共可以同时执行六个任务,flower 里显示的任务里 active 的也没有他们。

丢失了那三个任务,似乎现在只能同时执行三个任务了,

不知道大佬们有没有解决办法?除了重启 flower 以外,我猜这样任务丢失的可能会更多。


Python中Celery任务被强制kill中断后,Flower监控中丢失的任务状态仍显示为STARTED如何解决

3 回复

我遇到过同样的问题。当Celery worker进程被强制kill(比如用kill -9)时,任务状态会卡在STARTED,因为worker没机会更新状态到FAILURE。这确实是Celery的一个已知痛点。

核心原因是:Celery的任务状态更新依赖于worker进程的正常退出流程。kill -9直接终止进程,worker来不及发送状态更新消息。

解决方案:

  1. 使用worker_shutdown信号处理(推荐) 在worker启动时注册信号处理,捕获终止信号并优雅关闭:

    from celery.signals import worker_shutdown
    import logging
    
    logger = logging.getLogger(__name__)
    
    [@worker_shutdown](/user/worker_shutdown).connect
    def worker_shutdown_handler(sender, **kwargs):
        """处理worker关闭时的任务状态清理"""
        from celery import current_app
        
        # 获取当前worker所有正在执行的任务
        active_tasks = current_app.control.inspect().active()
        
        if active_tasks:
            for worker_name, tasks in active_tasks.items():
                for task in tasks:
                    task_id = task['id']
                    try:
                        # 将任务标记为失败
                        from celery.result import AsyncResult
                        result = AsyncResult(task_id)
                        result.state = 'FAILURE'
                        result.info = 'Worker was terminated unexpectedly'
                        
                        logger.warning(f"Marked task {task_id} as FAILURE due to worker shutdown")
                    except Exception as e:
                        logger.error(f"Failed to update task {task_id}: {e}")
    
  2. 配置任务超时和确认机制 在Celery配置中设置:

    # celery_config.py
    broker_url = 'redis://localhost:6379/0'
    result_backend = 'redis://localhost:6379/0'
    
    # 重要配置
    worker_max_tasks_per_child = 1000  # 每个worker子进程执行1000个任务后重启
    task_acks_late = True  # 任务完成后才确认,避免丢失
    task_reject_on_worker_lost = True  # worker丢失时重新排队
    task_time_limit = 300  # 任务超时时间(秒)
    task_soft_time_limit = 280  # 软超时时间
    
  3. 使用监控工具自动清理 写个定时任务清理僵尸任务:

    from celery import Celery
    from datetime import datetime, timedelta
    
    app = Celery('cleanup')
    
    [@app](/user/app).task
    def cleanup_stale_tasks():
        """清理超过1小时仍为STARTED状态的任务"""
        from celery.result import AsyncResult
        
        # 这里需要根据你的实际存储方式获取任务ID列表
        # 假设使用Redis作为结果后端
        import redis
        r = redis.Redis()
        
        # 获取所有任务键(根据实际键模式调整)
        task_keys = r.keys('celery-task-meta-*')
        
        for key in task_keys:
            task_data = r.get(key)
            if task_data:
                import json
                data = json.loads(task_data)
                
                # 检查状态和开始时间
                if data.get('status') == 'STARTED':
                    task_start = datetime.fromisoformat(data.get('date_done'))
                    if datetime.now() - task_start > timedelta(hours=1):
                        # 更新为失败状态
                        data['status'] = 'FAILURE'
                        data['result'] = 'Task timeout after worker termination'
                        r.set(key, json.dumps(data))
    
  4. 使用--time-limit启动参数 启动worker时设置超时:

    celery -A proj worker --time-limit=300
    

实际部署建议: 我通常用方案1+2的组合。在Docker环境中,确保容器使用SIGTERM而不是SIGKILL停止,给worker留出清理时间。

简单说就是:配置优雅关闭和任务超时,避免硬终止worker。


flower 读得是 redis 的数据

不过我以前试过先清空 redis,flower 还是能看见数据的,说明有其他方式缓存。

回到顶部