在Celery里实现秒级定时任务,用crontab不够精确,得用beat_schedule配合秒级间隔。下面是个完整示例:
# tasks.py
from celery import Celery
from datetime import datetime, timedelta
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def my_secondly_task():
print(f"秒级任务执行于: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# 配置秒级定时
app.conf.update(
beat_schedule={
'run-every-10-seconds': {
'task': 'tasks.my_secondly_task',
'schedule': 10.0, # 每10秒执行一次
'args': (),
'kwargs': {},
'options': {'queue': 'default'},
},
},
timezone='Asia/Shanghai',
)
启动命令:
# 启动worker
celery -A tasks worker --loglevel=info
# 启动beat调度器
celery -A tasks beat --loglevel=info
关键点:
schedule参数直接传浮点数(秒数)就能实现秒级调度
- 用Redis做broker响应更快
- 记得同时启动worker和beat进程
要指定时间段执行的话,可以在任务函数里加时间判断:
@app.task
def my_time_window_task():
now = datetime.now().time()
start = datetime.strptime("09:00:00", "%H:%M:%S").time()
end = datetime.strptime("17:00:00", "%H:%M:%S").time()
if start <= now <= end:
print(f"在指定时间段内执行: {now}")
用schedule参数设置秒数最直接。