Python中关于asyncio的问题,再次请教

有两段代码,是关于 asyncio 的。
代码段一:

import asyncio

async def worker_1(): print(‘worker_1 start’) await asyncio.sleep(1) print(‘worker_1 done’)

async def worker_2(): print(‘worker_2 start’) await asyncio.sleep(2) print(‘worker_2 done’)

async def main(): print(‘before await’) await worker_1() print(‘awaited worker_1’) await worker_2() print(‘awaited worker_2’)

%time asyncio.run(main())

########## 输出 ##########

before await worker_1 start worker_1 done awaited worker_1 worker_2 start worker_2 done awaited worker_2 Wall time: 3 s

代码段二:

import asyncio

async def worker_1(): print(‘worker_1 start’) await asyncio.sleep(1) print(‘worker_1 done’)

async def worker_2(): print(‘worker_2 start’) await asyncio.sleep(2) print(‘worker_2 done’)

async def main(): task1 = asyncio.create_task(worker_1()) task2 = asyncio.create_task(worker_2()) print(‘before await’) await task1 print(‘awaited worker_1’) await task2 print(‘awaited worker_2’)

%time asyncio.run(main())

########## 输出 ##########

before await worker_1 start worker_2 start worker_1 done awaited worker_1 worker_2 done awaited worker_2 Wall time: 2.01 s

问题:代码段一里面的协程(coroutine)换成代码段二的任务(task)后,为什么执行顺序就变了?这个过程中发生了什么事情?

说说我的猜想:
发现调用 asyncio.run(main()) 或者 [asyncio.gather()->asyncio.get_event_loop()->loop.run_until_complete()]都会将一个 coroutine 转化成 task/future 再放 event loop 里面去, 交由 event loop 去管理这些 task/future。代码段一只将 main()这个 coroutine 封装成了 task 加入到 event loop 中,所以整个 event loop 中只有一个 task 在走,在这个 task 中代码是顺序执行的,所以最后呈现出同步执行的结果;
但是代码段二调用了两次 asyncio.create_task(),这个方法会将一个 coroutine 转换成一个 task 并且放到 event loop 中,所以整个 event loop 其实有三个 task ( main,task1,task2 ),之后程序就交给 event loop 来调度,执行顺序就变不同了。 这个假设目前来看好像能解释得通

最后,希望各位能指点一下~


Python中关于asyncio的问题,再次请教

88 回复

好奇,先收藏

你代码 1 是顺序执行的,
代码 2 加入 create_task 实际上已经把 worker1 和 worker2 这两个函数创建成 task 加入到了 eventLoop 里,你 worker1 函数里有 await sleep(1) , 让出 cpu 后 eventLoop 会继续调度下一个,这里基本就是 worker2. 所以第二个看起来像并行的, 这里的并行是指你 worker1 里主动让出了 cpu, 所以 worker2 在你 worker1 sleep 的过程中可以运行了。

大概应该就是这个道理

create_task 创建后就加入到了调度器,就会被执行, 不是说等你 await 他的时候他才执行,await 只是等待他执行完成。

这是单线程吧…应该没有让出 CPU 的操作吧…

你以为协程是为什么被发明出来的?就是为了单线程里面,应用程序层面上,可以最大效率地执行多个可能阻塞(比如 io 和 sleep )的任务,避免 cpu 空置呀。所以让出 cpu 这个说法很准确

你说的话不就是我的猜想么?另外你也是跟我一样的猜想,实际上我稍微去找过源码,我只看到了函数 create_task 里面的注释说将协程加入 event loop 调度并返回一个 task 对象,但是我并没有找到将协程加入 event loop 的代码在哪里。
另外你可以解释一下为什么代码 1 是顺序执行的吗?跟代码 2 对比起来不同的原因是什么?

我都没看你的猜想 =_=

你代码 1 里明显是一个一个创建的 task。你又没有显示的加入到 eventloop, 那就只能是在 await 的时候才加进去执行


我是说这个 task 让出 cpu,让其他的 task 执行。。

协程就是非阻塞,处理 io 时会通知协程,协程休眠,不占用 cpu 等到 io 结束协程在占用 cpu,这一步可以说让出 cpu

所以你可以解释一下吗~为什么将协程换成 task 之后执行顺序就不一样了?

代码 2 执行顺序不一样是因为 create_task 的两个 task 运行不相互阻塞。
https://github.com/python/asyncio/blob/master/asyncio/base_events.py
create_task:227
call_soon: 562

