Concurrency In Python 简明教程
Concurrency in Python - Pool of Processes
进程池可以以与创建和使用线程池相同的方式创建和使用。进程池可以定义为预先实例化和空闲进程的组,这些进程随时准备接受工作。当我们需要完成大量任务时,相对于为每个任务实例化新进程,创建进程池是更优选的。
Pool of process can be created and used in the same way as we have created and used the pool of threads. Process pool can be defined as the group of pre-instantiated and idle processes, which stand ready to be given work. Creating process pool is preferred over instantiating new processes for every task when we need to do a large number of tasks.
Python Module – Concurrent.futures
Python 标准库有一个名为 concurrent.futures 的模块。该模块在 Python 3.2 中添加,为开发人员提供了启动异步任务的高级接口。它是 Python 线程和多处理模块之上的一个抽象层,用于提供使用线程或进程池运行任务的接口。
Python standard library has a module called the concurrent.futures. This module was added in Python 3.2 for providing the developers a high-level interface for launching asynchronous tasks. It is an abstraction layer on the top of Python’s threading and multiprocessing modules for providing the interface for running the tasks using pool of thread or processes.
在我们的后续部分中,我们将研究并发 futures 模块的不同子类。
In our subsequent sections, we will look at the different subclasses of the concurrent.futures module.
Executor Class
Executor 是 concurrent.futures Python 模块的抽象类。它不能直接使用,我们需要使用以下具体子类之一:
Executor is an abstract class of the concurrent.futures Python module. It cannot be used directly and we need to use one of the following concrete subclasses −
-
ThreadPoolExecutor
-
ProcessPoolExecutor
ProcessPoolExecutor – A concrete subclass
它是 Executor 类的具体子类之一。它使用多处理,我们得到一个进程池来提交任务。该池将任务分配给可用的进程,并安排它们运行。
It is one of the concrete subclasses of the Executor class. It uses multi-processing and we get a pool of processes for submitting the tasks. This pool assigns tasks to the available processes and schedule them to run.
How to create a ProcessPoolExecutor?
借助 concurrent.futures 模块及其具体子类 Executor ,我们可以轻松地创建一个进程池。为此,我们需要使用池中所需的进程数量构造一个 ProcessPoolExecutor 。默认情况下,数字为 5。接下来向进程池提交任务。
With the help of the concurrent.futures module and its concrete subclass Executor, we can easily create a pool of process. For this, we need to construct a ProcessPoolExecutor with the number of processes we want in the pool. By default, the number is 5. This is followed by submitting a task to the process pool.
Example
现在我们将考虑在创建线程池时使用的相同示例,唯一区别在于现在我们将使用 ProcessPoolExecutor 而非 ThreadPoolExecutor 。
We will now consider the same example that we used while creating thread pool, the only difference being that now we will use ProcessPoolExecutor instead of ThreadPoolExecutor .
from concurrent.futures import ProcessPoolExecutor
from time import sleep
def task(message):
sleep(2)
return message
def main():
executor = ProcessPoolExecutor(5)
future = executor.submit(task, ("Completed"))
print(future.done())
sleep(2)
print(future.done())
print(future.result())
if __name__ == '__main__':
main()
Output
False
False
Completed
在上述示例中,已经构造了一个带有 5 个线程的 Process*PoolExecutor*。然后,一个任务(它将在给出消息之前等待 2 秒)被提交给进程池执行器。从输出中可以看到,任务在 2 秒内不会完成,因此对 done() 的第一次调用将返回 False。2 秒后,任务完成,我们通过调用它的 result() 方法来获得未来的结果。
In the above example, a Process*PoolExecutor* has been constructed with 5 threads. Then a task, which will wait for 2 seconds before giving the message, is submitted to the process pool executor. As seen from the output, the task does not complete until 2 seconds, so the first call to done() will return False. After 2 seconds, the task is done and we get the result of the future by calling the result() method on it.
Instantiating ProcessPoolExecutor – Context Manager
实例化 ProcessPoolExecutor 的另一种方法是借助上下文管理器。它的工作方式类似于上述示例中使用的该方法。可以使用以下代码进行实例化:
Another way to instantiate ProcessPoolExecutor is with the help of context manager. It works similar to the method used in the above example. The main advantage of using context manager is that it looks syntactically good. The instantiation can be done with the help of the following code −
with ProcessPoolExecutor(max_workers = 5) as executor
Example
为了更好地理解,我们采用在创建线程池时使用的相同示例。在此示例中,我们需要首先导入 concurrent.futures 模块。然后创建一个名为 load_url() 的函数,该函数将加载请求的 URL。然后使用池中的 5 个线程数创建 ProcessPoolExecutor 。Process*PoolExecutor* 已用作上下文管理器。我们可以通过调用它的 result() 方法来获得未来的结果。
For better understanding, we are taking the same example as used while creating thread pool. In this example, we need to start by importing the concurrent.futures module. Then a function named load_url() is created which will load the requested url. The ProcessPoolExecutor is then created with the 5 number of threads in the pool. The Process*PoolExecutor* has been utilized as context manager. We can get the result of the future by calling the result() method on it.
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout = timeout) as conn:
return conn.read()
def main():
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
if __name__ == '__main__':
main()
Output
上述 Python 脚本将生成以下输出:
The above Python script will generate the following output −
'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229476 bytes
'http://www.cnn.com/' page is 165323 bytes
'http://www.bbc.co.uk/' page is 284981 bytes
'http://europe.wsj.com/' page is 967575 bytes
Use of the Executor.map() function
Python map() 函数广泛用于执行多项任务。此类任务之一是对 iterable 中的每个元素应用某个函数。同样,我们可以将迭代器的所有元素映射到一个函数,并将它们作为独立作业提交给 ProcessPoolExecutor 。考虑以下 Python 脚本示例以了解这一点。
The Python map() function is widely used to perform a number of tasks. One such task is to apply a certain function to every element within iterables. Similarly, we can map all the elements of an iterator to a function and submit these as independent jobs to the ProcessPoolExecutor. Consider the following example of Python script to understand this.
Example
我们将考虑在使用 Executor.map() 函数创建线程池时使用的相同示例。在下面给出的示例中,map 函数用于对 values 数组中的每个值应用 square() 函数。
We will consider the same example that we used while creating thread pool using the Executor.map() function. In the example givenbelow, the map function is used to apply square() function to every value in the values array.
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
return n * n
def main():
with ProcessPoolExecutor(max_workers = 3) as executor:
results = executor.map(square, values)
for result in results:
print(result)
if __name__ == '__main__':
main()
When to use ProcessPoolExecutor and ThreadPoolExecutor?
现在我们已经研究了这两个执行器类(ThreadPoolExecutor 和 ProcessPoolExecutor),我们需要知道在什么时候使用哪个执行器。在 CPU 绑定工作负载的情况下,我们需要选择 ProcessPoolExecutor,在 I/O 绑定工作负载的情况下,我们需要选择 ThreadPoolExecutor。
Now that we have studied about both the Executor classes – ThreadPoolExecutor and ProcessPoolExecutor, we need to know when to use which executor. We need to choose ProcessPoolExecutor in case of CPU-bound workloads and ThreadPoolExecutor in case of I/O-bound workloads.
如果我们使用 ProcessPoolExecutor ,那么我们不需要担心 GIL 因为它使用多处理。此外,与 ThreadPoolExecution 相比,执行时间会更少。考虑以下 Python 脚本示例来理解这一点。
If we use ProcessPoolExecutor, then we do not need to worry about GIL because it uses multiprocessing. Moreover, the execution time will be less when compared to ThreadPoolExecution. Consider the following Python script example to understand this.
Example
import time
import concurrent.futures
value = [8000000, 7000000]
def counting(n):
start = time.time()
while n > 0:
n -= 1
return time.time() - start
def main():
start = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, time_taken in zip(value, executor.map(counting, value)):
print('Start: {} Time taken: {}'.format(number, time_taken))
print('Total time taken: {}'.format(time.time() - start))
if __name__ == '__main__':
main()
Output
Start: 8000000 Time taken: 1.5509998798370361
Start: 7000000 Time taken: 1.3259999752044678
Total time taken: 2.0840001106262207
Example- Python script with ThreadPoolExecutor:
import time
import concurrent.futures
value = [8000000, 7000000]
def counting(n):
start = time.time()
while n > 0:
n -= 1
return time.time() - start
def main():
start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
for number, time_taken in zip(value, executor.map(counting, value)):
print('Start: {} Time taken: {}'.format(number, time_taken))
print('Total time taken: {}'.format(time.time() - start))
if __name__ == '__main__':
main()
Output
Start: 8000000 Time taken: 3.8420000076293945
Start: 7000000 Time taken: 3.6010000705718994
Total time taken: 3.8480000495910645
从以上两个程序的输出中,我们可以看到使用 ProcessPoolExecutor 和 ThreadPoolExecutor 时执行时间的差异。
From the outputs of both the programs above, we can see the difference of execution time while using ProcessPoolExecutor and ThreadPoolExecutor.