Python爬虫框架Scrapy如何解决写入数据库的性能问题
大家好,请问:
目前一个项目,scrapy 目前一分钟抓取 7000 条。需要把数据写入数据库。
而 Pipelines 好像是分析一条 insert into 一条,性能很低。
INSERT INTO table_name (列 1, 列 2,…) VALUES (值 1, 值 2,…)
请问是我的操作方法有问题,还是说可以把数据一批批写入提高效率,如:
INSERT INTO table_name (列 1, 列 2,…) VALUES (值 1, 值 2,…), (值 1, 值 2,…), (值 1, 值 2,…), (值 1, 值 2,…)
Python爬虫框架Scrapy如何解决写入数据库的性能问题
要是我就换 Redis
Scrapy的数据库写入性能瓶颈通常出现在pipeline的同步操作上。我常用的优化方案是结合异步数据库驱动和批量插入。
核心思路是把同步的pymysql换成异步的aiomysql,然后在pipeline里做批量处理。下面是个MySQL的示例:
# pipelines.py
import asyncio
import aiomysql
from itemadapter import ItemAdapter
from twisted.internet import defer
class AsyncMySQLPipeline:
def __init__(self, settings):
self.settings = settings
self.items_buffer = []
self.batch_size = 100 # 批量大小
self.conn_pool = None
@classmethod
def from_crawler(cls, crawler):
return cls(crawler.settings)
async def open_spider(self, spider):
# 创建连接池
self.conn_pool = await aiomysql.create_pool(
host=self.settings.get('MYSQL_HOST'),
port=self.settings.get('MYSQL_PORT', 3306),
user=self.settings.get('MYSQL_USER'),
password=self.settings.get('MYSQL_PASSWORD'),
db=self.settings.get('MYSQL_DB'),
minsize=5, # 最小连接数
maxsize=20, # 最大连接数
autocommit=True
)
async def close_spider(self, spider):
# 关闭前处理剩余数据
if self.items_buffer:
await self._flush_items()
if self.conn_pool:
self.conn_pool.close()
await self.conn_pool.wait_closed()
async def process_item(self, item, spider):
self.items_buffer.append(item)
if len(self.items_buffer) >= self.batch_size:
await self._flush_items()
return item
async def _flush_items(self):
if not self.items_buffer:
return
async with self.conn_pool.acquire() as conn:
async with conn.cursor() as cursor:
# 构建批量插入SQL
values = []
sql = "INSERT INTO your_table (field1, field2) VALUES "
placeholders = []
for item in self.items_buffer:
adapter = ItemAdapter(item)
values.extend([adapter['field1'], adapter['field2']])
placeholders.append("(%s, %s)")
sql += ",".join(placeholders)
await cursor.execute(sql, values)
self.items_buffer.clear()
关键优化点:
- 异步连接池:用
aiomysql.create_pool管理连接,避免频繁创建连接的开销 - 批量插入:攒够一定数量(比如100条)再一次性写入,减少数据库交互次数
- 连接复用:连接池自动管理连接,不需要每次都建立新连接
在settings.py里启用这个pipeline:
ITEM_PIPELINES = {
'your_project.pipelines.AsyncMySQLPipeline': 300,
}
如果数据量特别大,可以考虑再加个消息队列(比如RabbitMQ)做缓冲,把数据先扔到队列里,再用单独的消费者写入数据库,这样爬虫只管抓取,写入压力由消费者承担。
总结:异步驱动+批量插入是提升Scrapy数据库写入性能的有效方案。
m
先丢进 Redis,再写个中间件处理后再入库吧……
scrapy 批量化 写入方案 怎么样呢?如果想实现的话如何实现呢?
你这数据量算少了。insert 批量插入就行
请问在 scrapy 里批量 的思路是什么?
简单的话就是把每个 value 存着,然后搞个全局计数器,到了一定数量后拼成一个大 SQL,执行就行
异步 mysql 驱动。或者用发队列,避免阻塞
executemany
executemany 看上去与 NSERT INTO table_name (列 1, 列 2,…) VALUES (值 1, 值 2,…), (值 1, 值 2,…), (值 1, 值 2,…), (值 1, 值 2,…) 类似,只是更清晰。
先往队列里写,然后再写 consumer 消费掉,这样就不影响采集速度了
问题应该解决了:
MySQLStorePipeline 定义了一个 article_items 集合用于存储 spider 爬到的 item,当 items 数量达到 1000 时,批量写入数据库。如果接受到 item 就单条写入数据库,会比批量写入慢很对,爬虫的效率会慢一个数量级。
http://kekefund.com/2016/03/31/scrapy-learn/
7000 条一分钟,数据库写压力 117QPS
这么低的压力都嫌慢,说明配置超级垃圾
用这么低配的 MySQL,上 Redis 的意义在哪,浪费资源么,23333
不是啊,用的阿里云 RDS,远程的。你这样提醒了我,一会弄一个本地的中转一下。
哈哈,没有想到这种情况
也有阿里云的 Redis,不过远程的估计。。。。我试试本地的 MySQL。
你先试试批量提交能到多少。。
扔到MQ这种异步队列里,再异步插入不就行了
嗯嗯,谢谢大家。最终解决方法是:
因为每次一条 insert into 插入速度很慢,用了一个全局变量存着值,5000 条 executemany 写入一次远程阿里云数据库。
批量后完全满足一分钟过滤 7000 条的需求。(本地数据库也受不了一条条插入。)


