Python分布式任务队列Celery中worker资源初始化的问题
Hi,请教大家一个问题:
由于异步的 celery 任务依赖 MySQL 连接以及 Redis 连接,因此需要在 worker 启动后初始化这些连接信息,但是看网上的文档,celery 的 worker 启动的时候使用的是 prefork 的方式,如果初始化连接太早会导致多个进程使用的是同一个 MySQL 和 Redis 连接,这样因该是有问题的,那么各位都是在什么时候初始化连接的呢?
Python分布式任务队列Celery中worker资源初始化的问题
orm
在Celery里给worker做资源初始化,最靠谱的方法是用worker_process_init信号。这玩意儿在每个worker进程启动时触发,正好用来建数据库连接池、加载大模型这些重活儿。
比如你要搞个数据库连接池,就这么干:
from celery import Celery, signals
import redis
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
app = Celery('myapp')
# 全局放连接池,别在每个task里新建
db_engine = None
redis_pool = None
@signals.worker_process_init.connect
def setup_worker_resources(sender=None, **kwargs):
"""每个worker进程启动时跑这个"""
global db_engine, redis_pool
# 建数据库连接池
db_engine = create_engine(
'postgresql://user:pass@localhost/db',
pool_size=5,
max_overflow=10
)
# Redis连接池
redis_pool = redis.ConnectionPool(
host='localhost',
port=6379,
decode_responses=True
)
print(f"Worker {sender} 资源初始化完成")
@app.task
def process_data(item_id):
# 用全局的连接池
redis_conn = redis.Redis(connection_pool=redis_pool)
# ... 干活儿
return f"处理了 {item_id}"
# 启动worker时记得带上这个初始化
# celery -A myapp worker --loglevel=info
关键点:
- 用
worker_process_init信号,别用worker_init(那个只在主进程跑一次) - 全局变量存连接池,task里直接拿来用
- 池子大小根据worker并发数调,别开太大把数据库搞崩了
如果你用prefork模式(默认),每个worker进程都会独立跑一次初始化。用--concurrency控制进程数,一个进程一个池子,互不干扰。
要是资源特别贵(比如GPU模型),考虑用--concurrency=1单进程跑,或者上worker_prefetch_multiplier控制任务预取。
一句话建议:用worker_process_init信号做进程级初始化,全局变量存资源。
没啥问题啊,就该使用同一个
初始化部分不归你管,你写不了什么的
prefork 模式只是先 spawn 出子进程,这时候子进程就像一个空的 python interpreter。 然后你的代码会在每个子进程中 load 一次, 所以说每个子进程中的连接是独立的。
mysql 的话,如果你使用了连接池,可以用 show full processlist;来查看当前的连接, 根据里面的端口查找进程, 你会发现 celery 各个子进程都有自己的连接。
redis 的话,如果你的 redis 是用作 broker 的,那么子进程是不会直接连到 broker 的。如果 redis 用作业务存储,那么与 mysql 类似。
所以,问题答案什么时候初始化连接的呢?—— 依赖于你代码的实现

