Python学习之路35-协程

《流畅的Python》笔记。

本篇主要讨论一个与生成器看似无关,但实际非常相关的概念:协程。

1. 前言

说到协程(Coroutine),如果是刚接触Python不久的新手,估计第一个反应是:懵逼,这是个什么玩意儿?有一点基础的小伙伴可能会想到进程和线程。

其实,和子程序(或者说函数)一样,协程也是一种程序组件。Donald Knuth曾经说过,子程序是协程的特例。我们都知道,一个子程序就是一次函数调用,它只有一个入口和一个出口:调用者调用子程序,子程序运行完毕,将结果返回给调用者。而协程则是多入口和多出口的子程序:调用者可以不止一个,执行过程中可以暂停,输出结果也可以不止一个。

协程和进程、线程也是有关系的:为了实现并发,高效利用系统资源,于是有了进程;为了实现更高的并发,以及减小进程切换时的上下文开销,于是有了线程;但即便线程切换时的开销小了,如果线程数量一多(比如10K个),这时的上下文切换也不可小觑,于是在线程中加入了协程(这里之所以是“加入”,是因为协程的概念出现得比线程要早)。协程运行在一个线程当中,不会发生线程的切换,并且,它的启停可以由用户自行控制。由于协程在一个线程中运行,所以在共享资源时不需要加锁。

补充:以后有机会单独出一篇详细介绍进程、线程和协程的文章。

2. 迭代器、生成器和协程

这三者本不应该放在一起,之所以放在一起,是因为生成器将迭代器和协程联系了起来,或者说yield关键字将这三者联系了起来:生成器可以作为迭代器,生成器又是协程比不可少的组成部分。但千万不要把迭代器用作协程,也别把协程用作迭代器!这两者并不应该存在关系。

yield关键字背后的机制很强大,它不仅能向用户提供数据,还能从用户那里获取数据。而迭代器、生成器和协程这三个概念其实是对yield关键字用法的取舍

  • 凡是含有关键字yield或者yield from的函数都是生成器,不管你是用来干啥;
  • 如果只是用yield来生成数据,或者说向用户提供数据,那么这个生成器可以看做迭代器(用作迭代器的生成器);
  • 如果还想yield来获取外部的数据,实现双向数据交换,那么这个生成器可看做协程(用作协程的生成器)。

这里先列举出迭代器和协程在代码上最直观的区别:

1
2
3
4
5
6
7
8
# 代码2.1
def my_iter(): # 用作迭代器的生成器
yield 1; # 作为迭代器,yield关键字后面会跟一个数据
yield 2; # 且不关心yield的返回值,没有赋值语句

def my_co(): # 用作协程的生成器
x = yield # 这种写法表示希望从用户处获取数据,而不向用户提供数据(其实提供的是None)
y = yield 1 # 这种写法表示既向用户提供数据,也希望得到用户的反馈

3. 协程

本节主要包括协程的运行过程,协程的4个状态,协程的预激,协程的终止和异常处理,协程的返回值。

3.1 协程的运行

协程本身有4个状态(其实就是生成器的4个状态),可以使用inspect.getgeneratorstate()函数来确定:

  • GEN_CREATED:等待开始执行;
  • GEN_RUNNING:解释器正在执行,多线程时能看到这个状态;
  • GEN_SUSPENDED:在yield表达式处暂停时的状态;
  • GEN_CLOSED:执行结束。

下面通过一个简单的例子来说明这四个状态以及协程的运行过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 代码3.1
>>> def simple_coro(a):
... print("Started a =", a)
... b = yield a
... print("Received b =", b)
... c = yield a + b
... print("End with c=", c)
...
>>> from inspect import getgeneratorstate
>>> my_coro = simple_coro(1)
>>> getgeneratorstate(my_coro)
'GEN_CREATED' # 刚创建的协程所处的状态,这时协程还没有被激活
>>> next(my_coro) ### 第一次调用next()叫做预激,这一步非常重要! ###
Started a = 1
1
>>> >>> getgeneratorstate(my_coro)
'GEN_SUSPENDED' # 在yield表达式处暂停时的状态
>>> my_coro.send(2) # 通过.send()方法将用户的数据传给协程
Received b = 2
3
>>> my_coro.send(3)
End with c= 3
Traceback (most recent call last):
File "<input>", line 1, in <module>
StopIteration # 协程(生成器)结束,抛出StopIteration
>>> getgeneratorstate(my_coro)
'GEN_CLOSED' # 协程结束后的状态

