Python学习之路37-使用asyncio包处理并发

《流畅的Python》笔记。

本篇主要讨论asyncio包,这个包使用事件循环驱动的协程实现并发。

1. 前言

本篇主要介绍如果使用asyncio包将上一篇中线程版的“国旗下载”程序改为协程版本,通过异步非阻塞来实现并发。

说实话,我在读这部分内容的时候是懵逼的,书中阻塞非阻塞、同步异步的概念和我之前的理解有很大差异。之前一直以为同步就意味着阻塞,异步就意味着非阻塞。但其实,阻塞非阻塞与同步异步并没有本质的联系。

同步(Synchronizing)异步(Asynchronizing)是对指令而言的,也就是程序(理解成“函数”会更好一些)。以含有I/O操作的函数为例(被调用方),如果这个函数要等到I/O操作结束,获取了数据,才返回到调用方,这就叫同步(绝大部分函数都同步);反之,不等I/O执行完毕就返回到调用方,获取的数据以其他方式转给调用方,这就叫异步。

阻塞(Blocking)非阻塞(Non-Blocking)是对进程线程而言(为了简洁,只以“线程”为例)。因为某些原因(比如I/O),线程被挂起(被移出CPU),这就叫阻塞;反之,即使因为这些原因,线程依然不被挂起(不被移出CPU),这就叫非阻塞。

可见,这两组概念一共可以组成四种不同情况:同步阻塞(常见),同步非阻塞(不常见),异步阻塞(不常见),异步非阻塞(常见)。

仍以上述I/O函数为例:

  • 如果这个函数的I/O请求已发出,只是单纯地在等服务器发回数据,线程也只是单纯地在等这个函数返回结果,CPU将会把这个线程挂起,这就叫做同步阻塞
  • 如果这个函数中调用的是一个执行复杂计算的子函数,此时,函数依然在等结果没有返回,但线程并不是没有运行,不会被CPU挂起,这就叫做同步非阻塞(“CPU以轮询的方式查看I/O是否结束”更能说明这种情况,但这已是很古老的方式了);
  • 如果这个函数在I/O请求没得到结果之前就返回了,但线程依然在等这个结果(在函数体之外等待使用这个数据),这就叫异步阻塞
  • 如果这个函数在没得到结果之前返回了,线程继续执行其他函数,这就叫做异步非阻塞。更具体一点,这种情况对应的是使用回调实现异步非阻塞的情况;而Python中还有一种情况,也是本篇要讲的,就是使用协程实现异步非阻塞:协程在得到结果前依然不返回,但线程并没有等待,而是去执行其他协程。协程看起来就像同步一样。

由于之前并没有遇到代码世界中的同步非阻塞异步阻塞这两种情况,所以我也不确定上述这两种情况的例子是否准确,欢迎大佬留言指导。但这四种情况在现实生活中就很常见了,下面举个在某处看到的例子:

  • 老张把一普通水壶接上水放火上,眼睛直勾勾盯着等水开,不干其他事,这叫同步阻塞
  • 老张依然用一普通水壶烧水,但把水壶放火上后去客厅看电视,时不时回来看水烧好了没有,这叫同步非阻塞
  • 老张用一能响的水壶烧水,没盯着看,但也没干其他事,只是在那儿发愣。水烧好后,壶可劲儿的响,老张一惊,取走水壶,这叫异步阻塞
  • 老张用一能响的水壶烧水,把壶放火上后去客厅看电视,等壶响了再去拿壶,这叫异步非阻塞

从这四个例子可以看出,阻不阻塞是对老张而言的,在计算机中对应的就是进程线程;同步异步是对水壶而言的,在计算机中对应的就是函数。

有了上述概念后,我们接下来将使用asyncio包,将之前下载国旗的程序改为协程版本。

2. 异步

