Python中Scrapy爬取数据直接存到Hadoop有什么方法吗?
之前都是存到本机 MySql 数据库,这一次需要存到远程的 Hadoop 服务器中,实现边爬边存,不落到本机,请问有什么方法吗?有什么插件可以实现 Scrapy 和 hdfs 的连接呢,还是说需要自己写一个?求大神解答
Python中Scrapy爬取数据直接存到Hadoop有什么方法吗?
自己写个
Scrapy爬虫数据直接存Hadoop,最直接的方法是用HDFS的REST API(WebHDFS/HttpFS)。这里给你个完整示例:
import json
from scrapy.exporters import BaseItemExporter
import requests
class HDFSItemExporter(BaseItemExporter):
def __init__(self, hdfs_url, file_path, **kwargs):
super().__init__(**kwargs)
self.hdfs_url = hdfs_url.rstrip('/')
self.file_path = file_path
self.buffer = []
self.batch_size = 100 # 批量写入
def start_exporting(self):
# 创建HDFS文件
create_url = f"{self.hdfs_url}/webhdfs/v1{self.file_path}?op=CREATE&overwrite=true"
response = requests.put(create_url, allow_redirects=False)
if response.status_code == 307: # 重定向到DataNode
self.write_url = response.headers['Location']
else:
raise Exception(f"HDFS创建失败: {response.status_code}")
def export_item(self, item):
self.buffer.append(dict(self._get_serialized_fields(item)))
if len(self.buffer) >= self.batch_size:
self._flush_buffer()
def finish_exporting(self):
if self.buffer:
self._flush_buffer()
def _flush_buffer(self):
data = '\n'.join(json.dumps(item) for item in self.buffer)
response = requests.post(
self.write_url,
data=data,
headers={'Content-Type': 'application/json'}
)
if response.status_code != 200:
raise Exception(f"写入HDFS失败: {response.status_code}")
self.buffer.clear()
在Scrapy的pipelines.py中使用:
class HDFSPipeline:
def __init__(self, hdfs_url, file_path):
self.hdfs_url = hdfs_url
self.file_path = file_path
@classmethod
def from_crawler(cls, crawler):
return cls(
hdfs_url=crawler.settings.get('HDFS_URL'),
file_path=crawler.settings.get('HDFS_FILE_PATH', '/scrapy_data.json')
)
def open_spider(self, spider):
self.exporter = HDFSItemExporter(self.hdfs_url, self.file_path)
self.exporter.start_exporting()
def process_item(self, item, spider):
self.exporter.export_item(item)
return item
def close_spider(self, spider):
self.exporter.finish_exporting()
settings.py配置:
ITEM_PIPELINES = {
'your_project.pipelines.HDFSPipeline': 300,
}
HDFS_URL = 'http://namenode:50070'
HDFS_FILE_PATH = '/data/scrapy_output.json'
另一种方案是用PyArrow直接写HDFS:
import pyarrow as pa
import pyarrow.hdfs as hdfs
class PyArrowHDFSPipeline:
def open_spider(self, spider):
self.hdfs = hdfs.connect()
self.buffer = []
def process_item(self, item, spider):
self.buffer.append(dict(item))
if len(self.buffer) >= 1000:
self._write_to_hdfs()
return item
def _write_to_hdfs(self):
table = pa.Table.from_pandas(pd.DataFrame(self.buffer))
with self.hdfs.open('/scrapy_data.parquet', 'wb') as f:
pa.parquet.write_table(table, f)
self.buffer.clear()
WebHDFS方案最通用,PyArrow性能更好但需要集群环境。记得在Hadoop配置里启用WebHDFS(core-site.xml中设置dfs.webhdfs.enabled=true)。
总结:用WebHDFS REST API最直接。
这就是个 pipeline 吧 就跟换成 mongoDB 一个意思
thrift
或者用 java 写个接口接收数据。
我之前也遇到过类似的问题, 至少我没有找到 python 直接连接 hdfs 的库。
初学者,有点吃力。还是想找造好的轮子
这个能再详细指点一下吗?好像是没有现成的…
谢谢指点,我去试一试。python 直接连 hdfs 查到了一个 pyhdfs,不过也还没找到 scrapy 直接连的办法。
https://doc.scrapy.org/en/latest/topics/item-pipeline.html 可以看’Write items to MongoDB’ 或者 json 那一节,这边你要先和 hadoop 建立连接,然后自己实现‘ process_item ’这个方法。因为scrapy是解耦的,并不关心你的存储后端。
#4 看了下这个 pyhdfs,只有 10 个 star 呀,你确定没坑?
不过话说回来,只要这个 pyhdfs 可以工作,你就可以用呀。scrapy 接收到数据,然后调用这个库直接写入数据就行了。异步这块要处理一下,不然影响效率。
你要是想找 scrapy-hdfs 这种库肯定是找不到的。我还是建议你用 java 直接写个接口。
不要直接写 hdfs,效率低到哭
搞个管道,爬完写到管道里,那边消费到 hdfs
Mq 加 logstash 直接搞定,scrapy 的 pipeline 里面把数据打到 mq 里,logstash 的 input 是 mq,output 是 hdfs
这个是正解。 写 kafka
存在 hdfs,我理解的 hdfs 不是一个文件系统吗?? 数据难道不是存在各种数据库?要直接写的话,就是改 pipeline 吧。
hdfs+hbase,这个路数。或者直接 kafka+spark,计算去了。
Scrapy 爬取的数据存成日志
再加一个 Flume 把日志同步存储到 HDFS
需要一个消息队列
直接写 flume 呗,flume 用个 http 的 source 和 hdfs 的 sink,不会增加什么开发量


