Python中Scrapy爬虫使用MongoDB存储IP时,如何判断并避免重复插入数据?

pipelines.py 中的关键代码为:
if db.mycollection.find({“ip”:item[“ip”]}):
print “{} has already existed.”.format(item[“ip”])
print db.mycollection.find()
else:
db.mycollection.insert_one(data)
print db.mycollection.count({})
执行的结果总是:
120.25.164.134 has already existed.
<pymongo.cursor.Cursor object at 0x0000000005B330F0>
……
提示我抓取到的每一条数据都是存在的,我的 mycollection 聚集中最开始是空的,为什么会出现这种情况呢?
Python中Scrapy爬虫使用MongoDB存储IP时,如何判断并避免重复插入数据?


15 回复

怎么没有前辈来指教一下呢?


核心方案:使用MongoDB的 _id 字段或创建唯一索引来保证数据唯一性。

在Scrapy的pipeline里,最直接有效的方法是利用MongoDB自身的机制去重,而不是在插入前手动查询。手动查询在高并发下效率低且可能因竞态条件导致重复。

推荐做法:在 open_spider 中创建唯一索引,在 process_item 中使用 update_one 实现“存在则更新,不存在则插入”。

下面是完整的pipeline代码示例:

import pymongo
from scrapy import Item
from itemadapter import ItemAdapter

class MongoDBPipeline:
    def __init__(self, mongo_uri, mongo_db, collection_name):
        self.mongo_uri = mongo_uri
        self.mongo_db = mongo_db
        self.collection_name = collection_name

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            mongo_uri=crawler.settings.get('MONGO_URI'),
            mongo_db=crawler.settings.get('MONGO_DATABASE', 'scrapy_db'),
            collection_name=crawler.settings.get('MONGO_COLLECTION', 'proxy_ips')
        )

    def open_spider(self, spider):
        self.client = pymongo.MongoClient(self.mongo_uri)
        self.db = self.client[self.mongo_db]
        self.collection = self.db[self.collection_name]
        
        # 创建唯一索引,假设用 'ip' 和 'port' 字段组合来判断唯一性
        # 如果已经存在,MongoDB会忽略重复创建
        self.collection.create_index([('ip', 1), ('port', 1)], unique=True)

    def close_spider(self, spider):
        self.client.close()

    def process_item(self, item, spider):
        # 将Item转换为字典
        item_dict = ItemAdapter(item).asdict()
        
        # 定义用于判断文档唯一性的查询条件
        # 这里以 ip 和 port 为例,你可以根据你的item结构调整
        query = {
            'ip': item_dict['ip'],
            'port': item_dict['port']
        }
        
        # 使用 update_one 方法,设置 upsert=True
        # 如果找到匹配的文档,则更新;如果没找到,则插入新文档
        self.collection.update_one(
            query,
            {'$set': item_dict},
            upsert=True
        )
        return item

关键点解释:

  1. 创建唯一索引 (create_index): 在 open_spider 中为判断IP唯一性的字段组合(例如 ip + port)创建唯一索引。这是数据库层面的强约束,能从根本上防止重复。
  2. 使用 update_oneupsert=True: 在 process_item 中,根据唯一性条件构建查询字典 query,然后调用 update_one(query, {'$set': item_dict}, upsert=True)。这个操作是原子性的,完美解决了“先查后插”的竞态问题,并且代码简洁高效。

settings.py 中配置:

ITEM_PIPELINES = {
    'your_project.pipelines.MongoDBPipeline': 300,
}
MONGO_URI = 'mongodb://localhost:27017'
MONGO_DATABASE = 'your_database_name'
MONGO_COLLECTION = 'your_collection_name'

总结:让数据库去做去重,别在应用层自己搞。

这样,无论爬虫并发多高,同一个IP都只会有一条记录,后续的爬取会自动更新已有记录。

大佬都在度假,没空 8 小时之外看 v 站

大佬都在度假,没空 8 小时之外看 v 站

看了下文档,find()总是返回的是 Cursor, 猜测应该用 db.mycollection.find({“ip”:item[“ip”]}).count() 吧。

用 upsert

.find() 返回的是 cursor,用 find().toArray() 返回的是查找结果的数组,如果不存在就是个空数组。

你也可以试试 .findOneAndUpdate() 并设置 upsert: true,这样如果找到记录则更新(更新内容为空),找不到则新建一条记录。

返回的是 cursor,肯定为 True 呀,你得调用.next()才可能获得一个空结果吧。
另外,这种约束条件一般是这样实现:在 ip 上建立一个 unique 的索引,然后每次都直接插入;如果已有,则会报错 duplicate,忽略即可。你这种 ifelse 不是“原子”操作,如果有多个进程同时工作,可能插入多条相同 ip 的。

find_one

动态语言就是有这个问题
如果你需要集合为空:
那么使用 if collection.isEmpty 或者是 if collection.size() == 0

如果你需要集合为 null/none:
那么使用 if collection== null/none

这样写代码的时候意图清楚, 看代码的人也轻松.

你的建议可以实现。谢谢。

嗯,你说的对,这一点确实没有考虑到。

嗯,受教了。谢谢前辈。

find 方法返回的都是集合对象。

下面我的实际代码,是需要先把查询回来的结果对象使用 count 方法去查看具体数量

def process_item(self, item, spider):
result = self.db[self.mongo_collection].find({‘source_url’: item[‘source_url’]})
if result.count() != 0:
raise DropItem(“Duplicate item found: %s” % item)
else:
return item

回到顶部