Python中如何实现MySQL多线程插入数据?
python3 写了个爬虫。
多线程爬取数据写入数据库
一开始使用的是 sqllite3,一直提示 database is blocked,后来发现 sqllite 不支持多线程读写
然后谷歌搜了一下,看到了别人推荐使用 mysql 来做这种任务...我只会基础的 CURD 操作,用了简单的多线程来操作,然而 mysql 貌似也不能直接多线程插入数据,stackoverflow 里面的老哥们说是需要建立连接池...头疼,之前只接触过基础的操作,请各位 V2er 赐教
python3 代码:
def getExin():
while not Q.empty():
i = Q.get()
.*
try:
.*
except :
continue
try:
c.execute("INSERT INTO exin (username, password) \
VALUES (%s,'%s')" % (username, password))
conn.commit()
except:
pass
for i in range(0, 99475):
if len(str(i)) < 5:
i = (5 - len(str(i))) * “0” + str(i)
Q.put(i)
threads = []
conn = connect(user=‘root’, password=‘root’, database=‘test’)
c = conn.cursor()
for i in range(800):
x = threading.Thread(target=getExin,args=())
threads.append(x)
for t in threads:
t.start()
c.close()
print(“All done”)
Python中如何实现MySQL多线程插入数据?
一个线程对应一个链接就行了
import threading
import queue
import pymysql
import time
class MySQLMultiThreadInsert:
def __init__(self, host, user, password, database, thread_num=5, batch_size=100):
self.host = host
self.user = user
self.password = password
self.database = database
self.thread_num = thread_num
self.batch_size = batch_size
self.data_queue = queue.Queue()
self.threads = []
self._running = False
def start(self):
"""启动所有工作线程"""
self._running = True
for i in range(self.thread_num):
thread = threading.Thread(
target=self._worker,
name=f"MySQL-Worker-{i}",
daemon=True
)
thread.start()
self.threads.append(thread)
def stop(self):
"""停止所有工作线程"""
self._running = False
self.data_queue.join() # 等待队列中的所有任务完成
for thread in self.threads:
thread.join(timeout=5)
def add_data(self, data_list):
"""添加数据到队列,自动分批"""
for i in range(0, len(data_list), self.batch_size):
batch = data_list[i:i + self.batch_size]
self.data_queue.put(batch)
def _worker(self):
"""工作线程函数"""
# 每个线程创建自己的数据库连接
conn = pymysql.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database,
charset='utf8mb4',
autocommit=True
)
cursor = conn.cursor()
try:
while self._running or not self.data_queue.empty():
try:
# 设置超时避免线程永久阻塞
batch_data = self.data_queue.get(timeout=1)
except queue.Empty:
continue
try:
# 批量插入数据
sql = "INSERT INTO your_table (col1, col2, col3) VALUES (%s, %s, %s)"
cursor.executemany(sql, batch_data)
conn.commit()
except Exception as e:
print(f"插入失败: {e}")
conn.rollback()
finally:
self.data_queue.task_done()
finally:
cursor.close()
conn.close()
def wait_complete(self):
"""等待所有数据处理完成"""
self.data_queue.join()
# 使用示例
if __name__ == "__main__":
# 配置数据库连接
inserter = MySQLMultiThreadInsert(
host="localhost",
user="root",
password="password",
database="test_db",
thread_num=4, # 4个工作线程
batch_size=500 # 每批500条数据
)
# 启动工作线程
inserter.start()
# 模拟生成测试数据
test_data = []
for i in range(10000):
test_data.append((f"value1_{i}", f"value2_{i}", f"value3_{i}"))
# 添加数据到队列
inserter.add_data(test_data)
# 等待所有数据插入完成
inserter.wait_complete()
# 停止工作线程
inserter.stop()
这个实现的核心要点:
- 线程安全队列:使用
queue.Queue作为线程安全的数据缓冲区 - 连接池模式:每个工作线程维护独立的数据库连接,避免连接竞争
- 批量插入:通过
executemany()实现批量插入,减少网络往返 - 优雅关闭:支持安全停止所有工作线程
关键参数说明:
thread_num:控制并发线程数,建议根据CPU核心数和数据库负载调整batch_size:每批插入的数据量,影响插入效率和内存使用
表结构需要提前创建:
CREATE TABLE your_table (
id INT AUTO_INCREMENT PRIMARY KEY,
col1 VARCHAR(255),
col2 VARCHAR(255),
col3 VARCHAR(255)
);
总结:用生产者-消费者模式配合批量插入最有效。
尝试过会提示连接不上,我这里是 1000 线程
executemany 不行么 是什么场景非要多线程多个事务去写入哪
访问一次目标获取一次信息,然后写入数据库,就这么简单的逻辑,但是单线程太慢了…
要么一次一次写 redis 要么把取回的数据先存成 list 或者 tuple 到了一定的量再批量插入 mysql 你这么搞 太费资源了
资源不是问题,只是不想把简单问题复杂化,谢谢老哥啦
你这只是一个连接多线程使用吧
你现在 Q 是共享的,那 Q 的 get,put 是不是阻塞的?然后多个线程也是操作一个连接作数据插入,这样多线程有什么意义呢,除非你的数据准备阶段占时间开销的大头,否则这样并不能提升速度
老生常谈提示下:得用 InnoDB
我尝试过多线程每个都链接一次,但是 mysql 会拒绝我的访问…我开了 800~1000 的线程
Q 是 queue,应该不会阻塞吧,速度提升确实肉眼可见啊…
executemany 应该不慢吧
数据量挺大的,直接放入字典,我怕内存会爆啊
看看 MySQL 默认最大连接数多少,改下配置
这种情况,批量 insert 最佳
插入速度优化上百倍
你如果不批量插入,开再多线程,HDD 上一秒也只能提交 100 个事务。
你开多线程获得的性能提升未必有批量插入大。
我瞎比说说:
for i in range(800):
x = threading.Thread(target=getExin,args=())
t.setDaemon(True)
x.start()
不懂 py … 不过你这么搞肯定有问题 线程开太高 mysql 最大连接数会有问题 就算连得上 你 IO 还是会有问题 和#18 说的一样 你开多线程不一定有拼接 sql 批量插入来的实在
中间用个队列把数据存起来 然后起个线程批量插入就好了
这种事吧,celery 就好啦 gevent 开起来妈妈再也不用担心并发上不去了
搞 thread 多麻烦
之前没有写过协程、异步。昨晚看了一下,启发很大


