Python脚本实现跨机器查询数据库并邮件发送结果

B 机器上查询到如下信息: mysql> select count(),rating_score from tmp.task where task_priority = 'normal' and trackingWebsite_id = '7' and date_format(created_at,'%Y-%m-%d') = date_format(now(),'%Y-%m-%d') group by rating_score having count() > 2 order by rating_score desc; +----------+--------------+

| count(*) | rating_score |

+----------+--------------+

| 40 | 300 |

| 30 | 200 |

| 20 | 100 |

| 10 | 50 |

| 5 | 40 |

| 4 | 30 |

| 3 | 0 |

| 2 | -1 |

+----------+--------------+

我如何通过 A 机器上的 python 脚本将 B 机器上的这些查询结果信息(最好能包括查询语句),通过邮件发送出去?

请问有这样的范例吗?


Python脚本实现跨机器查询数据库并邮件发送结果

1 回复

我来写一个跨机器查询数据库并邮件发送结果的脚本。这个需求很常见,核心是处理好数据库连接和邮件发送。

import pandas as pd
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from sqlalchemy import create_engine
import logging
from datetime import datetime

class DatabaseQueryMailer:
    def __init__(self, db_config, mail_config):
        """
        初始化数据库和邮件配置
        
        Args:
            db_config: 数据库连接配置字典
            mail_config: 邮件服务器配置字典
        """
        self.db_config = db_config
        self.mail_config = mail_config
        self.setup_logging()
    
    def setup_logging(self):
        """设置日志记录"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
    
    def create_db_connection(self):
        """创建数据库连接"""
        try:
            # 使用SQLAlchemy创建连接,支持多种数据库
            connection_string = f"mysql+pymysql://{self.db_config['user']}:{self.db_config['password']}@{self.db_config['host']}:{self.db_config['port']}/{self.db_config['database']}"
            engine = create_engine(connection_string)
            return engine.connect()
        except Exception as e:
            self.logger.error(f"数据库连接失败: {e}")
            raise
    
    def execute_query(self, query):
        """执行SQL查询"""
        try:
            with self.create_db_connection() as conn:
                df = pd.read_sql(query, conn)
                self.logger.info(f"查询成功,返回 {len(df)} 行数据")
                return df
        except Exception as e:
            self.logger.error(f"查询执行失败: {e}")
            raise
    
    def df_to_html(self, df):
        """将DataFrame转换为HTML表格"""
        return df.to_html(index=False, border=1, classes='dataframe', justify='center')
    
    def send_email(self, subject, html_content, recipients):
        """发送邮件"""
        try:
            msg = MIMEMultipart('alternative')
            msg['Subject'] = subject
            msg['From'] = self.mail_config['sender']
            msg['To'] = ', '.join(recipients)
            
            # 添加HTML内容
            html_part = MIMEText(html_content, 'html')
            msg.attach(html_part)
            
            # 连接SMTP服务器发送
            with smtplib.SMTP(self.mail_config['smtp_server'], self.mail_config['smtp_port']) as server:
                if self.mail_config.get('use_tls', True):
                    server.starttls()
                server.login(self.mail_config['username'], self.mail_config['password'])
                server.send_message(msg)
            
            self.logger.info(f"邮件已发送给: {recipients}")
            
        except Exception as e:
            self.logger.error(f"邮件发送失败: {e}")
            raise
    
    def run(self, query, email_subject, recipients):
        """主执行方法"""
        try:
            # 执行查询
            df = self.execute_query(query)
            
            # 生成报告
            timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            html_content = f"""
            <html>
            <head>
                <style>
                    .dataframe {{ border-collapse: collapse; width: 100%; }}
                    .dataframe th, .dataframe td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
                    .dataframe th {{ background-color: #f2f2f2; }}
                    .info {{ color: #666; font-size: 12px; margin-bottom: 20px; }}
                </style>
            </head>
            <body>
                <h2>{email_subject}</h2>
                <div class="info">生成时间: {timestamp} | 数据行数: {len(df)}</div>
                {self.df_to_html(df)}
            </body>
            </html>
            """
            
            # 发送邮件
            self.send_email(email_subject, html_content, recipients)
            
        except Exception as e:
            self.logger.error(f"任务执行失败: {e}")
            raise

# 使用示例
if __name__ == "__main__":
    # 数据库配置(示例为MySQL)
    db_config = {
        'host': 'your-db-host',
        'port': 3306,
        'database': 'your_database',
        'user': 'your_username',
        'password': 'your_password'
    }
    
    # 邮件配置(示例为Gmail)
    mail_config = {
        'smtp_server': 'smtp.gmail.com',
        'smtp_port': 587,
        'username': 'your-email@gmail.com',
        'password': 'your-app-password',  # 使用应用专用密码
        'sender': 'your-email@gmail.com',
        'use_tls': True
    }
    
    # SQL查询语句
    query = """
    SELECT 
        user_id,
        username,
        email,
        created_at,
        last_login
    FROM users
    WHERE last_login >= DATE_SUB(NOW(), INTERVAL 7 DAY)
    ORDER BY last_login DESC
    LIMIT 100
    """
    
    # 创建实例并运行
    mailer = DatabaseQueryMailer(db_config, mail_config)
    mailer.run(
        query=query,
        email_subject="上周活跃用户报告",
        recipients=['recipient1@example.com', 'recipient2@example.com']
    )

这个脚本有几个关键点:

  1. 数据库连接:用了SQLAlchemy,这样换数据库(比如PostgreSQL、SQL Server)改个连接字符串就行,不用大改代码。

  2. 邮件发送:支持HTML格式,表格样式好看点,加了时间戳和数据行数这些信息。

  3. 错误处理:每个步骤都有try-catch,出问题能知道是哪一步。

  4. 日志记录:执行过程都记下来,方便查问题。

用的时候注意几个事:数据库驱动得装对(比如MySQL装pymysql),邮件密码最好用应用专用密码(特别是Gmail),敏感信息别写死在代码里。

这脚本可以直接跑,也可以加到crontab里定时执行。

回到顶部