Concurrency In Python 简明教程

Concurrency in Python - Multiprocessing

在本章中,我们将重点关注多进程和多线程之间的比较。

Multiprocessing

这是在单台计算机系统内使用两个或更多 CPU 单元。这是通过利用计算机系统中所有可用的 CPU 内核数量,从硬件获得充分潜能的最佳方法。

Multithreading

这是 CPU 通过同时执行多个线程管理操作系统使用的能力。多线程概念的主要目的是通过将一个进程分成多个线程,来实现并行性。

以下表格展示了它们之间的一些重要差异:

Multiprocessing

Multiprogramming

多处理指的是同时使用多个 CPU 处理多个进程。

多编程在主内存中同时保留一些程序,并使用单 CPU 同时执行这些程序。

It utilizes multiple CPUs.

It utilizes single CPU.

It permits parallel processing.

Context switching takes place.

花费更少的时间处理作业。

花费更多的时间处理作业。

它促进了计算机设备的高效利用。

Less efficient than multiprocessing.

Usually more expensive.

此类系统成本更低。

Eliminating impact of global interpreter lock (GIL)

在处理并发应用程序时,Python 中有一个名为 GIL (Global Interpreter Lock) 的限制。GIL 从不允许我们利用 CPU 的多个内核,因此可以说 Python 中没有真正的线程。GIL 是互斥锁,可以确保线程安全性。换句话说,我们可以说 GIL 可以防止多个线程并行执行 Python 代码。一次只能一个线程持有锁,如果我们希望执行一个线程,则其必须首先获取锁。

通过使用多进程,我们可以有效绕过由 GIL 造成的限制:

  1. 通过使用多进程,我们正在利用多进程的能力,因此我们正在利用 GIL 的多个实例。

  2. 由于这一点,不再限制同时执行程序中一个线程的字节码。

Starting Processes in Python

可以使用以下三种方法在多进程模块内启动 Python 中的一个进程:

  1. Fork

  2. Spawn

  3. Forkserver

Creating a process with Fork

Fork 命令是 UNIX 中找到的一个标准命令。它用来创建称作子进程的新进程。此子进程与称作父进程的进程并发运行。这些子进程与其父进程也是相同的,并且会继承父进程所有可用的资源。在使用 Fork 创建进程时,则会用到以下系统调用:

  1. fork() :这是一般在内核中实现的系统调用。它用来创建进程的副本。p>

  2. getpid() :此系统调用返回调用进程的进程 ID(PID)。

Example

以下 Python 脚本示例将帮助您理解如何创建一个子进程,并获取子父进程的 PID:

import os

def child():
   n = os.fork()

   if n > 0:
      print("PID of Parent process is : ", os.getpid())

   else:
      print("PID of Child process is : ", os.getpid())
child()

Output

PID of Parent process is : 25989
PID of Child process is : 25990

Creating a process with Spawn

生成表示以新事物开始。因此,生成一个进程意味着由父进程创建一个新进程。父进程会异步继续执行,或者等到子进程执行完毕。按照以下步骤生成一个进程:

  1. Importing multiprocessing module.

  2. Creating the object process.

  3. 通过调用 start() 方法启动流程活动。

  4. 通过调用 join() 方法,等待流程完成其工作并退出。

Example

下列示例 Python 脚本有助于衍生三个流程

import multiprocessing

def spawn_process(i):
   print ('This is process: %s' %i)
   return

if __name__ == '__main__':
   Process_jobs = []
   for i in range(3):
   p = multiprocessing.Process(target = spawn_process, args = (i,))
      Process_jobs.append(p)
   p.start()
   p.join()

Output

This is process: 0
This is process: 1
This is process: 2

Creating a process with Forkserver

Forkserver 机制仅可在那些支持通过 Unix 管道传递文件描述符的特定 UNIX 平台上使用。考虑以下几点以了解 Forkserver 机制的运作方式 -

  1. 在启动新流程时,使用 Forkserver 机制实例化一个服务器。

  2. 然后,服务器接收命令并处理为创建新流程做出的所有请求。

  3. 为了创建新流程,Python 程序会向 Forkserver 发送请求,它会为我们创建一个流程。

  4. 最后,可以在程序中使用新创建的流程。

Daemon processes in Python

Python multiprocessing 模块允许我们通过其守护进程选项来拥有守护进程。后台运行的守护进程或流程遵循与守护线程类似的概念。为了在后台执行流程,我们需要将守护进程标记设为“真”。只要主流程正在执行,守护进程便会继续运行,它将在执行完毕后或当主程序被终止时终止。

Example

这里我们使用的是与守护线程示例中相同的示例。唯一的区别是将模块从 multithreading 更改为 multiprocessing ,并将守护进程标记设为“真”。不过,输出会有所改变,如下所示 -

