Python中多个Celery worker执行同一类型任务时,如何获取任务具体在哪个worker上执行?
场景是这样的,假设我有一种类型的 celery task,
task name:scan.xxx ,
celery config 里面配置了 scan.xxx 这个任务路由到 scan_tasks 这个队列中(采用得是 rabbitMQ )
同时有多个 worker 同时在监听该队列,比如 host1 host2 host3,concurrent = 4 (也就是每个 worker 最大并发为 4 个进程)
现在的场景:
需要我知道某个任务被哪一个 worker (或者说哪一个 host )执行了,目的是想获取所有 worker (所有 host )的情况,比如负载百分之多少,运行了几个进程,当前状态(idle running)等
现在的方案:
任务使用 apply_async()函数被异步执行时传递指定的队列名,这样的话
queue1 -> host1
queue2 -> host2
queue3 -> host3
这样会有问题:
如果以后我要增加任务执行的效率,提高程序性能,不关闭程序的情况下,我要预先开辟多个队列(假设现在考虑最多 10 个 worker ) queue1、queue2、queue3、…、queue10,十分不优雅
还有一种方案是在任务执行时,get_ip()然后记录,得到是在哪台机器上执行的,但是同样感觉很不优雅
对了,最后还有一种方案是 celery flower,但是它的 api 不稳定,我现在看看,有没有不借助 flower 的方案呢
不知各位有没有这样的场景,都是怎么做的,总感觉会有优雅的方案。
Python中多个Celery worker执行同一类型任务时,如何获取任务具体在哪个worker上执行?
在Celery里,要让任务知道自己在哪个worker上跑,最直接的办法是用current_worker。不过得注意,这玩意儿只在任务执行的时候才有值。
给你个例子,在任务函数里这么写:
from celery import current_task
from celery.utils import get_full_cls_name
@app.task
def my_task():
# 获取当前worker的hostname
worker_hostname = current_task.request.hostname
print(f"这个任务正在worker {worker_hostname} 上执行")
return worker_hostname
跑任务的时候,每个worker打印出来的hostname就是它自己的。
如果你在任务外面也想跟踪,可以在发任务的时候把worker信息存下来,比如用bind=True:
@app.task(bind=True)
def my_bound_task(self):
worker = self.request.hostname
print(f"Bound task on worker: {worker}")
return worker
简单说,用request.hostname就能拿到worker标识。
如果一台机器运行 20 个,那不是要浪费 19 次吗?
为什么不单独监听呢,不管是其他监控报警和扩展都好处理
兄弟没听明白你的意思,我只在一台机器运行一个 worker,在 20 台机器运行 20 个 worker。。

