Concurrency In Python 简明教程

Threads Intercommunication

在现实生活中,如果一个团队正在完成一项共同的任务,那么他们之间应该沟通以适当地完成任务。同样的类比也适用于线程。在编程中,为了减少处理器的理想时间,我们创建了多个线程,并将不同的子任务分配给每个线程。因此,必须有通信设施,并且它们应该相互交互以同步方式完成工作。

考虑与线程交互交流相关的重要要点如下−

  1. No performance gain – 如果我们在线程和进程之间无法实现适当的通信,那么从并发和并行性中获得的性能提升毫无用处。

  2. Accomplish task properly – 如果没有线程之间适当的交互交流机制,则无法正确完成分配的任务。

  3. More efficient than inter-process communication - 线程间通信比进程间通信更有效且易用,因为进程内的所有线程共享相同的地址空间,而不必使用共享内存。

Python data structures for thread-safe communication

多线程代码出现了将信息从一个线程传递到另一个线程的问题。标准通信原语无法解决此问题。因此,我们需要实现自己的复合对象,以便在线程之间共享对象,以使通信线程安全。以下是几个数据结构,在其中进行一些更改后提供线程安全通信 -

Sets

以下列扩展集数据结构,实现线程安全的方式,我们需要扩展集类来实现自己的锁定机制。

Example

以下是如何扩展类的 Python 示例 -

class extend_class(set):
   def __init__(self, *args, **kwargs):
      self._lock = Lock()
      super(extend_class, self).__init__(*args, **kwargs)

   def add(self, elem):
      self._lock.acquire()
	  try:
      super(extend_class, self).add(elem)
      finally:
      self._lock.release()

   def delete(self, elem):
      self._lock.acquire()
      try:
      super(extend_class, self).delete(elem)
      finally:
      self._lock.release()

在上面的示例中,定义了一个名为 extend_class 的类对象,它进一步继承自 Python set class 。在这个类的构造函数中创建了一个锁对象。现在,有 add()delete() 两个函数。定义这些函数,并且它们是线程安全的。它们都依赖于 super 类功能,但有一个关键例外。

Decorator

这是用于线程安全通信的另一个关键方法,即使用装饰器。

Example

考虑一个显示如何使用装饰器 -− 的 Python 示例 -

def lock_decorator(method):

   def new_deco_method(self, *args, **kwargs):
      with self._lock:
         return method(self, *args, **kwargs)
return new_deco_method

class Decorator_class(set):
   def __init__(self, *args, **kwargs):
      self._lock = Lock()
      super(Decorator_class, self).__init__(*args, **kwargs)

   @lock_decorator
   def add(self, *args, **kwargs):
      return super(Decorator_class, self).add(elem)
   @lock_decorator
   def delete(self, *args, **kwargs):
      return super(Decorator_class, self).delete(elem)

在上面的示例中,定义了一个名为 lock_decorator 的装饰器方法,它进一步继承自 Python 方法类。然后,在这个类的构造函数中创建了一个锁对象。现在,有 add() 和 delete() 两个函数。定义这些函数,并且它们是线程安全的。它们都依赖于超类功能,但有一个关键例外。

Lists

列表数据结构是线程安全、快速且易于用于临时内存中存储的结构。在 Cpython 中,GIL 会防止对其并发访问。当我们了解到列表是线程安全的,但它们中的数据如何呢?实际上,列表的数据不受保护。例如,如果另一个线程正尝试执行相同操作,则 L.append(x) 无法保证返回预期结果。这是因为虽然 append() 是原子操作并且是线程安全的,但另一个线程正尝试以并发方式修改列表的数据,因此我们可以看到竞态条件对输出的副作用。

为了解决此类问题并安全地修改数据,我们必须实现适当的锁定机制,这进一步确保多个线程不会遇到竞态条件。为了实现适当的锁定机制,我们可以像前一个示例中那样扩展类。

列表上的一些其他原子操作如下 -

L.append(x)
L1.extend(L2)
x = L[i]
x = L.pop()
L1[i:j] = L2
L.sort()
x = y
x.field = y
D[x] = y
D1.update(D2)
D.keys()

