Python多线程爬虫写入MySQL速度特别慢如何优化

null
Python多线程爬虫写入MySQL速度特别慢如何优化

31 回复

那你让它快点写


我遇到过多线程爬虫写MySQL慢的问题,核心瓶颈通常是数据库连接和事务处理。给你一个优化方案:

import threading
import queue
import pymysql
from pymysql import cursors
from concurrent.futures import ThreadPoolExecutor
import time

class MySQLBatchWriter:
    def __init__(self, host, user, password, database, batch_size=1000, max_workers=5):
        self.batch_size = batch_size
        self.data_queue = queue.Queue(maxsize=5000)
        self.lock = threading.Lock()
        self.connection_pool = []
        
        # 创建连接池
        for _ in range(max_workers):
            conn = pymysql.connect(
                host=host,
                user=user,
                password=password,
                database=database,
                charset='utf8mb4',
                cursorclass=cursors.DictCursor,
                autocommit=False  # 关闭自动提交
            )
            self.connection_pool.append(conn)
        
        # 启动写入线程
        self.writer_thread = threading.Thread(target=self._batch_writer, daemon=True)
        self.writer_thread.start()
    
    def add_data(self, data_dict):
        """生产者线程调用此方法添加数据"""
        self.data_queue.put(data_dict)
    
    def _batch_writer(self):
        """专门的写入线程,批量提交"""
        batch = []
        conn_index = 0
        
        while True:
            try:
                # 从队列获取数据
                data = self.data_queue.get(timeout=1)
                batch.append(data)
                
                # 达到批量大小或队列为空时写入
                if len(batch) >= self.batch_size or self.data_queue.empty():
                    if batch:
                        self._insert_batch(batch, conn_index)
                        batch = []
                        conn_index = (conn_index + 1) % len(self.connection_pool)
                        
            except queue.Empty:
                if batch:
                    self._insert_batch(batch, conn_index)
                    batch = []
                time.sleep(0.1)
    
    def _insert_batch(self, batch, conn_index):
        """批量插入数据"""
        conn = self.connection_pool[conn_index]
        try:
            with conn.cursor() as cursor:
                # 构建批量插入SQL
                placeholders = ', '.join(['%s'] * len(batch[0]))
                columns = ', '.join(batch[0].keys())
                sql = f"INSERT INTO your_table ({columns}) VALUES ({placeholders})"
                
                # 准备数据
                values = [tuple(item.values()) for item in batch]
                
                # 执行批量插入
                cursor.executemany(sql, values)
                conn.commit()
                
                print(f"批量插入 {len(batch)} 条数据成功")
                
        except Exception as e:
            conn.rollback()
            print(f"批量插入失败: {e}")
            # 这里可以添加重试或错误处理逻辑
    
    def close(self):
        """关闭所有连接"""
        for conn in self.connection_pool:
            conn.close()

# 使用示例
def crawler_worker(writer, page_range):
    """爬虫工作线程"""
    for page in page_range:
        # 模拟爬取数据
        data = {
            'title': f'page_{page}',
            'url': f'http://example.com/page/{page}',
            'content': '...'
        }
        writer.add_data(data)

# 主程序
if __name__ == '__main__':
    # 创建批量写入器
    writer = MySQLBatchWriter(
        host='localhost',
        user='root',
        password='password',
        database='test_db',
        batch_size=500,  # 每500条批量提交一次
        max_workers=3    # 3个数据库连接
    )
    
    # 创建爬虫线程池
    with ThreadPoolExecutor(max_workers=10) as executor:
        # 分配爬取任务
        page_ranges = [(i*100, (i+1)*100) for i in range(10)]
        futures = []
        
        for start, end in page_ranges:
            future = executor.submit(crawler_worker, writer, range(start, end))
            futures.append(future)
        
        # 等待所有爬虫任务完成
        for future in futures:
            future.result()
    
    # 等待队列中的数据全部写入
    time.sleep(2)
    writer.close()

