Python学习之路36-使用future处理并发

《流畅的Python》笔记。

本篇主要讨论concurrent.futures模块,并用它实现一个简单的并发操作。

1. 前言

我们都知道,如果有大量数据要处理,或者要处理大量链接,异步操作会比顺序操作快很多。Python中,concurrentasyncio则是标准库中进行了高度封装的两个异步操作包。它们在底层使用了Python提供的更基础的两个模块,分别是multiprocessingthreading

future(全小写)并不具体指某个类的实例,而且笔者查了老多资料也没看到哪个类叫做future,它泛指用于异步操作的对象。concurrent.futuresasyncio这两个模块中有两个名为Future的类:concurrent.futures.Futureasyncio.Future。这两个类的作用相同,都表示可能已经完成或尚未完成的延迟计算。这两个Future的实例并不应该由我们手动创建,而应交由并发框架(也就是前面那两个模块)来实例化。

本篇主要介绍concurrent.futures模块的简单使用,并会将其和顺序计算进行对比,其中还会涉及GIL和阻塞型I/O的概念。asyncio将在下一篇进行介绍。

2. 顺序执行

首先实现一个下载各国国旗的程序,随后再将它与并发版本进行对比。以下是顺序执行的版本,它下载人口前20的国家的国旗,并保存到本地:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 代码2.1,flags.py
import os, time, sys # 这么引用只是为了节省篇幅,并不提倡
import requests # 第三方库

POP20_CC = ("CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR").split()
# 如果想测试自己的并发程序,为了避免被误认为是DOS攻击,请自建http服务
BASE_URL = "http://flupy.org/data/flags"
DEST_DIR = "downloads/"

def save_flag(img, filename): # 保存图片到本地
path = os.path.join(DEST_DIR, filename)
with open(path, "wb") as fp:
fp.write(img)

def get_flag(cc): # 请求图片
url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
resp = requests.get(url)
return resp.content

def show(text): # 每获取一张图片就给出一个提示
print(text, end=" ")
sys.stdout.flush()

def download_one(cc): # 下载一张图片
image = get_flag(cc)
show(cc)
save_flag(image, cc.lower() + ".gif")
return cc # 这个return主要是给后面的并发程序用的,此处不要这行代码也可以

def download_many(cc_list): # 下载多张图片
for cc in sorted(cc_list):
download_one(cc)
return len(cc_list)

def main(download_many): # 主程序,接收一个函数为参数
t0 = time.time() # 开始时间
count = download_many(POP20_CC)
elapsed = time.time() - t0 # 结束时间
msg = "\n{} flags downloaded in {:.2f}s"
print(msg.format(count, elapsed))

if __name__ == "__main__":
main(download_many)

# 结果
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN
20 flags downloaded in 14.83s # 耗时,只做了一次

3. concurrent.futures

现在我们用concurrent.futures模块将上述代码改写为线程版本,使其异步执行,其中有大部分函数延用上述代码。

3.1 futures.as_completed

首先实现一个更具有细节的版本,我们手动提交线程,然后再运行。这个版本只是为了讲述细节,所以并没有全部下载,最大线程数也没有设置得很高:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 代码3.1,flags_threadpool.py
from concurrent import futures
from flags import save_flag, get_flag, download_one, show, main

def download_many_ac(cc_list):
cc_list = cc_list[:5] # 只下载前五个用于测试
with futures.ThreadPoolExecutor(len(cc_list) / 2) as executor:
to_do = {} # 有意写出字典,其实也可以是列表或集合,但这是个惯用方法
for cc in sorted(cc_list):
future = executor.submit(download_one, cc)
to_do[future] = cc
msg = "Scheduled for {}: {}"
print(msg.format(cc, future))
results = []
for future in futures.as_completed(to_do):
res = future.result()
msg = "{} result: {!r}"
print(msg.format(future, res))
results.append(res)
return len(results)

if __name__ == "__main__":
main(download_many_ac)

# 结果:
Scheduled for BR: <Future at 0x1cbca5ab0f0 state=running>
Scheduled for CN: <Future at 0x1cbcb339b00 state=running>
Scheduled for ID: <Future at 0x1cbcb3490f0 state=running>
Scheduled for IN: <Future at 0x1cbcb349748 state=pending>
Scheduled for US: <Future at 0x1cbcb3497f0 state=pending>
CN <Future at 0x1cbcb339b00 state=finished returned str> result: 'CN'
BR <Future at 0x1cbca5ab0f0 state=finished returned str> result: 'BR'
IN <Future at 0x1cbcb349748 state=finished returned str> result: 'IN'
US <Future at 0x1cbcb3497f0 state=finished returned str> result: 'US'
ID <Future at 0x1cbcb3490f0 state=finished returned str> result: 'ID'

5 flags downloaded in 2.39s # 20个一起下载只需要1.6s左右

