Python中如何解决MySQL表数据去重及导入失败的问题

前几天遇到个新需求,需要不定时将数据导入 MySQL 数据库。既然是数据导入,就会牵扯到数据重复导入和数据导入失败问题。 1.在解决数据重复导入问题上 方案 1:使用原生 sql,利用临时表进行去重后,数据合入原表。首先临时表重复数据去重:使用 select distinct, 然后在合入原表时再使用 insert into tablename(col1, col2, col3...) select col1, col2, col3... from template_tablename where not exists(select col1, col2, col3 from tablename where tablename.col1 = col1 and tablename.col2=col2 and tablename.col3=col3) 进行二次去重,即可达到去重效果。 方案 2:使用 sqlalchmy 方式去重,查询判断表中不存在当前条数据即插入,多条数据,则需要考虑分批插入。 2.数据导入数据库失败: 方案 1:在导入前进行数据规则校验,并在导入后进行查询,如果存在,则跳过,不存在就记录并尝试重新导入。


Python中如何解决MySQL表数据去重及导入失败的问题

11 回复

如果有好的想法,欢迎留言,大家集思广益,多多提建议,一起搞个技术贴模块的讨论区间出来。鄙人建了个技术群欢迎加入,在这里一起搞事情。902788038


帖子回复:

这个问题很典型,处理MySQL数据去重和导入失败,核心思路是 “先清洗,后导入”。直接硬导入很容易撞上主键或唯一约束错误。下面分步解决:

1. 数据去重 (关键步骤) 去重最好在导入前完成。如果你已经有数据在Python里(比如从CSV、另一个库来的),用pandas最省事:

import pandas as pd

# 假设你的数据在一个DataFrame `df` 里
# 根据业务逻辑确定去重字段,比如按'user_id'和'email'组合去重
df_clean = df.drop_duplicates(subset=['user_id', 'email'], keep='first')  # 保留第一条

# 或者更彻底,对所有列完全相同的行去重
df_clean = df.drop_duplicates()

如果数据已经在另一张MySQL表source_table里,需要导入到target_table并去重,直接用SQL查询导入更高效:

-- 假设目标表有唯一索引或主键,使用 INSERT IGNORE 或 REPLACE
-- 方法1: INSERT IGNORE (重复则跳过)
INSERT IGNORE INTO target_table (col1, col2, col3)
SELECT DISTINCT col1, col2, col3 FROM source_table;

-- 方法2: REPLACE (重复则删除旧记录,插入新记录)
REPLACE INTO target_table (col1, col2, col3)
SELECT DISTINCT col1, col2, col3 FROM source_table;

-- 方法3: 如果数据量大,先建临时表去重
CREATE TABLE temp_table LIKE target_table;
INSERT INTO temp_table (col1, col2, col3)
SELECT DISTINCT col1, col2, col3 FROM source_table;
-- 然后清空目标表,再导入临时表数据 (根据业务需求选择)

2. 处理导入失败 在Python中用pymysqlsqlalchemy导入清洗后的数据时,使用事务和批处理,并捕获异常:

import pymysql
from sqlalchemy import create_engine
import pandas as pd

# 方法A: 使用pymysql,手动处理重复
connection = pymysql.connect(host='localhost', user='user', password='pass', database='db')
try:
    with connection.cursor() as cursor:
        # 假设df_clean是清洗后的DataFrame
        for index, row in df_clean.iterrows():
            sql = "INSERT IGNORE INTO your_table (col1, col2) VALUES (%s, %s)"
            # 使用INSERT IGNORE避免重复报错
            cursor.execute(sql, (row['col1'], row['col2']))
    connection.commit()
except Exception as e:
    print(f"导入出错: {e}")
    connection.rollback()
finally:
    connection.close()

# 方法B: 使用sqlalchemy + pandas (推荐,自动适应重复策略)
engine = create_engine('mysql+pymysql://user:pass[@localhost](/user/localhost)/db')
# `if_exists='append'` 追加数据,遇到重复索引会报错
# 可以配合`method='multi'`提升速度
df_clean.to_sql('your_table', con=engine, if_exists='append', index=False, method='multi')

如果to_sql因重复报错,可以在导入前设置临时关闭唯一键检查,但务必谨慎

SET UNIQUE_CHECKS=0;
-- 执行导入...
SET UNIQUE_CHECKS=1;

总结建议: 去重要在导入前用SQL或pandas搞定,导入时用INSERT IGNOREREPLACE避开重复错误。

为什么不考虑唯一索引

将列设置为主键,拷贝表(忽略主键重复的)

直接在新表设置联合唯一约束,再往新表导入数据,成功的就是唯一的,后面有重复的就会导入失败。

数据量大建议入库后去重,数据量小入库时去重。

可以使用唯一索引的,其实在刚开始的时候用的就是唯一索引,后面改成了 insert into select where…,然后又改成了 sqlalchemy 来操作,原因就是原生 sql 不易维护,且版本影响较大,比如关键字 ignore 在 5.7 之后就不再使用。

不如唯一索引。

是因为数据量大易造成数据库假死吗?

看定时的频率和使用频率
使用频率高:根据判断重复的字段加索引,然后自己写一个 findOrCreate 方法。每一条去执行这个方法。
定时频率高:可以在使用的时候 select x from y order by id(created_at) desc ,这个就可以直接拿到你最近一次存入的值。
可以再详细说一下使用场景,也许会有不同的方案。

使用场景就是 a,b,c 三个字段,每个字段都会有重复,但同时出现 a1,b1,c1 的情况只在数据库中存一次,加索引的话,要在 a,b,c 三个字段加联合唯一索引。

回到顶部