摘要:

本文主要是对 Python3 标准库 concurrent.futures 文档的翻译

concurrent.futures 模块为异步执行可调用的对象提供了一个高级的接口。

异步执行可以通过线程来实现,使用 ThreadPoolExecutor 模块,或者使用 ProcessPoolExecutor 模块通过分离进程来实现。两种实现都有同样的接口,他们都是通过抽象类 Executor 来定义的。

Executor 对象

class concurrent.futures.Executor

这是一个抽象类,用来提供方法去支持异步地执行调用,它不应该被直接调用,而是应该通过具体的子类来使用。

submit(fn, *args, **kwargs) > 可调用对象的调度器,fn参数将会以fn(*args, **kwargs)的形式来调用,同时返回一个 Future 对象代表了可调用对象的执行情况。

1
2
3
with ThreadPoolExecutor(max_workers=1) as executor:
     future = executor.submit(pow, 323, 1235)
     print(future.result())

map(func, *iterables, timeout=None, chunksize=1)

map(func, *iterables)函数的作用基本相同,除了func是被异步执行的,而且几个对于func调用可能是同时执行的。这个函数返回的迭代器调用__next__()方法的时候,如果在timeout秒内结果不可用,那么迭代器将会从原始调用的函数向Executor.map()抛出一个concurrent.futures.TimeoutError的异常。timeout既能是一个整数,也能是一个浮点数。如果timeout没有指定的话或者等于 None 的话,那么等待时间就没有限制。如果调用函数抛出了一个异常,那么当迭代器取到这个函数的时候,异常将会被抛出。 当使用ProcessPoolExecutor的时候,这个方法将iterables切成许多块,然后将这些内容作为分离的任务提交到进程池中。每个块的大概的尺寸能够通过chunksize(大于0的正整数)的参数来指定。当iterables非常大的时候,和chunksize默认等于1相比,将chunksize设置为一个很大的值,将会显著地提升性能。在使用ThreadPoolExecutor的情况下,chunksize的大小没有影响。

Python 3.5新增功能:添加了chunksize参数

shutdown(wait=True)

告诉执行器,当当前阻塞的 futures 执行完了以后,它应该释放所有它使用的资源。在shutdown函数之后再来调用Executor.submit()Executor.map()将会抛出RuntimeError

如果wait等于 True 的话,这个方法不会立即返回,而直到所有阻塞的 futures 都返回,而且和这个执行器所有相关的资源都被释放以后,这个函数才会返回。 如果wait设置为 False ,那么这个方法会立刻返回,而和这个执行器所有相关的资源只有等到所有阻塞的 futures 都执行完以后才会被释放。而无论wait参数的值是什么,整个 Python 程序都会等到所有阻塞的 futures 执行完毕以后才会退出。

通过with语句,可以避免明确地来调用这个方法,它在执行完以后将会自动关闭Executor。(调用 Executor.shutdown() 时wait会被设置为True,这将会等待所有 future 执行完毕)

1
2
3
4
5
6
import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

ThreadPoolExecutor

ThreadPoolExecutorExecutor的子类,使用一个线程池去异步地执行调用。

当一个 Future 关联的调用等待另外一个 Future 的执行结果的时候,死锁就有可能发生,例如下面的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import time

def wait_on_b():
    time.sleep(5)
    print(b.result())  # b 永远不会完成,因为它等待着 a 的结果
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result()) # a 永远不会完成,因为它等待着 b 的结果
    return 6

executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

和这个例子:

1
2
3
4
5
6
7
def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # 这个也永远不会完成,因为线程池里面最多只能有一个线程,而它现在正在执行着这个函数。
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)

class concurrent.futures.ThreadPoolExecutor(max_workers=None)

