Concurrency In Python 简明教程

Synchronizing Threads

线程同步可以定义为一种方法,借助这种方法我们可以确信两个或更多个并发线程不会同时访问被称为临界区的程序段。另一方面,我们知道临界区是程序中访问共享资源的部分。因此我们可以说,同步就是确保两个或更多个线程在同一时间不会通过访问资源彼此交互的过程。下图展示了四个线程同时尝试访问程序的临界区。

synchronizing

更确切地说,假设两个或更多个线程尝试同时向列表中添加对象。这种行为无法导致成功的结果,因为它只会丢弃一个或所有对象,或者它会完全损坏列表的状态。此处,同步的作用是只允许一个线程一次访问列表。

Issues in thread synchronization

我们在实现并发编程或应用同步原语时可能会遇到一些问题。在本部分,我们将讨论两个主要问题。问题包括 −

  1. Deadlock

  2. Race condition

Race condition

这是并发编程中的一个主要问题。并发访问共享资源会导致竞争条件。当两个或更多个线程可以访问共享数据并尝试在同一时间更改其值时,竞争条件可能会被定义为一个条件的发生。由于此原因,变量的值可能是不可预测的,并根据进程的上下文切换时间而变化。

Example

考虑此示例以理解竞争条件的概念−

Step 1 − 在此步骤中,我们需要导入 threading 模块 −

import threading

Step 2 − 现在,定义一个全局变量,如 x,并将其值设置为 0 −

x = 0

Step 3 - 接下来,我们需要定义 increment_global() 函数,它将在这个全局函数 x 中增加 1 −

def increment_global():

   global x
   x += 1

Step 4 - 在这个步骤中,我们将定义 taskofThread() 函数,该函数将多次调用 increment_global() 函数;在我们的例子中,是 50000 次 −

def taskofThread():

   for _ in range(50000):
      increment_global()

Step 5 - 接下来,定义在其中创建了线程 t1 和 t2 的 main() 函数。两者都将在 start() 函数的帮助下启动,并在 join() 函数的帮助下等待它们完成自己的工作。

def main():
   global x
   x = 0

   t1 = threading.Thread(target= taskofThread)
   t2 = threading.Thread(target= taskofThread)

   t1.start()
   t2.start()

   t1.join()
   t2.join()

Step 6 - 接下来,我们需要给出范围,例如我们希望调用 main() 函数的迭代次数。在这里,我们调用它 5 次。

if __name__ == "__main__":
   for i in range(5):
      main()
      print("x = {1} after Iteration {0}".format(i,x))

在下面显示的输出中,我们可以看到竞争条件的影响,因为每次迭代后 x 的值都有望为 100000。但是,值存在很大变化。这是由于线程并行访问共享全局变量 x。

Output

x = 100000 after Iteration 0
x = 54034 after Iteration 1
x = 80230 after Iteration 2
x = 93602 after Iteration 3
x = 93289 after Iteration 4

Dealing with race condition using locks

由于我们已经看到了竞争条件在上述程序中的影响,因此我们需要一个同步工具,该工具可以处理多线程之间的竞争条件。在 Python 中, <threading> 模块提供 Lock 类来处理竞争条件。此外, Lock 类提供了不同的方法,我们可以借助这些方法处理多线程之间的竞争条件。这些方法的说明如下 −

acquire() method

此方法用于获取,即锁定锁。锁可以是阻塞的或非阻塞的,具体取决于以下真或假值 −

  1. With value set to True − 如果使用 True(默认参数)调用 acquire() 方法,那么线程执行将被阻塞,直到锁解锁。

  2. With value set to False − 如果使用 False(非默认参数)调用 acquire() 方法,那么线程执行将不会被阻塞,直到将其设置为 true,即直到其被锁定。

release() method

此方法用于释放锁。以下与此方法相关的几个重要任务 −

  1. 如果锁被锁定,那么 release() 方法将对其解锁。它的任务是仅允许恰好一个线程继续进行,如果多个线程被阻塞并等待锁解锁。

  2. 如果锁已经解锁,它将引发 ThreadError

现在,我们可以使用 Lock 类及其方法重写上述程序以避免竞争条件。我们需要使用 lock 参数定义 taskofThread() 方法,然后需要使用 acquire() 和 release() 方法进行锁的阻塞和非阻塞处理,以避免竞争条件。

Example

以下是了解有关锁处理竞争条件的概念的 Python 程序示例 −

import threading

x = 0

def increment_global():

   global x
   x += 1

def taskofThread(lock):

   for _ in range(50000):
      lock.acquire()
      increment_global()
      lock.release()

def main():
   global x
   x = 0

   lock = threading.Lock()
   t1 = threading.Thread(target = taskofThread, args = (lock,))
   t2 = threading.Thread(target = taskofThread, args = (lock,))

   t1.start()
   t2.start()

   t1.join()
   t2.join()