关键优化点:

  1. 批量写入:使用executemany()批量插入,减少网络往返
  2. 连接池:复用数据库连接,避免频繁创建连接的开销
  3. 生产者-消费者模式:爬虫线程只负责生产数据,专门的写入线程负责批量提交
  4. 关闭自动提交:手动控制事务提交,批量操作完成后一次性提交
  5. 队列缓冲:使用队列解耦爬取和写入,避免相互阻塞

这样改造后,写入速度能提升几十倍。记得根据实际情况调整batch_size和连接数。

磁盘不行?

用 redis 缓存一层再持久(滑稽)

#!/usr/local/bin/python2.7
# -- coding:utf-8 --
import re, time, random, hashlib, urllib, requests, os, math, json, sys, base64, torndb, uuid, threading, sys, itertools, copy, traceback
import requests;requests.packages.urllib3.disable_warnings()
from common import
from config import

mkdir(‘jobs’)

class MyWorker(Worker):
table_coms = ‘zp_coms’
table_jobs = ‘zp_jobs’
htmlfolder = ‘jobs’
def crawlTask(self):
try:
url = self.getAddr() # 获取 url
htm = self.getHtml(url, None, 0)#获取 html
#getJob 解析 html 获取职位数据
self.record.update(self.getJob(htm))
self.record[‘flag’] = 10
except:
self.record[‘flag’] = 99
print traceback.format_exc()
finally:
#向每条职位数据中添加字段
self.record[‘job_link_href’] = url
self.record[‘company_type’]=’’
self.record[‘company_tel’]=’’
self.record[‘company_email’]=’’
self.record[‘job_type’]=’’
self.record[‘job_type_code’]=’’
self.record[‘company_fax’]=’’
#本次任务处理完成后更新数据库
self.update(self.table_jobs, self.record)
def getJob(self, job):
self.record[‘job_title’] = grep(u’<h1>(.+?)</h1>’, job)
self.record[‘job_salary’] = grep(u’<span class=“red”>(.+?)</span>’, job)
self.record[‘job_date’] = grep(u’<span.
?>发布于(.+?)</span>’, job)
infos = grep(u’<div class=“info-primary”>.+?<p>(.+?)</p>’, job, re.S).replace(’<em class=“vline”></em>’, ‘|’).split(’|’)
for info in infos:
if u’城市’ in info:
self.record[‘city_text’] = info[3:]
self.record[‘job_city_code’] = info[3:]
break
cmp_infos = grep(u’h3 class=“name”.+?p>(.+?)</p>’,job,re.S).replace(’<em class=“vline”></em>’, ‘|’).split(’|’)
for info in cmp_infos:
info = re.sub(’<.+?>’, ‘’,info)
if info in paramx[‘s’].values():
self.record[‘company_size’] = info
continue
if info in paramx[‘i’].values():
self.record[‘job_industry_code’] = info
self.record[‘company_industry’] = info
continue
self.record[‘company_linkman’] = re.sub(’<.
?>’, ‘’,grep(u’<h2 class=“name”>(.+?)</h2>’,job,re.S)).strip()
self.record[‘address’] = grep(u’div class=“location-address”>(.+?)</div>’,job,re.S)
self.record[‘company_link_url’] = ‘https://www.zhipin.com’+grep(u’ka=“job-detail-company” href="(.+?)"’, job,re.S)
self.record[‘company_name’] = grep(u’<h3 class=“name”.+?>(.+?)<.?/h3>’, job)
self.record[‘cmp_company_id’]=grep(u’“job-detail-company” href="/gongsi/(.+?).html"’, job)
def checkHtml(self, html):#用于检查页面是否有数据需要解析
if html.find(u’<title>BOSS 直聘验证码</title>’ )>=0:return 0
if html.find(u’您暂时无法继续访问~’ )>=0:return 0
return 1
def reqHtml(self, addr, data=None):#获取 response
headers = {
‘accept’: 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,
/*;q=0.8’,
‘accept-encoding’: ‘gzip, deflate, br’,
‘accept-language’: ‘zh-CN,zh;q=0.9,en;q=0.8’,
‘user-agent’: ‘Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36’,
}
return requests.get(addr, timeout=5, proxies=random.choice(proxies), headers=headers).content.decode(‘utf8’, errors=‘ignore’)
def getAddr(self):#构建 url
return ‘https://www.zhipin.com/job_detail/%(cid)s.html?’ % self.record

class MyLeader(Leader):
table_coms = ‘zp_coms’
table_jobs = ‘zp_jobs’
def getTaskFromDatabase(self):#从数据库中获取一万条任务
return self.dbconn.query(“SELECT id,flag,cid FROM %s WHERE flag IN (0) LIMIT 10000;” % self.table_jobs)
if name==‘main’:
dbconn = torndb.Connection(**dbconf)
MyLeader().runWork(MyWorker,1)

#Leader 类 runWork()方法为开启 MyWorker 多线程的方法,数字参数为指定线程数量
#getTaskFromDatabase()从数据库取出一万条数据(cid 用于职位链接,flag 用于标识抓取进度,id 用于更新数据库)
#MyWorker 为多线程类,run 方法中,self.leader.getTask()方法循环取出一条任务绑定 self.record,然后 crawlTask 函数处理任务,处理完成后 self.leader.popTask(self.record)删除此任务
#crawlTask()函数内部解析任务获取职位数据,并更新 self.record,最后 update()方法存入数据库

妈耶~代码插进去好乱~

加缓存,批量写。

一般瓶颈在硬盘,用 redis 缓存一下

妈呀,爬虫速度比数据库还慢,大哥,你给存数据库提取出来一个服务,在内存排队多线程提交,前台脚本只负责爬数据,转交给数据库保存服务就行了,流水线才好提速

原本是这么做的,被数据库保存搞了好几天搞得有些懵,原来一个爬一个存,但是爬的快存的慢跑步了多久整个进程就崩了。。然后我给改成了爬完就存多开些线程,进程倒是不会崩了,整体速度更慢了

爬虫写数据不要直接写 MySQL,速度慢,并发高还会出现锁错误。建议用 MongoDB,会快非常多。

请关注我的爬虫书: https://www.v2ex.com/t/493016#reply615

你的数据库 IO 扛不住了还是锁的原因?
自己性能分析一下 再找对应解决方案

首先请确认你的 Mysql 适配器支不支持多线程,据我了解,多数适配都不是线程安全的,多线程写会出现问题。
其二,最好还是先汇总,然后由一个单一 worker 写入,因为写入压力全在磁盘,多线程写入不会提高性能。

你可以在提交任务列表上加一个长度控制,如果内存快满了,就让服务调用方堵塞一会,先暂停爬取

批量 insert 效率高

爬的数据先放内存,然后走批量

爬虫数据刚拉回来又没清洗存关系数据库做啥? 直接上 nosql

缓存爬到的数据,批量写入,缓存太大时暂停爬取

我也很想。。。甲方爸爸要 mysql

排队,队列来弄。

twisted 异步写

加队列嘛,然后用时间间隔和待入库数据量做批量插入。

先写 nosql 后面再同步到 mysql

我。。。。换了个数据库,重构了一下表结构,速度上去了

可以详细说说吗?

请教一下,服务器中跑的服务很多,瞬息万变,可能一转眼系统就开始杀进程了,该如何把握时机把数据存入数据库呢?

#4 请问可以把源码贴一份到 gist 吗?想学习一下,多谢

对于中小量数据,直接用 pandas
pd.Dataframe([]) 暂存内存后直接 to_sql,大量数据 用 scrapy+mysql/mongodb

测试数据库没有配置好,换到了正式库上面,重新建了个表,索引主键唯一值什么的定义好,代码原封不动跑一遍,速度上来了。
虽然还是很慢。。。。。

140 万条数据,大小估摸在 600M 左右,试过 pandas,114M 数据导入 mysql 的时候会卡死(试了 3 次,每次都卡一个小时没反应,数据库也没变化)。

杀进程,,,你给你的程序注册一个 kill 事件响应?

对哦!😮

回到顶部