Python中如何开发一个类似于Cronhub的监控服务?

开发了一个类似于 cronhub 的监控服务.有兴趣的话一块完善完善. 应用场景的话主要是:
1. 比如备份程序每天晚上 8 点执行,怎么确保程序没问题
2. 有个后台进程, 每 30 分钟同步一次数据,怎么确保没问题呢


https://510908220.github.io/cron-sentinel/
Python中如何开发一个类似于Cronhub的监控服务?

1 回复

要开发一个类似Cronhub的监控服务,核心是构建一个调度系统来定期执行并监控任务。下面是一个基于APScheduler和Flask的基础实现,包含任务调度、执行监控和简单的HTTP状态端点。

import logging
from datetime import datetime
from flask import Flask, jsonify
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
import requests
import threading

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = Flask(__name__)

# 存储任务状态和结果
task_store = {}
store_lock = threading.Lock()

def monitor_job(job_id, url, method='GET', expected_status=200):
    """监控任务:访问URL并检查状态码"""
    start_time = datetime.now()
    status = 'success'
    error_msg = None
    
    try:
        response = requests.request(method, url, timeout=10)
        if response.status_code != expected_status:
            status = 'failed'
            error_msg = f'Expected status {expected_status}, got {response.status_code}'
    except Exception as e:
        status = 'failed'
        error_msg = str(e)
    
    end_time = datetime.now()
    duration = (end_time - start_time).total_seconds()
    
    # 记录任务结果
    with store_lock:
        task_store[job_id] = {
            'last_run': start_time.isoformat(),
            'status': status,
            'duration': duration,
            'error': error_msg,
            'next_run': scheduler.get_job(job_id).next_run_time.isoformat() if scheduler.get_job(job_id) else None
        }
    
    logger.info(f"Job {job_id}: {status.upper()} in {duration:.2f}s")

# 配置调度器
executors = {'default': ThreadPoolExecutor(20)}
scheduler = BackgroundScheduler(executors=executors)

@app.route('/jobs', methods=['GET'])
def list_jobs():
    """列出所有监控任务"""
    jobs = []
    for job in scheduler.get_jobs():
        job_data = {
            'id': job.id,
            'name': job.name,
            'next_run': job.next_run_time.isoformat() if job.next_run_time else None,
            'url': job.args[1] if len(job.args) > 1 else None
        }
        if job.id in task_store:
            job_data.update(task_store[job.id])
        jobs.append(job_data)
    return jsonify(jobs)

@app.route('/jobs/<job_id>/run', methods=['POST'])
def run_job_now(job_id):
    """立即运行指定任务"""
    job = scheduler.get_job(job_id)
    if not job:
        return jsonify({'error': 'Job not found'}), 404
    
    # 在后台线程中立即执行
    threading.Thread(target=job.func, args=job.args).start()
    return jsonify({'message': f'Job {job_id} triggered'})

def add_monitor_job(job_id, url, schedule='interval', **schedule_args):
    """添加监控任务"""
    if schedule == 'interval':
        scheduler.add_job(
            monitor_job,
            'interval',
            args=[job_id, url],
            id=job_id,
            name=f'Monitor {url}',
            **schedule_args
        )
    elif schedule == 'cron':
        scheduler.add_job(
            monitor_job,
            'cron',
            args=[job_id, url],
            id=job_id,
            name=f'Monitor {url}',
            **schedule_args
        )
    logger.info(f"Added job {job_id} monitoring {url}")

if __name__ == '__main__':
    # 启动调度器
    scheduler.start()
    
    # 添加示例任务
    add_monitor_job('example_1', 'https://httpbin.org/status/200', seconds=30)
    add_monitor_job('example_2', 'https://httpbin.org/status/404', seconds=45, expected_status=404)
    
    # 启动Flask应用
    app.run(host='0.0.0.0', port=5000, debug=False)

这个实现包含了几个关键部分:

  1. 任务调度:使用APScheduler处理定时任务,支持interval和cron两种调度方式。
  2. 监控执行monitor_job函数执行HTTP请求并验证响应状态。
  3. 状态存储:用字典存储任务执行结果,包括最后运行时间、状态和持续时间。
  4. API端点:提供RESTful接口来列出任务和手动触发执行。

要扩展成完整服务,你需要添加数据库持久化、用户认证、报警通知(邮件/Slack)、重试机制和Web界面。Celery可以作为更强大的任务队列替代APScheduler,SQLAlchemy或Django ORM处理数据存储。

用Flask+APScheduler快速搭建核心,再逐步完善功能。

回到顶部