Coroutine

从一般概念上说, 协程是特殊的函数调用: 被调用的函数可以在可控的位置被中断,然后在下一次调用时,继续从上次中断的位置继续执行。

本文主要通过Python的协程来介绍协程, 这是我唯一熟悉的一种协程实现.

Classic Coroutine

下面的python代码很好的说明了协程的核心功能

def co_routine():
    recv0 = yield 996  # hangs here after first coro.send
    assert recv0 == "Second"
    yield 711  # hangs here after second coro.send
    return

def main():
    coro = co_routine()  # Create a new coroutine object
    value = coro.send(None)  # active the coroutine,get 996 after execution
    assert value == 996
    value = coro.send("Second")  # active the coroutine again, and get 711
    assert value == 711

main()

要点:

  1. 当创建了一个协程后,协程是以coro对象的形式存在的,这个协程对象负责持有运行所需的资源,主要是对应的代码段和函数调用栈。
  2. coro对象在创建后是未激活的,也就是说,我们接下来执行的指令仍然在当前的代码段中。
  3. 通过coro.send()可以将控制流跳转到协程的代码段中去
  4. 可以通过sendyield在两侧交换数据.

"协程"相关的核心功能。

  1. yield_value = coro.send(send_to_prev_yiled), 激活协程, 并把send_to_prev_yiled作为上一次yield的返回值.
  2. received = yield value: 先执行右侧指令,向callee侧输出value,并挂起,等待被激活. 下次callee进行send时, send的值将绑定到received
  3. result = yield from Foo(): 激活Foo()创建的子协程,并将该子协程yield的值直接yield出去。
    1. 从实质上说,它是下面代码的语法糖。
    2. 从逻辑上说,yield from只作为一个bridge,把上层send进来的值继续发送下去,把下层yield的值直接yield出去。
    3. yield from 只有在子协程完全退出之后才会返回一个result值,这个result值是子协程的返回值,子协程yield出的值全都已经被再次yield出去了,是无法在这个位置访问到的
  4. 所有return v语句都将抛出一个StopIteration(v)异常
# result = yield from Foo()
tmp = Foo()
recv = None
try:
    while True:
        new_v = tmp.send(recv)
        recv = yield new_v
except CoroutineExit as e: # Note CoroutineExit is an alias ,it could be StopIteration (or RuntimeError in some framework)
    result = e.value

在通过yield from进行嵌套时, 逻辑上和调用函数类似, 形成了一个调用栈:

  1. 只有最内层被调用的"函数"可以推进, 它将推进到下一次yield的位置.
  2. 只有最内层被调用的"函数"return之后, 上一层的"函数"才能继续推进.

Native Coroutine(即async def

我们之前所说的coroutine可以被称为classisc coroutine, Python现在称以async def为核心的一套Coroutine为Native Coroutine. 它的核心目标是: 通过单线程内的分时复用, 按协程的风格模拟多线程.

它是在上述classic coroutine的基础上, 添加了额外的约束和工具而形成的, 具体而言

  1. 只使用async defawait两个关键字
    • async def仅用于标记这个函数应当按协程的形式创建, 也就是说,调用它是返回一个coro对象, 而不是直接进入函数体.
  2. async def的body内不允许使用 yield,yield from

由于async def的body内不允许使用 yield,yield from, 所以如果只使用await的话, 那么相当于直接跳转到对应的函数,并完整的执行一遍, 因为不可能有yield暂停执行.

async def async_bar():
    return

coro = async_bar()
coro.send(None)

因此,只靠async defawait是不能模拟多线程的, 必须额外实现以下两个功能:

  1. 主动让渡当前"线程"资源的函数
  2. 调度"线程"的调度器, 当某个coro让渡资源时, 调度器应当驱动另一个coro继续执行.

下面的例子中, 简单的实现了一个CoroManager, 它可以提供上述两个功能.

  1. add_new_thread用于将一个coro转换为逻辑上并发的新"线程", 它将返回一个新的coro对象, 用于辅助实现thread.join的逻辑
  2. 对普通的coro, await coro则仅仅相当于调用函数
from typing import List, Coroutine
import types

class CoroManager:
    def __init__(self):
        self._root_coro: List[Coroutine] = []

    def run(self, coro: Coroutine):
        self._root_coro.append(coro)
        while len(self._root_coro) > 0:
            self.advance_any_coro()

    def advance_any_coro(self):
        for coro in self._root_coro:
            try:
                coro.send(None)
            except StopIteration:
                self._root_coro.remove(coro)

    def add_new_thread(self, coro: Coroutine):
        self._root_coro.append(coro)

        # A coro can not be waited twice , so we create a dummy coro to help wait for it
        async def coro_waiter():
            while coro in self._root_coro:
                await self.sleep()  # yield cpu to any other thread

        return coro_waiter()

    @types.coroutine
    def sleep(self):
        yield

coro_manager = CoroManager()

async def worker(i):
    print(f'I am new thread {i}')
    await coro_manager.sleep()  # yield cpu to any other thread
    print(f'New thread {i} is done')

async def main():
    print('I am main thread')
    tsk0 = coro_manager.add_new_thread(worker(0))
    tsk1 = coro_manager.add_new_thread(worker(1))
    await coro_manager.sleep()  # yield cpu to any other thread
    await tsk0
    print('Main thread is active Again')
    await tsk1
    await worker(2)  # call worker in main thread directly
    print('Main thread is active Done')

coro_manager.run(main())

Python中称上述负责提供资源让渡/线程调度的工具为EventLoop, Python内置了asyncio用于实现上述功能.

从多线程的角度看, Python的协程适用场景为: 多个线程的并发程度高, 但并行程度低, 也就是大部分时候只有一个"线程"可以有效推进.

适用协程的比较典型的例子就是一些低频网络服务, 我们可能需要同时服务很多用户, 但是大部分时候, 只有1个用户是活跃的.

不适用协程的典型例子就是高性能计算, 对于高性能计算, 基本每个线程都是可以独立推进的, 只使用协程进行模拟的话, 实际并不会带来性能收益.