关于Python中scrapy使用twisted.enterprise.adbapi执行SQL的问题

问题是这样的,在 scrapy 中使用 pipeline 保存数据到 MySQL。
执行 SQL 语句例如
execute(
“”"
INSERT INTO table_name(field1, field2, field3, field4)
VALUES(%s, %s, %s, %s)

“”", % (item.name, item.age, item.sex, item.loc))
会提示 SYNTAX NEAR ‘INSERT INTO table_name(filed1, fi’ 错误,这里的 SQL 会被提示折断。当修改 SQL 减少插入的字段数量以缩短单行文本长度后,BUG 又会莫名的小时。

每次运行 scrapy crawl 都会提示这样的错误信息,但是经过几次运行数据库居然也有了十几条数据。
请问有人遇到类似的问题吗,python 新手,如果问题实在太低端请轻喷。
关于Python中scrapy使用twisted.enterprise.adbapi执行SQL的问题


3 回复

在Scrapy项目里用twisted.enterprise.adbapi执行异步SQL操作是个标准做法,能有效防止数据库I/O阻塞爬虫的并发请求。核心是创建一个数据库连接池,然后在process_item管道方法里用runInteraction来执行具体的SQL逻辑。

下面是一个完整的示例,假设你要把爬取到的数据存到MySQL。首先确保安装了twistedpymysql(或者其他你用的数据库驱动)。

# 在pipelines.py文件中
import pymysql
from twisted.enterprise import adbapi

class AsyncMySQLPipeline:
    def __init__(self, db_settings):
        # 保存配置,稍后创建连接池
        self.db_settings = db_settings
        self.dbpool = None

    @classmethod
    def from_crawler(cls, crawler):
        # 从settings.py获取配置的标准方法
        db_settings = {
            'host': crawler.settings.get('MYSQL_HOST', 'localhost'),
            'port': crawler.settings.get('MYSQL_PORT', 3306),
            'user': crawler.settings.get('MYSQL_USER'),
            'password': crawler.settings.get('MYSQL_PASSWORD'),
            'db': crawler.settings.get('MYSQL_DATABASE'),
            'charset': 'utf8mb4',
            'cursorclass': pymysql.cursors.DictCursor # 可选,指定游标类型
        }
        return cls(db_settings)

    def open_spider(self, spider):
        # 爬虫启动时创建连接池
        # adbapi.ConnectionPool 第一个参数是数据库驱动模块名,比如'pymysql'
        # 后续参数是传给驱动connect()方法的参数
        self.dbpool = adbapi.ConnectionPool('pymysql', **self.db_settings)

    def close_spider(self, spider):
        # 爬虫关闭时关闭连接池
        if self.dbpool:
            self.dbpool.close()

    def process_item(self, item, spider):
        # 这是处理每个item的核心方法
        # 我们把实际的插入操作交给 _do_insert 函数
        # runInteraction 会在线程池中执行 _do_insert,并返回一个Deferred对象
        query = self.dbpool.runInteraction(self._do_insert, item)
        # 可以添加错误处理回调
        query.addErrback(self._handle_error, item, spider)
        # 返回item,即使插入是异步的,这一步也会立刻返回,不阻塞
        return item

    def _do_insert(self, transaction, item):
        # 这个函数在单独的线程中运行,接收一个transaction对象(其实就是驱动的cursor)
        # 构造你的SQL语句
        sql = """
            INSERT INTO your_table_name (field1, field2, field3)
            VALUES (%s, %s, %s)
        """
        # 从item对象中提取值,假设你的item有这些字段
        values = (item.get('field1'), item.get('field2'), item.get('field3'))
        # 执行SQL
        transaction.execute(sql, values)
        # 注意:通常连接池会处理提交,但有些情况下可能需要手动transaction.connection.commit()

    def _handle_error(self, failure, item, spider):
        # 处理插入过程中发生的错误
        spider.logger.error(f"Error inserting item {item}: {failure}")

然后在settings.py里启用这个管道并配置数据库:

ITEM_PIPELINES = {
   'your_project.pipelines.AsyncMySQLPipeline': 300,
}
MYSQL_HOST = 'localhost'
MYSQL_USER = 'your_user'
MYSQL_PASSWORD = 'your_password'
MYSQL_DATABASE = 'your_database'
# 其他配置...

关键点解释:

  1. adbapi.ConnectionPool: 它管理一个线程池,所有数据库操作都在这些线程中执行,避免阻塞主reactor线程。
  2. runInteraction: 这是主力方法。它接收一个函数(如_do_insert)和参数。这个函数会在线程池的线程里被调用,并接收一个transaction参数(可以当作cursor用)。
  3. 异步流程process_item方法调用runInteraction后立即返回item,不等待数据库操作完成。Scrapy会继续处理其他请求。数据库操作在后台线程完成,通过Twisted的Deferred机制与主循环协调。
  4. 错误处理:务必添加errback(如_handle_error)来记录插入失败,否则错误会被静默吞掉。

一句话总结:用adbapi创建连接池,在process_item里调用runInteraction来异步执行SQL。


数据可以正常爬取并导出到 JSON 文件。打印过了抓取的网页也没有问题,目前怀疑是多行文本即 string literal 出了问题,对 python 不够熟悉还请高人指点。

不知道具体原因,我的代码给你看一下:

sql_insert = “insert into articles (category,tags,url,title,author) values (%s,%s,%s,%s,%s)”
sql_insert_params = (category, item[‘tags’],item[‘link’],item[‘title’],item[‘author’])
tx.execute(sql_insert, sql_insert_params)

多试几次吧

回到顶部