Python中如何开发一个类似于Cronhub的监控服务?
开发了一个类似于 cronhub 的监控服务.有兴趣的话一块完善完善. 应用场景的话主要是:
1. 比如备份程序每天晚上 8 点执行,怎么确保程序没问题
2. 有个后台进程, 每 30 分钟同步一次数据,怎么确保没问题呢
https://510908220.github.io/cron-sentinel/
Python中如何开发一个类似于Cronhub的监控服务?
1 回复
要开发一个类似Cronhub的监控服务,核心是构建一个调度系统来定期执行并监控任务。下面是一个基于APScheduler和Flask的基础实现,包含任务调度、执行监控和简单的HTTP状态端点。
import logging
from datetime import datetime
from flask import Flask, jsonify
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
import requests
import threading
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
# 存储任务状态和结果
task_store = {}
store_lock = threading.Lock()
def monitor_job(job_id, url, method='GET', expected_status=200):
"""监控任务:访问URL并检查状态码"""
start_time = datetime.now()
status = 'success'
error_msg = None
try:
response = requests.request(method, url, timeout=10)
if response.status_code != expected_status:
status = 'failed'
error_msg = f'Expected status {expected_status}, got {response.status_code}'
except Exception as e:
status = 'failed'
error_msg = str(e)
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
# 记录任务结果
with store_lock:
task_store[job_id] = {
'last_run': start_time.isoformat(),
'status': status,
'duration': duration,
'error': error_msg,
'next_run': scheduler.get_job(job_id).next_run_time.isoformat() if scheduler.get_job(job_id) else None
}
logger.info(f"Job {job_id}: {status.upper()} in {duration:.2f}s")
# 配置调度器
executors = {'default': ThreadPoolExecutor(20)}
scheduler = BackgroundScheduler(executors=executors)
@app.route('/jobs', methods=['GET'])
def list_jobs():
"""列出所有监控任务"""
jobs = []
for job in scheduler.get_jobs():
job_data = {
'id': job.id,
'name': job.name,
'next_run': job.next_run_time.isoformat() if job.next_run_time else None,
'url': job.args[1] if len(job.args) > 1 else None
}
if job.id in task_store:
job_data.update(task_store[job.id])
jobs.append(job_data)
return jsonify(jobs)
@app.route('/jobs/<job_id>/run', methods=['POST'])
def run_job_now(job_id):
"""立即运行指定任务"""
job = scheduler.get_job(job_id)
if not job:
return jsonify({'error': 'Job not found'}), 404
# 在后台线程中立即执行
threading.Thread(target=job.func, args=job.args).start()
return jsonify({'message': f'Job {job_id} triggered'})
def add_monitor_job(job_id, url, schedule='interval', **schedule_args):
"""添加监控任务"""
if schedule == 'interval':
scheduler.add_job(
monitor_job,
'interval',
args=[job_id, url],
id=job_id,
name=f'Monitor {url}',
**schedule_args
)
elif schedule == 'cron':
scheduler.add_job(
monitor_job,
'cron',
args=[job_id, url],
id=job_id,
name=f'Monitor {url}',
**schedule_args
)
logger.info(f"Added job {job_id} monitoring {url}")
if __name__ == '__main__':
# 启动调度器
scheduler.start()
# 添加示例任务
add_monitor_job('example_1', 'https://httpbin.org/status/200', seconds=30)
add_monitor_job('example_2', 'https://httpbin.org/status/404', seconds=45, expected_status=404)
# 启动Flask应用
app.run(host='0.0.0.0', port=5000, debug=False)
这个实现包含了几个关键部分:
- 任务调度:使用APScheduler处理定时任务,支持interval和cron两种调度方式。
- 监控执行:
monitor_job函数执行HTTP请求并验证响应状态。 - 状态存储:用字典存储任务执行结果,包括最后运行时间、状态和持续时间。
- API端点:提供RESTful接口来列出任务和手动触发执行。
要扩展成完整服务,你需要添加数据库持久化、用户认证、报警通知(邮件/Slack)、重试机制和Web界面。Celery可以作为更强大的任务队列替代APScheduler,SQLAlchemy或Django ORM处理数据存储。
用Flask+APScheduler快速搭建核心,再逐步完善功能。

