Python中如何实现Celery分布式任务队列的监控脚本

# coding=utf-8
import celery
import celery.states
import celery.events
import collections
from itertools import chain
import logging
import prometheus_client
import sys
from threading import Thread
import time
import json
import os
from app.tasks import celery as app

监测的指标

在线 worker 数量

WORKERS = prometheus_client.Gauge(‘celery_workers’, ‘Number of alive workers’)

每种状态任务的数量

TASKS = prometheus_client.Gauge(‘celery_tasks’, ‘Number of tasks per state’, [‘state’])

每种状态任务的名字和数量, celery 所有任务概览

TASKS_NAME = prometheus_client.Gauge(‘celery_tasks_by_name’, ‘Number of tasks per state and name’, [‘state’, ‘name’])

每个任务的执行时间,监测任务本身性能,用于优化 sql

TASKS_RUNTIME = prometheus_client.Histogram(‘celery_tasks_runtime_seconds’, ‘Task runtime (seconds)’, [‘name’])

每个任务的启动时间,监测阻塞情况, 用于分配调节 worker 数量

LATENCY = prometheus_client.Histogram(‘celery_task_latency’, ‘Seconds between a task is received and started.’)

logger = logging.getLogger(name)

class WorkerMonitoring:

def __init__(self, app):
    self._app = app

def run(self):
    while True:
        self.update_workers_count()
        time.sleep(5)

def update_workers_count(self):
    try:
        WORKERS.set(len(self._app.control.ping(timeout=5)))
    except Exception as exc:
        logger.exception("Error while pinging workers")

class TaskMonitoring:

def __init__(self, app):
    self._app = app
    self._state = self._app.events.State()
    self._known_states = set()
    self._known_states_names = set()

def run(self):
    self._monitor()

def _process_event(self, event):
    print(event)
    # 时间可能并发过来,加锁
    with self._state._mutex:
        if event['type'] != 'worker-heartbeat':
            event_type = event['type'][5:]
            state = celery.events.state.TASK_EVENT_TO_STATE[event_type]
            if state == celery.states.STARTED:
                # 监测启动时间
                self._observe_latency(event)

            self._collect_tasks(event, state)

def _observe_latency(self, event):
    try:
        prev_evt = self._state.tasks[event['uuid']]
    except KeyError:
        pass
    else:
        if prev_evt.state == celery.states.RECEIVED:
            LATENCY.observe(
                event['local_received'] - prev_evt.local_received)

def _collect_tasks(self, event, state):
    if state in celery.states.READY_STATES:
        self._incr_ready_task(event, state)
    else:

        self._state._event(event)

    self._collect_unready_tasks()

def _incr_ready_task(self, event, state):

    #  'FAILURE', 'REVOKED', 'SUCCESS' 任务信息
    TASKS.labels(state=state).inc()
    try:
        name = self._state.tasks.pop(event['uuid']).name
        runtime = event.get('runtime')
        
        if name is not None and runtime is not None:
            TASKS_RUNTIME.labels(name=name).observe(runtime)
    except (KeyError, AttributeError):
        pass

def _collect_unready_tasks(self):
    # 'PENDING', 'RECEIVED', 'REJECTED', 'RETRY', 'STARTED 任务信息
    cnt = collections.Counter(t.state for t in self._state.tasks.values())
    self._known_states.update(cnt.elements())
    for task_state in self._known_states:
        TASKS.labels(state=task_state).set(cnt[task_state])

    cnt = collections.Counter((t.state, t.name) for t in self._state.tasks.values() if t.name)
    self._known_states_names.update(cnt.elements())
    for task_state in self._known_states_names:
        TASKS_NAME.labels(state=task_state[0], name=task_state[1], ).set(cnt[task_state])

def _monitor(self):
    while True:
        try:
            with self._app.connection() as conn:
                # 从 broker 接收所有的事件,并交给 process_event 处理
                logger.info("Try to connect to broker")
                recv = self._app.events.Receiver(conn, handlers={'*': self._process_event, })

                setup_metrics(self._app)
                recv.capture(limit=None, timeout=None, wakeup=True)
                logger.info("Connected to broker")

        except Exception as e:
            logger.exception("Queue connection failed")
            setup_metrics(self._app)
            time.sleep(5)

def setup_metrics(app): WORKERS.set(0) try: registered_tasks = app.control.inspect().registered_tasks().values() except Exception as e:

    for metric in TASKS.collect():
        for name, labels, cnt in metric.samples:
            TASKS.labels(**labels).set(0)
    for metric in TASKS_NAME.collect():
        for name, labels, cnt in metric.samples:
            TASKS_NAME.labels(**labels).set(0)
else:

    # 'FAILURE', 'PENDING', 'RECEIVED', 'RETRY', 'REVOKED', 'STARTED', 'SUCCESS'
    for state in celery.states.ALL_STATES:

        TASKS.labels(state=state).set(0)
        for task_name in set(chain.from_iterable(registered_tasks)):
            TASKS_NAME.labels(state=state, name=task_name).set(0)

