Python 如何分批次导入 100W 条数据到 MySQL

问题 1 :现在我需要使用 Python 脚本导入 100W 条 mysql 数据,如果要每一次插入 1W 条数据(即每1W条数据提交1次),共插入 100 次,在批量操作中不开启事务, python 脚本应该如何来写?

问题 2 :对于下面的语句,想问一下 conn.commit()是不是把上面的 sql 语句做为事务来提交?

#coding=utf-8 import MySQLdb

conn = MySQLdb.connect(host='localhost',port = 3306,user='admin',passwd='admin',db='google') cur = conn.cursor() try: create_tb_cmd=''' create table if not exists tmp.Video_2017bak (id int(10) unsigned, asset_id int(10) unsigned, company_id int(10), ); ''' cur.execute(create_tb_cmd) finally: insert_tb_cmd=''' insert ignore into tmp.Video_2017bak (select * from google.Video where created_at < date_sub(now(), interval 2 year));''' delete_tb_cmd=''' delete from google.Video where created_at < date_sub(now(), interval 2 year);''' cur.execute(insert_tb_cmd) cur.execute(delete_tb_cmd)

cur.close() conn.commit() conn.close()


Python 如何分批次导入 100W 条数据到 MySQL

5 回复

直接用 MySQL 插入,没必要用 Python


对于导入百万级数据到MySQL,我通常用executemany配合分批处理。这是最直接有效的方法。

import mysql.connector
from mysql.connector import Error

def batch_insert_data(data, batch_size=10000):
    """
    分批插入数据到MySQL
    
    Args:
        data: 要插入的数据列表,每个元素是一个元组
        batch_size: 每批插入的数据量,默认10000
    """
    try:
        # 建立数据库连接
        connection = mysql.connector.connect(
            host='localhost',
            database='your_database',
            user='your_username',
            password='your_password'
        )
        
        if connection.is_connected():
            cursor = connection.cursor()
            
            # 准备插入语句
            insert_query = """
            INSERT INTO your_table (column1, column2, column3) 
            VALUES (%s, %s, %s)
            """
            
            # 分批处理数据
            total_records = len(data)
            for i in range(0, total_records, batch_size):
                batch = data[i:i + batch_size]
                cursor.executemany(insert_query, batch)
                connection.commit()
                
                print(f"已插入 {min(i + batch_size, total_records)}/{total_records} 条记录")
            
            print("数据插入完成!")
            
    except Error as e:
        print(f"数据库错误: {e}")
    finally:
        if connection.is_connected():
            cursor.close()
            connection.close()

# 示例:生成100万条测试数据
def generate_test_data(num_records=1000000):
    """生成测试数据"""
    data = []
    for i in range(num_records):
        # 假设有三列数据
        data.append((f"value1_{i}", f"value2_{i}", f"value3_{i}"))
    return data

# 使用示例
if __name__ == "__main__":
    # 生成100万条测试数据
    print("正在生成测试数据...")
    test_data = generate_test_data(1000000)
    
    # 分批插入数据
    print("开始插入数据...")
    batch_insert_data(test_data, batch_size=10000)

关键点说明:

  1. executemany()方法:这是MySQL Connector/Python的批量操作方法,比逐条插入快得多。它把多个INSERT语句打包成一次数据库调用。

  2. 分批处理逻辑:通过range(0, total_records, batch_size)实现数据切片,每批处理batch_size条记录。

  3. 事务控制:每批数据插入后立即提交(connection.commit()),这样即使中途出错,已提交的数据也不会丢失。

  4. 连接管理:使用try-except-finally确保数据库连接正确关闭。

  5. 参数化查询:使用%s占位符防止SQL注入,数据通过元组列表传递。

实际使用时,你需要:

  • 替换数据库连接参数
  • 根据你的表结构调整INSERT语句
  • 调整batch_size(通常5000-20000效果较好)

如果数据源是文件(如CSV),可以结合pandas的read_csv的chunksize参数来分批读取和处理。

简单总结:用executemany加循环分批提交最稳妥。

我想使用脚本让它每周执行一次,所以想使用 python 脚本来做。

这代码没法看
我搞不清楚 autocommit 默认是不是开启的,所以我会显示地禁用掉。
禁用 autocommit 的情况下,就是写个循环执行 INSERT, 然后搞个计数器,每 10000 条执行一次 conn.commit() 就行了

下面是我自己写的脚本,下面的脚本还能够优化吗?

#coding=utf-8

import MySQLdb

import numpy as np

conn = MySQLdb.connect(host=‘localhost’,port = 3306,user=‘root’,passwd=‘123456’,db=‘google’)
cur = conn.cursor()
conn.autocommit(1)

query_sql = “select id from google.Video where created_at < date_sub(now(), interval 1 year);”
insert_sql = "insert ignore into tmp.Video_2017bak (select * from google.Video where id = %s);"
delete_sql = "delete from google.Video where id = %s;"
cur.execute(query_sql)
dataList = cur.fetchall()
aaa = np.array(dataList)
ids = []

for i in range(len(aaa)):
ids.append(aaa[i])
if (i+1)%100==0 :
cur.executemany(insert_sql,ids)
ids = []
cur.executemany(insert_sql,ids)
ids = []

for i in range(len(aaa)):
ids.append(aaa[i])
if (i+1)%100==0 :
cur.executemany(delete_sql,ids)
ids = []
cur.executemany(delete_sql,ids)
ids = []

cur.close()
conn.close()

回到顶部