Python中MySQL升级脚本倒序执行的问题如何解决
升级用的 python 里面的 mysql 原生写的
数据库有 3 个状态 0 1 2 ,原状态为 0
第一个 sql 升级状态把 0 升级为 1(处理中)
之后写了一个事物
处理成功后把数据库状态升级为 2 失败则升级数据库为 0 (都同时升级其他字段)
运行有的时候网络波动 会出现处理完成后状态为 1 并且事物所处理的字段也填充上了,这是什么鬼
Python中MySQL升级脚本倒序执行的问题如何解决
4 回复
感觉楼主描述的有点绕啊,根据我的理解,你这个现象很正常啊,说明回写为 2 的操作并没有执行成功啊,
在你写的事务中应该有这样的伪代码
start transaction
done something
if done:
回写 2
transaction commit
else:
回写 0
transaction rollback
这个问题我遇到过,核心在于MySQL的information_schema里没有直接记录执行顺序。你得自己设计版本管理机制。
最直接的做法是给每个升级脚本加个版本号,然后建个表来记录执行历史:
import mysql.connector
from pathlib import Path
class MigrationManager:
def __init__(self, db_config):
self.conn = mysql.connector.connect(**db_config)
self.cursor = self.conn.cursor()
self._init_migration_table()
def _init_migration_table(self):
"""创建迁移记录表"""
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS migration_history (
id INT AUTO_INCREMENT PRIMARY KEY,
version VARCHAR(50) UNIQUE NOT NULL,
filename VARCHAR(255) NOT NULL,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
self.conn.commit()
def get_applied_versions(self):
"""获取已执行的版本"""
self.cursor.execute("SELECT version FROM migration_history ORDER BY applied_at DESC")
return [row[0] for row in self.cursor.fetchall()]
def apply_migration(self, filepath):
"""执行升级脚本"""
with open(filepath, 'r') as f:
sql = f.read()
version = Path(filepath).stem # 用文件名作为版本号,比如 001_initial.sql
try:
# 执行SQL
for statement in sql.split(';'):
if statement.strip():
self.cursor.execute(statement)
# 记录执行历史
self.cursor.execute(
"INSERT INTO migration_history (version, filename) VALUES (%s, %s)",
(version, Path(filepath).name)
)
self.conn.commit()
print(f"Applied: {filepath}")
except Exception as e:
self.conn.rollback()
print(f"Failed to apply {filepath}: {e}")
raise
def rollback_migration(self, version):
"""回滚到指定版本"""
# 获取需要回滚的版本(比指定版本新的都要回滚)
self.cursor.execute("""
SELECT version, filename FROM migration_history
WHERE version > %s
ORDER BY applied_at DESC
""", (version,))
migrations_to_rollback = self.cursor.fetchall()
for mig_version, filename in migrations_to_rollback:
# 这里需要你为每个升级脚本编写对应的回滚脚本
# 比如 upgrade_001.sql 对应 downgrade_001.sql
rollback_file = f"downgrade_{mig_version}.sql"
if Path(rollback_file).exists():
with open(rollback_file, 'r') as f:
sql = f.read()
try:
for statement in sql.split(';'):
if statement.strip():
self.cursor.execute(statement)
# 删除记录
self.cursor.execute(
"DELETE FROM migration_history WHERE version = %s",
(mig_version,)
)
self.conn.commit()
print(f"Rolled back: {mig_version}")
except Exception as e:
self.conn.rollback()
print(f"Failed to rollback {mig_version}: {e}")
raise
else:
print(f"Warning: No rollback script for {mig_version}")
def close(self):
self.cursor.close()
self.conn.close()
# 使用示例
if __name__ == "__main__":
db_config = {
'host': 'localhost',
'user': 'root',
'password': 'password',
'database': 'myapp'
}
manager = MigrationManager(db_config)
# 应用所有新迁移
migration_files = sorted(Path('migrations').glob('upgrade_*.sql'))
applied = manager.get_applied_versions()
for file in migration_files:
version = file.stem.replace('upgrade_', '')
if version not in applied:
manager.apply_migration(file)
# 回滚到指定版本
# manager.rollback_migration('002')
manager.close()
关键点:
- 每个升级脚本都要有对应的回滚脚本
- 用版本号控制执行顺序
- 记录执行历史方便追踪
建议用现成的迁移工具比如Alembic。
处理成功后把数据库状态升级为 2 失败则升级数据库为 0 (都同时升级其他字段)
不太理解你说的,用数据库事务去处理如果失败则回滚到前一个状态,应该是 1 啊,你失败则升级为 0 是什么鬼。
可能有多线程,或者重复数据导致了覆盖
解决办法是第一个 sql 加个条件,只处理状态为 0 的数据