这里 -

  1. L,L1,L2 all are lists

  2. D,D1,D2 are dicts

  3. x,y are objects

  4. i, j are ints

Queues

如果列表的数据不受保护,我们可能必须面对后果。我们可能获得或删除错误的数据项或竞态条件。这就是建议使用队列数据结构的原因。队列的实际示例可以是单车道单行道,车辆先进入,先退出。可以在售票窗口和公共汽车站看到更多队列的实际示例。

queues

队列默认情况下是线程安全的数据结构,我们不必担心实现复杂的锁定机制。Python 为我们提供了模块,在我们的应用程序中使用不同类型的队列。

Types of Queues

在本节中,我们将学习不同类型的队列。Python 从 <queue> 模块中提供三种队列选项供使用 -

  1. 普通队列(FIFO、先进先出)

  2. LIFO,后进先出

  3. Priority

我们将在后续部分了解不同的队列。

Normal Queues (FIFO, First in First out)

这是 Python 提供的最常用的队列实现。在此排队机制中,谁先来,谁先获得服务。FIFO 也称为普通队列。FIFO 队列可以表示如下 -

fifo

Python Implementation of FIFO Queue

在 python 中,可使用单线程和多线程实现 FIFO 队列。

FIFO queue with single thread

对于使用单线程实现 FIFO 队列, Queue 类将实现一个基本的先进先出容器。元素将使用 put() 添加到序列的“一端”,并使用 get() 从另一端移除。

Example

以下是使用单线程实现 FIFO 队列的 Python 程序 −

import queue

q = queue.Queue()

for i in range(8):
   q.put("item-" + str(i))

while not q.empty():
   print (q.get(), end = " ")

Output

item-0 item-1 item-2 item-3 item-4 item-5 item-6 item-7

输出表明以上程序使用单线程来说明元素将按其被插入的顺序从队列中移除。

FIFO queue with multiple threads

对于使用多线程实现 FIFO,我们需要定义 myqueue() 函数,该函数从队列模块中扩展而来。get() 和 put() 方法的工作原理与上面在使用单线程实现 FIFO 队列时讨论的一样。然后,为了使其多线程化,我们需要声明和实例化线程。这些线程将以 FIFO 方式消耗队列。

Example

以下是使用多线程实现 FIFO 队列的 Python 程序

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
   item = queue.get()
   if item is None:
   break
   print("{} removed {} from the queue".format(threading.current_thread(), item))
   queue.task_done()
   time.sleep(2)
q = queue.Queue()
for i in range(5):
   q.put(i)
threads = []
for i in range(4):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

Output

<Thread(Thread-3654, started 5044)> removed 0 from the queue
<Thread(Thread-3655, started 3144)> removed 1 from the queue
<Thread(Thread-3656, started 6996)> removed 2 from the queue
<Thread(Thread-3657, started 2672)> removed 3 from the queue
<Thread(Thread-3654, started 5044)> removed 4 from the queue

LIFO, Last in First Out queue

此队列使用的类比与 FIFO(先进先出)队列完全相反。在此排队机制中,最后来的人将首先获得服务。这类似于实现栈数据结构。在实现深度优先搜索等人工智能算法时,LIFO 队列被证明是有用的。

Python implementation of LIFO queue

在 python 中,可使用单线程和多线程实现 LIFO 队列。

LIFO queue with single thread

对于使用单线程实现 LIFO 队列, Queue 类将通过使用结构 Queue .LifoQueue 来实现一个基本的先进先出容器。现在,在调用 put() 时,元素将添加到容器的头部,并在使用 get() 时也从头部移除。

Example

以下是使用单线程实现 LIFO 队列的 Python 程序 −

import queue

q = queue.LifoQueue()

for i in range(8):
   q.put("item-" + str(i))

while not q.empty():
   print (q.get(), end=" ")
Output:
item-7 item-6 item-5 item-4 item-3 item-2 item-1 item-0

输出表明以上程序使用单线程来说明元素将按与它们被插入相反的顺序从队列中移除。

