Python中如何在Scrapy框架下实现MongoDB去重与下载管道管理

from scrapy.pipelines.files import FilesPipeline

from scrapy import Request

from scrapy.conf import settings

import pymongo


class XiaoMiQuanPipeLines(object):
def init(self):
host = settings[“MONGODB_HOST”]
port = settings[“MONGODB_PORT”]
dbname = settings[“MONGODB_DBNAME”]
sheetname = settings[“MONGODB_SHEETNAME”]

client = pymongo.MongoClient(host=host, port=port)

mydb = client[dbname]

self.post = mydb[sheetname]

def process_item(self, item):
url = item[‘file_url’]
name = item[‘name’]

result = self.post.aggregate(
[
{"$group": {"_id": {“url”: url, “name”: name}}}
]
)
if result:
pass
else:

self.post.insert({“url”: url, “name”: name})
return item


class DownLoadPipelines(FilesPipeline):

def file_path(self, request, response=None, info=None):
return request.meta.get(‘filename’, ‘’)

def get_media_requests(self, item, info):
file_url = item[‘file_url’]
meta = {‘filename’: item[‘name’]}
yield Request(url=file_url, meta=meta)


这里写两个管道,先判断,如何重复不下载,如果不重复,写入数据库,然后下载,这里用 aggregate 联合键去重
Python中如何在Scrapy框架下实现MongoDB去重与下载管道管理


10 回复

python<br><br>class MongoCache:<br> db = None<br><br> def __init__(self):<br> if not hasattr(MongoCache, 'pool'):<br> MongoCache.create_instance()<br><br> <br> def create_instance():<br> client = MongoClient(config.MONGO_URL)<br> MongoCache.db = client['spider']<br><br> def create(self, table, unique_key, origin_data):<br> if self.exists(table, unique_key):<br> return None<br><br> summaries = {k: generator_summary(v) for (k, v) in origin_data.items()}<br><br> return self.db[table].insert({<br> 'unique_key': unique_key,<br> 'data': origin_data,<br> 'summaries': summaries<br> })<br><br> def get(self, table, unique_key):<br> data = self.db[table].find_one({'unique_key': unique_key})<br> if data is None:<br> return None<br> return data['data']<br><br> def exists(self, table, unique_key):<br> data = self.db[table].find_one({'unique_key': unique_key})<br> return data is not None<br><br> def is_changed(self, table, unique_key, origin_data):<br> if not self.exists(table, unique_key):<br> return True<br><br> last_summaries = self.db[table].find_one({'unique_key': unique_key})['summaries']<br> for (k, v) in origin_data.items():<br> summary = generator_summary(v)<br> last_summary = last_summaries.get(k, None)<br> # print('{} -&gt; {} | {} -&gt; {}'.format(k, v, summary, last_summary))<br> if last_summary is None or last_summary != summary:<br> return True<br> return False<br><br> def change_fields(self, table, unique_key, origin_data):<br> if not self.exists(table, unique_key):<br> return origin_data<br> changes = {}<br> last_summaries = self.db[table].find_one({'unique_key': unique_key})['summaries']<br> for (k, v) in origin_data.items():<br> last_summary = last_summaries.get(k, None)<br> # print('{} -&gt; {} | {} -&gt; {}'.format(k, v, summary, last_summary))<br> if last_summary is None or last_summary != generator_summary(v):<br> changes[k] = v<br> return changes<br><br> def update(self, table, unique_key, origin_data):<br> if not self.exists(table, unique_key):<br> return origin_data<br> new_summaries = {k: generator_summary(v) for (k, v) in origin_data.items()}<br> self.db[table].update_one({'unique_key': unique_key},<br> {'$set': {'data': origin_data, 'summaries': new_summaries}})<br> return origin_data<br>


在Scrapy里用MongoDB做去重和管道管理,核心是自定义Item Pipeline和结合scrapy.dupefilters。下面给你一个完整的实现方案。

首先,确保安装了必要的包:

pip install pymongo scrapy

1. 去重过滤器 (DupeFilter)middlewares.py或单独文件(如dupefilter.py)中创建基于MongoDB的去重中间件:

# dupefilter.py
from scrapy.dupefilters import BaseDupeFilter
from pymongo import MongoClient
from scrapy.utils.request import request_fingerprint

class MongoDupeFilter(BaseDupeFilter):
    def __init__(self, mongo_uri, mongo_db, collection_name):
        self.client = MongoClient(mongo_uri)
        self.db = self.client[mongo_db]
        self.collection = self.db[collection_name]
        self.collection.create_index('fingerprint', unique=True)

    @classmethod
    def from_crawler(cls, crawler):
        settings = crawler.settings
        return cls(
            mongo_uri=settings.get('MONGO_URI', 'mongodb://localhost:27017'),
            mongo_db=settings.get('MONGO_DATABASE', 'scrapy_db'),
            collection_name=settings.get('DUPEFILTER_COLLECTION', 'seen_urls')
        )

    def request_seen(self, request):
        fp = request_fingerprint(request)
        try:
            self.collection.insert_one({'fingerprint': fp})
            return False
        except:
            return True

    def close(self, reason):
        self.client.close()

2. 下载管道 (Pipeline)pipelines.py中创建MongoDB存储管道:

# pipelines.py
from pymongo import MongoClient
from itemadapter import ItemAdapter

class MongoPipeline:
    def __init__(self, mongo_uri, mongo_db):
        self.mongo_uri = mongo_uri
        self.mongo_db = mongo_db

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            mongo_uri=crawler.settings.get('MONGO_URI'),
            mongo_db=crawler.settings.get('MONGO_DATABASE', 'scrapy_db')
        )

    def open_spider(self, spider):
        self.client = MongoClient(self.mongo_uri)
        self.db = self.client[self.mongo_db]

    def close_spider(self, spider):
        self.client.close()

    def process_item(self, item, spider):
        collection_name = spider.name  # 或用item['collection']指定
        self.db[collection_name].insert_one(ItemAdapter(item).asdict())
        return item

3. 配置settings.py

# settings.py
MONGO_URI = 'mongodb://localhost:27017'
MONGO_DATABASE = 'scrapy_db'
DUPEFILTER_COLLECTION = 'seen_urls'

# 启用自定义去重过滤器
DUPEFILTER_CLASS = 'your_project.dupefilter.MongoDupeFilter'

# 启用管道并设置优先级
ITEM_PIPELINES = {
    'your_project.pipelines.MongoPipeline': 300,
}

工作原理:

  • MongoDupeFilter会在爬取前检查请求指纹是否已存在于MongoDB,实现去重。
  • MongoPipeline负责将清洗后的数据存储到MongoDB。
  • 通过settings.py统一管理配置,便于维护。

一句话建议: 记得在MongoDB中为去重集合建立唯一索引提升性能。

v2ex 不支持 markdown…

https://gist.github.com/watsy0007/779c27fb0ceab283cc434b5eec10b7c4

封装了针对数据处理的公共方法.

我是直接 mongo 加 unique 索引,并捕捉索引冲突异常。。

你的是联合键吗?我说的是 url 和 name 一起

db.XiaoMiQuan.find()
{ “_id” : ObjectId(“5bbf14dbc96b5b3f5627d11d”), “file_url” : “https://baogaocos.seedsufe.com/2018/07/19/doc_1532004923556.pdf”, “name” : “AMCHAM-中国的“一带一路”:对美国企业的影响(英文)-2018.6-8 页.pdf” }我现在是这样写的
这是对的?

用不用把他全站 pdf 发你

人家网站是更新的吧

是啊,直接给你服务器

回到顶部