Python如何实现一个简单的基于数据上报的监控系统?

大家平时是否有过这样的场景呢:

(1)定时备份数据到备份机

(2)后台进程统计一些数据

(3)每隔一段时间执行一些逻辑(同步数据等)

但是,你是怎么确保:

(1)代码执行

(2)执行是否成功

简单的方式可以是代码执行完后触发一个通知(短信、邮件、微信等),看起来好像也可以满足需求. 但是,仔细想想有如下弊端:

1.假如有几十个类似的服务(分布在不同机器),每个服务执行完都触发通知. 这样相当于通知功能重复了几十遍(配置短信、微信、邮件等).

2.要是一些通知方式修改了(邮件等账号变了或短信服务器变了等),意味着需要修改分布很多地方的代码

3.难以统计分析,比如要分许某个服务一段时间内稳定性等.只能手动搜索邮件等去人肉查看了.

我简单实现了一个针对这样场景的监控系统 https://github.com/510908220/heartbeats 有需要的可以交流改进~


Python如何实现一个简单的基于数据上报的监控系统?

6 回复

我来给你一个简单的基于数据上报的监控系统实现。这个系统包含数据上报、存储、查询和告警功能。

import time
import json
import threading
from datetime import datetime
from collections import defaultdict
from typing import Dict, List, Optional
import sqlite3
import logging

class MetricCollector:
    """指标收集器"""
    def __init__(self):
        self.metrics = defaultdict(list)
        self.lock = threading.Lock()
        
    def report(self, metric_name: str, value: float, tags: Dict = None):
        """上报指标数据"""
        timestamp = datetime.now()
        metric_data = {
            'timestamp': timestamp,
            'value': value,
            'tags': tags or {}
        }
        
        with self.lock:
            self.metrics[metric_name].append(metric_data)
        
        logging.info(f"Metric reported: {metric_name}={value}")