一个Executor的子类,使用线程池中最多max_workers个线程去异步地执行回调。 Python 3.5中的改变:如果max_workers参数为None或者没有给定,那么它将会被默认设置成为机器的CPU核数乘5。这里假设ThreadPoolExecutor经常被用来执行IO密集型的工作而不是CPU密集型的工作,工作者的个数应该比ProcessPoolExecutor的工作者的个数要多。

ThreadPoolExecutor 例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# 获取一个单页,同时报告URL和内容
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# 我们可以通过with语句来确保线程能够被及时地清理
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

ProcessPoolExecutor 类是Executor的一个子类,它使用一个进程池来异步地执行调用。ProcessPoolExecutor使用multiprocessing模块,它允许去避免全局解释器锁,但同时也意味着仅仅只有pickable(TODO译者注:这里这个单词的含义我还没理解)对象能够被执行和返回。

__main__模块必须能够被作为工作者的子模块导入。这意味着ProcessPoolExecutor将不会在交互式的解释器中工作。

从一个已添加到Executor中的可调用对象中调用Executor或者Future的方法将会导致死锁(TODO译者注:有待实践操作)。

class concurrent.futures.ProcessPoolExecutor(max_workers=None)

一个Executor的子类来异步地执行调用,最多将会使用进程池中max_workers个工作进程。如果max_workersNone或者没有给出的话。它默认将会使用机器CPU的个数来作为最大进程数的值。如果max_workers小于或者等于0,那么将会抛出一个ValueError 3.3版本中的改变:当一个工作进程被突然终止了后,将会抛出一个BrokenProcessPool的错误。以前的情况是,行为是未定义的但是 Executor 上的操作或者他自己的 future 将会被冻结或者导致死锁。

ProcessPoolExecutor 的例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    """(译者注) 判断素数的程序

    这里对 sqrt_n 做一点解释:

    假设 n 不是素数,那么有 n = x * y( x != 1 and y != 1)
    因为 n = x * y, 所以 x <= sqrt(n) or y <= sqrt(n)
    所以 i in [2, sqrt(n)]; i表示能够被 n 整除的数
    所以如果 i not in [2, sqrt(n)]; 那么n是素数

    在这个程序中我们在一开始就判断过2,所以循环从3开始,且跳过所有偶数
    """
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

Future 对象

Future 类封装了一个可调用对象的异步执行过程,Future 对象是通过Executor.submit()函数来创建的。

class concurrent.futures.Future

封装了一个可调用对象的异步执行过程。Future 实例是通过Executor.submit()方法来创建的,而且不应该被直接创建,除非用来测试。

cancel() > 尝试去取消相关回调,如果这个回调正在被执行,而且不能被取消,那么这个方法将会返回False,否则这个方法将会取消相应的回调并且返回True

cancelled() > 如果相关回调被成功取消了,那么这个方法将会返回True

running() > 如果相关回调当前正在被执行而且无法取消,那么将会返回True

done() > 如果相关的回调被成功地取消或者已经运行完毕那么将返回True

result(timeout=None) > 返回由相关回调产生的结果。如果这个回调还没有被完成那么这个方法将会等待timeout秒。如果这个回调在timeout秒内还没有返回,一个concurrent.futures.TimeoutError的异常将会被抛出。timeout能够被设置成一个整数或者一个浮点数。如果timeout没有被设置或者其值为None,那么等待时间将没有限制。

如果这个 future 在完成之前被取消了,那么将会抛出一个CancelledError的异常。 如果相关的回调抛出了一个异常,那么这个方法也会相应地抛出这个异常。

exception(timeout=None) > 返回由相关回调抛出的异常。如果相关回调还没有被完成那么这个方法将会等待timeout秒。如果相关回调在timeout秒内还没有被完成,那么将会抛出一个concurrent.futures.TimeoutError的异常。timeout能够被设置成一个整数或者一个浮点数。如果timeout没有被设置或者其值为None,那么等待时间将没有限制。

如果这个 future 在完成之前被取消了,那么将会抛出一个CancelledError的异常。 如果相关回调被完成了且没有抛出异常,None将会被返回。

