Python中如何使用Celery重新加载数据或模块

第一次用 celery,系统中需要定时的下载数据文件并更新到内存中。
我定义了一个 reload_cache 的方法,加入到 beat schedule 里面,每次定时任务触发时,worker 的 MainProcess 会把 task 交给任意一个 SubProcess 去执行,导致其他 SubProcess 没有执行 reload_cache。如果用 gevent 应该没有这个问题。
想请教一下,有没有什么办法,保证 pool 里面的每个 SubProcess 都执行 reload_cache 方法。谢谢!
Python中如何使用Celery重新加载数据或模块

1 回复

在Celery中重新加载数据或模块,主要取决于你的具体场景。这里有几个常见的方法:

1. 重启Worker(最简单直接) 如果你修改了任务代码或配置,最可靠的方式是重启Celery worker:

# 使用supervisor
sudo supervisorctl restart celery-worker

# 直接kill然后重启
pkill -f "celery worker"
celery -A your_project worker --loglevel=info

2. 使用--autoreload参数(开发环境) 在开发时,可以启用自动重载,这样修改代码后worker会自动重启:

celery -A your_project worker --loglevel=info --autoreload

注意:生产环境不建议使用,因为会影响性能。

3. 动态导入模块 对于需要频繁更新的数据,可以在任务函数内部动态导入:

from celery import Celery
import importlib

app = Celery('tasks')

@app.task
def process_data():
    # 每次执行都重新导入模块
    data_module = importlib.import_module('your_data_module')
    importlib.reload(data_module)
    
    # 使用最新数据
    latest_data = data_module.get_data()
    # ...处理逻辑

4. 使用信号或外部存储 对于配置数据,可以放在数据库或缓存中,通过信号触发更新:

from django.core.cache import cache  # 如果使用Django

@app.task
def update_config():
    new_config = fetch_config_from_db()
    cache.set('app_config', new_config)

@app.task
def use_config():
    config = cache.get('app_config')
    # 使用配置

总结建议:代码改动重启worker,数据更新用动态加载或外部存储。

回到顶部