之前我们使用线程实现了并发下载数据,它是同步阻塞的,因为一到I/O操作,线程就被阻塞,然后调入新的线程。现在,我们将实现一个异步非阻塞版本。但从上述介绍知道,异步有两种方式:回调和协程。本文并不会实现回调版本的“下载国旗”,提出回调只是为了和协程进行比较。

2.1 回调

举个例子说明回调。在调用函数A时除了传入必要的参数外,还传入一个参数:函数B。A中有一些费时的操作,比如I/O,A在没得到结果之前就返回,而将等待结果以及进行后续处理的事情交给函数B。这个过程就是回调,函数B就称为回调函数

这种编程方式不太符合人的思维习惯,代码也不易于理解,情况一复杂,就很可能遇到“回调地狱”:多层嵌套回调。下面是一个JavaScript中使用回调的例子,它嵌套了3层:

1
2
3
4
5
6
7
8
9
10
// 代码2.1
api_call1(request1, function (response1){ // 多么痛的领悟
var request2 = step1(response1); // 第一步
api_call2(request2, function (response2){
var request3 = step2(response2); // 第二步
api_call3(request3, function (response3){
step(response3); // 第三步
})
})
})

api_call1api_call2api_call3都是库函数,用于异步获取结果。JavaScript中常用匿名函数作为回调函数。下面我们使用Python来实现上述代码,上述三个匿名函数分别命名为stage1stage2stage3

1
2
3
4
5
6
7
8
9
10
11
12
13
# 代码2.2
def stage1(response1):
request2 = step1(response1)
api_call2(request2, stage2)

def stage2(response2):
request3 = step2(response2)
api_call3(request3, stage3)

def stage3(response3):
step3(response3)

api_call1(request1, stage1) # 代码从这里开始执行

可见,即使用Python写,也不容易理解,这要是再多嵌套几层,不逼疯已经不错了。而且,如果要在stage2中使用request2,还得使用闭包,这就又变成了嵌套定义函数的情况。并且上述代码还没有考虑抛出异常的情况:在基于回调的API中,这个问题的解决办法是为每个异步调用注册两个回调,一个用于处理操作成功时返回的结果,一个用于处理错误。可以看出,一旦涉及错误处理,回调将更可怕。

2.2 协程

现在我们用协程来改写上述代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 代码2.3
import asyncio

@asyncio.coroutine
def three_stages(request1):
response1 = yield from api_call1(request1)
request2 = step1(response1)
response2 = yield from api_call2(request2)
request3 = step2(response2)
response3 = yield from api_call3(request3)
step3(response3)

loop = asyncio.get_event_loop()
loop.create_task(three_stages(request1))

与前面两个版本的回调相比,这个版本的代码将3个步骤依次写在同一函数中,易于理解,这样看起来是不是也更像同步函数?如果要处理异常,只需要相应的yield from语句处添加try/except即可。

但也别急着把这称为“协程天堂”,因为:

  • 不能使用常规函数,必须使用协程,而且要习惯yield from语句;
  • 不能直接调用协程。即,不能像直接调用api_call1(request1)那样直接调用three_stages(request1),必须使用事件循环(上面的loop)来驱动协程。

但不管怎样,代码读起来和写起来比回调简单多了,尤其是嵌套回调。

小技巧:读协程的代码时,为了便于理解代码的意思,可以直接将yield from关键字忽略掉。

2.3 下载国旗批量版

下面我们开始实现协程版本的“下载国旗”。

为了将其改为协程版本,我们不能使用之前的requests包,因为它会阻塞线程,改为使用aiohttp包。为了尽量保持代码的简洁,这里不处理异常。下方是完整的代码,代码中我们使用了新语法。以下代码的基本思路是:在一个单线程程序中使用主循环一次激活队列中的协程,各个协程向前执行几步,然后把控制权让给主循环,主循环再激活队列中的下一个协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# 代码2.4
import aiohttp, os, sys, time, asyncio # 代码中请勿这么写,这里只是为了减少行数

