Concurrency In Python 简明教程

Concurrency in Python - Pool of Threads

假设我们必须为多线程任务创建一个大量线程。由于线程过多,它在计算上可能开销最大,因为可能出现很多性能问题。一个主要问题可能是吞吐量受限。我们可以通过创建一个线程池来解决此问题。线程池可以被定义为预实例化和空闲线程的组,它们随时准备接受工作。当我们需要完成大量任务时,相对于为每个任务实例化新线程,更赞成创建线程池。线程池可以按如下方式管理大量线程的并发执行 −

Suppose we had to create a large number of threads for our multithreaded tasks. It would be computationally most expensive as there can be many performance issues, due to too many threads. A major issue could be in the throughput getting limited. We can solve this problem by creating a pool of threads. A thread pool may be defined as the group of pre-instantiated and idle threads, which stand ready to be given work. Creating thread pool is preferred over instantiating new threads for every task when we need to do large number of tasks. A thread pool can manage concurrent execution of large number of threads as follows −

  1. If a thread in a thread pool completes its execution then that thread can be reused.

  2. If a thread is terminated, another thread will be created to replace that thread.

Python Module – Concurrent.futures

Python 标准库包含 concurrent.futures 模块。此模块已添加到 Python 3.2 中,以便为开发人员提供一个用于启动异步任务的高级接口。它是一个位于 Python 的 threading 和 multiprocessing 模块之上的抽象层,用于提供使用线程或进程的池来运行任务的接口。

Python standard library includes the concurrent.futures module. This module was added in Python 3.2 for providing the developers a high-level interface for launching asynchronous tasks. It is an abstraction layer on the top of Python’s threading and multiprocessing modules for providing the interface for running the tasks using pool of thread or processes.

在我们的后续部分,我们将了解 concurrent.futures 模块的不同类。

In our subsequent sections, we will learn about the different classes of the concurrent.futures module.

Executor Class

Executor*is an abstract class of the *concurrent.futures Python 模块。它不能直接使用,我们需要使用以下具体子类之一 −

Executor*is an abstract class of the *concurrent.futures Python module. It cannot be used directly and we need to use one of the following concrete subclasses −

  1. ThreadPoolExecutor

  2. ProcessPoolExecutor

ThreadPoolExecutor – A Concrete Subclass

它是 Executor 类的具体子类之一。该子类使用多线程并且我们获得一个用于提交任务的线程池。此线程池将任务分配给可用线程,并计划运行它们。

It is one of the concrete subclasses of the Executor class. The subclass uses multi-threading and we get a pool of thread for submitting the tasks. This pool assigns tasks to the available threads and schedules them to run.

How to create a ThreadPoolExecutor?

借助 concurrent.futures 模块及其具体子类 Executor ,我们可以很容易地创建一个线程池。为此,我们需要使用我们希望在池中实现的线程数来构建 ThreadPoolExecutor 。默认情况下,该数字为 5。然后,我们可以向线程池提交任务。当我们 submit() 一个任务,我们得到一个 Future 。将来对象有一个名为 done() 的方法,它告诉未来是否已解析。这样,已为该特定将来对象设置一个值。当一个任务完成,线程池执行程序会将值设置到将来对象。

With the help of concurrent.futures module and its concrete subclass Executor, we can easily create a pool of threads. For this, we need to construct a ThreadPoolExecutor with the number of threads we want in the pool. By default, the number is 5. Then we can submit a task to the thread pool. When we submit() a task, we get back a Future. The Future object has a method called done(), which tells if the future has resolved. With this, a value has been set for that particular future object. When a task finishes, the thread pool executor sets the value to the future object.

Example

from concurrent.futures import ThreadPoolExecutor
from time import sleep
def task(message):
   sleep(2)
   return message

def main():
   executor = ThreadPoolExecutor(5)
   future = executor.submit(task, ("Completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

Output

False
True
Completed

在上面的示例中,已使用 5 个线程构建了一个 ThreadPoolExecutor 。然后,执行一个任务,在给出消息之前它将等待 2 秒,然后将此任务提交到线程池执行程序。从输出中可以看到,该任务执行 2 秒才完成,所以第一次调用的 done() 将返回 False。2 秒后,任务执行完毕,我们通过调用其 result() 方法来获取未来的结果。

In the above example, a ThreadPoolExecutor has been constructed with 5 threads. Then a task, which will wait for 2 seconds before giving the message, is submitted to the thread pool executor. As seen from the output, the task does not complete until 2 seconds, so the first call to done() will return False. After 2 seconds, the task is done and we get the result of the future by calling the result() method on it.

Instantiating ThreadPoolExecutor – Context Manager

实例化 ThreadPoolExecutor 的另一种方法是借助上下文管理器。它的工作原理与上述示例中使用的该方法类似。主要使用上下文管理器的好处是它的语义很好。实例化可以通过以下代码来实现 −

Another way to instantiate ThreadPoolExecutor is with the help of context manager. It works similar to the method used in the above example. The main advantage of using context manager is that it looks syntactically good. The instantiation can be done with the help of the following code −

with ThreadPoolExecutor(max_workers = 5) as executor

Example

以下示例来自 Python 文档。在此示例中,首先必须导入 concurrent.futures 模块。然后,创建一个名为 load_url() 的函数,它将加载所请求的 url。然后,该函数使用池中的 5 个线程创建 ThreadPoolExecutorThreadPoolExecutor 已用作上下文管理器。我们可以通过调用其 result() 方法来获取未来的结果。

The following example is borrowed from the Python docs. In this example, first of all the concurrent.futures module has to be imported. Then a function named load_url() is created which will load the requested url. The function then creates ThreadPoolExecutor with the 5 threads in the pool. The ThreadPoolExecutor has been utilized as context manager. We can get the result of the future by calling the result() method on it.

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/']

def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
   return conn.read()

with concurrent.futures.ThreadPoolExecutor(max_workers = 5) as executor:

   future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
   for future in concurrent.futures.as_completed(future_to_url):
   url = future_to_url[future]
   try:
      data = future.result()
   except Exception as exc:
      print('%r generated an exception: %s' % (url, exc))
   else:
      print('%r page is %d bytes' % (url, len(data)))

Output

以下是上述 Python 脚本的输出 −

Following would be the output of the above Python script −

'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229313 bytes
'http://www.cnn.com/' page is 168933 bytes
'http://www.bbc.co.uk/' page is 283893 bytes
'http://europe.wsj.com/' page is 938109 bytes

Use of Executor.map() function

Python map() 函数在许多任务中得到广泛使用。这类任务的一种是将某个函数应用于迭代中的每个元素。类似地,我们可以将迭代器的所有元素映射到一个函数,并将这些元素作为独立作业提交给 out ThreadPoolExecutor 。请看以下 Python 脚本示例,以了解函数是如何工作的。

The Python map() function is widely used in a number of tasks. One such task is to apply a certain function to every element within iterables. Similarly, we can map all the elements of an iterator to a function and submit these as independent jobs to out ThreadPoolExecutor. Consider the following example of Python script to understand how the function works.

Example

在下面的示例中,map 函数用于将 square() 函数应用于 values 数组中的每个值。

In this example below, the map function is used to apply the square() function to every value in the values array.

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with ThreadPoolExecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
for result in results:
      print(result)
if __name__ == '__main__':
   main()

Output

上面的 Python 脚本会生成以下输出 −

The above Python script generates the following output −

4
9
16
25