Python中如何解决MySQL表数据去重及导入失败的问题
前几天遇到个新需求,需要不定时将数据导入 MySQL 数据库。既然是数据导入,就会牵扯到数据重复导入和数据导入失败问题。 1.在解决数据重复导入问题上 方案 1:使用原生 sql,利用临时表进行去重后,数据合入原表。首先临时表重复数据去重:使用 select distinct, 然后在合入原表时再使用 insert into tablename(col1, col2, col3...) select col1, col2, col3... from template_tablename where not exists(select col1, col2, col3 from tablename where tablename.col1 = col1 and tablename.col2=col2 and tablename.col3=col3) 进行二次去重,即可达到去重效果。 方案 2:使用 sqlalchmy 方式去重,查询判断表中不存在当前条数据即插入,多条数据,则需要考虑分批插入。 2.数据导入数据库失败: 方案 1:在导入前进行数据规则校验,并在导入后进行查询,如果存在,则跳过,不存在就记录并尝试重新导入。
Python中如何解决MySQL表数据去重及导入失败的问题
如果有好的想法,欢迎留言,大家集思广益,多多提建议,一起搞个技术贴模块的讨论区间出来。鄙人建了个技术群欢迎加入,在这里一起搞事情。902788038
帖子回复:
这个问题很典型,处理MySQL数据去重和导入失败,核心思路是 “先清洗,后导入”。直接硬导入很容易撞上主键或唯一约束错误。下面分步解决:
1. 数据去重 (关键步骤) 去重最好在导入前完成。如果你已经有数据在Python里(比如从CSV、另一个库来的),用pandas最省事:
import pandas as pd
# 假设你的数据在一个DataFrame `df` 里
# 根据业务逻辑确定去重字段,比如按'user_id'和'email'组合去重
df_clean = df.drop_duplicates(subset=['user_id', 'email'], keep='first') # 保留第一条
# 或者更彻底,对所有列完全相同的行去重
df_clean = df.drop_duplicates()
如果数据已经在另一张MySQL表source_table里,需要导入到target_table并去重,直接用SQL查询导入更高效:
-- 假设目标表有唯一索引或主键,使用 INSERT IGNORE 或 REPLACE
-- 方法1: INSERT IGNORE (重复则跳过)
INSERT IGNORE INTO target_table (col1, col2, col3)
SELECT DISTINCT col1, col2, col3 FROM source_table;
-- 方法2: REPLACE (重复则删除旧记录,插入新记录)
REPLACE INTO target_table (col1, col2, col3)
SELECT DISTINCT col1, col2, col3 FROM source_table;
-- 方法3: 如果数据量大,先建临时表去重
CREATE TABLE temp_table LIKE target_table;
INSERT INTO temp_table (col1, col2, col3)
SELECT DISTINCT col1, col2, col3 FROM source_table;
-- 然后清空目标表,再导入临时表数据 (根据业务需求选择)
2. 处理导入失败
在Python中用pymysql或sqlalchemy导入清洗后的数据时,使用事务和批处理,并捕获异常:
import pymysql
from sqlalchemy import create_engine
import pandas as pd
# 方法A: 使用pymysql,手动处理重复
connection = pymysql.connect(host='localhost', user='user', password='pass', database='db')
try:
with connection.cursor() as cursor:
# 假设df_clean是清洗后的DataFrame
for index, row in df_clean.iterrows():
sql = "INSERT IGNORE INTO your_table (col1, col2) VALUES (%s, %s)"
# 使用INSERT IGNORE避免重复报错
cursor.execute(sql, (row['col1'], row['col2']))
connection.commit()
except Exception as e:
print(f"导入出错: {e}")
connection.rollback()
finally:
connection.close()
# 方法B: 使用sqlalchemy + pandas (推荐,自动适应重复策略)
engine = create_engine('mysql+pymysql://user:pass[@localhost](/user/localhost)/db')
# `if_exists='append'` 追加数据,遇到重复索引会报错
# 可以配合`method='multi'`提升速度
df_clean.to_sql('your_table', con=engine, if_exists='append', index=False, method='multi')
如果to_sql因重复报错,可以在导入前设置临时关闭唯一键检查,但务必谨慎:
SET UNIQUE_CHECKS=0;
-- 执行导入...
SET UNIQUE_CHECKS=1;
总结建议: 去重要在导入前用SQL或pandas搞定,导入时用INSERT IGNORE或REPLACE避开重复错误。
为什么不考虑唯一索引
将列设置为主键,拷贝表(忽略主键重复的)
直接在新表设置联合唯一约束,再往新表导入数据,成功的就是唯一的,后面有重复的就会导入失败。
数据量大建议入库后去重,数据量小入库时去重。
不如唯一索引。
是因为数据量大易造成数据库假死吗?
看定时的频率和使用频率
使用频率高:根据判断重复的字段加索引,然后自己写一个 findOrCreate 方法。每一条去执行这个方法。
定时频率高:可以在使用的时候 select x from y order by id(created_at) desc ,这个就可以直接拿到你最近一次存入的值。
可以再详细说一下使用场景,也许会有不同的方案。
使用场景就是 a,b,c 三个字段,每个字段都会有重复,但同时出现 a1,b1,c1 的情况只在数据库中存一次,加索引的话,要在 a,b,c 三个字段加联合唯一索引。


