Python如何实现一个简单的基于数据上报的监控系统?
大家平时是否有过这样的场景呢:
(1)定时备份数据到备份机
(2)后台进程统计一些数据
(3)每隔一段时间执行一些逻辑(同步数据等)
但是,你是怎么确保:
(1)代码执行
(2)执行是否成功
简单的方式可以是代码执行完后触发一个通知(短信、邮件、微信等),看起来好像也可以满足需求. 但是,仔细想想有如下弊端:
1.假如有几十个类似的服务(分布在不同机器),每个服务执行完都触发通知. 这样相当于通知功能重复了几十遍(配置短信、微信、邮件等).
2.要是一些通知方式修改了(邮件等账号变了或短信服务器变了等),意味着需要修改分布很多地方的代码
3.难以统计分析,比如要分许某个服务一段时间内稳定性等.只能手动搜索邮件等去人肉查看了.
我简单实现了一个针对这样场景的监控系统 https://github.com/510908220/heartbeats 有需要的可以交流改进~
Python如何实现一个简单的基于数据上报的监控系统?
Mark
我来给你一个简单的基于数据上报的监控系统实现。这个系统包含数据上报、存储、查询和告警功能。
import time
import json
import threading
from datetime import datetime
from collections import defaultdict
from typing import Dict, List, Optional
import sqlite3
import logging
class MetricCollector:
"""指标收集器"""
def __init__(self):
self.metrics = defaultdict(list)
self.lock = threading.Lock()
def report(self, metric_name: str, value: float, tags: Dict = None):
"""上报指标数据"""
timestamp = datetime.now()
metric_data = {
'timestamp': timestamp,
'value': value,
'tags': tags or {}
}
with self.lock:
self.metrics[metric_name].append(metric_data)
logging.info(f"Metric reported: {metric_name}={value}")
class StorageManager:
"""存储管理器"""
def __init__(self, db_path='monitoring.db'):
self.db_path = db_path
self._init_database()
def _init_database(self):
"""初始化数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 创建指标表
cursor.execute('''
CREATE TABLE IF NOT EXISTS metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
metric_name TEXT NOT NULL,
value REAL NOT NULL,
tags TEXT,
timestamp DATETIME NOT NULL
)
''')
# 创建索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_metric_name ON metrics(metric_name)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_timestamp ON metrics(timestamp)')
conn.commit()
conn.close()
def store_metric(self, metric_name: str, value: float, tags: Dict, timestamp: datetime):
"""存储指标到数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO metrics (metric_name, value, tags, timestamp)
VALUES (?, ?, ?, ?)
''', (metric_name, value, json.dumps(tags), timestamp))
conn.commit()
conn.close()
class AlertManager:
"""告警管理器"""
def __init__(self):
self.alerts = []
self.thresholds = {}
def set_threshold(self, metric_name: str, threshold: float, condition: str = '>'):
"""设置告警阈值"""
self.thresholds[metric_name] = {
'threshold': threshold,
'condition': condition
}
def check_alert(self, metric_name: str, value: float) -> Optional[str]:
"""检查是否需要告警"""
if metric_name not in self.thresholds:
return None
threshold_info = self.thresholds[metric_name]
threshold = threshold_info['threshold']
condition = threshold_info['condition']
alert_triggered = False
if condition == '>' and value > threshold:
alert_triggered = True
elif condition == '<' and value < threshold:
alert_triggered = True
elif condition == '==' and value == threshold:
alert_triggered = True
if alert_triggered:
alert_msg = f"ALERT: {metric_name}={value} {condition} {threshold}"
self.alerts.append({
'timestamp': datetime.now(),
'message': alert_msg,
'metric': metric_name,
'value': value
})
return alert_msg
return None
class MonitoringSystem:
"""监控系统主类"""
def __init__(self):
self.collector = MetricCollector()
self.storage = StorageManager()
self.alert_manager = AlertManager()
self.running = False
self.flush_thread = None
def start(self):
"""启动监控系统"""
self.running = True
self.flush_thread = threading.Thread(target=self._flush_worker, daemon=True)
self.flush_thread.start()
logging.info("Monitoring system started")
def stop(self):
"""停止监控系统"""
self.running = False
if self.flush_thread:
self.flush_thread.join(timeout=5)
logging.info("Monitoring system stopped")
def _flush_worker(self):
"""后台线程:定期将数据刷入存储"""
while self.running:
time.sleep(10) # 每10秒刷一次数据
with self.collector.lock:
for metric_name, data_list in self.collector.metrics.items():
for data in data_list:
# 存储到数据库
self.storage.store_metric(
metric_name,
data['value'],
data['tags'],
data['timestamp']
)
# 检查告警
alert = self.alert_manager.check_alert(metric_name, data['value'])
if alert:
logging.warning(alert)
# 清空已处理的数据
self.collector.metrics.clear()
def report_metric(self, metric_name: str, value: float, tags: Dict = None):
"""上报指标"""
self.collector.report(metric_name, value, tags)
def query_metrics(self, metric_name: str, start_time: datetime = None,
end_time: datetime = None) -> List[Dict]:
"""查询指标数据"""
conn = sqlite3.connect(self.storage.db_path)
cursor = conn.cursor()
query = 'SELECT metric_name, value, tags, timestamp FROM metrics WHERE metric_name = ?'
params = [metric_name]
if start_time:
query += ' AND timestamp >= ?'
params.append(start_time)
if end_time:
query += ' AND timestamp <= ?'
params.append(end_time)
query += ' ORDER BY timestamp DESC LIMIT 100'
cursor.execute(query, params)
results = []
for row in cursor.fetchall():
results.append({
'metric_name': row[0],
'value': row[1],
'tags': json.loads(row[2]) if row[2] else {},
'timestamp': row[3]
})
conn.close()
return results
# 使用示例
def main():
# 配置日志
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
# 创建监控系统
monitor = MonitoringSystem()
# 设置告警阈值
monitor.alert_manager.set_threshold('cpu_usage', 80.0, '>')
monitor.alert_manager.set_threshold('memory_usage', 90.0, '>')
# 启动系统
monitor.start()
try:
# 模拟上报一些指标
for i in range(20):
# 模拟CPU使用率
cpu_usage = 70 + i % 30 # 70-99之间波动
monitor.report_metric('cpu_usage', cpu_usage, {'host': 'server1'})
# 模拟内存使用率
memory_usage = 85 + i % 15 # 85-99之间波动
monitor.report_metric('memory_usage', memory_usage, {'host': 'server1'})
# 模拟请求数
request_count = 100 + i * 10
monitor.report_metric('request_count', request_count,
{'host': 'server1', 'endpoint': '/api'})
time.sleep(1)
# 等待数据刷入
time.sleep(15)
# 查询数据
print("\n查询CPU使用率数据:")
cpu_data = monitor.query_metrics('cpu_usage')
for data in cpu_data[:5]: # 显示前5条
print(f"{data['timestamp']} - {data['metric_name']}: {data['value']}%")
print("\n查询内存使用率数据:")
memory_data = monitor.query_metrics('memory_usage')
for data in memory_data[:5]:
print(f"{data['timestamp']} - {data['metric_name']}: {data['value']}%")
print("\n最近的告警:")
for alert in monitor.alert_manager.alerts[-5:]:
print(f"{alert['timestamp']} - {alert['message']}")
finally:
# 停止系统
monitor.stop()
if __name__ == '__main__':
main()
这个监控系统包含几个核心组件:
-
MetricCollector:负责收集和暂存上报的指标数据,使用线程安全的方式管理内存中的数据。
-
StorageManager:使用SQLite数据库持久化存储指标数据,包含时间戳、指标名、值和标签。
-
AlertManager:管理告警规则,当指标超过阈值时触发告警。
-
MonitoringSystem:主控制器,协调各个组件的工作,包含后台线程定期将数据刷入存储。
系统的工作流程:
- 应用通过
report_metric()方法上报指标 - 数据暂存在内存中,后台线程每10秒批量写入数据库
- 写入时检查是否触发告警
- 可以通过
query_metrics()查询历史数据
这个实现足够简单,可以直接运行,也容易扩展。你可以根据需要添加更多的指标类型、更复杂的告警规则,或者替换为其他存储后端。
总结:这个基础监控系统实现了数据上报、存储和告警的核心功能。
类似于这种的服务?
https://cronhub.io/
https://healthchecks.io/
感谢分享
看着确实游戏类似啊。healthchecks 我使用过,cronhub 第一次见,看着不错。从最近一段时间使用,类似这样的监控还是很好用的. 集中监控这类服务,非常有用
mark