if __name__ == "__main__":
   for i in range(5):
      main()
      print("x = {1} after Iteration {0}".format(i,x))

以下输出表明排除了竞争条件的影响;因为每次迭代后的 x 值现在为 100000,这符合此程序的预期。

Output

x = 100000 after Iteration 0
x = 100000 after Iteration 1
x = 100000 after Iteration 2
x = 100000 after Iteration 3
x = 100000 after Iteration 4

Deadlocks − The Dining Philosophers problem

死锁是一个令人头疼的问题,人们在设计并发系统时可能会遇到。我们可以借助用餐哲学家问题来说明这个问题,如下所示 −

艾兹格·迪杰斯特拉最初提出了用餐哲学家问题,这是并发系统最严重的问题之一(死锁)的著名说明之一。

在这个问题中,有五位著名的哲学家围坐在圆桌旁,从他们的碗里吃东西。有五把叉子,五位哲学家可以使用它们吃东西。然而,哲学家们决定同时使用两把叉子吃东西。

现在,哲学家有两个主要条件。首先,每位哲学家都可以处于进餐或思考状态,其次,他们必须首先获得两把叉子,即左手和右手。当每位哲学家同时设法拿起左叉时,就会出现问题。现在,他们都在等待右手叉子空闲,但他们会一直拿着叉子直到吃完东西,而右手叉子永远不会空闲。因此,在餐桌上会出现死锁状态。

Deadlock in concurrent system

现在,如果我们看到,并发系统中也可能出现同样的问题。上述示例中的叉子将是系统资源,每个哲学家都可以表示正在竞争获取资源的进程。

Solution with Python program

可以通过将哲学家分为两类来找到这个问题的解决方案,即 greedy philosophersgenerous philosophers 。主要是贪婪的哲学家会尝试拿起左叉并一直等到叉子在那里。然后,他会等待右叉,拿起它,吃饭,然后放下它。另一方面,慷慨的哲学家会尝试拿起左叉,如果叉子不存在,他会等待一段时间后再尝试。如果他们拿到了左叉,他们将尝试拿到右叉。如果他们也将拿到右叉,他们将吃饭并放下两把叉子。但是,如果他们拿不到右叉,他们将放下左叉。

Example

以下 Python 程序将帮助我们找到就餐哲学家问题的解决方案:

import threading
import random
import time

class DiningPhilosopher(threading.Thread):

   running = True

   def __init__(self, xname, Leftfork, Rightfork):
   threading.Thread.__init__(self)
   self.name = xname
   self.Leftfork = Leftfork
   self.Rightfork = Rightfork

   def run(self):
   while(self.running):
      time.sleep( random.uniform(3,13))
      print ('%s is hungry.' % self.name)
      self.dine()

   def dine(self):
   fork1, fork2 = self.Leftfork, self.Rightfork

   while self.running:
      fork1.acquire(True)
      locked = fork2.acquire(False)
	  if locked: break
      fork1.release()
      print ('%s swaps forks' % self.name)
      fork1, fork2 = fork2, fork1
   else:
      return

   self.dining()
   fork2.release()
   fork1.release()

   def dining(self):
   print ('%s starts eating '% self.name)
   time.sleep(random.uniform(1,10))
   print ('%s finishes eating and now thinking.' % self.name)

def Dining_Philosophers():
   forks = [threading.Lock() for n in range(5)]
   philosopherNames = ('1st','2nd','3rd','4th', '5th')

   philosophers= [DiningPhilosopher(philosopherNames[i], forks[i%5], forks[(i+1)%5]) \
      for i in range(5)]

   random.seed()
   DiningPhilosopher.running = True
   for p in philosophers: p.start()
   time.sleep(30)
   DiningPhilosopher.running = False
   print (" It is finishing.")

Dining_Philosophers()

上述程序使用了贪婪和慷慨哲学家的概念。该程序还使用了 Lock 类中的 <threading> 模块的 acquire()release() 方法。我们可以在以下输出中看到解决方案:

Output

4th is hungry.
4th starts eating
1st is hungry.
1st starts eating
2nd is hungry.
5th is hungry.
3rd is hungry.
1st finishes eating and now thinking.3rd swaps forks
2nd starts eating
4th finishes eating and now thinking.
3rd swaps forks5th starts eating
5th finishes eating and now thinking.
4th is hungry.
4th starts eating
2nd finishes eating and now thinking.
3rd swaps forks
1st is hungry.
1st starts eating
4th finishes eating and now thinking.
3rd starts eating
5th is hungry.
5th swaps forks
1st finishes eating and now thinking.
5th starts eating
2nd is hungry.
2nd swaps forks
4th is hungry.
5th finishes eating and now thinking.
3rd finishes eating and now thinking.
2nd starts eating 4th starts eating
It is finishing.