Python 简明教程

Python - Thread Pools

线程池是一种机制,它可以自动高效地管理多个线程,从而允许并发执行任务。Python 不通过 threading 模块直接提供线程池。

相反,它通过 multiprocessing.dummy 模块和 * concurrent.futures* 模块提供基于线程的池化。这些模块为创建和管理线程池提供了便捷的接口,从而更轻松地执行并发任务执行。

What is a Thread Pool?

线程池是由池管理的一组线程。池中的每个线程称为工作线程或工作者线程。这些线程可以被重用来执行多个任务,这减少了重复创建和销毁线程的负担。

线程池控制线程的创建及其生命周期,使其更有效地处理大量的任务。

我们可以在 Python 中使用以下类实现线程池:

  1. Python ThreadPool Class

  2. Python ThreadPoolExecutor Class

Using Python ThreadPool Class

multiprocessing.pool.ThreadPool 类在 multiprocessing 模块中提供了线程池接口。它管理一个工作线程池,可以向其中提交作业以进行并发执行。

ThreadPool 对象通过处理任务在工作线程之间的创建和分配,简化了多个线程的管理。它与 Pool 类共享一个接口,该类最初设计用于进程,但已被调整为也可用于线程。

ThreadPool 实例与 Pool 实例完全接口兼容,并且应该作为上下文管理器管理,或手动调用 close() 和 terminate()。

Example

此示例演示了使用 Python thread pool 在数字列表上并行执行方和立方函数,其中每个函数都并发地应用到数字上,最多使用 3 个线程,每个线程在执行之间延迟 1 秒。

from multiprocessing.dummy import Pool as ThreadPool
import time

def square(number):
   sqr = number * number
   time.sleep(1)
   print("Number:{} Square:{}".format(number, sqr))

def cube(number):
   cub = number*number*number
   time.sleep(1)
   print("Number:{} Cube:{}".format(number, cub))

numbers = [1, 2, 3, 4, 5]
pool = ThreadPool(3)
pool.map(square, numbers)
pool.map(cube, numbers)

pool.close()

Output

执行上述代码时,您将获得以下输出 -

Number:2 Square:4
Number:1 Square:1
Number:3 Square:9
Number:4 Square:16
Number:5 Square:25
Number:1 Cube:1
Number:2 Cube:8
Number:3 Cube:27
Number:4 Cube:64
Number:5 Cube:125

Using Python ThreadPoolExecutor Class

Python 的 ThreadPoolExecutor 模块中的 * concurrent.futures* 类为使用线程异步执行函数提供了高级界面。concurrent.futures 模块包括 Future 类和两个 Executor 类 - ThreadPoolExecutorProcessPoolExecutor

The Future Class

concurrent.futures.Future 类负责处理任何可调用的异步执行,比如函数。要获取 Future 对象,您应在任何 Executor 对象上调用 submit() 方法。不应通过其构造函数直接创建它。

Future 类中的重要方法是:

  1. result(timeout=None) :此方法返回调用返回的值。如果调用尚未完成,则此方法将最多等待 timeout 秒。如果调用在 timeout 秒内尚未完成,则会引发 TimeoutError。如果未指定超时,则等待时间没有限制。

  2. cancel() :此方法尝试取消调用。如果该调用当前正在执行或已完成运行并且无法取消,则该方法将返回一个布尔值 False 。否则,该调用将被取消,并且该方法返回 True。

  3. cancelled() :如果成功取消调用,则返回 True。

  4. running() :如果调用当前正在执行并且无法取消,则返回 True。

  5. done() :如果成功取消了调用或已完成运行,则返回 True。

The ThreadPoolExecutor Class

此类代表一个指定数量的最大工作线程池,以异步执行调用。

concurrent.futures.ThreadPoolExecutor(max_threads)

Example

以下是一个使用 concurrent.futures.ThreadPoolExecutor 类在 Python 中管理和异步执行任务的示例。具体来说,它展示了如何向线程池提交多个任务以及如何检查其执行状态。

from concurrent.futures import ThreadPoolExecutor
from time import sleep
def square(numbers):
   for val in numbers:
      ret = val*val
      sleep(1)
      print("Number:{} Square:{}".format(val, ret))
def cube(numbers):
   for val in numbers:
      ret = val*val*val
      sleep(1)
      print("Number:{} Cube:{}".format(val, ret))
if __name__ == '__main__':
   numbers = [1,2,3,4,5]
   executor = ThreadPoolExecutor(4)
   thread1 = executor.submit(square, (numbers))
   thread2 = executor.submit(cube, (numbers))
   print("Thread 1 executed ? :",thread1.done())
   print("Thread 2 executed ? :",thread2.done())
   sleep(2)
   print("Thread 1 executed ? :",thread1.done())
   print("Thread 2 executed ? :",thread2.done())

它将生成以下 output

Thread 1 executed ? : False
Thread 2 executed ? : False
Number:1 Square:1
Number:1 Cube:1
Thread 1 executed ? : False
Thread 2 executed ? : False
Number:2 Square:4
Number:2 Cube:8
Number:3 Square:9
Number:3 Cube:27
Number:4 Square:16
Number:4 Cube:64
Number:5 Square:25
Number:5 Cube:125