Python中如何使用django-celery实现动态添加周期任务
项目中现在有个需求是用户可以在前端页面添加定时或者周期任务,
后端的话目前采用 django-celery 来实现此类任务,
请问如何实现动态添加任务且不用重启 celery 相关进程?
Python中如何使用django-celery实现动态添加周期任务
在Django项目中用django-celery实现动态添加周期任务,核心是操作django_celery_beat.models.PeriodicTask和CrontabSchedule模型。下面是一个完整示例:
首先确保已安装并配置好django-celery-beat:
pip install django-celery-beat
在settings.py中添加:
INSTALLED_APPS = [
...
'django_celery_beat',
]
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
然后创建动态任务的代码:
from django_celery_beat.models import PeriodicTask, CrontabSchedule
import json
def add_periodic_task(task_name, task_path, crontab_time, args=None, kwargs=None):
"""
动态添加周期任务
Args:
task_name: 任务名称(唯一标识)
task_path: Celery任务路径,如'myapp.tasks.my_task'
crontab_time: 定时设置,格式为(minute, hour, day_of_month, month_of_year, day_of_week)
args: 位置参数列表
kwargs: 关键字参数字典
"""
# 创建或获取crontab计划
minute, hour, day_of_month, month_of_year, day_of_week = crontab_time
schedule, _ = CrontabSchedule.objects.get_or_create(
minute=minute,
hour=hour,
day_of_month=day_of_month,
month_of_year=month_of_year,
day_of_week=day_of_week,
timezone='Asia/Shanghai' # 根据实际情况调整时区
)
# 创建周期任务
task_args = json.dumps(args) if args else '[]'
task_kwargs = json.dumps(kwargs) if kwargs else '{}'
PeriodicTask.objects.update_or_create(
name=task_name,
defaults={
'task': task_path,
'crontab': schedule,
'enabled': True,
'args': task_args,
'kwargs': task_kwargs,
}
)
使用示例:
# 添加一个每天凌晨2点执行的任务
add_periodic_task(
task_name='daily_report',
task_path='reports.tasks.generate_daily_report',
crontab_time=('0', '2', '*', '*', '*'), # 分钟, 小时, 日, 月, 周几
kwargs={'report_type': 'summary'}
)
# 添加一个每周一上午9点的任务
add_periodic_task(
task_name='weekly_cleanup',
task_path='maintenance.tasks.cleanup_old_data',
crontab_time=('0', '9', '*', '*', '1') # 周一
)
要删除或禁用任务:
# 禁用任务
task = PeriodicTask.objects.get(name='daily_report')
task.enabled = False
task.save()
# 删除任务
PeriodicTask.objects.filter(name='weekly_cleanup').delete()
关键点:
- 使用
CrontabSchedule定义执行时间 PeriodicTask关联具体Celery任务和计划- 通过
update_or_create避免重复创建 - 参数需要JSON序列化存储
动态任务添加后,Celery beat会自动检测数据库变化并调整调度。记得重启beat服务使新任务生效。
一句话总结:直接操作django_celery_beat的模型来管理动态周期任务。
ditto
可以看看 django-celery-beat 这个项目
https://github.com/sibson/redbeat 可以看下这个项目,最近在用,基于 redis 做的,比较通用
如果任务比较轻,时间短直接 while 循环吧,celery 有时假死在那,或者后端 borker 出故障也会出问题。繁重任务还是用专门的消息队列处理吧
可以参考一下我的爬虫项目 crawlab,http://www.github.com/tikazyq/crawlab,之前折腾 celery beat 没有成功,后来转用 apscheduler 就可以动态添加定时任务了 ;-)
文件地址: https://github.com/tikazyq/crawlab/blob/master/crawlab/tasks/scheduler.py
我看的一个项目叫 django-Q https://django-q.readthedocs.io/en/latest/ 因为周期任务都在数据库中,只要填函数名字和参数就可以添加任务