解释

  • 刚创建的协程并没有激活,对协程的第一次next()调用就是预激,这一步非常重要,它将运行到第一yield表达式处并暂停。对于没有预激的协程,在调用.send(value)时,如果value不是None,解释器将抛出异常。对于预激,既可以调用next()函数,也可以.send(None)(此时会被特殊处理)。但对于yield from来说则不用预激,它会自动预激。
  • .send()方法实现了用户和协程的交互。yield是一个表达式(上述代码中等号的右边),它的默认返回值是None,如果用户通过.send(value)传入了参数value,那么这个值将作为协程暂停处yield表达式的返回值。
  • 协程的运行过程:也可以叫做生成器的运行过程。从上一篇中我们知道,调用next()函数或.send()方法时,协程会运行到下一个yield表达式处并暂停。具体来说,比如上述代码中的b = yield a,代码其实是停在等号的右边,yield a这个表达式还没有返回,只是把a传给了用户,但还没有计算出yield a表达式的返回值,b因此也没有被赋值。当代码再次运行时,等号右边的yield a表达式才返回值,并将这个值赋给b。如果通过next()函数让协程继续运行,则上一个暂停处的yield表达式将返回默认值None(b = None);如果通过.send(value)让协程继续运行,则上一个yield表达式将返回value(b = value)。这也解释了为什么要预激协程:如果没有预激,也就没有yield表达式与传入的value相对应,自然也就抛出异常。

3.2 终止协程和异常处理

协程中没处理的异常会向上冒泡,传给next()函数或.send()方法的调用方。不过,我们也可以通过.throw()方法手动抛出异常,还可以通过.close()方法手动结束协程:

  • generator.throw(exc_type[, exc_value[, traceback]]):让生成器在暂停的yield表达式处抛出指定的异常。如果生成器处理了这个异常,代码会向前执行到下一个yield表达式yield a,并将生成的a作为generator.throw()的返回值。如果生成器没有处理抛出的异常,则会向上冒泡,并且生成器会终止,状态转换成GEN_CLOSED
  • generator.close():使生成器在暂停处的yield表达式处抛出GeneratorExit异常。如果生成器没有处理这个异常,或者处理时抛出了StopIteration异常,.close()方法直接返回,且不报错;如果处理GeneratorExit时抛出了非StopIteration异常,则向上冒泡。

3.3 返回值

从上一篇和本篇的代码中,不知道大家发现了一个现象没有:所有的生成器最后都没有写return语句。这其实是有原因的,因为在Python3.3之前,如果生成器返回值,解释器会报语法错误。现在则不会报错了,但返回的值并不是像普通函数那样可以直接接收:Python解释器会把这个返回值绑定到生成器最后抛出的StopIteration异常对象的value属性中。示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 代码3.2
>>> def test():
... yield 1
... return "This is a test"
...
>>> t = test()
>>> next(t)
1
>>> next(t)
Traceback (most recent call last):
File "<input>", line 1, in <module>
StopIteration: This is a test # StopIteration有了附加信息
>>> t = test()
>>> next(t)
1
>>> try:
... next(t)
... except StopIteration as si:
... print(si.value) # 获取返回的值
...
This is a test

3.4 预激协程的装饰器

从前文我们知道,如果要使用协程,必须要预激。可以手动通过调用next()函数或者.send(None)方法。但有时我们会忘记手动预激,此时,我们可以使用装饰器来自动预激协程,这个装饰器如下:

1
2
3
4
5
6
7
8
9
10
# 代码3.3
from functools import wraps

