Python爬虫框架Scrapy如何解决写入数据库的性能问题

大家好,请问:
目前一个项目,scrapy 目前一分钟抓取 7000 条。需要把数据写入数据库。

而 Pipelines 好像是分析一条 insert into 一条,性能很低。
INSERT INTO table_name (列 1, 列 2,…) VALUES (值 1, 值 2,…)

请问是我的操作方法有问题,还是说可以把数据一批批写入提高效率,如:
INSERT INTO table_name (列 1, 列 2,…) VALUES (值 1, 值 2,…), (值 1, 值 2,…), (值 1, 值 2,…), (值 1, 值 2,…)
Python爬虫框架Scrapy如何解决写入数据库的性能问题


21 回复

要是我就换 Redis


Scrapy的数据库写入性能瓶颈通常出现在pipeline的同步操作上。我常用的优化方案是结合异步数据库驱动和批量插入。

核心思路是把同步的pymysql换成异步的aiomysql,然后在pipeline里做批量处理。下面是个MySQL的示例:

# pipelines.py
import asyncio
import aiomysql
from itemadapter import ItemAdapter
from twisted.internet import defer

class AsyncMySQLPipeline:
    def __init__(self, settings):
        self.settings = settings
        self.items_buffer = []
        self.batch_size = 100  # 批量大小
        self.conn_pool = None
        
    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler.settings)
    
    async def open_spider(self, spider):
        # 创建连接池
        self.conn_pool = await aiomysql.create_pool(
            host=self.settings.get('MYSQL_HOST'),
            port=self.settings.get('MYSQL_PORT', 3306),
            user=self.settings.get('MYSQL_USER'),
            password=self.settings.get('MYSQL_PASSWORD'),
            db=self.settings.get('MYSQL_DB'),
            minsize=5,  # 最小连接数
            maxsize=20,  # 最大连接数
            autocommit=True
        )
    
    async def close_spider(self, spider):
        # 关闭前处理剩余数据
        if self.items_buffer:
            await self._flush_items()
        if self.conn_pool:
            self.conn_pool.close()
            await self.conn_pool.wait_closed()
    
    async def process_item(self, item, spider):
        self.items_buffer.append(item)
        if len(self.items_buffer) >= self.batch_size:
            await self._flush_items()
        return item
    
    async def _flush_items(self):
        if not self.items_buffer:
            return
            
        async with self.conn_pool.acquire() as conn:
            async with conn.cursor() as cursor:
                # 构建批量插入SQL
                values = []
                sql = "INSERT INTO your_table (field1, field2) VALUES "
                placeholders = []
                
                for item in self.items_buffer:
                    adapter = ItemAdapter(item)
                    values.extend([adapter['field1'], adapter['field2']])
                    placeholders.append("(%s, %s)")
                
                sql += ",".join(placeholders)
                await cursor.execute(sql, values)
            
        self.items_buffer.clear()

关键优化点:

  1. 异步连接池:用aiomysql.create_pool管理连接,避免频繁创建连接的开销
  2. 批量插入:攒够一定数量(比如100条)再一次性写入,减少数据库交互次数
  3. 连接复用:连接池自动管理连接,不需要每次都建立新连接

在settings.py里启用这个pipeline:

ITEM_PIPELINES = {
    'your_project.pipelines.AsyncMySQLPipeline': 300,
}

如果数据量特别大,可以考虑再加个消息队列(比如RabbitMQ)做缓冲,把数据先扔到队列里,再用单独的消费者写入数据库,这样爬虫只管抓取,写入压力由消费者承担。

总结:异步驱动+批量插入是提升Scrapy数据库写入性能的有效方案。

先丢进 Redis,再写个中间件处理后再入库吧……

scrapy 批量化 写入方案 怎么样呢?如果想实现的话如何实现呢?

你这数据量算少了。insert 批量插入就行

请问在 scrapy 里批量 的思路是什么?

简单的话就是把每个 value 存着,然后搞个全局计数器,到了一定数量后拼成一个大 SQL,执行就行

异步 mysql 驱动。或者用发队列,避免阻塞

executemany

executemany 看上去与 NSERT INTO table_name (列 1, 列 2,…) VALUES (值 1, 值 2,…), (值 1, 值 2,…), (值 1, 值 2,…), (值 1, 值 2,…) 类似,只是更清晰。

请问如何将 每个 value 存着 ,是通过一个全局变量吗?还是在 Pipelines 中实现?

先往队列里写,然后再写 consumer 消费掉,这样就不影响采集速度了

问题应该解决了:

MySQLStorePipeline 定义了一个 article_items 集合用于存储 spider 爬到的 item,当 items 数量达到 1000 时,批量写入数据库。如果接受到 item 就单条写入数据库,会比批量写入慢很对,爬虫的效率会慢一个数量级。

http://kekefund.com/2016/03/31/scrapy-learn/

7000 条一分钟,数据库写压力 117QPS
这么低的压力都嫌慢,说明配置超级垃圾
用这么低配的 MySQL,上 Redis 的意义在哪,浪费资源么,23333

不是啊,用的阿里云 RDS,远程的。你这样提醒了我,一会弄一个本地的中转一下。

哈哈,没有想到这种情况

也有阿里云的 Redis,不过远程的估计。。。。我试试本地的 MySQL。

你先试试批量提交能到多少。。

扔到MQ这种异步队列里,再异步插入不就行了

嗯嗯,谢谢大家。最终解决方法是:
因为每次一条 insert into 插入速度很慢,用了一个全局变量存着值,5000 条 executemany 写入一次远程阿里云数据库。

批量后完全满足一分钟过滤 7000 条的需求。(本地数据库也受不了一条条插入。)

回到顶部