|
1 | 1 | 使用Python的 ``concurrent.futures`` 模块
|
2 | 2 | ========================================
|
3 | 3 |
|
| 4 | +Python3.2带来了 ``concurrent.future`` 模块,这个模块具有线程池和进程池、管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能。 |
| 5 | + |
| 6 | +此模块由以下部分组成: |
| 7 | + |
| 8 | +- ``concurrent.futures.Executor``: 这是一个虚拟基类,提供了一步执行的方法。 |
| 9 | +- ``submit(function, argument)``: 调度函数(可调用的对象)的执行,将 ``argument`` 作为参数传入。 |
| 10 | +- ``map(function, argument)``: 将 ``argument`` 作为参数执行函数,以 **异步** 的方式。 |
| 11 | +- ``shutdown(Wait=True)``: 发出让执行者释放所有资源的信号。 |
| 12 | +- ``concurrent.futures.Future``: 其中包括函数的异步执行。Future对象是submit任务(即带有参数的functions)到executor的实例。 |
| 13 | + |
| 14 | +Executor是抽象类,可以通过子类访问,即线程或进程的 ``ExecutorPools`` 。因为,线程或进程的实例是依赖于资源的任务,所以最好以“池”的形式将他们组织在一起,作为可以重用的launcher或executor。 |
| 15 | + |
| 16 | +使用线程池和进程池 |
| 17 | +------------------ |
| 18 | + |
| 19 | +线程池或进程池是用于在程序中优化和简化线程/进程的使用。通过池,你可以提交任务给executor。池由两部分组成,一部分是内部的队列,存放着待执行的任务;另一部分是一系列的进程或线程,用于执行这些任务。池的概念主要目的是为了重用:让线程或进程在生命周期内可以多次使用。它减少了创建创建线程和进程的开销,提高了程序性能。重用不是必须的规则,但它是程序员在应用中使用池的主要原因。 |
| 20 | + |
| 21 | +.. image:: ../images/pooling-management.png |
| 22 | + |
| 23 | +|ready| |
| 24 | +------- |
| 25 | + |
| 26 | +``current.Futures`` 模块提供了两种 ``Executor`` 的子类,各自独立操作一个线程池和一个进程池。这两个子类分别是: |
| 27 | + |
| 28 | +- ``concurrent.futures.ThreadPoolExecutor(max_workers)`` |
| 29 | +- ``concurrent.futures.ProcessPoolExecutor(max_workers)`` |
| 30 | + |
| 31 | +``max_workers`` 参数表示最多有多少个worker并行执行任务。 |
| 32 | + |
| 33 | +|how| |
| 34 | +----- |
| 35 | + |
| 36 | +下面的示例代码展示了线程池和进程池的功能。这里的任务是,给一个list ``number_list`` ,包含1到10。对list中的每一个数字,乘以1+2+3...+10000000的和(这个任务只是为了消耗时间)。 |
| 37 | + |
| 38 | +下面的代码分别测试了: |
| 39 | + |
| 40 | +- 顺序执行 |
| 41 | +- 通过有5个worker的线程池执行 |
| 42 | +- 通过有5个worker的进程池执行 |
| 43 | + |
| 44 | +代码如下::: |
| 45 | + |
| 46 | + # -*- coding: utf-8 -*- |
| 47 | + |
| 48 | + """ Concurrent.Futures Pooling - Chapter 4 Asynchronous Programming """ |
| 49 | + |
| 50 | + import concurrent.futures |
| 51 | + import time |
| 52 | + number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] |
| 53 | + |
| 54 | + def evaluate_item(x): |
| 55 | + # 计算总和,这里只是为了消耗时间 |
| 56 | + result_item = count(x) |
| 57 | + # 打印输入和输出结果 |
| 58 | + print ("item " + str(x) + " result " + str(result_item)) |
| 59 | + |
| 60 | + def count(number) : |
| 61 | + for i in range(0, 10000000): |
| 62 | + i=i+1 |
| 63 | + return i * number |
| 64 | + |
| 65 | + if __name__ == "__main__": |
| 66 | + # 顺序执行 |
| 67 | + start_time = time.clock() |
| 68 | + for item in number_list: |
| 69 | + evaluate_item(item) |
| 70 | + print("Sequential execution in " + str(time.clock() - start_time), "seconds") |
| 71 | + # 线程池执行 |
| 72 | + start_time_1 = time.clock() |
| 73 | + with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: |
| 74 | + for item in number_list: |
| 75 | + executor.submit(evaluate_item, item) |
| 76 | + print ("Thread pool execution in " + str(time.clock() - start_time_1), "seconds") |
| 77 | + # 进程池 |
| 78 | + start_time_2 = time.clock() |
| 79 | + with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor: |
| 80 | + for item in number_list: |
| 81 | + executor.submit(evaluate_item, item) |
| 82 | + print ("Process pool execution in " + str(time.clock() - start_time_2), "seconds") |
| 83 | + |
| 84 | +运行这个代码,我们可以看到运行时间的输出::: |
| 85 | + |
| 86 | + $ python3 pool.py |
| 87 | + item 1 result 10000000 |
| 88 | + item 2 result 20000000 |
| 89 | + item 3 result 30000000 |
| 90 | + item 4 result 40000000 |
| 91 | + item 5 result 50000000 |
| 92 | + item 6 result 60000000 |
| 93 | + item 7 result 70000000 |
| 94 | + item 8 result 80000000 |
| 95 | + item 9 result 90000000 |
| 96 | + item 10 result 100000000 |
| 97 | + Sequential execution in 7.495329 seconds |
| 98 | + item 1 result 10000000 |
| 99 | + item 2 result 20000000 |
| 100 | + item 4 result 40000000 |
| 101 | + item 3 result 30000000 |
| 102 | + item 5 result 50000000 |
| 103 | + item 8 result 80000000 |
| 104 | + item 7 result 70000000 |
| 105 | + item 9 result 90000000 |
| 106 | + item 6 result 60000000 |
| 107 | + item 10 result 100000000 |
| 108 | + Thread pool execution in 8.349609000000001 seconds |
| 109 | + item 1 result 10000000 |
| 110 | + item 2 result 20000000 |
| 111 | + item 3 result 30000000 |
| 112 | + item 4 result 40000000 |
| 113 | + item 5 result 50000000 |
| 114 | + item 7 result 70000000 |
| 115 | + item 8 result 80000000 |
| 116 | + item 6 result 60000000 |
| 117 | + item 9 result 90000000 |
| 118 | + item 10 result 100000000 |
| 119 | + Process pool execution in 0.02012900000000073 seconds |
| 120 | + |
| 121 | +|work| |
| 122 | +------ |
| 123 | + |
| 124 | +我们创建了一个list存放10个数字,然后使用一个循环计算从1加到10000000,打印出和与 ``number_list`` 的乘积。:: |
| 125 | + |
| 126 | + def evaluate_item(x): |
| 127 | + # 计算总和,这里只是为了消耗时间 |
| 128 | + result_item = count(x) |
| 129 | + # 打印输入和输出结果 |
| 130 | + print ("item " + str(x) + " result " + str(result_item)) |
| 131 | + |
| 132 | + def count(number) : |
| 133 | + for i in range(0, 10000000): |
| 134 | + i=i+1 |
| 135 | + return i * number |
| 136 | + |
| 137 | +在主要程序中,我们先使用顺序执行跑了一次程序::: |
| 138 | + |
| 139 | + |
| 140 | + if __name__ == "__main__": |
| 141 | + # 顺序执行 |
| 142 | + start_time = time.clock() |
| 143 | + for item in number_list: |
| 144 | + evaluate_item(item) |
| 145 | + print("Sequential execution in " + str(time.clock() - start_time), "seconds") |
| 146 | + |
| 147 | +然后,我们使用了 ``futures.ThreadPoolExecutor`` 模块的线程池跑了一次::: |
| 148 | + |
| 149 | + with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: |
| 150 | + for item in number_list: |
| 151 | + executor.submit(evaluate_item, item) |
| 152 | + print ("Thread pool execution in " + str(time.clock() - start_time_1), "seconds") |
| 153 | + |
| 154 | +``ThreadPoolExecutor`` 使用线程池中的一个线程执行给定的任务。池中一共有5个线程,每一个线程从池中取得一个任务然后执行它。当任务执行完成,再从池中拿到另一个任务。 |
| 155 | + |
| 156 | +当所有的任务执行完成后,打印出执行用的时间::: |
| 157 | + |
| 158 | + print ("Thread pool execution in " + str(time.clock() - start_time_1), "seconds") |
| 159 | + |
| 160 | +最后,我们又用 ``ProcessPoolExecutor`` 跑了一次程序::: |
| 161 | + |
| 162 | + with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor: |
| 163 | + for item in number_list: |
| 164 | + executor.submit(evaluate_item, item) |
| 165 | + |
| 166 | +如同 ``ThreadPoolExecutor`` 一样, ``ProcessPoolExecutor`` 是一个executor,使用一个线程池来并行执行任务。然而,和 ``ThreadPoolExecutor`` 不同的是, ``ProcessPoolExecutor`` 使用了多核处理的模块,让我们可以不受GIL的限制,大大缩短执行时间。 |
| 167 | + |
| 168 | +|more| |
| 169 | +------ |
| 170 | + |
| 171 | +几乎所有需要处理多个客户端请求的服务应用都会使用池。然而,也有一些应用要求任务需要立即执行,或者要求对任务的线程有更多的控制权,这种情况下,池不是一个最佳选择。 |
0 commit comments