def coroutine(func):
@wraps(func)
def primer(*args, **kwargs):
gen = func(*args, **kwargs)
next(gen)
return gen
return primer

提前预激的生成器只能和yield兼容,不能和yield from兼容,因为yield from会自动预激。所以请确定你的生成器要不要被放在yield from之后。

4. yield from

上一篇文章说到,对于嵌套生成器,使用yield from能减少很多代码,比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 代码4.1
def y2():
def y1(): # y1只要是个可迭代对象就行
yield 1
yield 2
# 第一种写法
for y in y1():
yield y
# 第二种写法
# yield from y1()

if __name__ == "__main__":
for y in y2():
print(y)

第二种写法明显比第一种简洁。这是yield from的一个作用:简化嵌套循环。yield from后面还可以跟任意可迭代对象,并不是只能跟生成器

yield from最重要的作用是起到了类似通道的作用它能让客户端代码和子生成器之间进行数据交换

这里有几个术语需要先解释一下:

  • 委派生成器:包含yield from <iterable>表达式的生成器函数。

  • 子生成器:上述的<iterable>部分就是子生成器。<iterable>也可以是委派生成器,以此类推下去,形成一个链条,但这个链条最终以一个只使用yield表达式的简单生成器结束。

  • 调用方:调用委派生成器的代码或对象叫做调用方。为了避免歧异,我们把最外层的代码,也就是调用第一层委派生成器的代码叫做客户端代码

比如上述代码,按照没有yield from语句的写法,如果客户端代码想通过y2.send(value)y1传值,value只能传到y2这一层,如果想再传入y1,将要写大量复杂的代码。下面是yield from的说明图:

结合上图,可做如下总结:

  • yield fromyield在使用上并无太大区别;
  • 委派生成器也是生成器。当第一次对委派生成器调用next().send(None)时,委派生成器会执行到第一个yield from表达式并暂停。当客户端继续调用委派生成器的.send().throw().close()等方法时,会“直接”作用到最内层的子生成器上,而不是让委派生成器的代码继续向前执行。只有当子生成器抛出StopIteration异常后,委派生成器中的代码才继续执行,并将StopIteration.value的值作为yield from表达式的返回值。

补充(可跳过)

这一小节是yield from的逻辑伪代码实现,代码较为复杂,看不懂也没什么关系,可以跳过,也可直接看最后的总结,并不影响yield from的使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 代码4.2 ### "RESULT = yield from EXPR"语句的等效代码 ###
_i = iter(EXPR) # 得到EXPR的迭代器
try:
_y = next(_i) # 预激!还没有向客户端生成值
except StopIteration as _e: # 如果_i抛出了StopIteration异常
_r = _e.value # _i的最后的返回值。这不是最后的生成值!
else: # 如果调用next(_i)一切正常
while 1: # 这是一个无限循环
try:
_s = yield _y # 向客户端发送子生成器生成的值,然后暂停
except GeneratorExit as _e: # 如果客户端调用.throw(GeneratorExit),或者调用close方法
try: # 首先尝试获取_i的close方法,因为_i不一定是生成器,普通迭代器不会实现close方法
_m = _i.close
except AttributeError:
pass # 没有获取到close方法,什么也不做
else:
_m() # 如果获取到了close方法,则调用子生成器的close方法
raise _e # 最后不管怎样,都向上抛出GeneratorExit异常
except BaseException as _e: # 如果客户端通过throw()传入其它异常
_x = sys.exc_info() # 获取错误信息
try: # 尝试获取_i的throw方法,理由和上面的情况一样
_m = _i.throw
except AttributeError: # 如果没有这个方法
raise _e # 则向上抛出用户传入的异常
else: # 如果_i有throw方法,即它是一个子生成器
try:
_y = _m(*_x) # 尝试调用子生成器的throw方法
except StopIteration as _e:
_r = _e.value # 如果子生成器抛出StopIteration,获取返回的值
break # 并且跳出循环
else: # 如果在生成器生成值时没有异常发生
try: # 试验证用户通过.send()方法传入的值
if _s is None: # 如果传入的是None
_y = next(_i) # 则尝试调用next(),向前继续执行
else: # 如果传入的不是None,则尝试调用子生成器的send方法
_y = _i.send(_s)
# 如果子生成器没有send方法,则向上报AttributeError
except StopIteration as _e: # 如果子生成器抛出了StopIteration
_r = _e.value # 获取子生成器返回的值
break # 并跳出循环,回复委派生成器的运行
RESULT = _r # _r就是yield from EXPR最终的返回值,将其赋予RESULT

