Python分布式任务队列Celery自动停止运行怎么解决

项目采用了 celery 进行异步操作,但是 celery 运行很多任务后自动停止运行假死,目前我采取的是手动重新启动 celery,有的大神说这是 celery 的 bug,怎么解决? backend 用的是 redis.


Python分布式任务队列Celery自动停止运行怎么解决
8 回复

定时重启,这个问题很普遍


Celery worker自动停止通常有几个常见原因,我来给你分析一下:

1. 内存超限被系统杀死 这是最常见的情况。检查系统日志:

# 查看系统日志
sudo journalctl -u celery | tail -50
# 或查看OOM killer日志
dmesg | grep -i "killed process"

2. 任务超时 如果任务执行时间超过worker超时设置,worker会重启:

# celery.py配置
app.conf.update(
    worker_max_tasks_per_child=1000,  # 每个worker子进程最多执行任务数
    worker_max_memory_per_child=200000,  # 内存限制(KB)
    task_soft_time_limit=300,  # 软超时(秒)
    task_time_limit=600,  # 硬超时(秒)
)

3. 心跳丢失 检查broker连接是否稳定,添加重连机制:

from celery import Celery
from kombu import Connection

app = Celery('tasks', broker='redis://localhost:6379/0')

# 配置broker连接重试
app.conf.broker_connection_retry_on_startup = True
app.conf.broker_connection_max_retries = 10

4. 使用supervisor管理 用supervisor守护进程:

; /etc/supervisor/conf.d/celery.conf
[program:celery]
command=/path/to/venv/bin/celery -A proj worker --loglevel=info
directory=/path/to/project
user=www-data
numprocs=1
autostart=true
autorestart=true
startsecs=10
stopwaitsecs=600

5. 检查任务死循环 确保任务代码没有无限循环或内存泄漏:

# 添加任务执行时间监控
@app.task(bind=True)
def my_task(self):
    start_time = time.time()
    try:
        # 你的任务逻辑
        result = do_work()
        if time.time() - start_time > 300:  # 超过5分钟记录警告
            self.request.chain = None  # 防止任务链继续
        return result
    except Exception as e:
        logger.error(f"Task failed: {e}")
        raise

快速排查步骤:

  1. celery -A proj status 检查worker状态
  2. celery -A proj inspect active 查看正在执行的任务
  3. 检查日志文件中的错误信息

建议先检查系统日志确认是否被OOM killer终止。

你这个自动停止运行假死 是个什么哦,我遇到过一个定时任务延迟或长时间执行不了的问题,是队列里的任务阻塞导致的。不知道你的是什么情况。

#1 Celery 还提供 CELERYD_FORCE_EXECV = True 这个设置来防止假死,但是然并卵,只是减少次数,但是还是会假死

你的 celery 开机是怎么启动的?

服务器内存使用率是怎样的。看下有没有配置 CELERYD_MAX_TASKS_PER_CHILD ( worker 最多执行多少个任务,超过就销毁),官方默认是不限制的

celery 生产环境用 rmq 好些吧,CELERYD_MAX_TASKS_PER_CHILD 这个参数按照楼上的设置个值试试,或者 celery 版本换成 3.6 的最后一个版本?楼主用的 4.2 ?
仅供参考

celery 的启动工作数量设置
CELERY_WORKER_CONCURRENCY = 5
# celery 的 worker 执行多少个任务后进行重启操作
CELERY_WORKER_MAX_TASKS_PER_CHILD = 200
这个在配置文件里写清楚,不然塞内存
核心逻辑:排队等死

回到顶部