Python中如何使用Celery往指定的Exchange发送消息

celery 文档上的用法好像是声明好 exchange,queue,绑定好之后往队列里发,但是我现在的场景是直接发到 exchange 里就可以,queue 谁来绑定我并不关心,我也不要去做声明用 kombu 自己实现很容易,但是 celery 有什么方法做到吗,我看到的都是直接往 queue 里发.消息队列用的 rmq,类型是 topic,顺便求 flask+celery+rmq 的的最佳实践.以下是我用 kombu 的实现

def send_as_task( exchange_name, args=(), kwargs={}, routing_key=''):
    exchange = Exchange(name=exchange_name, type='topic', durable=True, auto_delete=False)
    payload = {'args': args, 'kwargs': kwargs}
    with producers[config_use.get_connection()].acquire(block=True) as producer:
        producer.publish(body=payload,
                         serializer='json',
                         compression='bzip2',
                         exchange=exchange,
                         declare=[exchange],
                         routing_key=routing_key,
                         retry=True,
                         retry_policy={
                             'interval_start': 0,  # First retry immediately,
                             'interval_step': 2,  # then increase by 2s for every retry.
                             'interval_max': 30,  # but don't exceed 30s between retries.)
                             'max_retries': 30,  # give up after 30 tries.
                         },
                         )

Python中如何使用Celery往指定的Exchange发送消息

1 回复

# 首先确保安装了celery和对应的broker(如RabbitMQ)
# pip install celery

from celery import Celery
import kombu

# 1. 创建Celery应用时指定broker
app = Celery('tasks', broker='pyamqp://guest@localhost//')

# 2. 定义自定义的Exchange
custom_exchange = kombu.Exchange(
    'my_custom_exchange',  # Exchange名称
    type='direct',         # Exchange类型:direct/topic/fanout/headers
    durable=True           # 是否持久化
)

# 3. 发送消息到指定Exchange
def send_to_custom_exchange():
    # 方法1:使用send_task直接指定exchange
    app.send_task(
        'tasks.process_data',  # 任务名称
        args=['some_data'],    # 参数
        kwargs={},
        exchange=custom_exchange,  # 指定Exchange
        routing_key='my_routing_key',  # 路由键
        serializer='json'
    )
    
    # 方法2:通过自定义Queue绑定到指定Exchange
    custom_queue = kombu.Queue(
        'my_custom_queue',
        exchange=custom_exchange,
        routing_key='my_routing_key'
    )
    
    # 将队列注册到Celery
    app.conf.task_queues = (custom_queue,)
    
    # 发送到该队列
    app.send_task(
        'tasks.process_data',
        args=['data_for_custom_queue'],
        queue='my_custom_queue'
    )

# 4. 定义处理任务(可选,如果只是发消息不需要)
@app.task
def process_data(data):
    print(f"Processing: {data}")
    return f"Processed {data}"

# 5. 运行发送函数
if __name__ == '__main__':
    send_to_custom_exchange()

关键点:

  1. kombu.Exchange创建自定义Exchange,指定名称、类型和持久化选项
  2. 发送时通过exchange参数直接指定,或者创建绑定到该Exchange的Queue
  3. Exchange类型根据需求选:direct(精确匹配routing_key)、topic(模式匹配)、fanout(广播)、headers(头匹配)
  4. 记得启动对应的worker:celery -A your_module worker --loglevel=info

总结:创建Exchange对象并在send_task时指定即可。

回到顶部