解释

  • concurrent.futures中有一个名为Executor的抽象基类,由它定义执行异步操作的接口。在这个模块中有它的两个具体类:的ThreadPoolExecutorProcessPoolExecutor,前者是线程,后者是进程。Executor的第一个参数指定最大运行线程数。
  • Executor.submit(func, *args, **kwargs)方法会在线程中执行func(*args, **kwargs),它将这个方法封装成Future对象并返回(假设这个实例叫做future)。submit方法会对future进行排期,如果运行的线程数没达到最大线程数,则future会被立即运行,并将其状态置为running;否则就等待,并将其状态置为pending这同时也表明,线程在submit方法中启动
  • futures.as_completed函数的第一个参数是一个future序列,在内部会被转换成set。它返回一个迭代器,在future运行结束后产出future。在使用这个函数时还有一个惯用方法:将future放到一个字典中。因为as_completed返回的future的顺序不一定是传入时的顺序,使用字典可以很轻松的做一些后续处理。
  • 上述代码中,从第31-35行的最开始两个字母是由show函数输出的。光看上述结果,会让人觉得线程是在as_completed中启动的,而之所以结果输出得这么整齐,是因为for循环里只是“提交”,实际运行是在线程中。如果在每次循环最后都执行sleep(2),你将会看到这样的结果:

    1
    2
    3
    4
    5
    # 代码3.2
    Scheduled for BR: <Future at 0x13e6b30b2b0 state=running>
    BR Scheduled for CN: <Future at 0x13e6b5820b8 state=running>
    CN Scheduled for ID: <Future at 0x13e6c099278 state=running>
    -- snip --
  • concurrent.futures.Future有一个result方法,它返回future中可调用对象运行完成后的结果,或者重新抛出可调用对象运行时的异常。如果future还未运行完成,调用future.result()阻塞调用方所在的线程,直到有结果可返回;它还可以接受一个timeout参数用于指定运行时间,如果在timeout时间内future没有运行完毕,将抛出TimeoutError异常。

3.2 Executor.map

代码3.1中,我们自行提交线程,其实,上述可改为更简洁的版本:使用Executor.map批量提交,只需要新建一个download_many函数,其余不变:

1
2
3
4
5
6
7
8
9
# 代码3.3
def download_many(cc_list):
with futures.ThreadPoolExecutor(len(cc_list)) as executor:
res = executor.map(download_one, sorted(cc_list))
return len(list(res))

# 结果:
JP RUBR EG CN VN BD TR FR ID NG DE IN PK ET PH IR US CD MX
20 flags downloaded in 1.69s

Executor.map()方法和内置的map函数类似,它将第一个参数(可调用对象)映射到第二个参数(可迭代对象)的每一个元素上以创建Future列表。Executor.map()方法内部也是通过调用Future.submit来创建Future对象。

3.3 比较

从上面代码可以看出,虽然使用Executor.map()的代码量比较少,但Executor.submit()futures.as_completed()的组合更灵活。

Executor.map()更适合于需要批量处理的情况,比如同一函数(或者可调用对象)不同参数。而Executor.submit()则更适合于零散的情况,比如不同函数同一参数,不同函数不同参数,甚至两个线程毫无关联。

4. 补充

本文主体部分已经结束,下面是一些补充。

4.1 I/O密集型和GIL

CPython本身并不是线程安全的,因此有全局解释器锁(Global Interpreter Lock, GIL),一次只允许使用一个线程执行Python字节码。

以这个为基础,按理说上述所有代码将都不能并行下载,因为一次只能运行一个线程,并且线程版本的运行时间应该比顺序版本的还要多才对(线程切换耗时)。但结果也表明,两个线程版本的耗时都大大降低了。

这是因为,Python标准库中所有执行阻塞型I/O操作的函数,在等待操作系统返回结果时都会释放GIL。这就意味着,GIL几乎对I/O密集型处理并没有什么影响,依然可以使用多线程。

4.2 CPU密集型

concurrent.futures中还有一个ProcessPoolExecutor类,它实现的是真正的并行计算。它和ThreadPoolExecutro一样,继承自Executor,两者实现了共同的接口,因此使用concurrent.futures编写的代码可以轻松地在线程版本与进程版本之间转换,比如要讲上述代码改为进程版本,只需更改download_many()中的一行代码:

1
2
3
4
# 代码3.4
with futures.ThreadPoolExecutor(len(cc_list)) as executor:
# 改为:
with futures.ProcessPoolExecutor() as executor:

也可以指定进程数,但默认是os.cpu_count()的返回值,即电脑的CPU核心数。

这个类非常适合于CPU密集型作业上。使用这个类实现的上述代码虽然比线程版本慢一些,但依然比顺序版本快很多。

4.3 进度条

如果你用最新版pip下载过第三方库,你会发现在下载时会有一个文字进度条。在Python中想要实现这种效果可以使用第三方库tqdm,以下是它的一个简单用法:

1
2
3
4
5
6
7
8
9
# 代码3.5
import tqdm
from time import sleep

for i in tqdm.tqdm(range(1000)):
sleep(0.01)

# 结果:
40%|████ | 400/1000 [00:10<00:00, 98.11it/s]
VPointer wechat
欢迎大家关注我的微信公众号"代码港"~~
您的慷慨将鼓励我继续创作~~