从一般概念上说, 协程是特殊的函数调用: 被调用的函数可以在可控的位置被中断,然后在下一次调用时,继续从上次中断的位置继续执行。
本文主要通过Python的协程来介绍协程, 这是我唯一熟悉的一种协程实现.
Classic Coroutine
下面的python代码很好的说明了协程的核心功能
def co_routine():
recv0 = yield 996 # hangs here after first coro.send
assert recv0 == "Second"
yield 711 # hangs here after second coro.send
return
def main():
coro = co_routine() # Create a new coroutine object
value = coro.send(None) # active the coroutine,get 996 after execution
assert value == 996
value = coro.send("Second") # active the coroutine again, and get 711
assert value == 711
main()
要点:
- 当创建了一个协程后,协程是以
coro
对象的形式存在的,这个协程对象负责持有运行所需的资源,主要是对应的代码段和函数调用栈。 coro
对象在创建后是未激活的,也就是说,我们接下来执行的指令仍然在当前的代码段中。- 通过
coro.send()
可以将控制流跳转到协程的代码段中去 - 可以通过
send
和yield
在两侧交换数据.
"协程"相关的核心功能。
yield_value = coro.send(send_to_prev_yiled)
, 激活协程, 并把send_to_prev_yiled
作为上一次yield
的返回值.received = yield value
: 先执行右侧指令,向callee
侧输出value
,并挂起,等待被激活. 下次callee
进行send时, send的值将绑定到received
result = yield from Foo()
: 激活Foo()
创建的子协程,并将该子协程yield
的值直接yield
出去。- 从实质上说,它是下面代码的语法糖。
- 从逻辑上说,
yield from
只作为一个bridge,把上层send进来的值继续发送下去,把下层yield的值直接yield出去。 yield from
只有在子协程完全退出之后才会返回一个result值,这个result值是子协程的返回值,子协程yield
出的值全都已经被再次yield出去了,是无法在这个位置访问到的。
- 所有
return v
语句都将抛出一个StopIteration(v)
异常
# result = yield from Foo()
tmp = Foo()
recv = None
try:
while True:
new_v = tmp.send(recv)
recv = yield new_v
except CoroutineExit as e: # Note CoroutineExit is an alias ,it could be StopIteration (or RuntimeError in some framework)
result = e.value
在通过yield from
进行嵌套时, 逻辑上和调用函数类似, 形成了一个调用栈:
- 只有最内层被调用的"函数"可以推进, 它将推进到下一次yield的位置.
- 只有最内层被调用的"函数"return之后, 上一层的"函数"才能继续推进.
Native Coroutine(即async def
)
我们之前所说的coroutine
可以被称为classisc coroutine
, Python现在称以async def
为核心的一套Coroutine为Native Coroutine. 它的核心目标是: 通过单线程内的分时复用, 按协程的风格模拟多线程.
它是在上述classic coroutine
的基础上, 添加了额外的约束和工具而形成的, 具体而言
- 只使用
async def
和await
两个关键字async def
仅用于标记这个函数应当按协程的形式创建, 也就是说,调用它是返回一个coro
对象, 而不是直接进入函数体.
async def
的body内不允许使用yield
,yield from
由于async def
的body内不允许使用 yield
,yield from
, 所以如果只使用await
的话, 那么相当于直接跳转到对应的函数,并完整的执行一遍, 因为不可能有yield
暂停执行.
async def async_bar():
return
coro = async_bar()
coro.send(None)
因此,只靠async def
和await
是不能模拟多线程的, 必须额外实现以下两个功能:
- 主动让渡当前"线程"资源的函数
- 调度"线程"的调度器, 当某个coro让渡资源时, 调度器应当驱动另一个coro继续执行.
下面的例子中, 简单的实现了一个CoroManager
, 它可以提供上述两个功能.
add_new_thread
用于将一个coro转换为逻辑上并发的新"线程", 它将返回一个新的coro对象, 用于辅助实现thread.join
的逻辑- 对普通的coro,
await coro
则仅仅相当于调用函数
from typing import List, Coroutine
import types
class CoroManager:
def __init__(self):
self._root_coro: List[Coroutine] = []
def run(self, coro: Coroutine):
self._root_coro.append(coro)
while len(self._root_coro) > 0:
self.advance_any_coro()
def advance_any_coro(self):
for coro in self._root_coro:
try:
coro.send(None)
except StopIteration:
self._root_coro.remove(coro)
def add_new_thread(self, coro: Coroutine):
self._root_coro.append(coro)
# A coro can not be waited twice , so we create a dummy coro to help wait for it
async def coro_waiter():
while coro in self._root_coro:
await self.sleep() # yield cpu to any other thread
return coro_waiter()
@types.coroutine
def sleep(self):
yield
coro_manager = CoroManager()
async def worker(i):
print(f'I am new thread {i}')
await coro_manager.sleep() # yield cpu to any other thread
print(f'New thread {i} is done')
async def main():
print('I am main thread')
tsk0 = coro_manager.add_new_thread(worker(0))
tsk1 = coro_manager.add_new_thread(worker(1))
await coro_manager.sleep() # yield cpu to any other thread
await tsk0
print('Main thread is active Again')
await tsk1
await worker(2) # call worker in main thread directly
print('Main thread is active Done')
coro_manager.run(main())
Python中称上述负责提供资源让渡/线程调度的工具为EventLoop, Python内置了asyncio
用于实现上述功能.
从多线程的角度看, Python的协程适用场景为: 多个线程的并发程度高, 但并行程度低, 也就是大部分时候只有一个"线程"可以有效推进.
适用协程的比较典型的例子就是一些低频网络服务, 我们可能需要同时服务很多用户, 但是大部分时候, 只有1个用户是活跃的.
不适用协程的典型例子就是高性能计算, 对于高性能计算, 基本每个线程都是可以独立推进的, 只使用协程进行模拟的话, 实际并不会带来性能收益.