Python中使用aiohttp写爬虫,如何正确停止循环?
使用 aiohttp 试着写了一个爬虫,但是发现可能会出现 一级页面还在抓取的时候,由于队列为空,直接退出的情况。不知该如何去做这个判断?另外不知以下代码这么写是否有其他的问题??
# coding:utf-8
import asyncio
import aiohttp
class Task(object):
def init(self, info, priority):
self.priority = priority
self.info = info
def __lt__(self, other):
return self.priority < other.priority
class Spider(object):
def __init__(self, loop=None):
self.loop = loop
conn = aiohttp.TCPConnector(limit=3)
self.session = aiohttp.ClientSession(loop=loop, connector=conn)
self.queue = asyncio.PriorityQueue()
def start(self, page):
task_info = {'callback': self.parse_1, 'page': page}
return task_info
async def set_item(self, task):
pass
async def fetch(self, task):
await asyncio.sleep(2)
task['callback'](task['page'])
async def worker(self):
while True:
next_task = await self.queue.get()
if next_task.info.get('type') == 'item':
asyncio.ensure_future(self.set_item(next_task.info))
else:
asyncio.ensure_future(self.fetch(next_task.info))
self.queue.task_done()
# if self.queue.empty():
# await asyncio.sleep(1)
# if self.queue.empty():
# break
def run(self):
for page in range(1, 10):
self.queue.put_nowait(Task(self.start(page), 0))
self.loop.run_until_complete(self.worker())
def close(self):
if not self.session.closed:
if self.session._connector_owner:
self.session._connector.close()
self.session._connector = None
def parse_1(self, meta):
print('parse_1-----', meta)
for page in range(20, 30):
task = {'callback': self.parse_2, 'page': page}
self.queue.put_nowait(Task(task, 1))
def parse_2(self, meta):
print('parse2----', meta)
for page in range(30, 40):
task = {'callback': self.parse_3, 'page': page}
self.queue.put_nowait(Task(task, 0))
def parse_3(self, meta):
print('parse3----', meta)
loop = asyncio.get_event_loop()
sp = Spider(loop=loop)
sp.run()
sp.close()
Python中使用aiohttp写爬虫,如何正确停止循环?
我看别人写的是先把队列放到 redis 里,对 redis 进行判断从而进行开始或终止
用 asyncio.CancelledError 配合 asyncio.gather 或者 asyncio.wait 来优雅停止。核心是拿到任务对象然后取消。
假设你的爬虫主循环在跑多个并发任务,比如这样:
import asyncio
import aiohttp
import signal
async def fetch(session, url):
try:
async with session.get(url) as response:
return await response.text()
except asyncio.CancelledError:
# 清理资源,比如关闭响应
print(f"Cancelled fetch for {url}")
raise
async def worker(name, session, queue):
while True:
url = await queue.get()
try:
html = await fetch(session, url)
print(f"{name} fetched {url[:50]}...")
finally:
queue.task_done()
async def main():
queue = asyncio.Queue()
# 往队列里塞点任务
for url in ['http://example.com/page1', 'http://example.com/page2']:
await queue.put(url)
async with aiohttp.ClientSession() as session:
# 创建多个worker任务
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'Worker-{i}', session, queue))
tasks.append(task)
# 等队列清空,或者设置个停止条件
await queue.join()
# 停止所有worker任务
for task in tasks:
task.cancel()
# 等待它们被取消完成
await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == '__main__':
asyncio.run(main())
更常见的做法是监听系统信号(比如Ctrl+C):
async def main_with_signal():
queue = asyncio.Queue()
# ... 初始化队列和session ...
tasks = []
async with aiohttp.ClientSession() as session:
for i in range(3):
task = asyncio.create_task(worker(f'Worker-{i}', session, queue))
tasks.append(task)
# 设置信号处理
loop = asyncio.get_running_loop()
stop_event = asyncio.Event()
def signal_handler():
print("\nShutting down...")
stop_event.set()
loop.add_signal_handler(signal.SIGINT, signal_handler)
try:
# 主循环,直到收到停止信号
while not stop_event.is_set():
# 这里可以加一些生产任务到队列的逻辑
await asyncio.sleep(0.1) # 防止空转
finally:
# 取消所有任务
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
# 运行这个版本
if __name__ == '__main__':
asyncio.run(main_with_signal())
关键就两点:1. 在任务里捕获 asyncio.CancelledError 做清理;2. 在外部用 task.cancel() 触发取消,然后 gather 等它结束。
简单说就是:取消任务并妥善处理取消异常。
感觉用 redis 应该也会有这个问题。。。除非是标记一下任务状态
woker 函数里别 while True;弄一个标记,如果为 True 继续,为 False 就停止,
但爬取何时完成,无法预料,也就无法给设置为 False 啊
加个 work in progress 标记
手机看的好难受,没看完,你试试 work 那里使用 wait_for 替换 ensure_future
判断任务队列用 join 而不是 empty
请教一下这个标记的动作应该放在哪里执行??
替换后无法执行。。。
呃,判断队列为空不是 empty 么?
empty 是判断队列是否为空,join 是阻塞至所有任务完成,也就是调用 task_done。你用 empty 判断队列为空,只是所有的任务都被 get 了,不代表已经完成了。你可以看看官方的例子 https://docs.python.org/3/library/asyncio-queue.html
#8 任何 worker 没处理完之前都是 WIP 啊,除了要判断队列是否为空,还要判断是否有任务是 WIP
#9 谢回复!代码中是在 while 中从队列获取任务直接注册到事件循环,然后就执行了 task_done。这么如果用 join 去判断的话应该也是一样的,我尝试用链接中的例子去改写上面的爬虫代码,但是好像也是行不通,会一直阻塞。不知何故。
#10 谢回复!判断任务状态,在官方文档中找到了 all_tasks 和 current_task 这两个方法,但是好像不好使,即使任务全部完成也不为 None,导致判断失败。。
<br> async def worker(self):<br> """<br> 任务调度<br> :return:<br> """<br> while True:<br> if not self.queue.empty():<br> next_data = await self.queue.get()<br> task_id = uuid.uuid1()<br> self.task_running_list.append(task_id)<br> if isinstance(next_data, Item):<br> asyncio.create_task(self.set_item(task_id, <a target="_blank" href="http://next_data.info" rel="nofollow noopener">next_data.info</a>))<br> else:<br> asyncio.create_task(self.fetch(task_id, <a target="_blank" href="http://next_data.info" rel="nofollow noopener">next_data.info</a>))<br> self.queue.task_done()<br> else:<br> await asyncio.sleep(0)<br> if self.queue.empty() and not self.task_running_list:<br> break<br>
根据 的提示,这么处理可解决该问题。
#12 处理完要把 task_id 删掉啊
嗯嗯,删除操作是在 fetch 方法里面执行的
怎么贴的代码?
建议 aio+mq

