Python中Scrapy爬虫使用MongoDB存储IP时,如何判断并避免重复插入数据?
pipelines.py 中的关键代码为:
if db.mycollection.find({“ip”:item[“ip”]}):
print “{} has already existed.”.format(item[“ip”])
print db.mycollection.find()
else:
db.mycollection.insert_one(data)
print db.mycollection.count({})
执行的结果总是:
120.25.164.134 has already existed.
<pymongo.cursor.Cursor object at 0x0000000005B330F0>
……
提示我抓取到的每一条数据都是存在的,我的 mycollection 聚集中最开始是空的,为什么会出现这种情况呢?
Python中Scrapy爬虫使用MongoDB存储IP时,如何判断并避免重复插入数据?
怎么没有前辈来指教一下呢?
核心方案:使用MongoDB的 _id 字段或创建唯一索引来保证数据唯一性。
在Scrapy的pipeline里,最直接有效的方法是利用MongoDB自身的机制去重,而不是在插入前手动查询。手动查询在高并发下效率低且可能因竞态条件导致重复。
推荐做法:在 open_spider 中创建唯一索引,在 process_item 中使用 update_one 实现“存在则更新,不存在则插入”。
下面是完整的pipeline代码示例:
import pymongo
from scrapy import Item
from itemadapter import ItemAdapter
class MongoDBPipeline:
def __init__(self, mongo_uri, mongo_db, collection_name):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
self.collection_name = collection_name
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI'),
mongo_db=crawler.settings.get('MONGO_DATABASE', 'scrapy_db'),
collection_name=crawler.settings.get('MONGO_COLLECTION', 'proxy_ips')
)
def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
self.collection = self.db[self.collection_name]
# 创建唯一索引,假设用 'ip' 和 'port' 字段组合来判断唯一性
# 如果已经存在,MongoDB会忽略重复创建
self.collection.create_index([('ip', 1), ('port', 1)], unique=True)
def close_spider(self, spider):
self.client.close()
def process_item(self, item, spider):
# 将Item转换为字典
item_dict = ItemAdapter(item).asdict()
# 定义用于判断文档唯一性的查询条件
# 这里以 ip 和 port 为例,你可以根据你的item结构调整
query = {
'ip': item_dict['ip'],
'port': item_dict['port']
}
# 使用 update_one 方法,设置 upsert=True
# 如果找到匹配的文档,则更新;如果没找到,则插入新文档
self.collection.update_one(
query,
{'$set': item_dict},
upsert=True
)
return item
关键点解释:
- 创建唯一索引 (
create_index): 在open_spider中为判断IP唯一性的字段组合(例如ip+port)创建唯一索引。这是数据库层面的强约束,能从根本上防止重复。 - 使用
update_one与upsert=True: 在process_item中,根据唯一性条件构建查询字典query,然后调用update_one(query, {'$set': item_dict}, upsert=True)。这个操作是原子性的,完美解决了“先查后插”的竞态问题,并且代码简洁高效。
在 settings.py 中配置:
ITEM_PIPELINES = {
'your_project.pipelines.MongoDBPipeline': 300,
}
MONGO_URI = 'mongodb://localhost:27017'
MONGO_DATABASE = 'your_database_name'
MONGO_COLLECTION = 'your_collection_name'
总结:让数据库去做去重,别在应用层自己搞。
这样,无论爬虫并发多高,同一个IP都只会有一条记录,后续的爬取会自动更新已有记录。
大佬都在度假,没空 8 小时之外看 v 站
大佬都在度假,没空 8 小时之外看 v 站
看了下文档,find()总是返回的是 Cursor, 猜测应该用 db.mycollection.find({“ip”:item[“ip”]}).count() 吧。
用 upsert
.find() 返回的是 cursor,用 find().toArray() 返回的是查找结果的数组,如果不存在就是个空数组。
你也可以试试 .findOneAndUpdate() 并设置 upsert: true,这样如果找到记录则更新(更新内容为空),找不到则新建一条记录。
返回的是 cursor,肯定为 True 呀,你得调用.next()才可能获得一个空结果吧。
另外,这种约束条件一般是这样实现:在 ip 上建立一个 unique 的索引,然后每次都直接插入;如果已有,则会报错 duplicate,忽略即可。你这种 ifelse 不是“原子”操作,如果有多个进程同时工作,可能插入多条相同 ip 的。
find_one
动态语言就是有这个问题
如果你需要集合为空:
那么使用 if collection.isEmpty 或者是 if collection.size() == 0
如果你需要集合为 null/none:
那么使用 if collection== null/none
这样写代码的时候意图清楚, 看代码的人也轻松.
你的建议可以实现。谢谢。
嗯,你说的对,这一点确实没有考虑到。
嗯,受教了。谢谢前辈。
find 方法返回的都是集合对象。
下面我的实际代码,是需要先把查询回来的结果对象使用 count 方法去查看具体数量
def process_item(self, item, spider):
result = self.db[self.mongo_collection].find({‘source_url’: item[‘source_url’]})
if result.count() != 0:
raise DropItem(“Duplicate item found: %s” % item)
else:
return item

