关于Python中scrapy使用twisted.enterprise.adbapi执行SQL的问题
问题是这样的,在 scrapy 中使用 pipeline 保存数据到 MySQL。
执行 SQL 语句例如
execute(
“”"
INSERT INTO table_name(field1, field2, field3, field4)
VALUES(%s, %s, %s, %s)
“”", % (item.name, item.age, item.sex, item.loc))
会提示 SYNTAX NEAR ‘INSERT INTO table_name(filed1, fi’ 错误,这里的 SQL 会被提示折断。当修改 SQL 减少插入的字段数量以缩短单行文本长度后,BUG 又会莫名的小时。
每次运行 scrapy crawl 都会提示这样的错误信息,但是经过几次运行数据库居然也有了十几条数据。
请问有人遇到类似的问题吗,python 新手,如果问题实在太低端请轻喷。
关于Python中scrapy使用twisted.enterprise.adbapi执行SQL的问题
在Scrapy项目里用twisted.enterprise.adbapi执行异步SQL操作是个标准做法,能有效防止数据库I/O阻塞爬虫的并发请求。核心是创建一个数据库连接池,然后在process_item管道方法里用runInteraction来执行具体的SQL逻辑。
下面是一个完整的示例,假设你要把爬取到的数据存到MySQL。首先确保安装了twisted和pymysql(或者其他你用的数据库驱动)。
# 在pipelines.py文件中
import pymysql
from twisted.enterprise import adbapi
class AsyncMySQLPipeline:
def __init__(self, db_settings):
# 保存配置,稍后创建连接池
self.db_settings = db_settings
self.dbpool = None
@classmethod
def from_crawler(cls, crawler):
# 从settings.py获取配置的标准方法
db_settings = {
'host': crawler.settings.get('MYSQL_HOST', 'localhost'),
'port': crawler.settings.get('MYSQL_PORT', 3306),
'user': crawler.settings.get('MYSQL_USER'),
'password': crawler.settings.get('MYSQL_PASSWORD'),
'db': crawler.settings.get('MYSQL_DATABASE'),
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor # 可选,指定游标类型
}
return cls(db_settings)
def open_spider(self, spider):
# 爬虫启动时创建连接池
# adbapi.ConnectionPool 第一个参数是数据库驱动模块名,比如'pymysql'
# 后续参数是传给驱动connect()方法的参数
self.dbpool = adbapi.ConnectionPool('pymysql', **self.db_settings)
def close_spider(self, spider):
# 爬虫关闭时关闭连接池
if self.dbpool:
self.dbpool.close()
def process_item(self, item, spider):
# 这是处理每个item的核心方法
# 我们把实际的插入操作交给 _do_insert 函数
# runInteraction 会在线程池中执行 _do_insert,并返回一个Deferred对象
query = self.dbpool.runInteraction(self._do_insert, item)
# 可以添加错误处理回调
query.addErrback(self._handle_error, item, spider)
# 返回item,即使插入是异步的,这一步也会立刻返回,不阻塞
return item
def _do_insert(self, transaction, item):
# 这个函数在单独的线程中运行,接收一个transaction对象(其实就是驱动的cursor)
# 构造你的SQL语句
sql = """
INSERT INTO your_table_name (field1, field2, field3)
VALUES (%s, %s, %s)
"""
# 从item对象中提取值,假设你的item有这些字段
values = (item.get('field1'), item.get('field2'), item.get('field3'))
# 执行SQL
transaction.execute(sql, values)
# 注意:通常连接池会处理提交,但有些情况下可能需要手动transaction.connection.commit()
def _handle_error(self, failure, item, spider):
# 处理插入过程中发生的错误
spider.logger.error(f"Error inserting item {item}: {failure}")
然后在settings.py里启用这个管道并配置数据库:
ITEM_PIPELINES = {
'your_project.pipelines.AsyncMySQLPipeline': 300,
}
MYSQL_HOST = 'localhost'
MYSQL_USER = 'your_user'
MYSQL_PASSWORD = 'your_password'
MYSQL_DATABASE = 'your_database'
# 其他配置...
关键点解释:
adbapi.ConnectionPool: 它管理一个线程池,所有数据库操作都在这些线程中执行,避免阻塞主reactor线程。runInteraction: 这是主力方法。它接收一个函数(如_do_insert)和参数。这个函数会在线程池的线程里被调用,并接收一个transaction参数(可以当作cursor用)。- 异步流程:
process_item方法调用runInteraction后立即返回item,不等待数据库操作完成。Scrapy会继续处理其他请求。数据库操作在后台线程完成,通过Twisted的Deferred机制与主循环协调。 - 错误处理:务必添加
errback(如_handle_error)来记录插入失败,否则错误会被静默吞掉。
一句话总结:用adbapi创建连接池,在process_item里调用runInteraction来异步执行SQL。
数据可以正常爬取并导出到 JSON 文件。打印过了抓取的网页也没有问题,目前怀疑是多行文本即 string literal 出了问题,对 python 不够熟悉还请高人指点。
不知道具体原因,我的代码给你看一下:
sql_insert = “insert into articles (category,tags,url,title,author) values (%s,%s,%s,%s,%s)”
sql_insert_params = (category, item[‘tags’],item[‘link’],item[‘title’],item[‘author’])
tx.execute(sql_insert, sql_insert_params)
多试几次吧