class EnableEvents:

# celery 有个问题,即使配置了 CELERY_SEND_EVENTS,也不发送事件,采取这种方式

def __init__(self, app):
    self._app = app

def run(self):  # pragma: no cover
    while True:
        try:
            self.enable_events()
        except Exception as exc:
            self.log.exception("Error while trying to enable events")
        time.sleep(5)

def enable_events(self):
    self._app.control.enable_events()

def start_httpd(addr): host, port = addr.split(’:’) logging.info(‘Starting HTTPD on {}:{}’.format(host, port)) prometheus_client.start_http_server(int(port), host)

def celery_monitoring(): setup_metrics(app)

e = Thread(target=EnableEvents(app).run)
e.daemon = True
e.start()

w = Thread(target=WorkerMonitoring(app).run)
w.daemon = True
w.start()

t = Thread(target=TaskMonitoring(app).run)
t.daemon = True
t.start()

start_httpd('0.0.0.0:49792')

t.join()
w.join()
e.join()

@manager.command def start_celery_monitoring(): “”" nohup python manage.py start_celery_monitoring & “”" celery_monitoring()


Python中如何实现Celery分布式任务队列的监控脚本

5 回复

推荐楼主传到 github 上给我们个链接就行了


要监控Celery分布式任务队列,可以结合Flower和自定义脚本。Flower是官方推荐的Web监控工具,但有时需要程序化获取数据。下面是一个完整的监控脚本示例:

# celery_monitor.py
import json
import time
from celery import Celery
from celery.result import AsyncResult

class CeleryMonitor:
    def __init__(self, broker_url='redis://localhost:6379/0'):
        self.app = Celery('monitor', broker=broker_url)
        self.inspect = self.app.control.inspect()
    
    def get_active_tasks(self):
        """获取所有工作节点的活跃任务"""
        active = self.inspect.active()
        if not active:
            return {}
        
        tasks = []
        for worker, task_list in active.items():
            for task in task_list:
                tasks.append({
                    'worker': worker,
                    'task_id': task['id'],
                    'name': task['name'],
                    'args': task['args'],
                    'started': task['time_start']
                })
        return tasks
    
    def get_queue_stats(self):
        """获取队列统计信息"""
        stats = self.inspect.stats()
        if not stats:
            return {}
        
        queue_info = {}
        for worker, info in stats.items():
            queue_info[worker] = {
                'total_tasks': info.get('total', {}),
                'active': info.get('active', 0),
                'processed': info.get('processed', 0)
            }
        return queue_info
    
    def get_task_result(self, task_id):
        """获取特定任务结果"""
        result = AsyncResult(task_id, app=self.app)
        return {
            'task_id': task_id,
            'status': result.status,
            'result': result.result if result.ready() else None,
            'traceback': result.traceback
        }
    
    def monitor_loop(self, interval=10):
        """持续监控循环"""
        try:
            while True:
                print(f"\n=== Celery监控 {time.strftime('%Y-%m-%d %H:%M:%S')} ===")
                
                # 1. 活跃任务
                active = self.get_active_tasks()
                print(f"活跃任务数: {len(active)}")
                for task in active:
                    print(f"  - {task['name']} on {task['worker']}")
                
                # 2. 队列统计
                stats = self.get_queue_stats()
                for worker, info in stats.items():
                    print(f"{worker}: 活跃={info['active']}, 已处理={info['processed']}")
                
                time.sleep(interval)
                
        except KeyboardInterrupt:
            print("\n监控已停止")

# 使用示例
if __name__ == '__main__':
    # 配置你的broker地址
    monitor = CeleryMonitor('redis://localhost:6379/0')
    
    # 单次检查
    print("活跃任务:", json.dumps(monitor.get_active_tasks(), indent=2))
    print("\n队列统计:", json.dumps(monitor.get_queue_stats(), indent=2))
    
    # 或者运行持续监控
    # monitor.monitor_loop(interval=5)

这个脚本提供了几个关键功能:

  1. get_active_tasks() - 获取当前正在执行的任务
  2. get_queue_stats() - 获取工作节点统计信息
  3. get_task_result() - 查询特定任务结果
  4. monitor_loop() - 持续监控循环

你还可以扩展这个脚本来:

  • 集成Prometheus指标导出
  • 添加异常告警(邮件/Slack)
  • 记录历史数据到数据库
  • 监控任务超时和重试情况

对于生产环境,建议结合Flower的Web界面和这种程序化监控脚本。Flower提供更丰富的可视化,而自定义脚本可以集成到你的运维系统中。

总结:用Celery的control API获取运行时数据,结合Flower做可视化。

兄弟是看我帖子了吗

看了~我最近也要处理这个事情~

最好加上 readme ~

回到顶部