Python中如何使用pymysql进行数据库操作求助
用 pymysql 对数据库进行迁移,从一个库插入数据到另一个库,一个表里大概有 7000 多条数据,运行第一次会失败,第二次或者第三次运行可能会成功,多运行几次可能会成功也可能失败,原表数据没有动过,而且每次报错都是不同的错,有时候说是格式错误,有时候说是 sql 语法错误,有大佬知道是什么原因吗?
错误 1:pymysql.err.InternalError: (1292, “172.21.12.110:3336/GW_DB_TEST_0001 | Incorrect datetime value: ‘2013-�4-20 13:44:10’ for column ‘UPDATE_DATE’ at row 30”) —表里的时间字段是没有问题的,有时候插入到几百行的时候报错,有时候在插入几千行的时候报错
错误 2:You have an error in your SQL syntax; check sql syntax error --也会不知道在第几条数据的时候报错
求助啊!!
Python中如何使用pymysql进行数据库操作求助
如果还我处理,在不知道 bug 时候我会吧 pymsql 换成 mysqlclient 再来测试一次,如果都出错,那是我迁移有问题,那就换方法迁移或者排错
import pymysql
# 1. 建立数据库连接
connection = pymysql.connect(
host='localhost', # 数据库地址
user='your_username', # 用户名
password='your_password', # 密码
database='your_database', # 数据库名
charset='utf8mb4', # 字符编码
cursorclass=pymysql.cursors.DictCursor # 返回字典格式的结果
)
try:
# 2. 创建游标对象
with connection.cursor() as cursor:
# 3. 执行SQL查询
# 示例1: 创建表
create_table_sql = """
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
cursor.execute(create_table_sql)
# 示例2: 插入数据
insert_sql = "INSERT INTO users (name, email) VALUES (%s, %s)"
# 插入单条数据
cursor.execute(insert_sql, ('张三', 'zhangsan@example.com'))
# 插入多条数据
users_data = [
('李四', 'lisi@example.com'),
('王五', 'wangwu@example.com')
]
cursor.executemany(insert_sql, users_data)
# 示例3: 查询数据
select_sql = "SELECT * FROM users WHERE name LIKE %s"
cursor.execute(select_sql, ('%张%',))
# 获取所有结果
results = cursor.fetchall()
print("查询结果:")
for row in results:
print(f"ID: {row['id']}, 姓名: {row['name']}, 邮箱: {row['email']}")
# 获取单条结果
cursor.execute("SELECT * FROM users WHERE id = %s", (1,))
single_result = cursor.fetchone()
print(f"单条记录: {single_result}")
# 示例4: 更新数据
update_sql = "UPDATE users SET email = %s WHERE id = %s"
cursor.execute(update_sql, ('new_email@example.com', 1))
# 示例5: 删除数据
delete_sql = "DELETE FROM users WHERE id = %s"
cursor.execute(delete_sql, (3,))
# 4. 提交事务
connection.commit()
# 示例6: 带条件查询和排序
cursor.execute("""
SELECT * FROM users
WHERE created_at > '2023-01-01'
ORDER BY created_at DESC
LIMIT 10
""")
recent_users = cursor.fetchall()
finally:
# 5. 关闭连接
connection.close()
# 使用上下文管理器简化连接管理(推荐)
def query_users():
with pymysql.connect(
host='localhost',
user='your_username',
password='your_password',
database='your_database',
charset='utf8mb4'
) as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM users")
return cursor.fetchall()
# 错误处理示例
try:
conn = pymysql.connect(host='localhost', user='root', password='', database='test')
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM non_existent_table")
except pymysql.Error as e:
print(f"数据库错误: {e}")
finally:
if 'conn' in locals():
conn.close()
核心要点:
pymysql.connect()建立连接,参数包括主机、用户名、密码、数据库名connection.cursor()创建游标,DictCursor让结果以字典形式返回cursor.execute()执行SQL,用参数化查询防止SQL注入cursor.fetchall()/fetchone()获取查询结果connection.commit()提交事务,connection.rollback()回滚- 使用
with语句自动管理连接和游标的关闭
简单总结: 连接→游标→执行SQL→处理结果→提交→关闭连接,记得用参数化查询防注入。
建议你直接从源库用数据库的导出功能直接导出 SQL 文件,再到目标库创建相同数据结构直接用 SQL 文件导入,然后在目标库里用 sql 直接来处理。
如果有非常逻辑复杂的预处理,那么直接用 python 生成适合目标库的 sql 批插入 sql 文件,在目标库用 sql 文件导入。
这样每步都可控好排错。
随缘 debug 吗?打 log 或者设置断点
编码问题?
不建议用 pymysql 做迁移。
宁可用 bash 写,莫用 pymysql (个人经历)
没记错的话,各种编码坑,各种带不带引号有坑,各种因为字段内容需要转义有坑。
迁移的话,percona-xtrabackup 考虑一下
才 7000 多个,拿出 txt 直接拼 sql 了
percona-xtrabackup 考虑一下

