直接上代码。清空Celery队列最常用的方法是使用celery purge命令,但有时需要在代码里操作。这里给你几种方法:
方法1:命令行清空(最常用)
# 清空默认队列
celery -A your_project purge
# 清空指定队列
celery -A your_project purge -Q queue_name
# 强制确认(不提示)
celery -A your_project purge -f
方法2:Python代码清空
from celery import Celery
app = Celery('your_project')
# 方法2.1: 使用control命令
def purge_queue(queue_name='celery'):
with app.connection() as conn:
# 获取队列中的消息数量
count = conn.default_channel.queue_purge(queue_name)
print(f"Purged {count} messages from {queue_name}")
return count
# 方法2.2: 通过app.control
def purge_all_queues():
app.control.purge() # 清空所有队列
print("All queues purged")
# 方法2.3: 清空特定队列(需要broker支持)
def purge_specific_queues():
inspector = app.control.inspect()
active_queues = inspector.active_queues() or {}
for worker, queues in active_queues.items():
for queue_info in queues:
queue_name = queue_info['name']
with app.connection() as conn:
conn.default_channel.queue_purge(queue_name)
print(f"Purged queue: {queue_name} on worker: {worker}")
# 使用示例
if __name__ == '__main__':
purge_queue('default') # 清空默认队列
# 或者
purge_all_queues()
方法3:直接操作Redis(如果使用Redis作为broker)
import redis
def purge_redis_queue(queue_name='celery'):
r = redis.Redis(host='localhost', port=6379, db=0)
# Celery在Redis中使用的键名模式
key = f"celery" # 默认队列
# 或者指定队列: key = f"celery.{queue_name}"
# 删除所有任务
deleted = r.delete(key)
print(f"Deleted {deleted} keys for queue {queue_name}")
# 如果需要更精确,可以删除所有相关键
pattern = f"celery*{queue_name}*"
keys = r.keys(pattern)
if keys:
r.delete(*keys)
print(f"Deleted {len(keys)} keys matching {pattern}")
方法4:清空结果后端(如果需要)
def purge_results():
# 清空所有任务结果
app.control.discard_all()
# 或者使用结果后端的方法
from celery.result import AsyncResult
# 如果需要更细粒度的控制
# 这取决于你使用的结果后端(Redis, Django DB等)
if hasattr(app.backend, 'cleanup'):
app.backend.cleanup()
print("Results purged")
重要提醒:
- 清空队列会删除所有待处理任务,无法恢复
- 生产环境谨慎操作,最好先备份或确认队列内容
- 不同broker(RabbitMQ, Redis, SQS)的实现方式略有不同
- 确保Celery worker没有在运行,或者清空后重启worker
一句话总结:用celery purge命令最快,代码里用app.control.purge()最方便。