LIFO queue with multiple threads

实现与我们使用多线程实现 FIFO 队列的实现类似。唯一的区别在于我们需要使用 Queue 类,该类将通过使用结构 Queue.LifoQueue 来实现一个基本的后进先出容器。

Example

以下是使用多线程实现 LIFO 队列的 Python 程序 −

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
      item = queue.get()
      if item is None:
      break
	  print("{} removed {} from the queue".format(threading.current_thread(), item))
      queue.task_done()
      time.sleep(2)
q = queue.LifoQueue()
for i in range(5):
   q.put(i)
threads = []
for i in range(4):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

Output

<Thread(Thread-3882, started 4928)> removed 4 from the queue
<Thread(Thread-3883, started 4364)> removed 3 from the queue
<Thread(Thread-3884, started 6908)> removed 2 from the queue
<Thread(Thread-3885, started 3584)> removed 1 from the queue
<Thread(Thread-3882, started 4928)> removed 0 from the queue

Priority queue

在 FIFO 和 LIFO 队列中,项目的顺序与插入顺序相关。但是,在许多情况下,优先级比插入顺序更重要。让我们考虑一个现实世界的例子。假设机场的安检人员正在检查不同类别的乘客。VVIP、航空公司工作人员、海关官员这些类别的乘客可能会优先接受检查,而不是像普通人那样根据到达顺序进行检查。

优先级队列需要考虑的另一个重要方面是如何开发任务调度程序。一种常见的设计是在队列中以优先级为基础为最重要的代理任务提供服务。此数据结构可用于根据其优先级值从队列中提取项目。

Python Implementation of Priority Queue

在 python 中,可使用单线程和多线程实现优先级队列。

Priority queue with single thread

对于使用单线程实现优先级队列, Queue 类将使用结构 Queue .PriorityQueue 来实现优先级容器上的任务。现在,在调用 put() 时,将添加具有值的元素,其中值最低的元素将具有最高的优先级,因此使用 get() 最先检索到。

Example

考虑以下用于使用单线程实现 Priority 队列的 Python 程序 −

import queue as Q
p_queue = Q.PriorityQueue()

p_queue.put((2, 'Urgent'))
p_queue.put((1, 'Most Urgent'))
p_queue.put((10, 'Nothing important'))
prio_queue.put((5, 'Important'))

while not p_queue.empty():
   item = p_queue.get()
   print('%s - %s' % item)

Output

1 – Most Urgent
2 - Urgent
5 - Important
10 – Nothing important

在以上输出中,我们可以看到队列是根据优先级存储项目的,较低的值具有较高的优先级。

Priority queue with multi threads

其实现与多线程的 FIFO 和 LIFO 队列实现方式类似。唯一的区别是,我们需要使用 Queue 类来使用结构 Queue.PriorityQueue 来初始化优先权。另一个区别是生成队列的方式。在下面示例中,它将由两个相同的数据集生成。

Example

以下 Python 程序帮助使用多线程实现优先级队列 −

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
      item = queue.get()
      if item is None:
      break
      print("{} removed {} from the queue".format(threading.current_thread(), item))
      queue.task_done()
      time.sleep(1)
q = queue.PriorityQueue()
for i in range(5):
   q.put(i,1)

for i in range(5):
   q.put(i,1)

threads = []
for i in range(2):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

Output

<Thread(Thread-4939, started 2420)> removed 0 from the queue
<Thread(Thread-4940, started 3284)> removed 0 from the queue
<Thread(Thread-4939, started 2420)> removed 1 from the queue
<Thread(Thread-4940, started 3284)> removed 1 from the queue
<Thread(Thread-4939, started 2420)> removed 2 from the queue
<Thread(Thread-4940, started 3284)> removed 2 from the queue
<Thread(Thread-4939, started 2420)> removed 3 from the queue
<Thread(Thread-4940, started 3284)> removed 3 from the queue
<Thread(Thread-4939, started 2420)> removed 4 from the queue
<Thread(Thread-4940, started 3284)> removed 4 from the queue