要一天抓取100万张网页,关键在于并发处理和资源管理。核心思路是用异步IO配合连接池,同时控制好请求频率。
这里给个基于aiohttp和asyncio的示例:
import asyncio
import aiohttp
from aiohttp import ClientSession, TCPConnector
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MassCrawler:
def __init__(self, urls_file, max_concurrent=500):
self.urls_file = urls_file
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
self.processed = 0
self.start_time = None
async def fetch_page(self, session: ClientSession, url: str):
async with self.semaphore: # 控制并发数
try:
async with session.get(url, timeout=30) as response:
html = await response.text()
# 这里处理页面内容,比如保存到文件或数据库
self.processed += 1
if self.processed % 1000 == 0:
elapsed = time.time() - self.start_time
rate = self.processed / elapsed
logger.info(f"已处理 {self.processed} 个页面,速率: {rate:.2f} 页/秒")
return html
except Exception as e:
logger.error(f"抓取失败 {url}: {e}")
return None
async def worker(self, session: ClientSession, url_queue: asyncio.Queue):
while True:
try:
url = await url_queue.get()
await self.fetch_page(session, url)
url_queue.task_done()
except asyncio.CancelledError:
break
async def run(self):
self.start_time = time.time()
# 读取URL列表
with open(self.urls_file, 'r') as f:
urls = [line.strip() for line in f if line.strip()]
# 创建连接池,限制总连接数
connector = TCPConnector(limit=1000, limit_per_host=50)
async with ClientSession(connector=connector) as session:
# 创建任务队列
url_queue = asyncio.Queue(maxsize=10000)
# 预先填充队列
for url in urls[:1000000]: # 限制前100万
await url_queue.put(url)
# 启动工作协程
workers = [
asyncio.create_task(self.worker(session, url_queue))
for _ in range(self.max_concurrent)
]
# 等待队列处理完成
await url_queue.join()
# 取消工作协程
for w in workers:
w.cancel()
# 等待所有worker结束
await asyncio.gather(*workers, return_exceptions=True)
elapsed = time.time() - self.start_time
logger.info(f"完成! 处理了 {self.processed} 个页面,总耗时: {elapsed:.2f} 秒")
# 使用示例
async def main():
crawler = MassCrawler('urls.txt', max_concurrent=800)
await crawler.run()
if __name__ == '__main__':
asyncio.run(main())
几个关键点:
- 异步IO:用asyncio+aiohttp避免线程开销,一个事件循环就能处理大量并发连接
- 连接池管理:TCPConnector控制总连接数和单主机连接数,避免把目标服务器搞垮
- 信号量限流:用Semaphore控制最大并发数,防止内存爆掉
- 队列缓冲:用asyncio.Queue做生产消费者模式,平衡读写速度
要真正达到100万/天(约11.6页/秒),需要:
- 足够带宽和IP资源
- 目标服务器能承受这个压力
- 考虑用代理池分散请求
- 做好异常处理和重试机制
代码里留了扩展点,比如可以在fetch_page里加代理切换、内容解析、数据存储等。
总结:用异步IO配合连接池控制是关键。