Python爬虫入门:如何爬取ztree树上的节点并异步全部展开导出

http://www.treejs.cn/v3/demo/cn/core/simpleData.html

目前只能理出简单爬已经展开的节点。模拟点击一层层加载怎么实现?

''''

def get_html_src(url):
driver = webdriver.Chrome("/Users/qk/Downloads/chromedriver")
driver.get(url)
parentElement = driver.find_element_by_id('treeDemo')
elementList = parentElement.find_elements_by_tag_name("li")

with open(“tmp/ztree.csv”, “a”) as csvfile:
writer = csv.writer(csvfile)

for e in elementList:
    writer.writerow([e.text])
    e.find_elements_by_tag_name
    print('',)

    print('节点名称:', e.text)

csvfile.close() time.sleep(10)

driver.close()

if name == “main”:

get_html_src(‘http://www.treejs.cn/v3/demo/cn/core/simpleData.html’)

''''


Python爬虫入门:如何爬取ztree树上的节点并异步全部展开导出

3 回复

我来帮你写一个爬取 zTree 并异步展开所有节点的完整解决方案。

import asyncio
import aiohttp
from pyquery import PyQuery as pq
import json
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ZTreeCrawler:
    def __init__(self, url, headers=None):
        self.url = url
        self.headers = headers or {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        }
        self.all_nodes = []
        self.session = None
        
    async def init_session(self):
        """初始化异步会话"""
        self.session = aiohttp.ClientSession(headers=self.headers)
        
    async def close_session(self):
        """关闭会话"""
        if self.session:
            await self.session.close()
            
    async def fetch_page(self, url):
        """获取页面内容"""
        try:
            async with self.session.get(url) as response:
                return await response.text()
        except Exception as e:
            logger.error(f"请求失败: {url}, 错误: {e}")
            return None
            
    def parse_tree_structure(self, html):
        """解析页面中的zTree结构"""
        doc = pq(html)
        tree_data = []
        
        # 查找zTree容器
        tree_containers = doc('[class*="ztree"], [id*="tree"]')
        
        for container in tree_containers.items():
            # 查找树节点
            nodes = container.find('li')
            for node in nodes.items():
                node_data = self._parse_node(node)
                if node_data:
                    tree_data.append(node_data)
                    
        return tree_data
        
    def _parse_node(self, node):
        """解析单个节点"""
        try:
            # 获取节点文本
            text_elem = node.find('a, span').first()
            if not text_elem:
                return None
                
            node_text = text_elem.text().strip()
            
            # 获取节点ID和父ID(从class或data属性中)
            node_id = node.attr('id') or node.attr('data-id') or ''
            parent_id = node.parent('ul').parent('li').attr('id') or ''
            
            # 检查是否有子节点(折叠状态)
            has_children = node.find('ul').length > 0
            is_expanded = 'open' in node.attr('class', '')
            
            return {
                'id': node_id,
                'parent_id': parent_id,
                'text': node_text,
                'has_children': has_children,
                'is_expanded': is_expanded,
                'children': []
            }
        except Exception as e:
            logger.error(f"解析节点失败: {e}")
            return None
            
    async def expand_node(self, node_data):
        """模拟展开节点(如果需要异步加载)"""
        # 如果节点已经展开或没有子节点,直接返回
        if node_data['is_expanded'] or not node_data['has_children']:
            return node_data
            
        # 这里需要根据实际网站的API来构造请求
        # 示例:假设网站通过AJAX加载子节点
        expand_url = f"{self.url}/api/tree/expand?id={node_data['id']}"
        
        try:
            async with self.session.get(expand_url) as response:
                if response.status == 200:
                    children_data = await response.json()
                    node_data['children'] = children_data
                    node_data['is_expanded'] = True
                    logger.info(f"已展开节点: {node_data['text']}")
        except Exception as e:
            logger.warning(f"展开节点失败: {node_data['text']}, 错误: {e}")
            
        return node_data
        
    async def crawl_tree(self, nodes=None, parent_id=''):
        """递归爬取树的所有节点"""
        if nodes is None:
            # 首次获取页面
            html = await self.fetch_page(self.url)
            if not html:
                return []
                
            nodes = self.parse_tree_structure(html)
            
        for node in nodes:
            # 添加到所有节点列表
            self.all_nodes.append({
                'id': node['id'],
                'parent_id': parent_id,
                'text': node['text']
            })
            
            # 如果节点有子节点且未展开,先展开
            if node['has_children'] and not node['is_expanded']:
                expanded_node = await self.expand_node(node)
                
                # 如果有子节点数据,递归处理
                if expanded_node.get('children'):
                    await self.crawl_tree(expanded_node['children'], node['id'])
                    
        return self.all_nodes
        
    async def export_to_json(self, filename='ztree_nodes.json'):
        """导出为JSON文件"""
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(self.all_nodes, f, ensure_ascii=False, indent=2)
        logger.info(f"已导出 {len(self.all_nodes)} 个节点到 {filename}")
        
    async def export_to_csv(self, filename='ztree_nodes.csv'):
        """导出为CSV文件"""
        import csv
        
        with open(filename, 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            writer.writerow(['ID', 'Parent ID', 'Text'])
            
            for node in self.all_nodes:
                writer.writerow([node['id'], node['parent_id'], node['text']])
                
        logger.info(f"已导出 {len(self.all_nodes)} 个节点到 {filename}")

async def main():
    # 使用示例
    url = "https://example.com/ztree-page"  # 替换为实际URL
    
    crawler = ZTreeCrawler(url)
    
    try:
        await crawler.init_session()
        
        # 爬取所有节点
        all_nodes = await crawler.crawl_tree()
        
        # 导出数据
        await crawler.export_to_json('ztree_data.json')
        await crawler.export_to_csv('ztree_data.csv')
        
        print(f"总共爬取到 {len(all_nodes)} 个节点")
        
    finally:
        await crawler.close_session()

if __name__ == "__main__":
    # 运行异步主函数
    asyncio.run(main())

关键点说明:

  1. 异步处理:使用asyncioaiohttp实现异步请求,提高爬取效率
  2. 节点解析:通过CSS选择器定位zTree节点,提取ID、文本和层级关系
  3. 递归展开:检测折叠节点并模拟点击展开,获取所有子节点
  4. 数据导出:支持JSON和CSV两种格式导出

使用前需要调整的地方:

  1. 替换url为实际目标网址
  2. 根据网站实际情况调整parse_tree_structure方法中的选择器
  3. 如果网站通过AJAX加载子节点,需要修改expand_node方法中的API请求逻辑
  4. 可能需要处理登录、Cookie等认证信息

一句话建议: 先分析目标网站的zTree实现方式,再调整选择器和展开逻辑。


修改好了

with open(“tmp/ztree.csv”, “a”) as csvfile:
writer = csv.writer(csvfile)
for num,e in enumerate(elementList,start=1):
writer.writerow([e.text])
try:
print(num)
print(e.get_attribute(‘innerHTML’))
e.find_element_by_class_name(‘center_close’).click()
time.sleep(2)
#print(eparent)
#eparent.click()
except WebDriverException:
print(“Element is not clickable”)


print(e.text)

下面要改成递归方法和叫错

回到顶部