Python中gevent的AsyncResult结果不一致问题如何解决?
我有一个需求需要启动 100 个 coroutine 然后讲结果异步传输过去,我就用 AsyncResult 来接受结果,但是接受的结果却只有最后一次,代码如下:
# coding=utf8
from gevent.monkey import patch_all
patch_all()
import gevent
from gevent.event import AsyncResult
from gevent.pool import Pool
from gevent import Greenlet
async = AsyncResult()
pool = Pool()
def Recv():
async.wait()
print ‘recv:{0}’.format(async.get())
class Send(Greenlet):
def __init__(self, v):
self.v = v
self.event = async
self.pool = pool
super(Send, self).__init__()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.event.set_exception(ValueError)
def _run(self):
gevent.sleep(3)
print 'send:{0}'.format(self.v)
self.event.set(self.v)
def main():
for i in xrange(10):
s = Send(i)
pool.start(s)
pool.spawn(Recv)
pool.join()
if name == ‘main’:
main()
但是运行结果却是这样的:
send:0
send:1
send:2
send:3
send:4
send:5
send:6
send:7
send:8
send:9
recv:9
recv:9
recv:9
recv:9
recv:9
recv:9
recv:9
recv:9
recv:9
recv:9
怎么才能完整接收到 send 的结果而且不堵塞其它的协程呢?(顺序不重要)
Python中gevent的AsyncResult结果不一致问题如何解决?
这个问题我遇到过。gevent.AsyncResult 的结果不一致,核心原因通常是你在多个 greenlet 里对同一个 AsyncResult 对象调用了多次 get() 或 wait()。
AsyncResult 的设计是“一次性”的。当一个 greenlet 调用 result.set(value) 后,第一个调用 result.get() 的协程会拿到这个值并“消耗”掉它。之后其他还在等待的协程再调用 get(),行为就不可预测了:可能抛出异常(如 AlreadyReadError),也可能永远阻塞,取决于你的 gevent 版本和场景。
解决思路就一个:确保一个 AsyncResult 只被一个消费者 get() 一次。 有几种常见做法:
- 为每个任务创建独立的
AsyncResult:这是最清晰、最推荐的做法。不要复用对象。 - 使用
Group或Pool管理多个异步结果:比如gevent.pool.Pool的apply_async会返回独立的AsyncResult对象,天然隔离。 - 如果需要广播结果,用
Event或自己封装:如果多个协程需要等待同一个结果,用gevent.event.Event更合适。或者用一个AsyncResult搭配一个共享的结果变量。
代码示例:错误用法 vs 正确用法
import gevent
from gevent.event import AsyncResult
# ---------- 错误用法:多个消费者争抢同一个 AsyncResult ----------
def consumer(name, ar):
print(f"{name} 开始等待结果...")
# 多个consumer调用同一个ar.get(),会导致不一致
result = ar.get()
print(f"{name} 拿到结果: {result}")
def producer_bad():
ar = AsyncResult()
# 启动两个消费者协程
gevent.spawn(consumer, "消费者A", ar)
gevent.spawn(consumer, "消费者B", ar)
gevent.sleep(1) # 模拟耗时
ar.set("最终结果")
gevent.sleep(0.1)
print("--- 错误示例输出(行为不确定)---")
producer_bad()
gevent.sleep(0.5)
# ---------- 正确用法1:每个任务独立的 AsyncResult ----------
def worker(task_id, ar):
gevent.sleep(0.5) # 模拟工作
ar.set(f"任务{task_id}的完成结果")
def correct_individual():
results = []
for i in range(3):
ar = AsyncResult() # 关键:每个任务都有自己的AsyncResult
gevent.spawn(worker, i, ar)
results.append(ar)
# 分别获取,互不干扰
for i, ar in enumerate(results):
print(f"获取任务{i}的结果: {ar.get()}")
print("\n--- 正确示例1:独立对象 ---")
correct_individual()
# ---------- 正确用法2:使用 Event 广播 ----------
from gevent.event import Event
def event_consumer(name, evt, shared_result):
print(f"{name} 等待事件触发...")
evt.wait() # 所有消费者都在这里等待同一个事件
print(f"{name} 事件已触发,读取共享结果: {shared_result[0]}")
def producer_with_event():
shared_container = [None] # 用容器共享结果
evt = Event()
# 启动消费者
gevent.spawn(event_consumer, "消费者X", evt, shared_container)
gevent.spawn(event_consumer, "消费者Y", evt, shared_container)
gevent.sleep(1)
shared_container[0] = "广播的结果"
evt.set() # 通知所有等待者
gevent.sleep(0.1)
print("\n--- 正确示例2:使用 Event 广播 ---")
producer_with_event()
总结:别复用 AsyncResult 对象,一个结果对应一个消费者。
这种能看文档解决的问题就没必要写那么多文字来问了吧,浪费自己和大家的时间,
> A one-time event that stores a value or an exception.

