Python中MySQL升级脚本倒序执行的问题如何解决

升级用的 python 里面的 mysql 原生写的
数据库有 3 个状态 0 1 2 ,原状态为 0
第一个 sql 升级状态把 0 升级为 1(处理中)
之后写了一个事物
处理成功后把数据库状态升级为 2 失败则升级数据库为 0 (都同时升级其他字段)

运行有的时候网络波动 会出现处理完成后状态为 1 并且事物所处理的字段也填充上了,这是什么鬼
Python中MySQL升级脚本倒序执行的问题如何解决

4 回复

感觉楼主描述的有点绕啊,根据我的理解,你这个现象很正常啊,说明回写为 2 的操作并没有执行成功啊,
在你写的事务中应该有这样的伪代码

start transaction
done something
if done:
回写 2
transaction commit
else:
回写 0
transaction rollback


这个问题我遇到过,核心在于MySQL的information_schema里没有直接记录执行顺序。你得自己设计版本管理机制。

最直接的做法是给每个升级脚本加个版本号,然后建个表来记录执行历史:

import mysql.connector
from pathlib import Path

class MigrationManager:
    def __init__(self, db_config):
        self.conn = mysql.connector.connect(**db_config)
        self.cursor = self.conn.cursor()
        self._init_migration_table()
    
    def _init_migration_table(self):
        """创建迁移记录表"""
        self.cursor.execute("""
            CREATE TABLE IF NOT EXISTS migration_history (
                id INT AUTO_INCREMENT PRIMARY KEY,
                version VARCHAR(50) UNIQUE NOT NULL,
                filename VARCHAR(255) NOT NULL,
                applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        self.conn.commit()
    
    def get_applied_versions(self):
        """获取已执行的版本"""
        self.cursor.execute("SELECT version FROM migration_history ORDER BY applied_at DESC")
        return [row[0] for row in self.cursor.fetchall()]
    
    def apply_migration(self, filepath):
        """执行升级脚本"""
        with open(filepath, 'r') as f:
            sql = f.read()
        
        version = Path(filepath).stem  # 用文件名作为版本号,比如 001_initial.sql
        
        try:
            # 执行SQL
            for statement in sql.split(';'):
                if statement.strip():
                    self.cursor.execute(statement)
            
            # 记录执行历史
            self.cursor.execute(
                "INSERT INTO migration_history (version, filename) VALUES (%s, %s)",
                (version, Path(filepath).name)
            )
            self.conn.commit()
            print(f"Applied: {filepath}")
            
        except Exception as e:
            self.conn.rollback()
            print(f"Failed to apply {filepath}: {e}")
            raise
    
    def rollback_migration(self, version):
        """回滚到指定版本"""
        # 获取需要回滚的版本(比指定版本新的都要回滚)
        self.cursor.execute("""
            SELECT version, filename FROM migration_history 
            WHERE version > %s 
            ORDER BY applied_at DESC
        """, (version,))
        
        migrations_to_rollback = self.cursor.fetchall()
        
        for mig_version, filename in migrations_to_rollback:
            # 这里需要你为每个升级脚本编写对应的回滚脚本
            # 比如 upgrade_001.sql 对应 downgrade_001.sql
            rollback_file = f"downgrade_{mig_version}.sql"
            
            if Path(rollback_file).exists():
                with open(rollback_file, 'r') as f:
                    sql = f.read()
                
                try:
                    for statement in sql.split(';'):
                        if statement.strip():
                            self.cursor.execute(statement)
                    
                    # 删除记录
                    self.cursor.execute(
                        "DELETE FROM migration_history WHERE version = %s",
                        (mig_version,)
                    )
                    
                    self.conn.commit()
                    print(f"Rolled back: {mig_version}")
                    
                except Exception as e:
                    self.conn.rollback()
                    print(f"Failed to rollback {mig_version}: {e}")
                    raise
            else:
                print(f"Warning: No rollback script for {mig_version}")
    
    def close(self):
        self.cursor.close()
        self.conn.close()

# 使用示例
if __name__ == "__main__":
    db_config = {
        'host': 'localhost',
        'user': 'root',
        'password': 'password',
        'database': 'myapp'
    }
    
    manager = MigrationManager(db_config)
    
    # 应用所有新迁移
    migration_files = sorted(Path('migrations').glob('upgrade_*.sql'))
    applied = manager.get_applied_versions()
    
    for file in migration_files:
        version = file.stem.replace('upgrade_', '')
        if version not in applied:
            manager.apply_migration(file)
    
    # 回滚到指定版本
    # manager.rollback_migration('002')
    
    manager.close()

关键点:

  1. 每个升级脚本都要有对应的回滚脚本
  2. 用版本号控制执行顺序
  3. 记录执行历史方便追踪

建议用现成的迁移工具比如Alembic。

处理成功后把数据库状态升级为 2 失败则升级数据库为 0 (都同时升级其他字段)

不太理解你说的,用数据库事务去处理如果失败则回滚到前一个状态,应该是 1 啊,你失败则升级为 0 是什么鬼。

可能有多线程,或者重复数据导致了覆盖
解决办法是第一个 sql 加个条件,只处理状态为 0 的数据

回到顶部