首先代码一里面只有 main()这个主异步函数会被转换成 task,其他的还是 coroutine,另外 create_task 调用之后,转换出来的 task 也没有明显加入 event loop,不过你能找到加进去 event loop 的代码的话,更能支持你的说法…

create_task 相当于之前的 ensure_future,实际上就是把任务放入 event_loop,并安排他执行。asynico 3.4 之前是 python 编写的,3.7 应该都是 c 了

我翻源码的时候也看到了这里,但是 create_task 这个函数看上去只是返回了创建的 task 对象,这个函数的注释是"""Schedule a coroutine object.
Return a task object.
“”"
请问 Schedule a coroutine object.怎么体现出来?在哪里 schedule 的?

create_task 返回的是已经被加入到调度器的 task, 你不用 await, 也可以直接用 task.done(), task.result()等方法来获取该 task 执行的结果。

而且你对代码 1 里只有 main()这个函数会被转换成 task 这个理解也不正确,async 函数 也会被创建成 task 加入到 eventloop 中。 eventloop 是不会直接执行 async 函数的

同时 3.7 支持 asyncio.all_tasks 来返回事件循环所运行的未完成的 Task 对象的集合 和 asyncio.current_task 返回当前运行的 Task 实例你可以自己调试下

你要再深入 tasks.py 那个文件去看,然后又会回到这个 baseEvent 下的 call_soon 函数,中点不是返回了什么,而是 Task 在初始化的时候做了什么。

我还是同样的问题,我能看到注释说 Schedule the execution of a coroutine object in a spawn task.
Return a Task object.
问题是 Schedule the execution of a coroutine object in a spawn task.这个 Schedule 在哪里体现的?

await 协程时, 协程才会开始执行, 执行返成结果返回给 await
create_task 时会先直接 await 协程, 相当于 task = await 协程; await task, 由于你两个 create_task 是连在一起的, 它们就没有顺序关系, 直接开始交错执行了。

看官方文档, https://docs.python.org/zh-cn/3/library/asyncio-task.html
asyncio.create_task(coro)
将 coro 协程 打包为一个 Task 排入日程准备执行。返回 Task 对象。

该任务会在 get_running_loop() 返回的循环中执行,如果当前线程没有在运行的循环则会引发 RuntimeError。

所以再 task 初始化的时候,那个__init__函数你看了吗?就几行代码…


<br> self._loop.call_soon(self._step)<br> self.__class__._all_tasks.add(self)<br>

按照你的说法,async 函数也会被转换成 task 而放到 eventloop 中,那其实代码一和代码二的 async 函数最后都会变成 task 才被加进 event loop,那为什么代码一是顺序的,代码二不是?

可以把你的查看路径发一下给我吗?从哪个函数跳到哪个函数这样的路径

因为你代码 1 里调用 await 一个协程函数的时候他才加进去啊,又不是你写了 await, 在开始运行的时候就加进去的。。。你 await worker2 的时候, 你 await 的 worker1 早就执行完了。毕竟你 await worker2 的顺序在 await worker1 之后。

协程的启动
async def print_msg(msg):
print(msg)

coro = print_msg()
coro.send(None) 执行至等待处(await)
coro.send(xxx) 恢复执行, xx 作为函数里面 await 的结果
一直 send, 一直执行, 直到 StopIteration,协程执行完毕。

create_task 时直接 send(None) 启动了协程执行, 导致两个 task 没有顺序关系。

建议你先仔细看看官方文档。不然你理解起来很费劲。我们解释起来也费劲。

我已经看了不少资料了…你看看我上个问题,你肯定又没仔细看,当然会费劲…

第一个 await 过程因为是阻塞等待结果的,其实等同于不包含 async/await 的同步写法

create_task 创建了一个 task , task 构造函数里, 使用了 call_soon, call_soon 的意思 event loop 在下一批调度时立刻执行这个 task, 不再等待.

看不到你的图片…

pycharm 跳进去的不一定是正确的, 特别是当这个函数是用实现时,pycharm 只会留下一个文档和一句 pass

换个思路来看,你 await worker_1() 的时候,Python 解释器根本不知道你待会儿还要 await worker_2(),就更无从谈起两个并行执行了。只有等你 await worker_1() 结束后,才能发现还要 await worker_2()。

还是看不到的

看上图,call_soon 把 task 加入 loop._ready, loop.run_once 把_ready 的东西 popleft 出来, 开始执行。

直接贴应该可以吧? github 的都可以发?

base_events create_task
tasks init
base_events call_soon
base_events _call_soon
_call_soon 的时候创建好 handle events

图床是 imgur 可能被墙掉了

老哥我真的看不到图…

v2 这个判断逻辑有问题,只是文件名字,都没有 http 就说看起来像垃圾链接。。。

def call_soon(self, callback, *args, context=None):
“”“Arrange for a callback to be called as soon as possible.

This operates as a FIFO queue: callbacks are called in the
order in which they are registered. Each callback will be
called exactly once.

Any positional arguments after the callback will be passed to
the callback when it is called.
“””
self._check_closed()
if self._debug:
self._check_thread()
self._check_callback(callback, ‘call_soon’)
handle = self._call_soon(callback, args, context)
if handle._source_traceback:
del handle._source_traceback[-1]
return handle
在事件循环里
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)

# This is the only place where callbacks are actually called.
# All other places just add them to ready.
# Note: We run all currently scheduled callbacks, but not any
# callbacks scheduled by callbacks run this time around –
# they will be run the next time (after another I/O poll).
# Use an idiom that is thread-safe without using locks.
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
if self._debug:
try:
self._current_handle = handle
t0 = self.time()
handle._run()
dt = self.time() - t0
if dt >= self.slow_callback_duration:
logger.warning(‘Executing %s took %.3f seconds’,
_format_handle(handle), dt)
finally:
self._current_handle = None
else:
handle._run()
handle = None # Needed to break cycles when an exception occurs.

创建好 handle events 之后是要等到下一次循环的时候才开始执行吧?什么时候会进入到事件循环的下一轮?

好好理解理解 create_task…看的我有点尴尬

按照你的意思,结合代码二,什么时候会进入下一批调度?

嗯嗯,我会努力的。同时也祝愿您下次评论的时候除了这种没有营养的话也多能说说有建设性的东西呢~
可以接受批评,但拒绝无实际帮助的评论呢,亲

谢谢你啦~

task 部分上面说了,如果你还要再深入的话,事件循环部分细节需要去看 _asynciomodule.c 里面的内容了

首先对刚刚评论的不礼貌向你道歉,这个 creat_task 跟 tornado 中的 add_callback 有些相似,这个 creat_task 再被调用时会直接执行传入的协程对象,并且默认不等待协程结果的执行完毕,你在调用 await worker 的时候在会开始等待 Task 对象执行完成,下面这段代码应该会对理解有些帮助,可以单步走下:
import asyncio


async def worker_1():
print(‘worker_1 start’)
print(‘worker_1 done’)
return '123’


async def worker_2():
print(‘worker_2 start’)
# await asyncio.sleep(0)
print(‘worker_2 done’)
return '234’


async def main():
task1 = asyncio.create_task(worker_1())
task2 = asyncio.create_task(worker_2())
print(‘before await’)
print(‘awaited worker_1’)
print(‘awaited worker_2’)
print(task1)
print(task2)

asyncio.run(main())

说说我的理解,这两段代码都创建了 3 个 task,但是对于代码 1,这三个 task 的执行顺序是 main-> work1->work2,对于代码 2,这三个 task 的执行顺序是 main-> work1,main->work2,并且可以认为 work1 和 work2 是同时执行的。

相关注释我也看到了,但是由于没有 C 基础,就先不勉强自己,谢谢你啦

好的谢谢你,我试一下

不知道你是看了源码还是看了程序输出得出来的结论,我也不能确定你说的就是对的

讲道理,我在你上一个问题的#14 和#16 已经分析过 task 在 init 是怎么把自己放进 eventloop 了,核心点还是在于 call_soon

另外你提到的 pycharm 跳转到 pyi 文件的问题,那个 pyi 只是用来做类型标记的,并没有具体的实现,如果你想看到实现可以临时去 pycharm 的用户目录把包含 pyi 的目录暂时移走就能进入真正的.py 文件了。
上面提到了有一部分 eventloop 会涉及到 c 代码,其实那大部分是涉及到怎么 pull 一个 fd 的,并不影响你阅读了解 asyncio 模块的工作原理。对于一个 SelectorEventLoop 是可以纯 python 代码实现的( python3.5 的就是纯 py 代码实现),c 代码只是为了优化性能,大部分情况下源码中还有有纯 python 的替代实现,你可以仔细看看

说实话,发现了更核心点的代码应该是在 call_soon 中传进去的 self.__step 这个函数中,有一段:
try:
if exc is None:
# We use the send method directly, because coroutines
# don’t have __iter__ and __next__ methods.
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
if self._must_cancel:
# Task is cancelled right before coro stops.
self._must_cancel = False
super().set_exception(futures.CancelledError())
else:
super().set_result(exc.value)

里面的 coro.send(None)是启动协程的关键,#29 有提到了
另外你之前的答案我也有看,但是一下子没理解下来,现在打算重新梳理一下,无论如何,十分感谢你对我的帮助~

关于__step 这个地方可能看一下 Python 的生成器那块会有更深的了解,最初的 asyncio 就是用生成器完成的,只是在后续版本中才逐步把 coroutines 和 generators 分离了,后续版本还有了异步生成器的概念,这些内容你可以看一下相关 PEP 文档,那里有解释为什么会采用这样的设计。
https://www.python.org/dev/peps/pep-0492/
https://www.python.org/dev/peps/pep-0525/

遇到 await 就阻塞了,只有等 await 那个变量执行完才会继续往下运行

上次没跟你讲清楚我觉得很失败

你还是拿起编辑器抄一个协程的实现吧,也就几百行,本质状态机

不要有这样的想法呀,可能只是我反应比较慢一点。其实大家对我的帮助很多了,真的谢谢你们

我个人的理解
代码一
event_loop 里面只有一个任务 main,CPU 执行代码顺序就是
main()
main: print(‘before await’)
main: await worker_1()
worker_1()
worker_1: await asyncio.sleep(1)到这里的时候有 IO 事件,让出 CPU 给 event_loop,但是 event_loop 没有其他的任务,所说 CPU 会空置等待 asyncio.sleep(1)完成再切换到 worker_1 中继续执行,接下来都是这样,所以这整个过程是同步执行的

代码二
一开始有一个 main 任务
main()
main: task1 = asyncio.create_task(worker_1())
main: task2 = asyncio.create_task(worker_2())
main: print(‘before await’) 这里 main 并没有让出 CPU,所以先打印 before await
main: await task1 这时 CPU 直接切换到了 task1,接下来
worker_1: print(‘worker_1 start’)
worker_1: await asyncio.sleep(1)直到这里出现 IO 事件才会把 CPU 让出给 event_loop,event_loop 中 main 和 task1 都是 await 状态,CPU 切换到 task2 任务
worker_2: print(‘worker_2 start’)
worker_2: await asyncio.sleep(2)出现 IO 时间切回 event_loop,三个任务都在 await,所以 CPU 会空置 1S 等待 asyncio.sleep(1),然后
worker_1: print(‘worker_1 done’)
main: print(‘awaited worker_1’)
main: await task2 等待 asyncio.sleep(2)结束
worker_2: print(‘worker_2 done’)
main: print(‘awaited worker_2’)

代码二你把 await task1 注释了再跑一次就会发现自己理解错了,await 不是你所理解的 cpu 切换或者启动任务,是主进程等待任务完成





讲真的,我试着整理了一下过程:

create_task->tasks.Task()的__init__->self._loop.call_soon[self.__step 函数作为参数传进去,里面的 result = coro.send(None)是启动协程的关键]
->_call_soon[self._ready.append(handle),这表示准备执行的任务队列]
在事件循环里:
base_events.py 里 run_forever->_run_once[handle = self._ready.popleft()取出一个任务(task/future),然后 handle._run()执行任务]

像之前说的 create_task 在将一个 coroutine 转化成 task 之后,将自己放进去了 event loop 中准备在下一轮循环中执行,那问题是是什么时候会进入下一轮循环的呢?

是不是这样的过程:
asyncio.run(main())之后,首先将 main()这个协程 coroutine 转化成 task 并加入 event loop,事件循环的第一轮执行 main 这个 task,执行期间将在 main 中创建的
两个 task(worker_1,worker_2)加入到 event loop 中,运行到 await task1 后让出控制权并检查事件循环里有没有其他任务,发现有刚刚新添加的两个任务,就转而去执行其他任务,
在其他任务中遇到了 await asyncio.sleep()再跳出来去执行另外的任务…直到所有任务执行完毕。

但如果是这样的话,好像只是在一轮循环里面就执行完了呀…总感觉哪里不对,是我理解错了吗?

作为事件循环,当你 await task1 把控制权交还的时候,这一轮循环已经结束,随后立即开始下一轮的事件循环,检查有没有其他任务这已经是下一轮循环干的事了

根据 python 的协程实现原理,只有在一个 coroutine 主动调用 await 或者 return 的时候才会发生 task 切换,虽然这里的 await task1 的目的是为了等待 task1 结束,但如果你在主 task 中永远不 await 任何一个 future 或者 return 的话(比如跑个死循环),其他 task 是永远不会被执行的。这点和线程可以剥夺式调度完全不同。( ps: python3.6 开始是可以检测一个 coroutine 执行了太长时间而发出警告的,但仍然不可主动剥夺)

有个建议,你可以尝试自己实现一下如何在单线程下完成“多任务”,基本上就是楼上所说有限状态机,很多单片机程序就是一个大的 while 循环套几组 switch,一组 switch 就是一个 task,每个 switch 的 flag 是一个标志量,内部执行到需要等待的时候就把 flag 值改成下一个代码块,随后主动退出,去让下一组 task 执行,这样实现“伪并发”。当然由于单片机的程序单一,硬件资源极为有限(很多单片机 ram 只有 1kb 甚至更小),这里的 task 是写死的。而 python 中相当于每次 while 循环的时候从队列头取出一组 switch(task)来执行,执行完后再进行下一轮 while (还有些 io 相关的操作暂时省略)

十分感谢!!

测试了一下,确实没有切换或启动任务的意思

赞楼主刨根问底的精神,有点像我当初为了搞清楚 asyncio 工作原理莽着源码看的样子。当看到 Task.__step 之后就明白整个链条是怎么串起来的了,有种醍醐灌顶的感觉,后来用 asyncio 写异步就得心应手了。

我自己学习的时候是用 yield 和 yield from 手写了一个协程的实现(在 py3.4 的时候,那是 asyncio 还未加入标准库),不过理解的根源还是写“多任务”的单片机程序😂

我看到 Task.__step 的时候也是跟你一样的感觉啊!!



其实最近提问的主题都被不少人收藏了,可能大家都会对这个感兴趣但是没人问出来并且刨进去,所以真的很感谢大家的帮忙,在这里的解答不仅是帮助到了我,还能照顾到其他收藏了或者点击进来的人,所以才希望有多点有建设性的评论,现在真的是莽着来看源码的,希望看着看着就没那么莽了吧。

借楼感谢上面这么多的评论的帮助!

async def main():
task1 = asyncio.create_task(worker_1())
task2 = asyncio.create_task(worker_2())
print(‘before await’)
await asyncio.sleep(2)
print(‘awaited worker_1’)
await asyncio.sleep(1)
print(‘awaited worker_2’)
结果:
before await
worker_1 start
worker_2 start
worker_1 done
awaited worker_1
worker_2 done
awaited worker_2


我把 main 里面的稍微改了下,得到跟你一样的结果,我感觉是 create_task 时就已经把两个任务添加到协程的任务列表里面去了,然后后面遇到阻塞就切换,然后就会得到跟 1 不一样的结果

而你代码二中 await task 造成了一定的误解,造成主动 await 才会执行 task 的假象
个人见解

你只改了 main 没有改 worker 里面的?

import asyncio


async def worker_1():
print(‘worker_1 start’)
await asyncio.sleep(1)
print(‘worker_1 done’)

async def worker_2():
print(‘worker_2 start’)
await asyncio.sleep(2)
print(‘worker_2 done’)

async def main():
task1 = asyncio.create_task(worker_1())
task2 = asyncio.create_task(worker_2())
print(‘before await’)
await asyncio.sleep(2)
print(‘awaited worker_1’)
await asyncio.sleep(1)
print(‘awaited worker_2’)


if name == ‘main’:
asyncio.run(main())

源码是这样的,就修改了下 main()里面的

其实可以用一个比较简单的方式解释,await 导致了 task 切换,但是 create_task 之后这个 task 已经放进等待列表,所以在 main task 中 await 任何 future 都会导致 worker 被执行,至于 worker_1 和 worker_2 的执行循序,这个就要看 EventLoop 的具体实现了

明白你的意思了,其实只要一个协程里面 await 了,就会进行切换,至于切换到哪一个就看 event loop 的调度了

啊相差了一分钟,其实想表达的是同样的意思,我等下整理一篇博客出来你可以帮忙看看吗

如果你仔细看过 EventLoop 的实现代码,你会发现其实 asyncio 内部很喜欢用在一个函数结尾调用 call_soon 的方式实现循环,比如
def _run(a,loop):
____if a == XXX:
________干一些事
____else:
________干另一些事
____loop.call_soon(a,loop)
____return
这样实现的代码虽然看起来复杂,可能能让你更明白事件循环的执行过程

修正一下,是
____loop.call_soon(_run,a,loop)
这里会发现 call_soon 传入的参数是个普通函数而不是一个 coroutine,为什么这么设计就很有意思了


文章写好啦 https://blog.csdn.net/qq_29785317/article/details/97054589
大家看看有什么需要修改的地方~~多多指正噢

我看到了 _run_once 这一部分,不知道是不是你说的 EventLoop 的实现,我觉得是其中一部分

回到顶部