Python分布式任务队列Celery中worker资源初始化的问题

Hi,请教大家一个问题:
由于异步的 celery 任务依赖 MySQL 连接以及 Redis 连接,因此需要在 worker 启动后初始化这些连接信息,但是看网上的文档,celery 的 worker 启动的时候使用的是 prefork 的方式,如果初始化连接太早会导致多个进程使用的是同一个 MySQL 和 Redis 连接,这样因该是有问题的,那么各位都是在什么时候初始化连接的呢?
Python分布式任务队列Celery中worker资源初始化的问题

4 回复

在Celery里给worker做资源初始化,最靠谱的方法是用worker_process_init信号。这玩意儿在每个worker进程启动时触发,正好用来建数据库连接池、加载大模型这些重活儿。

比如你要搞个数据库连接池,就这么干:

from celery import Celery, signals
import redis
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

app = Celery('myapp')

# 全局放连接池,别在每个task里新建
db_engine = None
redis_pool = None

@signals.worker_process_init.connect
def setup_worker_resources(sender=None, **kwargs):
    """每个worker进程启动时跑这个"""
    global db_engine, redis_pool
    
    # 建数据库连接池
    db_engine = create_engine(
        'postgresql://user:pass@localhost/db',
        pool_size=5,
        max_overflow=10
    )
    
    # Redis连接池
    redis_pool = redis.ConnectionPool(
        host='localhost',
        port=6379,
        decode_responses=True
    )
    
    print(f"Worker {sender} 资源初始化完成")

@app.task
def process_data(item_id):
    # 用全局的连接池
    redis_conn = redis.Redis(connection_pool=redis_pool)
    # ... 干活儿
    return f"处理了 {item_id}"

# 启动worker时记得带上这个初始化
# celery -A myapp worker --loglevel=info

关键点:

  1. worker_process_init信号,别用worker_init(那个只在主进程跑一次)
  2. 全局变量存连接池,task里直接拿来用
  3. 池子大小根据worker并发数调,别开太大把数据库搞崩了

如果你用prefork模式(默认),每个worker进程都会独立跑一次初始化。用--concurrency控制进程数,一个进程一个池子,互不干扰。

要是资源特别贵(比如GPU模型),考虑用--concurrency=1单进程跑,或者上worker_prefetch_multiplier控制任务预取。

一句话建议:用worker_process_init信号做进程级初始化,全局变量存资源。

没啥问题啊,就该使用同一个
初始化部分不归你管,你写不了什么的

prefork 模式只是先 spawn 出子进程,这时候子进程就像一个空的 python interpreter。 然后你的代码会在每个子进程中 load 一次, 所以说每个子进程中的连接是独立的。
mysql 的话,如果你使用了连接池,可以用 show full processlist;来查看当前的连接, 根据里面的端口查找进程, 你会发现 celery 各个子进程都有自己的连接。

redis 的话,如果你的 redis 是用作 broker 的,那么子进程是不会直接连到 broker 的。如果 redis 用作业务存储,那么与 mysql 类似。

所以,问题答案什么时候初始化连接的呢?—— 依赖于你代码的实现

回到顶部