只用 Python 的 yield 关键字, 如何实现一个协程或用户态函数(生成器)调度器?

关于协程 sleep 实现想了好久, idea 来源于 Python 社区的一个大牛, 当时看过一遍, 现在想的比较清楚就写出来了–


https://gist.github.com/Petelin/1835fe063c38c01cf10a141736d14a9b <button onclick="lazyGist(this)"> 显示 Gist 代码 </button>

只用 Python 的 yield 关键字, 如何实现一个协程或用户态函数(生成器)调度器?


8 回复

最后有一个 tcp server 的实现, 类似于 gevent 的并发,在 handler 里写 sleep 不会被阻塞


yield 实现一个简单的协程调度器,核心就是维护一个任务队列,让生成器函数通过 yield 主动让出执行权,调度器再选择下一个任务继续执行。

下面是一个最基础、可运行的示例。它模拟了操作系统“协作式多任务”的核心思想:每个任务必须主动 yield,调度器才能切换到下一个。

# 首先,定义我们的“任务”。它就是一个生成器函数。
def task(name, n):
    for i in range(n):
        print(f'{name} 执行第 {i+1} 步')
        # 关键:通过 yield 主动让出 CPU 控制权,并返回一个状态(这里只是示意)
        yield f'{name}:step{i+1}'
    print(f'{name} 执行完毕!')

# 调度器类
class SimpleScheduler:
    def __init__(self):
        # 任务队列,里面存放的是生成器对象
        self._task_queue = []

    def add_task(self, gen):
        """ 添加一个生成器任务到队列 """
        self._task_queue.append(gen)

    def run(self):
        """ 启动调度,直到所有任务完成 """
        while self._task_queue:
            # 从队列头部取出一个任务
            current_task = self._task_queue.pop(0)
            try:
                # 执行一次 next(current_task),即运行到下一个 yield
                # 返回的值(message)这里可以用于任务间通信或状态判断,本例仅作演示
                message = next(current_task)
                # 如果任务还没执行完,把它重新放回队列尾部,等待下次调度
                self._task_queue.append(current_task)
                # 可以打印 yield 返回的信息
                # print(f'调度器收到消息: {message}')
            except StopIteration:
                # 任务执行完毕(生成器耗尽),不再放回队列
                print(f'调度器: 一个任务已完成并移除')
                pass

# 使用示例
if __name__ == '__main__':
    scheduler = SimpleScheduler()

    # 创建几个任务(生成器对象)
    t1 = task('任务A', 3)
    t2 = task('任务B', 4)
    t3 = task('任务C', 2)

    # 将任务添加到调度器
    scheduler.add_task(t1)
    scheduler.add_task(t2)
    scheduler.add_task(t3)

    # 启动调度
    print('--- 开始协作式调度 ---')
    scheduler.run()
    print('--- 所有任务执行完毕 ---')

代码解释:

  1. task 函数:是一个生成器。它执行循环,每次循环打印信息后 yield。这个 yield 做了两件事:一是暂停函数执行,二是返回一个值(这里是步骤信息)。
  2. SimpleScheduler:核心是 run 方法。它循环检查任务队列,每次取出一个任务,调用 next() 让它执行一步(直到遇到 yield)。如果任务没完 (StopIteration 异常没抛出),就把它塞回队列尾部,实现简单的“轮转”调度。
  3. 执行流程:三个任务会交替执行,输出顺序是 A1, B1, C1, A2, B2, C2, A3, B3, B4。因为每个任务执行一步就 yield,调度器让它们“协作”地交替运行。

一句话总结: 用一个队列管理所有生成器,循环调用 next() 驱动它们一步步执行,利用 yield 暂停和恢复的特性实现切换。

lz 应该是看了这个吧,代码思路最后一部分一样。
David Beazley - Python Concurrency From the Ground Up: LIVE! - PyCon 2015

<iframe src="https://www.youtube.com/embed/MCs5OvhV9S4" class="embedded_video" allowfullscreen="" type="text/html" id="ytplayer" frameborder="0"></iframe>

核心就是用 poll 做异步定时器~ 和 yield 没多大关系吧。lz 可以看看 tornado 的 sleep 实现

这种写法的初衷是解决多任务(concurrency),poll/select 控制异步调度,yield 实现多任务。

好的,我去看看,也不是异步定时器,而是异步通知,yield 正如 czheo 说的,是多任务,没有多任务,写出来的也只是 event loop 而已。

应该是有一个 PPT,从 yield 开始讲的。基本都是从他那学的。关于 sleep 部分实现是我想的。

粗看了下 eventlet 的做法差不多…用 yield 写起来太别扭了才有 greenlet

回到顶部