Python中Scrapy爬取数据直接存到Hadoop有什么方法吗?

之前都是存到本机 MySql 数据库,这一次需要存到远程的 Hadoop 服务器中,实现边爬边存,不落到本机,请问有什么方法吗?有什么插件可以实现 Scrapy 和 hdfs 的连接呢,还是说需要自己写一个?求大神解答


Python中Scrapy爬取数据直接存到Hadoop有什么方法吗?
16 回复

自己写个


Scrapy爬虫数据直接存Hadoop,最直接的方法是用HDFS的REST API(WebHDFS/HttpFS)。这里给你个完整示例:

import json
from scrapy.exporters import BaseItemExporter
import requests

class HDFSItemExporter(BaseItemExporter):
    def __init__(self, hdfs_url, file_path, **kwargs):
        super().__init__(**kwargs)
        self.hdfs_url = hdfs_url.rstrip('/')
        self.file_path = file_path
        self.buffer = []
        self.batch_size = 100  # 批量写入
        
    def start_exporting(self):
        # 创建HDFS文件
        create_url = f"{self.hdfs_url}/webhdfs/v1{self.file_path}?op=CREATE&overwrite=true"
        response = requests.put(create_url, allow_redirects=False)
        
        if response.status_code == 307:  # 重定向到DataNode
            self.write_url = response.headers['Location']
        else:
            raise Exception(f"HDFS创建失败: {response.status_code}")
    
    def export_item(self, item):
        self.buffer.append(dict(self._get_serialized_fields(item)))
        if len(self.buffer) >= self.batch_size:
            self._flush_buffer()
    
    def finish_exporting(self):
        if self.buffer:
            self._flush_buffer()
    
    def _flush_buffer(self):
        data = '\n'.join(json.dumps(item) for item in self.buffer)
        response = requests.post(
            self.write_url,
            data=data,
            headers={'Content-Type': 'application/json'}
        )
        if response.status_code != 200:
            raise Exception(f"写入HDFS失败: {response.status_code}")
        self.buffer.clear()

在Scrapy的pipelines.py中使用:

class HDFSPipeline:
    def __init__(self, hdfs_url, file_path):
        self.hdfs_url = hdfs_url
        self.file_path = file_path
    
    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            hdfs_url=crawler.settings.get('HDFS_URL'),
            file_path=crawler.settings.get('HDFS_FILE_PATH', '/scrapy_data.json')
        )
    
    def open_spider(self, spider):
        self.exporter = HDFSItemExporter(self.hdfs_url, self.file_path)
        self.exporter.start_exporting()
    
    def process_item(self, item, spider):
        self.exporter.export_item(item)
        return item
    
    def close_spider(self, spider):
        self.exporter.finish_exporting()

settings.py配置:

ITEM_PIPELINES = {
    'your_project.pipelines.HDFSPipeline': 300,
}
HDFS_URL = 'http://namenode:50070'
HDFS_FILE_PATH = '/data/scrapy_output.json'

另一种方案是用PyArrow直接写HDFS:

import pyarrow as pa
import pyarrow.hdfs as hdfs

class PyArrowHDFSPipeline:
    def open_spider(self, spider):
        self.hdfs = hdfs.connect()
        self.buffer = []
    
    def process_item(self, item, spider):
        self.buffer.append(dict(item))
        if len(self.buffer) >= 1000:
            self._write_to_hdfs()
        return item
    
    def _write_to_hdfs(self):
        table = pa.Table.from_pandas(pd.DataFrame(self.buffer))
        with self.hdfs.open('/scrapy_data.parquet', 'wb') as f:
            pa.parquet.write_table(table, f)
        self.buffer.clear()

WebHDFS方案最通用,PyArrow性能更好但需要集群环境。记得在Hadoop配置里启用WebHDFS(core-site.xml中设置dfs.webhdfs.enabled=true)。

总结:用WebHDFS REST API最直接。

这就是个 pipeline 吧 就跟换成 mongoDB 一个意思

thrift
或者用 java 写个接口接收数据。
我之前也遇到过类似的问题, 至少我没有找到 python 直接连接 hdfs 的库。

初学者,有点吃力。还是想找造好的轮子
这个能再详细指点一下吗?好像是没有现成的…
谢谢指点,我去试一试。python 直接连 hdfs 查到了一个 pyhdfs,不过也还没找到 scrapy 直接连的办法。

https://doc.scrapy.org/en/latest/topics/item-pipeline.html 可以看’Write items to MongoDB’ 或者 json 那一节,这边你要先和 hadoop 建立连接,然后自己实现‘ process_item ’这个方法。因为scrapy是解耦的,并不关心你的存储后端。

#4 看了下这个 pyhdfs,只有 10 个 star 呀,你确定没坑?

不过话说回来,只要这个 pyhdfs 可以工作,你就可以用呀。scrapy 接收到数据,然后调用这个库直接写入数据就行了。异步这块要处理一下,不然影响效率。

你要是想找 scrapy-hdfs 这种库肯定是找不到的。我还是建议你用 java 直接写个接口。

不要直接写 hdfs,效率低到哭
搞个管道,爬完写到管道里,那边消费到 hdfs

哎,地球这么大,给个可行方案吧。
建立个 kafka 队列,然后 python 写 kafka 的轮子必然是有了。
kafka 写 hdfs 那是顺理成章,这样就搞定了。

Mq 加 logstash 直接搞定,scrapy 的 pipeline 里面把数据打到 mq 里,logstash 的 input 是 mq,output 是 hdfs

这个是正解。 写 kafka

存在 hdfs,我理解的 hdfs 不是一个文件系统吗?? 数据难道不是存在各种数据库?要直接写的话,就是改 pipeline 吧。

hdfs+hbase,这个路数。或者直接 kafka+spark,计算去了。

Scrapy 爬取的数据存成日志
再加一个 Flume 把日志同步存储到 HDFS

需要一个消息队列

直接写 flume 呗,flume 用个 http 的 source 和 hdfs 的 sink,不会增加什么开发量

回到顶部