Python中Celery+RabbitMQ为什么每次都会生成一个新的队列?
#abmp.py:
from celery import Celery
app = Celery(‘abmp’,
backend=‘amqp://guest@localhost’,
broker=‘amqp://guest@localhost’
)
@app.task(bind=True)
def add(self, a, b):
return a + b
# execute_test.py
from abmp import add
result = add.apply_async(
args=(5,7),
queue=‘push_tasks’,
exchange=‘push_tasks’,
routing_key=‘push_tasks’
)
#后台执行
celery -A abmp worker -E -Q push_tasks -l info
最后到 rabbitmq 后台查看,发现每次执行 execute_test.py 都生成了一个新的 queue,而不是把任务丢给 push_tasks 中。
Python中Celery+RabbitMQ为什么每次都会生成一个新的队列?
是我配置错误了吗
我理解你的问题。在使用Celery+RabbitMQ时,每次启动worker都会生成新队列,通常是因为配置了exclusive或auto_delete参数,或者使用了默认的celery队列名称。
最常见的原因是Celery的默认配置。默认情况下,Celery会为每个worker进程创建一个唯一的队列。这是通过task_default_queue设置和worker的自动命名机制实现的。
解决方案:
- 固定队列名称:在Celery配置中明确指定队列名称,而不是让Celery自动生成。
# celery_config.py
from kombu import Queue
app.conf.task_queues = (
Queue('my_queue', routing_key='my_queue'),
)
app.conf.task_default_queue = 'my_queue'
app.conf.task_default_exchange = 'my_queue'
app.conf.task_default_routing_key = 'my_queue'
- 检查worker启动参数:确保启动worker时没有使用
--exclusive或类似的参数。
# 不要使用--exclusive
celery -A your_app worker --loglevel=info -Q my_queue
- 配置broker_url:确保RabbitMQ连接URL正确,并且没有设置特殊参数。
app.conf.broker_url = 'amqp://user:password@localhost:5672//'
- 查看现有队列:检查RabbitMQ中已有的队列:
# 查看RabbitMQ队列
rabbitmqctl list_queues name durable auto_delete
关键点:Celery默认会为每个worker节点创建独立队列来实现负载均衡。如果你想要固定的共享队列,需要显式配置队列定义并禁用自动队列创建。
建议明确配置任务队列而不是依赖默认行为。
新的叫啥
一个随机的队列名,看着像 UUID 的字符串。
lz 后来怎么解决了?