POP20_CC = ("CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR").split()
BASE_URL = "http://flupy.org/data/flags"
DEST_DIR = "downloads/"

def save_flag(img, filename):
path = os.path.join(DEST_DIR, filename)
with open(path, "wb") as fp:
fp.write(img)

def show(text):
print(text, end=" ")
sys.stdout.flush()

async def get_flag(cc): # aiohttp只支持TCP和UDP请求
url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
async with aiohttp.ClientSession() as session: # <1> 开启一个会话
async with session.get(url) as resp: # 发送请求
image = await resp.read() # 读取请求
return image

async def download_one(cc):
image = await get_flag(cc)
show(cc)
save_flag(image, cc.lower() + ".gif")
return cc

def download_many(cc_list):
loop = asyncio.get_event_loop() # 获取事件循环
to_do = [download_one(cc) for cc in sorted(cc_list)] # 生成协程列表
wait_coro = asyncio.wait(to_do) # 将协程包装成Task类,wait_coro并不是运行结果!而是协程!
res, _ = loop.run_until_complete(wait_coro) # 驱动每个协程运行
loop.close() # 循环结束
return len(res)

def main(download_many):
t0 = time.time()
count = download_many(POP20_CC)
elapsed = time.time() - t0
msg = "\n{} flags downloaded in {:.2f}s"
print(msg.format(count, elapsed))

if __name__ == "__main__":
main(download_many)

# 结果:
VN TR FR DE IN ID RU NG CN EG BR MX PH CD IR PK ET JP BD US
20 flags downloaded in 1.27s

解释:

①这里使用了新的语法async/await。再Python3.5之前,如果想定义一个协程只能延用函数的定义方式def,然后在定义体里面使用yieldyield from。如果想把一个函数更明确地声明为协程(或者说异步函数),还可以使用asyncio中的coroutine装饰器,但这么做是不是挺麻烦的?从Python3.5起,可以明确使用async来定义协程(异步函数)和异步生成器。使用async则可以省略掉`@asyncio.coroutine装饰器;在用async修饰的协程的定义体中可以使用yield关键字,但不能使用yield from,它必须被替换为await,即使yield from后面只是一个普通的生成器;从由async修饰的协程或生成器中获取数据时,必须使用await`。

②如果要使用`@asyncio.coroutine装饰器明确声明协程,那么在协程定义体内部只能使用yield from,不能使用yield,因为使用到yield的地方已经在asyncio中全部封装成了函数或者方法。最新版的@asyncio.coroutine也可以装饰async修饰的协程,这种情况下coroutine`不做任何事,只是原封不动的返回被装饰的协程。

③ \<1>处的代码之所以改用async with(异步上下文管理器),是因为新版asyncio并不支持书中的旧语法yield from aiohttp.request("GET", url)。关于async/awaitasync with/async for的相关内容将在后续文章中介绍,这里只需要知道async对应于`@asyncio.coroutineawait对应于yield from`即可。

④我们将get_flag改成了协程版本,并使用aiohttp来实现异步请求;download_one函数也随之变成了协程版本。

download_many只是一个普通函数,它要驱动协程运行。在这个函数中,我们通过asyncio.get_event_loop()创建事件循环(实质就是一个线程)来驱动协程的运行。接着生成含20个download_one协程的协程列表to_do,随后再调用asyncio.wait(to_do)将这个协程列表包装成一个wait协程,取名为wait_corowait协程会将to_do中所有的协程包装成Task对象(Future的子类),再形成列表。最后,我们通过loop.run_until_complete(wait_coro)驱动协程wait_coro运行。整个的驱动链是这样的:loop.run_until_complete驱动协程wait_corowait_coro再在内部驱动20个协程。

wait协程最后会返回一个元组,第一个元素是完成的协程数,第二个是未完成的协程数loop.run_until_complete返回传入的协程的返回值(实际代码是Future.result())。有点绕,其实就是wait_coro最后返回一个元组给run_until_completerun_until_complete再把这个值返回给调用方。

