Python 简明教程

Python - Inter-Thread Communication

线程间通信是指在Python多线程程序内启用线程之间的通信和同步的过程。

通常,Python中的线程在一个进程内共享相同的内存空间,这允许它们通过 threading 模块提供的共享变量、对象和专门的同步机制来交换数据并协调其活动。

为了促进线程间通信,threading模块提供了各种同步原语,如锁、事件、条件和信号量对象。在本教程中,您将学习如何使用事件和条件对象在多线程程序中提供线程之间的通信。

The Event Object

事件对象管理内部标志的状态,以便线程可以等待或设置。 Event 对象提供了控制此标志状态的方法,允许线程根据共享条件同步它们的活动。

该标志最初为false并在set()方法下变为true,并通过clear()方法重置为false。wait()方法会阻塞,直到标志为true为止。

以下是 Event 对象的关键方法-

  1. is_set() : 仅当内部标志为true时才返回True。

  2. set() : 将内部标志设为true。所有等待它变为true的线程都将被唤醒。一旦标志为true,调用wait()的线程将根本不会被阻塞。

  3. clear() :将内部标志重置为false。随后,调用wait()的线程将阻塞,直到再次调用set()将内部标志设为true。

  4. wait(timeout=None) :阻塞,直到内部标志为真。如果内部标志在进入时为真,则立即返回。否则,阻塞,直到另一个线程调用 set() 将标志设为真,或者直到发生可选超时。当超时参数存在且不为 None 时,它应为一个浮点数,指定操作的超时时间(以秒为单位)。

Example

以下代码尝试模拟由 GREEN 或 RED 交通信号状态控制的交通流量。

程序中有两个线程,分别针对两个不同的函数。 signal_state() 函数定期设置和重置事件,表示信号从绿色变为红色。

traffic_flow() 函数等待事件被设置,并运行一个循环,直到它保持设置状态。

from threading import Event, Thread
import time

terminate = False

def signal_state():
    global terminate
    while not terminate:
        time.sleep(0.5)
        print("Traffic Police Giving GREEN Signal")
        event.set()
        time.sleep(1)
        print("Traffic Police Giving RED Signal")
        event.clear()

def traffic_flow():
    global terminate
    num = 0
    while num < 10 and not terminate:
        print("Waiting for GREEN Signal")
        event.wait()
        print("GREEN Signal ... Traffic can move")
        while event.is_set() and not terminate:
            num += 1
            print("Vehicle No:", num," Crossing the Signal")
            time.sleep(1)
        print("RED Signal ... Traffic has to wait")

event = Event()
t1 = Thread(target=signal_state)
t2 = Thread(target=traffic_flow)
t1.start()
t2.start()

# Terminate the threads after some time
time.sleep(5)
terminate = True

# join all threads to complete
t1.join()
t2.join()

print("Exiting Main Thread")

Output

执行上述代码时,您将获得以下输出 -

Waiting for GREEN Signal
Traffic Police Giving GREEN Signal
GREEN Signal ... Traffic can move
Vehicle No: 1  Crossing the Signal
Traffic Police Giving RED Signal
RED Signal ... Traffic has to wait
Waiting for GREEN Signal
Traffic Police Giving GREEN Signal
GREEN Signal ... Traffic can move
Vehicle No: 2  Crossing the Signal
Vehicle No: 3  Crossing the Signal
Traffic Police Giving RED Signal
Traffic Police Giving GREEN Signal
Vehicle No: 4  Crossing the Signal
Traffic Police Giving RED Signal
RED Signal ... Traffic has to wait
Traffic Police Giving GREEN Signal
Traffic Police Giving RED Signal
Exiting Main Thread

The Condition Object

Python 的 threading 模块中的 Condition 对象提供了更高级的同步机制。它允许线程在继续之前等待来自另一个线程的通知。Condition 对象始终与锁相关联,并提供用于线程之间信令的机制。

以下是 threading.Condition() 类的语法 -

threading.Condition(lock=None)

以下是 Condition 对象的关键方法 -

  1. acquire(*args) :获取底层锁。此方法调用底层锁上的对应方法;返回值是该方法返回的任何值。

  2. release() :释放底层锁。此方法调用底层锁上的对应方法;没有返回值。

  3. wait(timeout=None) :此方法释放底层锁,然后阻塞,直到同一条件变量在另一个线程中被 notify() 或 notify_all() 调用唤醒或者发生可选超时。一旦唤醒或超时,它会重新获取锁并返回。

  4. wait_for(predicate, timeout=None) :此实用方法可能会重复调用 wait(),直到满足谓词或发生超时。返回值是谓词的最后一个返回值,如果方法超时,则评估为 False。

  5. notify(n=1) :此方法最多唤醒 n 个等待条件变量的线程;如果没有线程在等待,则它将不执行任何操作。

  6. notify_all() :唤醒所有等待此条件的线程。此方法的作用类似于 notify(),但会唤醒所有等待线程,而不是一个线程。如果调用此方法时调用线程未获取锁,则会引发 RuntimeError。

Example

此示例演示了使用 Python 的 threading 模块的 Condition 对象进行线程间通信的简单形式。此处, thread_athread_b 使用 Condition 对象进行通信, thread_a 等待,直到它从 thread_b 接收通知。 thread_b 在通知 thread_a 之前休眠 2 秒,然后完成。

from threading import Condition, Thread
import time

c = Condition()

def thread_a():
    print("Thread A started")
    with c:
        print("Thread A waiting for permission...")
        c.wait()
        print("Thread A got permission!")
    print("Thread A finished")

def thread_b():
    print("Thread B started")
    with c:
        time.sleep(2)
        print("Notifying Thread A...")
        c.notify()
    print("Thread B finished")

Thread(target=thread_a).start()
Thread(target=thread_b).start()

Output

执行上述代码时,您将获得以下输出 -

Thread A started
Thread A waiting for permission...
Thread B started
Notifying Thread A...
Thread B finished
Thread A got permission!
Thread A finished

Example

以下代码演示了如何使用 Condition 对象在线程之间提供通信。其中,线程 t2 运行 taskB() 函数,线程 t1 运行 taskA() 函数。t1 线程获取条件并通知它。

到那时,t2 线程处于等待状态。释放条件后,等待线程会继续使用通知函数产生的随机数。

from threading import Condition, Thread
import time
import random

numbers = []

def taskA(c):
    for _ in range(5):
        with c:
            num = random.randint(1, 10)
            print("Generated random number:", num)
            numbers.append(num)
            print("Notification issued")
            c.notify()
        time.sleep(0.3)

def taskB(c):
    for i in range(5):
        with c:
            print("waiting for update")
            while not numbers:
                c.wait()
            print("Obtained random number", numbers.pop())
        time.sleep(0.3)

c = Condition()
t1 = Thread(target=taskB, args=(c,))
t2 = Thread(target=taskA, args=(c,))
t1.start()
t2.start()
t1.join()
t2.join()
print("Done")

执行此代码时,它将生成以下 output -

waiting for update
Generated random number: 2
Notification issued
Obtained random number 2
Generated random number: 5
Notification issued
waiting for update
Obtained random number 5
Generated random number: 1
Notification issued
waiting for update
Obtained random number 1
Generated random number: 9
Notification issued
waiting for update
Obtained random number 9
Generated random number: 2
Notification issued
waiting for update
Obtained random number 2
Done