add_done_callback(fn) > 将可调用对象fn连接到这个 future 上,fn将会在 future 被取消或者结束运行时被调用,而且仅有相关 future 这一个参数。 > 添加的可调用对象将会以它们被添加的顺序来调用,而且总是在添加它们的那个进程的所属的线程中调用(译者注,可以参考这段代码)。如果相关调用fn抛出了一个Exception子类的异常,它将会被记录和忽略。如果相关调用fn抛出了一个BaseException子类的异常,那么行为是未定义的。 > 如果相关的 future 已经被完成了或者取消了,fn将会被立刻调用。

如下的Future方法意味着可以在单元测试或者Exectuor的实现中使用。

set_running_or_notify_cancel() > 这个方法应该仅能够被Exectuor的实现(在执行和Future相关的工作之前调用)和单元测试调用。 > 如果这个方法返回False,那么相关的Future被取消了。即Future.cancel()被调用了而且返回True。任何等待 Future 完成的线程(即通过as_completed或者wait()方法等待)都会被唤醒。 > 这个方法仅能被调用一次,而且不能在Future.set_result()或者Future.set_exception()被调用了之后调用。

set_result(result) > 将 future 相关的工作的结果设置成result > 这个方法仅仅应该被Exectuor实现的时候和单元测试来调用。

set_exception(exception) > 将 future 相关的工作的结果设置成异常exception > 这个方法仅仅应该被Exectuor实现的时候和单元测试来调用。

模块方法

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED) > 等待由fs(译者注:这里这个fs表示 futures 的意思)给出的 future 实例(可能由不同的 Exectuor 实例创建的)去完成。返回一个命名二元组的集合。在第一个集合元素中,命名为done,包含着那些在wait函数完成之前就已经完成(被完成或者被取消)的 future。第二个集合元素命名为not_done,包含那些未完成的 future。 > timeout用来控制wait函数在返回之前等待的最大秒数。timeout能够被设置成整数或者浮点数。如果timeout没有被设置或者值为None,那么等待时间将没有限制。 > return_when代表了函数应该在什么时候返回,它必须被设置成如下值中的一个:

Constant Description
FIRST_COMPLETED 当任何 future 结束或者被取消的时候,这个函数就会返回
FIRST_EXCEPTION 当任何 future 通过抛出异常来结束的时候,这个函数就会返回,如果没有任何函数抛出异常,那么它等价与ALL_COMPLETED
ALL_COMPLETED 当所有的 future 都已结束或者被取消的时候,这个函数就会返回。

concurrent.futures.as_completed(fs, timeout=None) > 返回一个在由fs给出的 future 实例(可能由不同的Executor创建的)上的迭代器,当 future 完成的时候(结束或者被取消),就把这个 future yield回去。如果fs中给出的 future 重复了,那么将仅会被返回一次。任何在as_completed函数调用之前就已经完成或者被取消的 future, 将会首先yield回来。如果__next__()被调用,在timeout秒后结果依然不可达,那么返回的迭代器将会从原始的调用向as_completed抛出一个concurrent.futures.TimeoutError异常。timeout可以被设置成整数或者浮点数。如果timeout没有被指定或者其值为None,那么等待的时间将没有限制。

See also: PEP 3148 - futures - 执行异步计算 这个提案描述了包含在 Python 标准库中的这个特性。

异常类

exception concurrent.futures.CancelledError > 当 future 被取消的时候抛出这个异常。

exception concurrent.futures.TimeoutError > 当 future 的操作超出给定的timeout时间的时候抛出这个异常。

exception concurrent.futures.process.BrokenProcessPool > 派生自RuntimeError,当ProcessPoolExecutor中的一个工作进程以不干净的方式(例如,它是从外部被杀死的)结束的时候,这个异常将会抛出。 > Python 3.3 后的新特性