⑦在上一篇中,我们知道concurrent.futures中有一个Future,且通过它的result方法获取最后运行的结果;在asyncio包中,不光有Future,还有它的子类Task,但获取结果通常并不是调用result方法,而是通过yield fromawait,即yield from future获取结果。asyncio.Future类的result方法没有参数,不能设置超时时间;如果调用resultfuture还未运行完毕,它并不会阻塞去等待结果,而是抛出asyncio.InvalidStateError异常。

2.4 下载国旗改进版

上一篇中,我们除了使用Executor.map()批量处理线程之外,我们还使用了concurrent.futures.as_completed()挨个迭代运行完的线程返回的结果。asyncio也实现了这个方法,我们将使用这个函数改写上方的代码。

还有一个问题:我们往往只关注了网络I/O请求,常常忽略本地的I/O操作。线程版本中的save_flag函数也是会阻塞线程的,因为它操作了磁盘。但由于图片太小,速度太快,我们感觉并不明显,如果换成更高像素的图片,这种速度差异就会很明显。我们将会以某种方式使其避免阻塞线程。下面是改写的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 代码2.5
import asyncio, os, sys, time, aiohttp

async def download_one(cc, semaphore):
async with semaphore:
image = await get_flag(cc)
loop = asyncio.get_event_loop()
loop.run_in_executor(None, save_flag, image, cc + ".gif")
return cc

async def download_coro(cc_list, concur_req):
semaphore = asyncio.Semaphore(concur_req) # 它是一个信号量,用于控制并发量
to_do = [download_one(cc, semaphore) for cc in sorted(cc_list)]
to_do_iter = asyncio.as_completed(to_do)
for future in to_do_iter:
res = await future
print("Downloaded", res)

def download_many(cc_list, concur_req): # 变化不大
loop = asyncio.get_event_loop()
coro = download_coro(cc_list, concur_req)
loop.run_until_complete(coro)
loop.close()

if __name__ == "__main__":
t0 = time.time()
download_many(POP20_CC, 1000) # 第二个参数表示最大并发数
print("\nDone! Time elapsed {:.2f}s.".format(time.time() - t0))

# 结果:
Downloaded BD
Downloaded CN
-- snip --
Downloaded US

Done! Time elapsed 1.21s.

上述代码有3个地方值得关注:

  • asyncio.as_completed()元素为协程的可迭代对象为参数,但自身并不是协程,只是一个生成器。它在内部将传入的协程包装成Task,然后返回一个生成器,产出协程的返回值。这个生成器按协程完成的顺序生成值(先完成先产出),而不是按协程在迭代器中的顺序生成值。
  • asyncio.Semaphore是个信号量类,内部维护这一个计数器,调用它的acquire方法(这个方法是个协程),计数器减一;对其调用release方法(这个方法不是协程),计数器加一;当计数器为0时,会阻塞调用这个方法的协程。
  • 我们将save_flag函数放到了其他线程中,loop.run_in_executor()的第一个参数是Executor实例,如果为None,则使用事件循环的默认ThreadPoolExecutor实例。余下的参数是可调用对象,以及可调用对象的位置参数。

3. 总结

本章开篇介绍了阻塞非阻塞、同步异步的概念,然后介绍了异步的两种实现方式:回调和协程。并通过代码比较了回调和协程的实现方式。然后我们使用asyncioaiohttp两个库,将之前线程版本的下载国旗程序改为了协程版本。可惜我也是刚接触协程不久,写的内容不一定准确,尤其是关于asyncio的内容,这个库之前是一点都没接触过。后面我会专门研究Python中的协程,以及asyncio的实现,争取把这部分内容彻底搞懂。

VPointer wechat
欢迎大家关注我的微信公众号"代码港"~~
您的慷慨将鼓励我继续创作~~