Python中如何在Scrapy框架下实现MongoDB去重与下载管道管理
from scrapy.pipelines.files import FilesPipeline
from scrapy import Request
from scrapy.conf import settings
import pymongo
class XiaoMiQuanPipeLines(object):
def init(self):
host = settings[“MONGODB_HOST”]
port = settings[“MONGODB_PORT”]
dbname = settings[“MONGODB_DBNAME”]
sheetname = settings[“MONGODB_SHEETNAME”]
client = pymongo.MongoClient(host=host, port=port)
mydb = client[dbname]
self.post = mydb[sheetname]
def process_item(self, item):
url = item[‘file_url’]
name = item[‘name’]
result = self.post.aggregate(
[
{"$group": {"_id": {“url”: url, “name”: name}}}
]
)
if result:
pass
else:
self.post.insert({“url”: url, “name”: name})
return item
class DownLoadPipelines(FilesPipeline):
def file_path(self, request, response=None, info=None):
return request.meta.get(‘filename’, ‘’)
def get_media_requests(self, item, info):
file_url = item[‘file_url’]
meta = {‘filename’: item[‘name’]}
yield Request(url=file_url, meta=meta)
这里写两个管道,先判断,如何重复不下载,如果不重复,写入数据库,然后下载,这里用 aggregate 联合键去重
Python中如何在Scrapy框架下实现MongoDB去重与下载管道管理
python<br><br>class MongoCache:<br> db = None<br><br> def __init__(self):<br> if not hasattr(MongoCache, 'pool'):<br> MongoCache.create_instance()<br><br> <br> def create_instance():<br> client = MongoClient(config.MONGO_URL)<br> MongoCache.db = client['spider']<br><br> def create(self, table, unique_key, origin_data):<br> if self.exists(table, unique_key):<br> return None<br><br> summaries = {k: generator_summary(v) for (k, v) in origin_data.items()}<br><br> return self.db[table].insert({<br> 'unique_key': unique_key,<br> 'data': origin_data,<br> 'summaries': summaries<br> })<br><br> def get(self, table, unique_key):<br> data = self.db[table].find_one({'unique_key': unique_key})<br> if data is None:<br> return None<br> return data['data']<br><br> def exists(self, table, unique_key):<br> data = self.db[table].find_one({'unique_key': unique_key})<br> return data is not None<br><br> def is_changed(self, table, unique_key, origin_data):<br> if not self.exists(table, unique_key):<br> return True<br><br> last_summaries = self.db[table].find_one({'unique_key': unique_key})['summaries']<br> for (k, v) in origin_data.items():<br> summary = generator_summary(v)<br> last_summary = last_summaries.get(k, None)<br> # print('{} -> {} | {} -> {}'.format(k, v, summary, last_summary))<br> if last_summary is None or last_summary != summary:<br> return True<br> return False<br><br> def change_fields(self, table, unique_key, origin_data):<br> if not self.exists(table, unique_key):<br> return origin_data<br> changes = {}<br> last_summaries = self.db[table].find_one({'unique_key': unique_key})['summaries']<br> for (k, v) in origin_data.items():<br> last_summary = last_summaries.get(k, None)<br> # print('{} -> {} | {} -> {}'.format(k, v, summary, last_summary))<br> if last_summary is None or last_summary != generator_summary(v):<br> changes[k] = v<br> return changes<br><br> def update(self, table, unique_key, origin_data):<br> if not self.exists(table, unique_key):<br> return origin_data<br> new_summaries = {k: generator_summary(v) for (k, v) in origin_data.items()}<br> self.db[table].update_one({'unique_key': unique_key},<br> {'$set': {'data': origin_data, 'summaries': new_summaries}})<br> return origin_data<br>
在Scrapy里用MongoDB做去重和管道管理,核心是自定义Item Pipeline和结合scrapy.dupefilters。下面给你一个完整的实现方案。
首先,确保安装了必要的包:
pip install pymongo scrapy
1. 去重过滤器 (DupeFilter)
在middlewares.py或单独文件(如dupefilter.py)中创建基于MongoDB的去重中间件:
# dupefilter.py
from scrapy.dupefilters import BaseDupeFilter
from pymongo import MongoClient
from scrapy.utils.request import request_fingerprint
class MongoDupeFilter(BaseDupeFilter):
def __init__(self, mongo_uri, mongo_db, collection_name):
self.client = MongoClient(mongo_uri)
self.db = self.client[mongo_db]
self.collection = self.db[collection_name]
self.collection.create_index('fingerprint', unique=True)
@classmethod
def from_crawler(cls, crawler):
settings = crawler.settings
return cls(
mongo_uri=settings.get('MONGO_URI', 'mongodb://localhost:27017'),
mongo_db=settings.get('MONGO_DATABASE', 'scrapy_db'),
collection_name=settings.get('DUPEFILTER_COLLECTION', 'seen_urls')
)
def request_seen(self, request):
fp = request_fingerprint(request)
try:
self.collection.insert_one({'fingerprint': fp})
return False
except:
return True
def close(self, reason):
self.client.close()
2. 下载管道 (Pipeline)
在pipelines.py中创建MongoDB存储管道:
# pipelines.py
from pymongo import MongoClient
from itemadapter import ItemAdapter
class MongoPipeline:
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI'),
mongo_db=crawler.settings.get('MONGO_DATABASE', 'scrapy_db')
)
def open_spider(self, spider):
self.client = MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def close_spider(self, spider):
self.client.close()
def process_item(self, item, spider):
collection_name = spider.name # 或用item['collection']指定
self.db[collection_name].insert_one(ItemAdapter(item).asdict())
return item
# settings.py
MONGO_URI = 'mongodb://localhost:27017'
MONGO_DATABASE = 'scrapy_db'
DUPEFILTER_COLLECTION = 'seen_urls'
# 启用自定义去重过滤器
DUPEFILTER_CLASS = 'your_project.dupefilter.MongoDupeFilter'
# 启用管道并设置优先级
ITEM_PIPELINES = {
'your_project.pipelines.MongoPipeline': 300,
}
工作原理:
MongoDupeFilter会在爬取前检查请求指纹是否已存在于MongoDB,实现去重。MongoPipeline负责将清洗后的数据存储到MongoDB。- 通过
settings.py统一管理配置,便于维护。
一句话建议: 记得在MongoDB中为去重集合建立唯一索引提升性能。
v2ex 不支持 markdown…
https://gist.github.com/watsy0007/779c27fb0ceab283cc434b5eec10b7c4
封装了针对数据处理的公共方法.
我是直接 mongo 加 unique 索引,并捕捉索引冲突异常。。
你的是联合键吗?我说的是 url 和 name 一起
#3 可以多个 key 做索引 https://docs.mongodb.com/manual/core/index-multikey/
db.XiaoMiQuan.find()
{ “_id” : ObjectId(“5bbf14dbc96b5b3f5627d11d”), “file_url” : “https://baogaocos.seedsufe.com/2018/07/19/doc_1532004923556.pdf”, “name” : “AMCHAM-中国的“一带一路”:对美国企业的影响(英文)-2018.6-8 页.pdf” }我现在是这样写的
这是对的?
用不用把他全站 pdf 发你
人家网站是更新的吧
是啊,直接给你服务器

