Python中如何实现异步抓取验证的IP代理池开源项目?

项目地址 hproxy ❤️️️

特性

  • [x] 多种方式进行数据存储,易扩展:

  • [x] 自定义爬虫基础部件,上手简单,统一代码风格:

  • [x] 提供 API 获取代理,启动后访问 127.0.0.1:8001/api

    • 'delete/:proxy': 删除代理
    • 'get': 随机选择一个代理
    • 'list':列出全部代理
    • ...
  • [x] 从代理池随机选取一个代理提供 html 源码抓取服务

  • [x] 定时抓取、更新、自动验证

功能描述

代理获取

本项目的爬虫代码全部集中于目录spider,在/spider/proxy_spider/目录下定义了一系列代理网站的爬虫,所有爬虫基于/spider/base/proxy_spider.py里定义的规范编写,参考这些,就可以很方便的扩展一系列代理爬虫

运行spider_console.py文件,即可启动全部爬虫进行代理的获取,无需定义新加的爬虫脚本,只需按照规范命名,即可自动获取爬虫模块然后运行

代理接口

// http://127.0.0.1:8001/api/get?valid=1
// 返回成功,开启验证参数 valid=1 的话 speed 会有值,并且默认是开启的
// types 1:高匿 2:匿名 3:透明
{
    "status": 1,
    "info": {
        "proxy": "101.37.79.125:3128",
        "types": 3
    },
    "msg": "success",
    "speed": 2.4909408092
}
// http://127.0.0.1:8001/api/list 列出所有代理,没有一个个验证
{
    "status": 1,
    "info": {
        "180.168.184.179:53128": {
            "proxy": "180.168.184.179:53128",
            "types": 3
        },
        "101.37.79.125:3128": {
            "proxy": "101.37.79.125:3128",
            "types": 3
        }
    },
    "msg": "success"
}
// http://127.0.0.1:8001/api/delete/171.39.45.6:8123
{
    "status": 1,
    "msg": "success"
}
// http://127.0.0.1:8001/api/valid/183.159.91.75:18118
{
    "status": 1,
    "msg": "success",
    "speed": 0.3361008167
}
// http://127.0.0.1:8001/api/html?url=https://www.v2ex.com
// 随机选取代理抓取 v2ex
{
    "status": 1,
    "info": {
        "html": "html 源码",
        "proxy": "120.77.254.116:3128"
    },
    "msg": "success"
}

希望对你有帮助,大佬勿喷,顺便问一句,爬取亚马逊有比较好的方案么?


Python中如何实现异步抓取验证的IP代理池开源项目?

6 回复

同道


要搞一个异步抓取的IP代理池,核心就是用aiohttp做异步请求,asyncio管理协程。下面这个简化版示例展示了基本架构:

import asyncio
import aiohttp
from aiohttp import ClientSession, ClientTimeout
import logging
from typing import List, Optional
import random

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class AsyncProxyPool:
    def __init__(self, test_url: str = "http://httpbin.org/ip", timeout: int = 10):
        self.test_url = test_url
        self.timeout = ClientTimeout(total=timeout)
        self.valid_proxies = []
        self.sources = [
            "http://www.proxysource1.com/list",  # 替换为实际代理源
            "http://www.proxysource2.com/list"
        ]
    
    async def fetch_proxies(self, session: ClientSession, url: str) -> List[str]:
        """从指定源获取代理列表"""
        try:
            async with session.get(url, timeout=self.timeout) as response:
                if response.status == 200:
                    text = await response.text()
                    # 这里需要根据实际网页结构解析代理IP
                    # 示例:简单按行分割
                    proxies = [line.strip() for line in text.split('\n') if line.strip()]
                    return proxies
        except Exception as e:
            logger.error(f"获取代理失败 {url}: {e}")
        return []
    
    async def validate_proxy(self, session: ClientSession, proxy: str) -> bool:
        """验证单个代理是否可用"""
        try:
            proxy_url = f"http://{proxy}"
            async with session.get(
                self.test_url, 
                proxy=proxy_url,
                timeout=self.timeout
            ) as response:
                if response.status == 200:
                    data = await response.json()
                    logger.info(f"代理 {proxy} 验证成功: {data.get('origin')}")
                    return True
        except Exception as e:
            logger.debug(f"代理 {proxy} 验证失败: {e}")
        return False
    
    async def validate_all_proxies(self, proxies: List[str]) -> List[str]:
        """并发验证所有代理"""
        async with aiohttp.ClientSession() as session:
            tasks = [self.validate_proxy(session, proxy) for proxy in proxies]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            valid_proxies = [
                proxy for proxy, is_valid in zip(proxies, results) 
                if isinstance(is_valid, bool) and is_valid
            ]
            return valid_proxies
    
    async def update_pool(self):
        """更新代理池:抓取 + 验证"""
        all_proxies = []
        
        # 并发从多个源抓取代理
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch_proxies(session, url) for url in self.sources]
            results = await asyncio.gather(*tasks)
            
            for proxies in results:
                all_proxies.extend(proxies)
        
        # 去重
        unique_proxies = list(set(all_proxies))
        logger.info(f"抓取到 {len(unique_proxies)} 个唯一代理")
        
        # 并发验证所有代理
        valid_proxies = await self.validate_all_proxies(unique_proxies)
        
        self.valid_proxies = valid_proxies
        logger.info(f"验证通过 {len(valid_proxies)} 个代理")
    
    def get_random_proxy(self) -> Optional[str]:
        """随机获取一个可用代理"""
        if self.valid_proxies:
            return random.choice(self.valid_proxies)
        return None

async def main():
    # 使用示例
    proxy_pool = AsyncProxyPool()
    
    # 更新代理池
    await proxy_pool.update_pool()
    
    # 获取随机代理
    proxy = proxy_pool.get_random_proxy()
    if proxy:
        print(f"随机代理: {proxy}")
        
        # 使用代理发起请求示例
        async with aiohttp.ClientSession() as session:
            try:
                async with session.get(
                    "http://httpbin.org/headers",
                    proxy=f"http://{proxy}"
                ) as response:
                    print(f"使用代理请求状态: {response.status}")
            except Exception as e:
                print(f"代理请求失败: {e}")

if __name__ == "__main__":
    asyncio.run(main())

关键点:

  1. 异步架构:用asyncioaiohttp实现全异步操作,比同步请求快得多
  2. 并发验证asyncio.gather()同时验证多个代理,效率高
  3. 代理源管理:可以扩展多个代理源,提高代理数量
  4. 验证机制:通过实际请求测试代理可用性

实际项目中还需要添加:

  • 代理存储(Redis/数据库)
  • 定时更新任务
  • 代理评分机制(响应速度、成功率)
  • 更健壮的代理源解析

建议直接参考开源项目如proxy_poolproxypool,在其基础上改异步。

这是爬的网上的免费代理吗?质量怎么样?

希望能给一个在线的地址直接使用 (手动滑稽)

只能说接口保证返回一个有用的

可以有

回到顶部