import multiprocessing
import time

def nondaemonProcess():
   print("starting my Process")
   time.sleep(8)
   print("ending my Process")
def daemonProcess():
   while True:
   print("Hello")
   time.sleep(2)
if __name__ == '__main__':
   nondaemonProcess = multiprocessing.Process(target = nondaemonProcess)
   daemonProcess = multiprocessing.Process(target = daemonProcess)
   daemonProcess.daemon = True
   nondaemonProcess.daemon = False
   daemonProcess.start()
   nondaemonProcess.start()

Output

starting my Process
ending my Process

与守护线程生成的结果不同,因为非守护模式下的进程没有输出。因此,为了避免运行进程持续存在,守护进程在主程序结束后自动结束。

Terminating processes in Python

我们可以使用 terminate() 方法立即杀死或终止进程。我们将在子进程执行完成之前使用此方法终止已通过函数创建的子进程。

Example

import multiprocessing
import time
def Child_process():
   print ('Starting function')
   time.sleep(5)
   print ('Finished function')
P = multiprocessing.Process(target = Child_process)
P.start()
print("My Process has terminated, terminating main thread")
print("Terminating Child Process")
P.terminate()
print("Child Process successfully terminated")

Output

My Process has terminated, terminating main thread
Terminating Child Process
Child Process successfully terminated

输出显示程序在通过 Child_process() 函数创建的子进程执行之前终止。这意味着已成功终止子进程。

Identifying the current process in Python

操作系统中的每个进程都具有称为 PID 的进程标识。在 Python 中,我们可以使用以下命令查找当前进程的 PID:

import multiprocessing
print(multiprocessing.current_process().pid)

Example

Python 脚本的以下示例有助于找出主进程的 PID 以及子进程的 PID:

import multiprocessing
import time
def Child_process():
   print("PID of Child Process is: {}".format(multiprocessing.current_process().pid))
print("PID of Main process is: {}".format(multiprocessing.current_process().pid))
P = multiprocessing.Process(target=Child_process)
P.start()
P.join()

Output

PID of Main process is: 9401
PID of Child Process is: 9402

Using a process in subclass

我们可以通过为 threading.Thread 类创建子类来创建线程。此外,我们还可以通过为 multiprocessing.Process 类创建子类来创建进程。要使用子类中的进程,我们需要考虑以下几点:

  1. 我们需要定义 Process 类的某个新子类。

  2. 我们需要覆盖 init(self [,args] ) 类。

  3. 我们需要覆盖 run(self [,args] ) 方法以实施 Process

  4. 我们通过调用*start()*方法来启动该流程。

Example

import multiprocessing
class MyProcess(multiprocessing.Process):
   def run(self):
   print ('called run method in process: %s' %self.name)
   return
if __name__ == '__main__':
   jobs = []
   for i in range(5):
   P = MyProcess()
   jobs.append(P)
   P.start()
   P.join()

Output

called run method in process: MyProcess-1
called run method in process: MyProcess-2
called run method in process: MyProcess-3
called run method in process: MyProcess-4
called run method in process: MyProcess-5

Python Multiprocessing Module – Pool Class

如果我们谈论 Python 应用程序中的简单并行 processing 任务,那么 multiprocessing 模块可以向我们提供 Pool 类。可以在我们的主程序中使用 Pool 类的下列方法来启动子进程的数量

apply() method

此方法类似于 .ThreadPoolExecutor 的.submit()*方法。它会一直处于 blocked 状态,直至获得结果。

apply_async() method

当我们需要并行执行我们的任务时,那么需要使用*apply_async()*方法来将任务提交到 pool。它是一种异步操作,不会一直锁定主线程,直到所有子进程执行完。

map() method

apply() 方法类似,它也会一直处于 blocked 状态,直至获得结果。它等同于内置 map() 函数,后者将可迭代数据分成多个块,并将它们作为独立的任务提交到进程池。

map_async() method

它是 map() 方法的一种变体,就像 apply_async()apply() 方法的变体一样。它返回一个结果对象。当结果准备好后,会对他应用一个可调用函数。可调用函数必须立即完成;否则,处理结果的线程会一直处于 blocked 状态。

Example

以下示例将帮助你实现进程池,以便执行并行执行。通过 multiprocessing.Pool 方法应用 square() 函数,已执行一个简单的平方数计算。然后已使用 pool.map() 提交 5,因为输入是从 0 到 4 的整数列表。结果将存储在 p_outputs 中并打印出来。

def square(n):
   result = n*n
   return result
if __name__ == '__main__':
   inputs = list(range(5))
   p = multiprocessing.Pool(processes = 4)
   p_outputs = pool.map(function_square, inputs)
   p.close()
   p.join()
   print ('Pool :', p_outputs)

Output

Pool : [0, 1, 4, 9, 16]