Python中gevent的AsyncResult结果不一致问题如何解决?

我有一个需求需要启动 100 个 coroutine 然后讲结果异步传输过去,我就用 AsyncResult 来接受结果,但是接受的结果却只有最后一次,代码如下:

# coding=utf8

from gevent.monkey import patch_all patch_all() import gevent from gevent.event import AsyncResult from gevent.pool import Pool from gevent import Greenlet

async = AsyncResult() pool = Pool()

def Recv(): async.wait() print ‘recv:{0}’.format(async.get())

class Send(Greenlet):

def __init__(self, v):
    self.v = v
    self.event = async
    self.pool = pool
    super(Send, self).__init__()

def __enter__(self):
    return self

def __exit__(self, exc_type, exc_val, exc_tb):
    self.event.set_exception(ValueError)

def _run(self):
    gevent.sleep(3)
    print 'send:{0}'.format(self.v)
    self.event.set(self.v)

def main():

for i in xrange(10):
    s = Send(i)
    pool.start(s)
    pool.spawn(Recv)

pool.join()

if name == ‘main’: main()

但是运行结果却是这样的:

send:0
send:1
send:2
send:3
send:4
send:5
send:6
send:7
send:8
send:9
recv:9
recv:9
recv:9
recv:9
recv:9
recv:9
recv:9
recv:9
recv:9
recv:9

怎么才能完整接收到 send 的结果而且不堵塞其它的协程呢?(顺序不重要)


Python中gevent的AsyncResult结果不一致问题如何解决?

3 回复

这个问题我遇到过。gevent.AsyncResult 的结果不一致,核心原因通常是你在多个 greenlet 里对同一个 AsyncResult 对象调用了多次 get()wait()

AsyncResult 的设计是“一次性”的。当一个 greenlet 调用 result.set(value) 后,第一个调用 result.get() 的协程会拿到这个值并“消耗”掉它。之后其他还在等待的协程再调用 get(),行为就不可预测了:可能抛出异常(如 AlreadyReadError),也可能永远阻塞,取决于你的 gevent 版本和场景。

解决思路就一个:确保一个 AsyncResult 只被一个消费者 get() 一次。 有几种常见做法:

  1. 为每个任务创建独立的 AsyncResult:这是最清晰、最推荐的做法。不要复用对象。
  2. 使用 GroupPool 管理多个异步结果:比如 gevent.pool.Poolapply_async 会返回独立的 AsyncResult 对象,天然隔离。
  3. 如果需要广播结果,用 Event 或自己封装:如果多个协程需要等待同一个结果,用 gevent.event.Event 更合适。或者用一个 AsyncResult 搭配一个共享的结果变量。

代码示例:错误用法 vs 正确用法

import gevent
from gevent.event import AsyncResult

# ---------- 错误用法:多个消费者争抢同一个 AsyncResult ----------
def consumer(name, ar):
    print(f"{name} 开始等待结果...")
    # 多个consumer调用同一个ar.get(),会导致不一致
    result = ar.get()
    print(f"{name} 拿到结果: {result}")

def producer_bad():
    ar = AsyncResult()
    # 启动两个消费者协程
    gevent.spawn(consumer, "消费者A", ar)
    gevent.spawn(consumer, "消费者B", ar)
    gevent.sleep(1)  # 模拟耗时
    ar.set("最终结果")
    gevent.sleep(0.1)

print("--- 错误示例输出(行为不确定)---")
producer_bad()
gevent.sleep(0.5)

# ---------- 正确用法1:每个任务独立的 AsyncResult ----------
def worker(task_id, ar):
    gevent.sleep(0.5)  # 模拟工作
    ar.set(f"任务{task_id}的完成结果")

def correct_individual():
    results = []
    for i in range(3):
        ar = AsyncResult()  # 关键:每个任务都有自己的AsyncResult
        gevent.spawn(worker, i, ar)
        results.append(ar)

    # 分别获取,互不干扰
    for i, ar in enumerate(results):
        print(f"获取任务{i}的结果: {ar.get()}")

print("\n--- 正确示例1:独立对象 ---")
correct_individual()

# ---------- 正确用法2:使用 Event 广播 ----------
from gevent.event import Event

def event_consumer(name, evt, shared_result):
    print(f"{name} 等待事件触发...")
    evt.wait()  # 所有消费者都在这里等待同一个事件
    print(f"{name} 事件已触发,读取共享结果: {shared_result[0]}")

def producer_with_event():
    shared_container = [None]  # 用容器共享结果
    evt = Event()
    # 启动消费者
    gevent.spawn(event_consumer, "消费者X", evt, shared_container)
    gevent.spawn(event_consumer, "消费者Y", evt, shared_container)
    gevent.sleep(1)
    shared_container[0] = "广播的结果"
    evt.set()  # 通知所有等待者
    gevent.sleep(0.1)

print("\n--- 正确示例2:使用 Event 广播 ---")
producer_with_event()

总结:别复用 AsyncResult 对象,一个结果对应一个消费者。


这种能看文档解决的问题就没必要写那么多文字来问了吧,浪费自己和大家的时间,

> A one-time event that stores a value or an exception.

回到顶部