Python中如何实现Celery分布式任务队列的监控脚本
# coding=utf-8
import celery
import celery.states
import celery.events
import collections
from itertools import chain
import logging
import prometheus_client
import sys
from threading import Thread
import time
import json
import os
from app.tasks import celery as app
监测的指标
在线 worker 数量
WORKERS = prometheus_client.Gauge(‘celery_workers’, ‘Number of alive workers’)
每种状态任务的数量
TASKS = prometheus_client.Gauge(‘celery_tasks’, ‘Number of tasks per state’, [‘state’])
每种状态任务的名字和数量, celery 所有任务概览
TASKS_NAME = prometheus_client.Gauge(‘celery_tasks_by_name’, ‘Number of tasks per state and name’, [‘state’, ‘name’])
每个任务的执行时间,监测任务本身性能,用于优化 sql
TASKS_RUNTIME = prometheus_client.Histogram(‘celery_tasks_runtime_seconds’, ‘Task runtime (seconds)’, [‘name’])
每个任务的启动时间,监测阻塞情况, 用于分配调节 worker 数量
LATENCY = prometheus_client.Histogram(‘celery_task_latency’, ‘Seconds between a task is received and started.’)
logger = logging.getLogger(name)
class WorkerMonitoring:
def __init__(self, app):
self._app = app
def run(self):
while True:
self.update_workers_count()
time.sleep(5)
def update_workers_count(self):
try:
WORKERS.set(len(self._app.control.ping(timeout=5)))
except Exception as exc:
logger.exception("Error while pinging workers")
class TaskMonitoring:
def __init__(self, app):
self._app = app
self._state = self._app.events.State()
self._known_states = set()
self._known_states_names = set()
def run(self):
self._monitor()
def _process_event(self, event):
print(event)
# 时间可能并发过来,加锁
with self._state._mutex:
if event['type'] != 'worker-heartbeat':
event_type = event['type'][5:]
state = celery.events.state.TASK_EVENT_TO_STATE[event_type]
if state == celery.states.STARTED:
# 监测启动时间
self._observe_latency(event)
self._collect_tasks(event, state)
def _observe_latency(self, event):
try:
prev_evt = self._state.tasks[event['uuid']]
except KeyError:
pass
else:
if prev_evt.state == celery.states.RECEIVED:
LATENCY.observe(
event['local_received'] - prev_evt.local_received)
def _collect_tasks(self, event, state):
if state in celery.states.READY_STATES:
self._incr_ready_task(event, state)
else:
self._state._event(event)
self._collect_unready_tasks()
def _incr_ready_task(self, event, state):
# 'FAILURE', 'REVOKED', 'SUCCESS' 任务信息
TASKS.labels(state=state).inc()
try:
name = self._state.tasks.pop(event['uuid']).name
runtime = event.get('runtime')
if name is not None and runtime is not None:
TASKS_RUNTIME.labels(name=name).observe(runtime)
except (KeyError, AttributeError):
pass
def _collect_unready_tasks(self):
# 'PENDING', 'RECEIVED', 'REJECTED', 'RETRY', 'STARTED 任务信息
cnt = collections.Counter(t.state for t in self._state.tasks.values())
self._known_states.update(cnt.elements())
for task_state in self._known_states:
TASKS.labels(state=task_state).set(cnt[task_state])
cnt = collections.Counter((t.state, t.name) for t in self._state.tasks.values() if t.name)
self._known_states_names.update(cnt.elements())
for task_state in self._known_states_names:
TASKS_NAME.labels(state=task_state[0], name=task_state[1], ).set(cnt[task_state])
def _monitor(self):
while True:
try:
with self._app.connection() as conn:
# 从 broker 接收所有的事件,并交给 process_event 处理
logger.info("Try to connect to broker")
recv = self._app.events.Receiver(conn, handlers={'*': self._process_event, })
setup_metrics(self._app)
recv.capture(limit=None, timeout=None, wakeup=True)
logger.info("Connected to broker")
except Exception as e:
logger.exception("Queue connection failed")
setup_metrics(self._app)
time.sleep(5)
def setup_metrics(app):
WORKERS.set(0)
try:
registered_tasks = app.control.inspect().registered_tasks().values()
except Exception as e:
for metric in TASKS.collect():
for name, labels, cnt in metric.samples:
TASKS.labels(**labels).set(0)
for metric in TASKS_NAME.collect():
for name, labels, cnt in metric.samples:
TASKS_NAME.labels(**labels).set(0)
else:
# 'FAILURE', 'PENDING', 'RECEIVED', 'RETRY', 'REVOKED', 'STARTED', 'SUCCESS'
for state in celery.states.ALL_STATES:
TASKS.labels(state=state).set(0)
for task_name in set(chain.from_iterable(registered_tasks)):
TASKS_NAME.labels(state=state, name=task_name).set(0)
class EnableEvents:
# celery 有个问题,即使配置了 CELERY_SEND_EVENTS,也不发送事件,采取这种方式
def __init__(self, app):
self._app = app
def run(self): # pragma: no cover
while True:
try:
self.enable_events()
except Exception as exc:
self.log.exception("Error while trying to enable events")
time.sleep(5)
def enable_events(self):
self._app.control.enable_events()
def start_httpd(addr):
host, port = addr.split(’:’)
logging.info(‘Starting HTTPD on {}:{}’.format(host, port))
prometheus_client.start_http_server(int(port), host)
def celery_monitoring():
setup_metrics(app)
e = Thread(target=EnableEvents(app).run)
e.daemon = True
e.start()
w = Thread(target=WorkerMonitoring(app).run)
w.daemon = True
w.start()
t = Thread(target=TaskMonitoring(app).run)
t.daemon = True
t.start()
start_httpd('0.0.0.0:49792')
t.join()
w.join()
e.join()
@manager.command
def start_celery_monitoring():
“”"
nohup python manage.py start_celery_monitoring &
“”"
celery_monitoring()
Python中如何实现Celery分布式任务队列的监控脚本
推荐楼主传到 github 上给我们个链接就行了
要监控Celery分布式任务队列,可以结合Flower和自定义脚本。Flower是官方推荐的Web监控工具,但有时需要程序化获取数据。下面是一个完整的监控脚本示例:
# celery_monitor.py
import json
import time
from celery import Celery
from celery.result import AsyncResult
class CeleryMonitor:
def __init__(self, broker_url='redis://localhost:6379/0'):
self.app = Celery('monitor', broker=broker_url)
self.inspect = self.app.control.inspect()
def get_active_tasks(self):
"""获取所有工作节点的活跃任务"""
active = self.inspect.active()
if not active:
return {}
tasks = []
for worker, task_list in active.items():
for task in task_list:
tasks.append({
'worker': worker,
'task_id': task['id'],
'name': task['name'],
'args': task['args'],
'started': task['time_start']
})
return tasks
def get_queue_stats(self):
"""获取队列统计信息"""
stats = self.inspect.stats()
if not stats:
return {}
queue_info = {}
for worker, info in stats.items():
queue_info[worker] = {
'total_tasks': info.get('total', {}),
'active': info.get('active', 0),
'processed': info.get('processed', 0)
}
return queue_info
def get_task_result(self, task_id):
"""获取特定任务结果"""
result = AsyncResult(task_id, app=self.app)
return {
'task_id': task_id,
'status': result.status,
'result': result.result if result.ready() else None,
'traceback': result.traceback
}
def monitor_loop(self, interval=10):
"""持续监控循环"""
try:
while True:
print(f"\n=== Celery监控 {time.strftime('%Y-%m-%d %H:%M:%S')} ===")
# 1. 活跃任务
active = self.get_active_tasks()
print(f"活跃任务数: {len(active)}")
for task in active:
print(f" - {task['name']} on {task['worker']}")
# 2. 队列统计
stats = self.get_queue_stats()
for worker, info in stats.items():
print(f"{worker}: 活跃={info['active']}, 已处理={info['processed']}")
time.sleep(interval)
except KeyboardInterrupt:
print("\n监控已停止")
# 使用示例
if __name__ == '__main__':
# 配置你的broker地址
monitor = CeleryMonitor('redis://localhost:6379/0')
# 单次检查
print("活跃任务:", json.dumps(monitor.get_active_tasks(), indent=2))
print("\n队列统计:", json.dumps(monitor.get_queue_stats(), indent=2))
# 或者运行持续监控
# monitor.monitor_loop(interval=5)
这个脚本提供了几个关键功能:
get_active_tasks()- 获取当前正在执行的任务get_queue_stats()- 获取工作节点统计信息get_task_result()- 查询特定任务结果monitor_loop()- 持续监控循环
你还可以扩展这个脚本来:
- 集成Prometheus指标导出
- 添加异常告警(邮件/Slack)
- 记录历史数据到数据库
- 监控任务超时和重试情况
对于生产环境,建议结合Flower的Web界面和这种程序化监控脚本。Flower提供更丰富的可视化,而自定义脚本可以集成到你的运维系统中。
总结:用Celery的control API获取运行时数据,结合Flower做可视化。
兄弟是看我帖子了吗
看了~我最近也要处理这个事情~
最好加上 readme ~

