Python 如何分批次导入 100W 条数据到 MySQL
问题 1 :现在我需要使用 Python 脚本导入 100W 条 mysql 数据,如果要每一次插入 1W 条数据(即每1W条数据提交1次),共插入 100 次,在批量操作中不开启事务, python 脚本应该如何来写?
问题 2 :对于下面的语句,想问一下 conn.commit()是不是把上面的 sql 语句做为事务来提交?
#coding=utf-8 import MySQLdb
conn = MySQLdb.connect(host='localhost',port = 3306,user='admin',passwd='admin',db='google') cur = conn.cursor() try: create_tb_cmd=''' create table if not exists tmp.Video_2017bak (id int(10) unsigned, asset_id int(10) unsigned, company_id int(10), ); ''' cur.execute(create_tb_cmd) finally: insert_tb_cmd=''' insert ignore into tmp.Video_2017bak (select * from google.Video where created_at < date_sub(now(), interval 2 year));''' delete_tb_cmd=''' delete from google.Video where created_at < date_sub(now(), interval 2 year);''' cur.execute(insert_tb_cmd) cur.execute(delete_tb_cmd)
cur.close() conn.commit() conn.close()
Python 如何分批次导入 100W 条数据到 MySQL
直接用 MySQL 插入,没必要用 Python
对于导入百万级数据到MySQL,我通常用executemany配合分批处理。这是最直接有效的方法。
import mysql.connector
from mysql.connector import Error
def batch_insert_data(data, batch_size=10000):
"""
分批插入数据到MySQL
Args:
data: 要插入的数据列表,每个元素是一个元组
batch_size: 每批插入的数据量,默认10000
"""
try:
# 建立数据库连接
connection = mysql.connector.connect(
host='localhost',
database='your_database',
user='your_username',
password='your_password'
)
if connection.is_connected():
cursor = connection.cursor()
# 准备插入语句
insert_query = """
INSERT INTO your_table (column1, column2, column3)
VALUES (%s, %s, %s)
"""
# 分批处理数据
total_records = len(data)
for i in range(0, total_records, batch_size):
batch = data[i:i + batch_size]
cursor.executemany(insert_query, batch)
connection.commit()
print(f"已插入 {min(i + batch_size, total_records)}/{total_records} 条记录")
print("数据插入完成!")
except Error as e:
print(f"数据库错误: {e}")
finally:
if connection.is_connected():
cursor.close()
connection.close()
# 示例:生成100万条测试数据
def generate_test_data(num_records=1000000):
"""生成测试数据"""
data = []
for i in range(num_records):
# 假设有三列数据
data.append((f"value1_{i}", f"value2_{i}", f"value3_{i}"))
return data
# 使用示例
if __name__ == "__main__":
# 生成100万条测试数据
print("正在生成测试数据...")
test_data = generate_test_data(1000000)
# 分批插入数据
print("开始插入数据...")
batch_insert_data(test_data, batch_size=10000)
关键点说明:
-
executemany()方法:这是MySQL Connector/Python的批量操作方法,比逐条插入快得多。它把多个INSERT语句打包成一次数据库调用。
-
分批处理逻辑:通过
range(0, total_records, batch_size)实现数据切片,每批处理batch_size条记录。 -
事务控制:每批数据插入后立即提交(
connection.commit()),这样即使中途出错,已提交的数据也不会丢失。 -
连接管理:使用try-except-finally确保数据库连接正确关闭。
-
参数化查询:使用
%s占位符防止SQL注入,数据通过元组列表传递。
实际使用时,你需要:
- 替换数据库连接参数
- 根据你的表结构调整INSERT语句
- 调整batch_size(通常5000-20000效果较好)
如果数据源是文件(如CSV),可以结合pandas的read_csv的chunksize参数来分批读取和处理。
简单总结:用executemany加循环分批提交最稳妥。
我想使用脚本让它每周执行一次,所以想使用 python 脚本来做。
这代码没法看
我搞不清楚 autocommit 默认是不是开启的,所以我会显示地禁用掉。
禁用 autocommit 的情况下,就是写个循环执行 INSERT, 然后搞个计数器,每 10000 条执行一次 conn.commit() 就行了
下面是我自己写的脚本,下面的脚本还能够优化吗?
#coding=utf-8
import MySQLdb
import numpy as np
conn = MySQLdb.connect(host=‘localhost’,port = 3306,user=‘root’,passwd=‘123456’,db=‘google’)
cur = conn.cursor()
conn.autocommit(1)
query_sql = “select id from google.Video where created_at < date_sub(now(), interval 1 year);”
insert_sql = "insert ignore into tmp.Video_2017bak (select * from google.Video where id = %s);"
delete_sql = "delete from google.Video where id = %s;"
cur.execute(query_sql)
dataList = cur.fetchall()
aaa = np.array(dataList)
ids = []
for i in range(len(aaa)):
ids.append(aaa[i])
if (i+1)%100==0 :
cur.executemany(insert_sql,ids)
ids = []
cur.executemany(insert_sql,ids)
ids = []
for i in range(len(aaa)):
ids.append(aaa[i])
if (i+1)%100==0 :
cur.executemany(delete_sql,ids)
ids = []
cur.executemany(delete_sql,ids)
ids = []
cur.close()
conn.close()