class StorageManager:
    """存储管理器"""
    def __init__(self, db_path='monitoring.db'):
        self.db_path = db_path
        self._init_database()
        
    def _init_database(self):
        """初始化数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 创建指标表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS metrics (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                metric_name TEXT NOT NULL,
                value REAL NOT NULL,
                tags TEXT,
                timestamp DATETIME NOT NULL
            )
        ''')
        
        # 创建索引
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_metric_name ON metrics(metric_name)')
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_timestamp ON metrics(timestamp)')
        
        conn.commit()
        conn.close()
    
    def store_metric(self, metric_name: str, value: float, tags: Dict, timestamp: datetime):
        """存储指标到数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            INSERT INTO metrics (metric_name, value, tags, timestamp)
            VALUES (?, ?, ?, ?)
        ''', (metric_name, value, json.dumps(tags), timestamp))
        
        conn.commit()
        conn.close()

class AlertManager:
    """告警管理器"""
    def __init__(self):
        self.alerts = []
        self.thresholds = {}
        
    def set_threshold(self, metric_name: str, threshold: float, condition: str = '>'):
        """设置告警阈值"""
        self.thresholds[metric_name] = {
            'threshold': threshold,
            'condition': condition
        }
    
    def check_alert(self, metric_name: str, value: float) -> Optional[str]:
        """检查是否需要告警"""
        if metric_name not in self.thresholds:
            return None
            
        threshold_info = self.thresholds[metric_name]
        threshold = threshold_info['threshold']
        condition = threshold_info['condition']
        
        alert_triggered = False
        if condition == '>' and value > threshold:
            alert_triggered = True
        elif condition == '<' and value < threshold:
            alert_triggered = True
        elif condition == '==' and value == threshold:
            alert_triggered = True
            
        if alert_triggered:
            alert_msg = f"ALERT: {metric_name}={value} {condition} {threshold}"
            self.alerts.append({
                'timestamp': datetime.now(),
                'message': alert_msg,
                'metric': metric_name,
                'value': value
            })
            return alert_msg
        
        return None

class MonitoringSystem:
    """监控系统主类"""
    def __init__(self):
        self.collector = MetricCollector()
        self.storage = StorageManager()
        self.alert_manager = AlertManager()
        self.running = False
        self.flush_thread = None
        
    def start(self):
        """启动监控系统"""
        self.running = True
        self.flush_thread = threading.Thread(target=self._flush_worker, daemon=True)
        self.flush_thread.start()
        logging.info("Monitoring system started")
    
    def stop(self):
        """停止监控系统"""
        self.running = False
        if self.flush_thread:
            self.flush_thread.join(timeout=5)
        logging.info("Monitoring system stopped")
    
    def _flush_worker(self):
        """后台线程:定期将数据刷入存储"""
        while self.running:
            time.sleep(10)  # 每10秒刷一次数据
            
            with self.collector.lock:
                for metric_name, data_list in self.collector.metrics.items():
                    for data in data_list:
                        # 存储到数据库
                        self.storage.store_metric(
                            metric_name,
                            data['value'],
                            data['tags'],
                            data['timestamp']
                        )
                        
                        # 检查告警
                        alert = self.alert_manager.check_alert(metric_name, data['value'])
                        if alert:
                            logging.warning(alert)
                
                # 清空已处理的数据
                self.collector.metrics.clear()
    
    def report_metric(self, metric_name: str, value: float, tags: Dict = None):
        """上报指标"""
        self.collector.report(metric_name, value, tags)
    
    def query_metrics(self, metric_name: str, start_time: datetime = None, 
                     end_time: datetime = None) -> List[Dict]:
        """查询指标数据"""
        conn = sqlite3.connect(self.storage.db_path)
        cursor = conn.cursor()
        
        query = 'SELECT metric_name, value, tags, timestamp FROM metrics WHERE metric_name = ?'
        params = [metric_name]
        
        if start_time:
            query += ' AND timestamp >= ?'
            params.append(start_time)
        if end_time:
            query += ' AND timestamp <= ?'
            params.append(end_time)
        
        query += ' ORDER BY timestamp DESC LIMIT 100'
        
        cursor.execute(query, params)
        results = []
        
        for row in cursor.fetchall():
            results.append({
                'metric_name': row[0],
                'value': row[1],
                'tags': json.loads(row[2]) if row[2] else {},
                'timestamp': row[3]
            })
        
        conn.close()
        return results

# 使用示例
def main():
    # 配置日志
    logging.basicConfig(level=logging.INFO, 
                       format='%(asctime)s - %(levelname)s - %(message)s')
    
    # 创建监控系统
    monitor = MonitoringSystem()
    
    # 设置告警阈值
    monitor.alert_manager.set_threshold('cpu_usage', 80.0, '>')
    monitor.alert_manager.set_threshold('memory_usage', 90.0, '>')
    
    # 启动系统
    monitor.start()
    
    try:
        # 模拟上报一些指标
        for i in range(20):
            # 模拟CPU使用率
            cpu_usage = 70 + i % 30  # 70-99之间波动
            monitor.report_metric('cpu_usage', cpu_usage, {'host': 'server1'})
            
            # 模拟内存使用率
            memory_usage = 85 + i % 15  # 85-99之间波动
            monitor.report_metric('memory_usage', memory_usage, {'host': 'server1'})
            
            # 模拟请求数
            request_count = 100 + i * 10
            monitor.report_metric('request_count', request_count, 
                                {'host': 'server1', 'endpoint': '/api'})
            
            time.sleep(1)
        
        # 等待数据刷入
        time.sleep(15)
        
        # 查询数据
        print("\n查询CPU使用率数据:")
        cpu_data = monitor.query_metrics('cpu_usage')
        for data in cpu_data[:5]:  # 显示前5条
            print(f"{data['timestamp']} - {data['metric_name']}: {data['value']}%")
        
        print("\n查询内存使用率数据:")
        memory_data = monitor.query_metrics('memory_usage')
        for data in memory_data[:5]:
            print(f"{data['timestamp']} - {data['metric_name']}: {data['value']}%")
        
        print("\n最近的告警:")
        for alert in monitor.alert_manager.alerts[-5:]:
            print(f"{alert['timestamp']} - {alert['message']}")
            
    finally:
        # 停止系统
        monitor.stop()

if __name__ == '__main__':
    main()

这个监控系统包含几个核心组件:

  1. MetricCollector:负责收集和暂存上报的指标数据,使用线程安全的方式管理内存中的数据。

  2. StorageManager:使用SQLite数据库持久化存储指标数据,包含时间戳、指标名、值和标签。

  3. AlertManager:管理告警规则,当指标超过阈值时触发告警。

  4. MonitoringSystem:主控制器,协调各个组件的工作,包含后台线程定期将数据刷入存储。

系统的工作流程:

  • 应用通过report_metric()方法上报指标
  • 数据暂存在内存中,后台线程每10秒批量写入数据库
  • 写入时检查是否触发告警
  • 可以通过query_metrics()查询历史数据

这个实现足够简单,可以直接运行,也容易扩展。你可以根据需要添加更多的指标类型、更复杂的告警规则,或者替换为其他存储后端。

总结:这个基础监控系统实现了数据上报、存储和告警的核心功能。

类似于这种的服务?
https://cronhub.io/
https://healthchecks.io/

感谢分享

看着确实游戏类似啊。healthchecks 我使用过,cronhub 第一次见,看着不错。从最近一段时间使用,类似这样的监控还是很好用的. 集中监控这类服务,非常有用

回到顶部