从上面这么长一串代码可以看出,如果没有yield from,而我们又想向最内层的子生成器传值,这得多麻烦。下面总结出几点yield from的特性:

  • 所有的“直接”其实都是间接的,都是一层一层传下去,或者一层一层传上来的,只是我们感觉是直接的而已;

  • 调用.send(value)将值传给委派生成器时,如果valueNone,则调用子生成器的__next__方法;否则,调用子生成器的.send(value)

  • 当对委派生成器调用.throw(),委派生成器会先确定子生成器有没有.throw()方法,如果有,则调用,如果没有,则向上抛出AttributeError异常;

  • 当客户端调用委派生成器的.throw(GeneratorExit)或者.close()方法时,委派生成器也会先确定子生成器有没有.close()方法,如果有,则调用子生成器的.close()方法,由子生成器来抛出GeneratorExit异常,委派生成器将这个异常向上传递;如果子类没有.close()方法,则委派生成器直接抛出GeneratorExit异常。Python解释器会捕获这个异常,但不会显示异常信息。

  • 只要子生成器抛出StopIteration异常,不管是用户通过.throw方法传递的,还是子生成器运行结束时抛出的,都会导致委派生成器继续向前执行。

5. 协程计算均值

在《Python学习之路26》中,我们分别用类和闭包来实现了平均值的计算,现在,作为本章最后一个例子,我们使用协程来实现平均值的计算,其中还会用到yield from和生成器的返回值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 代码5.1
import inspect

def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None:
break
total += term
count += 1
average = total / count
return average

def grouper(results, key):
while True: # 每个循环都会新建averager
results[key] = yield from averager()

def main(data):
results = {}
for key, values in data.items():
group = grouper(results, key) # 每个循环都会新建grouper
next(group) # 激活
for value in values:
group.send(value)
# 此句非常重要,否则不会执行到averager()中的return语句,也就得不到最终的返回值
group.send(None)

print(results)

data = {"list1": [1, 2, 3, 4, 5], "list2": [6, 7, 8, 9, 10]}

if __name__ == "__main__":
main(data)

# 结果:
{'list1': 3.0, 'list2': 8.0}

不知道大家看到这段代码的时候有没有什么疑问。当笔者看到grouper()委派生成器里的While True:时,非常疑惑:为啥要加个While循环呢?如果按这个版本,我们在main中的for循环后检测group的状态,会发现它是GEN_SUSPENDED,这笔者的强迫症就犯了,怎么能不是GEN_CLOSED呢?!而且这个版本每当执行完group.send(None)后,在grouper()中又会创建新的averager,然后当maingroup更新后,上一个grouper(也就是刚新建了averagergrouper)由于引用数为0,又被回收了。刚新建一个averager就被回收,这不多此一举吗?于是笔者将代码改成了如下形式:

1
2
3
4
5
6
7
8
9
10
11
12
# 代码5.2
def grouper(results, key): # 去掉了循环
results[key] = yield from averager()

def main(data):
results = {}
for key, values in data.items():
-- snip --
try: # 手动捕获异常
group.send(None)
except StopIteration:
continue

写出来后发现代码并没有之前的简洁,但至少group最后变成了GEN_CLOSED状态。至于最后怎么取舍就看各位了。

VPointer wechat
欢迎大家关注我的微信公众号"代码港"~~
您的慷慨将鼓励我继续创作~~