Concurrency In Python 简明教程

Concurrency in Python - Pool of Processes

进程池可以以与创建和使用线程池相同的方式创建和使用。进程池可以定义为预先实例化和空闲进程的组,这些进程随时准备接受工作。当我们需要完成大量任务时,相对于为每个任务实例化新进程,创建进程池是更优选的。

Python Module – Concurrent.futures

Python 标准库有一个名为 concurrent.futures 的模块。该模块在 Python 3.2 中添加,为开发人员提供了启动异步任务的高级接口。它是 Python 线程和多处理模块之上的一个抽象层,用于提供使用线程或进程池运行任务的接口。

在我们的后续部分中,我们将研究并发 futures 模块的不同子类。

Executor Class

Executorconcurrent.futures Python 模块的抽象类。它不能直接使用,我们需要使用以下具体子类之一:

  1. ThreadPoolExecutor

  2. ProcessPoolExecutor

ProcessPoolExecutor – A concrete subclass

它是 Executor 类的具体子类之一。它使用多处理,我们得到一个进程池来提交任务。该池将任务分配给可用的进程,并安排它们运行。

How to create a ProcessPoolExecutor?

借助 concurrent.futures 模块及其具体子类 Executor ,我们可以轻松地创建一个进程池。为此,我们需要使用池中所需的进程数量构造一个 ProcessPoolExecutor 。默认情况下,数字为 5。接下来向进程池提交任务。

Example

现在我们将考虑在创建线程池时使用的相同示例,唯一区别在于现在我们将使用 ProcessPoolExecutor 而非 ThreadPoolExecutor

from concurrent.futures import ProcessPoolExecutor
from time import sleep
def task(message):
   sleep(2)
   return message

def main():
   executor = ProcessPoolExecutor(5)
   future = executor.submit(task, ("Completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

Output

False
False
Completed

在上述示例中,已经构造了一个带有 5 个线程的 Process*PoolExecutor*。然后,一个任务(它将在给出消息之前等待 2 秒)被提交给进程池执行器。从输出中可以看到,任务在 2 秒内不会完成,因此对 done() 的第一次调用将返回 False。2 秒后,任务完成,我们通过调用它的 result() 方法来获得未来的结果。

Instantiating ProcessPoolExecutor – Context Manager

实例化 ProcessPoolExecutor 的另一种方法是借助上下文管理器。它的工作方式类似于上述示例中使用的该方法。可以使用以下代码进行实例化:

with ProcessPoolExecutor(max_workers = 5) as executor

Example

为了更好地理解,我们采用在创建线程池时使用的相同示例。在此示例中,我们需要首先导入 concurrent.futures 模块。然后创建一个名为 load_url() 的函数,该函数将加载请求的 URL。然后使用池中的 5 个线程数创建 ProcessPoolExecutor 。Process*PoolExecutor* 已用作上下文管理器。我们可以通过调用它的 result() 方法来获得未来的结果。

import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import urllib.request

URLS = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/']

def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
      return conn.read()

def main():
   with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
      future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
      for future in concurrent.futures.as_completed(future_to_url):
      url = future_to_url[future]
      try:
         data = future.result()
      except Exception as exc:
         print('%r generated an exception: %s' % (url, exc))
      else:
         print('%r page is %d bytes' % (url, len(data)))

if __name__ == '__main__':
   main()

Output

上述 Python 脚本将生成以下输出:

'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229476 bytes
'http://www.cnn.com/' page is 165323 bytes
'http://www.bbc.co.uk/' page is 284981 bytes
'http://europe.wsj.com/' page is 967575 bytes

Use of the Executor.map() function

Python map() 函数广泛用于执行多项任务。此类任务之一是对 iterable 中的每个元素应用某个函数。同样,我们可以将迭代器的所有元素映射到一个函数,并将它们作为独立作业提交给 ProcessPoolExecutor 。考虑以下 Python 脚本示例以了解这一点。

Example

我们将考虑在使用 Executor.map() 函数创建线程池时使用的相同示例。在下面给出的示例中,map 函数用于对 values 数组中的每个值应用 square() 函数。

from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with ProcessPoolExecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
   for result in results:
      print(result)
if __name__ == '__main__':
   main()

Output

上述 Python 脚本将生成以下输出:

4
9
16
25

When to use ProcessPoolExecutor and ThreadPoolExecutor?

现在我们已经研究了这两个执行器类(ThreadPoolExecutor 和 ProcessPoolExecutor),我们需要知道在什么时候使用哪个执行器。在 CPU 绑定工作负载的情况下,我们需要选择 ProcessPoolExecutor,在 I/O 绑定工作负载的情况下,我们需要选择 ThreadPoolExecutor。

如果我们使用 ProcessPoolExecutor ,那么我们不需要担心 GIL 因为它使用多处理。此外,与 ThreadPoolExecution 相比,执行时间会更少。考虑以下 Python 脚本示例来理解这一点。

Example

import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.ProcessPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
   print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

Output

Start: 8000000 Time taken: 1.5509998798370361
Start: 7000000 Time taken: 1.3259999752044678
Total time taken: 2.0840001106262207

Example- Python script with ThreadPoolExecutor:
import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.ThreadPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
      print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

Output

Start: 8000000 Time taken: 3.8420000076293945
Start: 7000000 Time taken: 3.6010000705718994
Total time taken: 3.8480000495910645

从以上两个程序的输出中,我们可以看到使用 ProcessPoolExecutorThreadPoolExecutor 时执行时间的差异。