Python异步编程中如何快速获取大量请求中的第一个有效返回值?

情景:
我的一个 list 中,有很多 future, 每个执行耗时不定。返回结果 None 或者 某个值。
future1…futuren 中,第一个不为 None 的值,就是我想要的。
如果用下面的语句
python <br>result = yield [future1(), future2(), ....] <br>
会把所有的都执行完。

但是有可能会有这种情况:
future1,future2 结果返回 None,future3 不为 None
我就可以直接取 future3 的结果了。


请教如何实现?
Python异步编程中如何快速获取大量请求中的第一个有效返回值?


17 回复

这种 polling 工作肯定都是库或者框架做的,然而你又没有具体说到底用哪个异步库或者框架。


import asyncio
import aiohttp
from typing import Any, Optional

async def fetch_first_valid(
    urls: list[str],
    session: aiohttp.ClientSession,
    timeout: float = 5.0
) -> Optional[Any]:
    """
    并发请求多个URL,返回第一个成功的响应内容
    """
    async def fetch_one(url: str) -> str:
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=timeout)) as resp:
            resp.raise_for_status()
            return await resp.text()
    
    tasks = [asyncio.create_task(fetch_one(url)) for url in urls]
    
    # 使用asyncio.wait等待第一个完成的任务
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )
    
    # 取消剩余未完成的任务
    for task in pending:
        task.cancel()
    
    # 获取第一个完成的结果
    if done:
        first_task = next(iter(done))
        try:
            return first_task.result()
        except Exception:
            # 如果第一个任务失败,可以继续等待其他任务
            # 这里简单返回None,实际可根据需求调整
            pass
    
    # 等待所有任务结束(避免cancel警告)
    await asyncio.gather(*pending, return_exceptions=True)
    return None

async def main():
    urls = [
        "https://api1.example.com/data",
        "https://api2.example.com/endpoint",
        "https://backup.example.com/info"
    ]
    
    async with aiohttp.ClientSession() as session:
        result = await fetch_first_valid(urls, session)
        print(f"First valid response: {result}")

if __name__ == "__main__":
    asyncio.run(main())

核心思路就是用asyncio.wait()FIRST_COMPLETED参数,配合任务取消机制。这里有几个关键点:

  1. 并发创建所有请求任务:用列表推导式同时创建所有请求的Task对象
  2. 等待第一个完成asyncio.wait()设置return_when=asyncio.FIRST_COMPLETED,一旦有任务完成就返回
  3. 清理剩余任务:立即取消其他还在进行的请求,避免资源浪费
  4. 异常处理:第一个完成的请求可能失败,这时可以继续等待其他任务或直接返回

如果第一个请求失败想继续等下一个,可以改成循环处理。不过通常多个备用API的场景,第一个能成的概率就很高了。

FIRST_COMPLETED比挨个等快多了,特别是当备用接口响应时间差异大时。

(换 js,用 promise

额,tornado,我是因为不想太限制

结果入队列,然后轮训?


http://www.tornadoweb.org/en/stable/queues.html

4# 思路是对的,实际上就是把结果入队列,取第一个。只不过这个队列不需要轮询这么费力而已。

包装一下 返回前 callback

你可以选择不用 yield,直接 add_done_callback

结果套同一个 func,判断第一个非 none 的做处理,其它的 abort

所有 future 的结果都扔到一个 BlockingQueue 里面然后从这个里面取

感觉这个方案最便捷

我记得 pycon 2018 trio 的作者就讲了这个

不知道楼主的意思是不是这样的同时运行多个,然后获取返回的结果,如果是 None 忽略 不是就获取值并结束
task_list = []
done, pending = await asyncio.wait(task_list)
for task in done:
if task.result() != None:
result = task.result()
break

rxPY 这种需求还是用专门库吧,比自己琢磨方便多了

可能是我表述不清楚,其实是这样的:如果第一个返回的是 future3 且结果不为 None,那么我还得等 future1 和 future2。1…n 是有优先级顺序的。当 future i 返回,且有结果时,可以确定的是 i+1 到 n 是可以丢弃了,但是得等 future1 到 future (i-1)的结果

所以你获取的顺序不是按执行结束时间排序而是按原来排列好的队列?如果是这样的话,简单点直接先遍历队列 put 到 asynico.queue 里面,再依次 get 出来运行,如果运行结果非 none 就不用再 get 直接退出来

官方的 asyncio 已经提供你所需要的 API 了:
asyncio.as_completed(fs, *, loop=None, timeout=None)
Return an iterator whose values, when waited for, are Future instances.

上面提到的 asyncio.wait 会等待所有的执行完,asyncio.as_completed 只要有一个执行完就立即返回,例如你要的效果:
futures = [future1, future2, future3]
for next_completed in asyncio.as_completed(futures):
result = await next_completed
if result is not None:
break
这样在取结果的时候,其它的 future 还在执行,应该就是你要的效果。

回到顶部