Python爬虫入门:如何爬取ztree树上的节点并异步全部展开导出
http://www.treejs.cn/v3/demo/cn/core/simpleData.html
目前只能理出简单爬已经展开的节点。模拟点击一层层加载怎么实现?
''''
def get_html_src(url):
driver = webdriver.Chrome("/Users/qk/Downloads/chromedriver")
driver.get(url)
parentElement = driver.find_element_by_id('treeDemo')
elementList = parentElement.find_elements_by_tag_name("li")
with open(“tmp/ztree.csv”, “a”) as csvfile:
writer = csv.writer(csvfile)
for e in elementList:
writer.writerow([e.text])
e.find_elements_by_tag_name
print('',)
print('节点名称:', e.text)
csvfile.close()
time.sleep(10)
driver.close()
if name == “main”:
get_html_src(‘http://www.treejs.cn/v3/demo/cn/core/simpleData.html’)
''''
Python爬虫入门:如何爬取ztree树上的节点并异步全部展开导出
我来帮你写一个爬取 zTree 并异步展开所有节点的完整解决方案。
import asyncio
import aiohttp
from pyquery import PyQuery as pq
import json
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ZTreeCrawler:
def __init__(self, url, headers=None):
self.url = url
self.headers = headers or {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
self.all_nodes = []
self.session = None
async def init_session(self):
"""初始化异步会话"""
self.session = aiohttp.ClientSession(headers=self.headers)
async def close_session(self):
"""关闭会话"""
if self.session:
await self.session.close()
async def fetch_page(self, url):
"""获取页面内容"""
try:
async with self.session.get(url) as response:
return await response.text()
except Exception as e:
logger.error(f"请求失败: {url}, 错误: {e}")
return None
def parse_tree_structure(self, html):
"""解析页面中的zTree结构"""
doc = pq(html)
tree_data = []
# 查找zTree容器
tree_containers = doc('[class*="ztree"], [id*="tree"]')
for container in tree_containers.items():
# 查找树节点
nodes = container.find('li')
for node in nodes.items():
node_data = self._parse_node(node)
if node_data:
tree_data.append(node_data)
return tree_data
def _parse_node(self, node):
"""解析单个节点"""
try:
# 获取节点文本
text_elem = node.find('a, span').first()
if not text_elem:
return None
node_text = text_elem.text().strip()
# 获取节点ID和父ID(从class或data属性中)
node_id = node.attr('id') or node.attr('data-id') or ''
parent_id = node.parent('ul').parent('li').attr('id') or ''
# 检查是否有子节点(折叠状态)
has_children = node.find('ul').length > 0
is_expanded = 'open' in node.attr('class', '')
return {
'id': node_id,
'parent_id': parent_id,
'text': node_text,
'has_children': has_children,
'is_expanded': is_expanded,
'children': []
}
except Exception as e:
logger.error(f"解析节点失败: {e}")
return None
async def expand_node(self, node_data):
"""模拟展开节点(如果需要异步加载)"""
# 如果节点已经展开或没有子节点,直接返回
if node_data['is_expanded'] or not node_data['has_children']:
return node_data
# 这里需要根据实际网站的API来构造请求
# 示例:假设网站通过AJAX加载子节点
expand_url = f"{self.url}/api/tree/expand?id={node_data['id']}"
try:
async with self.session.get(expand_url) as response:
if response.status == 200:
children_data = await response.json()
node_data['children'] = children_data
node_data['is_expanded'] = True
logger.info(f"已展开节点: {node_data['text']}")
except Exception as e:
logger.warning(f"展开节点失败: {node_data['text']}, 错误: {e}")
return node_data
async def crawl_tree(self, nodes=None, parent_id=''):
"""递归爬取树的所有节点"""
if nodes is None:
# 首次获取页面
html = await self.fetch_page(self.url)
if not html:
return []
nodes = self.parse_tree_structure(html)
for node in nodes:
# 添加到所有节点列表
self.all_nodes.append({
'id': node['id'],
'parent_id': parent_id,
'text': node['text']
})
# 如果节点有子节点且未展开,先展开
if node['has_children'] and not node['is_expanded']:
expanded_node = await self.expand_node(node)
# 如果有子节点数据,递归处理
if expanded_node.get('children'):
await self.crawl_tree(expanded_node['children'], node['id'])
return self.all_nodes
async def export_to_json(self, filename='ztree_nodes.json'):
"""导出为JSON文件"""
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.all_nodes, f, ensure_ascii=False, indent=2)
logger.info(f"已导出 {len(self.all_nodes)} 个节点到 {filename}")
async def export_to_csv(self, filename='ztree_nodes.csv'):
"""导出为CSV文件"""
import csv
with open(filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['ID', 'Parent ID', 'Text'])
for node in self.all_nodes:
writer.writerow([node['id'], node['parent_id'], node['text']])
logger.info(f"已导出 {len(self.all_nodes)} 个节点到 {filename}")
async def main():
# 使用示例
url = "https://example.com/ztree-page" # 替换为实际URL
crawler = ZTreeCrawler(url)
try:
await crawler.init_session()
# 爬取所有节点
all_nodes = await crawler.crawl_tree()
# 导出数据
await crawler.export_to_json('ztree_data.json')
await crawler.export_to_csv('ztree_data.csv')
print(f"总共爬取到 {len(all_nodes)} 个节点")
finally:
await crawler.close_session()
if __name__ == "__main__":
# 运行异步主函数
asyncio.run(main())
关键点说明:
- 异步处理:使用
asyncio和aiohttp实现异步请求,提高爬取效率 - 节点解析:通过CSS选择器定位zTree节点,提取ID、文本和层级关系
- 递归展开:检测折叠节点并模拟点击展开,获取所有子节点
- 数据导出:支持JSON和CSV两种格式导出
使用前需要调整的地方:
- 替换
url为实际目标网址 - 根据网站实际情况调整
parse_tree_structure方法中的选择器 - 如果网站通过AJAX加载子节点,需要修改
expand_node方法中的API请求逻辑 - 可能需要处理登录、Cookie等认证信息
一句话建议: 先分析目标网站的zTree实现方式,再调整选择器和展开逻辑。
修改好了
with open(“tmp/ztree.csv”, “a”) as csvfile:
writer = csv.writer(csvfile)
for num,e in enumerate(elementList,start=1):
writer.writerow([e.text])
try:
print(num)
print(e.get_attribute(‘innerHTML’))
e.find_element_by_class_name(‘center_close’).click()
time.sleep(2)
#print(eparent)
#eparent.click()
except WebDriverException:
print(“Element is not clickable”)
print(e.text)
下面要改成递归方法和叫错

