Python中如何编写抓取代理并维护代理池的程序
抓代理:
项目地址: fate0/getproxy
工作原理很简单,也就是去抓代理网站、过滤可用 ip、存储至文本。更多信息可以看 README.md
代理池:
项目地址: fate0/proxylist
- 使用 fate0/getproxy 程序抓取代理
- 使用 Travis CI 跑程序,并将结果 push 到仓库中
- 使用 webtask.io 作为 cron,每 15min 触发 Travis CI 构建一次。
正常情况下,代理池的数据在 2000 - 3000 之间
最后:
欢迎各种 star、issue 以及 pr (。・`ω´・)
Python中如何编写抓取代理并维护代理池的程序
支持,需要这种
要写一个代理池,核心就是三部分:抓取、验证、管理。我一般用requests+BeautifulSoup来抓,用aiohttp来做异步验证,用Redis或者SQLite来存有效的代理。
先看抓取部分,这里有几个常用的免费代理网站:
import requests
from bs4 import BeautifulSoup
import re
import time
def fetch_proxies():
"""从几个免费代理网站抓取代理"""
proxies = []
headers = {'User-Agent': 'Mozilla/5.0'}
# 西刺代理
try:
url = 'https://www.xicidaili.com/nn/'
resp = requests.get(url, headers=headers, timeout=10)
soup = BeautifulSoup(resp.text, 'html.parser')
trs = soup.select('#ip_list tr')[1:] # 跳过表头
for tr in trs[:20]: # 取前20个
tds = tr.find_all('td')
if len(tds) >= 2:
ip = tds[1].text.strip()
port = tds[2].text.strip()
proxies.append(f'{ip}:{port}')
except:
pass
# 快代理
try:
url = 'https://www.kuaidaili.com/free/inha/'
resp = requests.get(url, headers=headers, timeout=10)
ips = re.findall(r'data-title="IP">(.*?)</td>', resp.text)
ports = re.findall(r'data-title="PORT">(.*?)</td>', resp.text)
for ip, port in zip(ips[:15], ports[:15]):
proxies.append(f'{ip}:{port}')
except:
pass
return list(set(proxies)) # 去重
抓完得验证,同步验证太慢,用异步:
import aiohttp
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def check_proxy(proxy, test_url='http://httpbin.org/ip', timeout=5):
"""异步验证代理是否可用"""
try:
connector = aiohttp.TCPConnector(ssl=False)
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get(test_url, proxy=f'http://{proxy}',
timeout=timeout) as resp:
if resp.status == 200:
text = await resp.text()
# 检查返回的IP是否确实是代理IP
if proxy.split(':')[0] in text:
return proxy
except:
pass
return None
async def validate_proxies(proxies):
"""批量验证代理"""
tasks = [check_proxy(proxy) for proxy in proxies]
results = await asyncio.gather(*tasks)
return [r for r in results if r]
管理部分用Redis比较方便,存个有序集合,按分数排序:
import redis
import json
import random
class ProxyPool:
def __init__(self, host='localhost', port=6379):
self.redis = redis.Redis(host=host, port=port, decode_responses=True)
self.key = 'proxy_pool'
def add_proxy(self, proxy, score=10):
"""添加代理,初始分数10"""
self.redis.zadd(self.key, {proxy: score})
def get_random_proxy(self):
"""随机获取一个高分数代理"""
# 获取分数>=5的代理(排除质量差的)
proxies = self.redis.zrangebyscore(self.key, 5, 100)
return random.choice(proxies) if proxies else None
def decrease_score(self, proxy):
"""降低代理分数"""
score = self.redis.zscore(self.key, proxy)
if score:
if score <= 1:
self.redis.zrem(self.key, proxy) # 分数太低就删除
else:
self.redis.zincrby(self.key, -1, proxy)
def increase_score(self, proxy):
"""增加代理分数,最高不超过100"""
score = self.redis.zscore(self.key, proxy)
if score and score < 100:
self.redis.zincrby(self.key, 1, proxy)
def get_all_proxies(self):
"""获取所有代理"""
return self.redis.zrange(self.key, 0, -1)
最后写个调度器,定时抓取和验证:
import schedule
import threading
def scheduler():
"""定时任务调度"""
pool = ProxyPool()
def fetch_job():
print('开始抓取代理...')
proxies = fetch_proxies()
print(f'抓取到 {len(proxies)} 个代理')
# 异步验证
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
valid_proxies = loop.run_until_complete(validate_proxies(proxies))
for proxy in valid_proxies:
pool.add_proxy(proxy)
print(f'验证通过 {len(valid_proxies)} 个代理')
def check_job():
"""定时检查代理池,移除失效代理"""
print('检查代理池...')
all_proxies = pool.get_all_proxies()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
valid_proxies = loop.run_until_complete(validate_proxies(all_proxies))
# 找出失效的代理
valid_set = set(valid_proxies)
for proxy in all_proxies:
if proxy not in valid_set:
pool.decrease_score(proxy)
# 每10分钟抓取一次
schedule.every(10).minutes.do(fetch_job)
# 每5分钟检查一次
schedule.every(5).minutes.do(check_job)
# 立即执行一次
fetch_job()
while True:
schedule.run_pending()
time.sleep(60)
# 在后台线程运行调度器
thread = threading.Thread(target=scheduler, daemon=True)
thread.start()
用的时候直接从pool里拿代理:
pool = ProxyPool()
proxy = pool.get_random_proxy()
if proxy:
proxies = {'http': f'http://{proxy}', 'https': f'http://{proxy}'}
response = requests.get('http://example.com', proxies=proxies, timeout=10)
if response.status_code == 200:
pool.increase_score(proxy) # 成功就加分
else:
pool.decrease_score(proxy) # 失败就减分
这样基本就够用了,关键是定时维护和分数机制。
支持
INFO:getproxy.getproxy:[] Init
INFO:getproxy.getproxy:[] Current Ip Address: 36.102.227.142
INFO:getproxy.getproxy:[] Load input proxies
INFO:getproxy.getproxy:[] Validate input proxies
INFO:getproxy.getproxy:[] Load plugins
INFO:getproxy.getproxy:[] Grab proxies
ERROR:getproxy.plugin.cnproxy:[-] Request page 1 error: (‘Connection aborted.’,
ConnectionAbortedError(10053, ‘您的主机中的软件中止了一个已建立的连接。’, None,
10053, None))
ERROR:getproxy.plugin.freeproxylist:[-] Request page 0 error: HTTPSConnectionPoo
l(host=‘free-proxy-list.net’, port=443): Max retries exceeded with url: / (Cause
d by ConnectTimeoutError(<urllib3.connection.VerifiedHTTPSConnection object at 0
x03529A30>, ‘Connection to free-proxy-list.net timed out. (connect timeout=10)’)
)
ERROR:getproxy.plugin.proxylist:[-] Request page 1 error: HTTPConnectionPool(hos
t=‘proxy-list.org’, port=80): Max retries exceeded with url: /english/index.php?
p=1 (Caused by ConnectTimeoutError(<urllib3.connection.HTTPConnection object at
0x03531730>, ‘Connection to proxy-list.org timed out. (connect timeout=10)’))
ERROR:getproxy.plugin.txt:[-] Request url http://www.proxylists.net/http_highano
n.txt error: HTTPConnectionPool(host=‘www.proxylists.net’, port=80): Max retries
exceeded with url: /http_highanon.txt (Caused by ConnectTimeoutError(<urllib3.c
onnection.HTTPConnection object at 0x035423F0>, ‘Connection to www.proxylists.ne
t timed out. (connect timeout=10)’))
INFO:getproxy.getproxy:[*] Validate web proxies
环境 Python3 win7
支持 已经 star
travis 看了想骂娘
那几个网站被国内墙了呗,等着看其他网站的结果就好了。
拿 travis 干这事感觉挺好的,免费、方便、不被墙 =。=
不错啊
已 star
已 star
lz 考虑加这个不 proxydb.net
这个可以有
我擦。牛逼


