Concurrency In Python 简明教程
Threads Intercommunication
在现实生活中,如果一个团队正在完成一项共同的任务,那么他们之间应该沟通以适当地完成任务。同样的类比也适用于线程。在编程中,为了减少处理器的理想时间,我们创建了多个线程,并将不同的子任务分配给每个线程。因此,必须有通信设施,并且它们应该相互交互以同步方式完成工作。
In real life, if a team of people is working on a common task then there should be communication between them for finishing the task properly. The same analogy is applicable to threads also. In programming, to reduce the ideal time of the processor we create multiple threads and assign different sub tasks to every thread. Hence, there must be a communication facility and they should interact with each other to finish the job in a synchronized manner.
考虑与线程交互交流相关的重要要点如下−
Consider the following important points related to thread intercommunication −
-
No performance gain − If we cannot achieve proper communication between threads and processes then the performance gains from concurrency and parallelism is of no use.
-
Accomplish task properly − Without proper intercommunication mechanism between threads, the assigned task cannot be completed properly.
-
More efficient than inter-process communication − Inter-thread communication is more efficient and easy to use than inter-process communication because all threads within a process share same address space and they need not use shared memory.
Python data structures for thread-safe communication
多线程代码出现了将信息从一个线程传递到另一个线程的问题。标准通信原语无法解决此问题。因此,我们需要实现自己的复合对象,以便在线程之间共享对象,以使通信线程安全。以下是几个数据结构,在其中进行一些更改后提供线程安全通信 -
Multithreaded code comes up with a problem of passing information from one thread to another thread. The standard communication primitives do not solve this issue. Hence, we need to implement our own composite object in order to share objects between threads to make the communication thread-safe. Following are a few data structures, which provide thread-safe communication after making some changes in them −
Sets
以下列扩展集数据结构,实现线程安全的方式,我们需要扩展集类来实现自己的锁定机制。
For using set data structure in a thread-safe manner, we need to extend the set class to implement our own locking mechanism.
Example
以下是如何扩展类的 Python 示例 -
Here is a Python example of extending the class −
class extend_class(set):
def __init__(self, *args, **kwargs):
self._lock = Lock()
super(extend_class, self).__init__(*args, **kwargs)
def add(self, elem):
self._lock.acquire()
try:
super(extend_class, self).add(elem)
finally:
self._lock.release()
def delete(self, elem):
self._lock.acquire()
try:
super(extend_class, self).delete(elem)
finally:
self._lock.release()
在上面的示例中,定义了一个名为 extend_class 的类对象,它进一步继承自 Python set class 。在这个类的构造函数中创建了一个锁对象。现在,有 add() 和 delete() 两个函数。定义这些函数,并且它们是线程安全的。它们都依赖于 super 类功能,但有一个关键例外。
In the above example, a class object named extend_class has been defined which is further inherited from the Python set class. A lock object is created within the constructor of this class. Now, there are two functions - add() and delete(). These functions are defined and are thread-safe. They both rely on the super class functionality with one key exception.
Decorator
这是用于线程安全通信的另一个关键方法,即使用装饰器。
This is another key method for thread-safe communication is the use of decorators.
Example
考虑一个显示如何使用装饰器 -− 的 Python 示例 -
Consider a Python example that shows how to use decorators &mminus;
def lock_decorator(method):
def new_deco_method(self, *args, **kwargs):
with self._lock:
return method(self, *args, **kwargs)
return new_deco_method
class Decorator_class(set):
def __init__(self, *args, **kwargs):
self._lock = Lock()
super(Decorator_class, self).__init__(*args, **kwargs)
@lock_decorator
def add(self, *args, **kwargs):
return super(Decorator_class, self).add(elem)
@lock_decorator
def delete(self, *args, **kwargs):
return super(Decorator_class, self).delete(elem)
在上面的示例中,定义了一个名为 lock_decorator 的装饰器方法,它进一步继承自 Python 方法类。然后,在这个类的构造函数中创建了一个锁对象。现在,有 add() 和 delete() 两个函数。定义这些函数,并且它们是线程安全的。它们都依赖于超类功能,但有一个关键例外。
In the above example, a decorator method named lock_decorator has been defined which is further inherited from the Python method class. Then a lock object is created within the constructor of this class. Now, there are two functions - add() and delete(). These functions are defined and are thread-safe. They both rely on super class functionality with one key exception.
Lists
列表数据结构是线程安全、快速且易于用于临时内存中存储的结构。在 Cpython 中,GIL 会防止对其并发访问。当我们了解到列表是线程安全的,但它们中的数据如何呢?实际上,列表的数据不受保护。例如,如果另一个线程正尝试执行相同操作,则 L.append(x) 无法保证返回预期结果。这是因为虽然 append() 是原子操作并且是线程安全的,但另一个线程正尝试以并发方式修改列表的数据,因此我们可以看到竞态条件对输出的副作用。
The list data structure is thread-safe, quick as well as easy structure for temporary, in-memory storage. In Cpython, the GIL protects against concurrent access to them. As we came to know that lists are thread-safe but what about the data lying in them. Actually, the list’s data is not protected. For example, L.append(x) is not guarantee to return the expected result if another thread is trying to do the same thing. This is because, although append() is an atomic operation and thread-safe but the other thread is trying to modify the list’s data in concurrent fashion hence we can see the side effects of race conditions on the output.
为了解决此类问题并安全地修改数据,我们必须实现适当的锁定机制,这进一步确保多个线程不会遇到竞态条件。为了实现适当的锁定机制,我们可以像前一个示例中那样扩展类。
To resolve this kind of issue and safely modify the data, we must implement a proper locking mechanism, which further ensures that multiple threads cannot potentially run into race conditions. To implement proper locking mechanism, we can extend the class as we did in the previous examples.
列表上的一些其他原子操作如下 -
Some other atomic operations on lists are as follows −
L.append(x)
L1.extend(L2)
x = L[i]
x = L.pop()
L1[i:j] = L2
L.sort()
x = y
x.field = y
D[x] = y
D1.update(D2)
D.keys()
这里 -
Here −
-
L,L1,L2 all are lists
-
D,D1,D2 are dicts
-
x,y are objects
-
i, j are ints
Queues
如果列表的数据不受保护,我们可能必须面对后果。我们可能获得或删除错误的数据项或竞态条件。这就是建议使用队列数据结构的原因。队列的实际示例可以是单车道单行道,车辆先进入,先退出。可以在售票窗口和公共汽车站看到更多队列的实际示例。
If the list’s data is not protected, we might have to face the consequences. We may get or delete wrong data item, of race conditions. That is why it is recommended to use the queue data structure. A real-world example of queue can be a single-lane one-way road, where the vehicle enters first, exits first. More real-world examples can be seen of the queues at the ticket windows and bus-stops.

队列默认情况下是线程安全的数据结构,我们不必担心实现复杂的锁定机制。Python 为我们提供了模块,在我们的应用程序中使用不同类型的队列。
Queues are by default, thread-safe data structure and we need not worry about implementing complex locking mechanism. Python provides us the module to use different types of queues in our application.
Types of Queues
在本节中,我们将学习不同类型的队列。Python 从 <queue> 模块中提供三种队列选项供使用 -
In this section, we will earn about the different types of queues. Python provides three options of queues to use from the <queue> module −
-
Normal Queues (FIFO, First in First out)
-
LIFO, Last in First Out
-
Priority
我们将在后续部分了解不同的队列。
We will learn about the different queues in the subsequent sections.
Normal Queues (FIFO, First in First out)
这是 Python 提供的最常用的队列实现。在此排队机制中,谁先来,谁先获得服务。FIFO 也称为普通队列。FIFO 队列可以表示如下 -
It is most commonly used queue implementations offered by Python. In this queuing mechanism whosoever will come first, will get the service first. FIFO is also called normal queues. FIFO queues can be represented as follows −

Python Implementation of FIFO Queue
在 python 中,可使用单线程和多线程实现 FIFO 队列。
In python, FIFO queue can be implemented with single thread as well as multithreads.
FIFO queue with single thread
对于使用单线程实现 FIFO 队列, Queue 类将实现一个基本的先进先出容器。元素将使用 put() 添加到序列的“一端”,并使用 get() 从另一端移除。
For implementing FIFO queue with single thread, the Queue class will implement a basic first-in, first-out container. Elements will be added to one “end” of the sequence using put(), and removed from the other end using get().
Example
以下是使用单线程实现 FIFO 队列的 Python 程序 −
Following is a Python program for implementation of FIFO queue with single thread −
import queue
q = queue.Queue()
for i in range(8):
q.put("item-" + str(i))
while not q.empty():
print (q.get(), end = " ")
Output
item-0 item-1 item-2 item-3 item-4 item-5 item-6 item-7
输出表明以上程序使用单线程来说明元素将按其被插入的顺序从队列中移除。
The output shows that above program uses a single thread to illustrate that the elements are removed from the queue in the same order they are inserted.
FIFO queue with multiple threads
对于使用多线程实现 FIFO,我们需要定义 myqueue() 函数,该函数从队列模块中扩展而来。get() 和 put() 方法的工作原理与上面在使用单线程实现 FIFO 队列时讨论的一样。然后,为了使其多线程化,我们需要声明和实例化线程。这些线程将以 FIFO 方式消耗队列。
For implementing FIFO with multiple threads, we need to define the myqueue() function, which is extended from the queue module. The working of get() and put() methods are same as discussed above while implementing FIFO queue with single thread. Then to make it multithreaded, we need to declare and instantiate the threads. These threads will consume the queue in FIFO manner.
Example
以下是使用多线程实现 FIFO 队列的 Python 程序
Following is a Python program for implementation of FIFO queue with multiple threads
import threading
import queue
import random
import time
def myqueue(queue):
while not queue.empty():
item = queue.get()
if item is None:
break
print("{} removed {} from the queue".format(threading.current_thread(), item))
queue.task_done()
time.sleep(2)
q = queue.Queue()
for i in range(5):
q.put(i)
threads = []
for i in range(4):
thread = threading.Thread(target=myqueue, args=(q,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
Output
<Thread(Thread-3654, started 5044)> removed 0 from the queue
<Thread(Thread-3655, started 3144)> removed 1 from the queue
<Thread(Thread-3656, started 6996)> removed 2 from the queue
<Thread(Thread-3657, started 2672)> removed 3 from the queue
<Thread(Thread-3654, started 5044)> removed 4 from the queue
LIFO, Last in First Out queue
此队列使用的类比与 FIFO(先进先出)队列完全相反。在此排队机制中,最后来的人将首先获得服务。这类似于实现栈数据结构。在实现深度优先搜索等人工智能算法时,LIFO 队列被证明是有用的。
This queue uses totally opposite analogy than FIFO(First in First Out) queues. In this queuing mechanism, the one who comes last, will get the service first. This is similar to implement stack data structure. LIFO queues prove useful while implementing Depth-first search like algorithms of artificial intelligence.
Python implementation of LIFO queue
在 python 中,可使用单线程和多线程实现 LIFO 队列。
In python, LIFO queue can be implemented with single thread as well as multithreads.
LIFO queue with single thread
对于使用单线程实现 LIFO 队列, Queue 类将通过使用结构 Queue .LifoQueue 来实现一个基本的先进先出容器。现在,在调用 put() 时,元素将添加到容器的头部,并在使用 get() 时也从头部移除。
For implementing LIFO queue with single thread, the Queue class will implement a basic last-in, first-out container by using the structure Queue.LifoQueue. Now, on calling put(), the elements are added in the head of the container and removed from the head also on using get().
Example
以下是使用单线程实现 LIFO 队列的 Python 程序 −
Following is a Python program for implementation of the LIFO queue with single thread −
import queue
q = queue.LifoQueue()
for i in range(8):
q.put("item-" + str(i))
while not q.empty():
print (q.get(), end=" ")
Output:
item-7 item-6 item-5 item-4 item-3 item-2 item-1 item-0
输出表明以上程序使用单线程来说明元素将按与它们被插入相反的顺序从队列中移除。
The output shows that the above program uses a single thread to illustrate that elements are removed from the queue in the opposite order they are inserted.
LIFO queue with multiple threads
实现与我们使用多线程实现 FIFO 队列的实现类似。唯一的区别在于我们需要使用 Queue 类,该类将通过使用结构 Queue.LifoQueue 来实现一个基本的后进先出容器。
The implementation is similar as we have done the implementation of FIFO queues with multiple threads. The only difference is that we need to use the Queue class that will implement a basic last-in, first-out container by using the structure Queue.LifoQueue.
Example
以下是使用多线程实现 LIFO 队列的 Python 程序 −
Following is a Python program for implementation of LIFO queue with multiple threads −
import threading
import queue
import random
import time
def myqueue(queue):
while not queue.empty():
item = queue.get()
if item is None:
break
print("{} removed {} from the queue".format(threading.current_thread(), item))
queue.task_done()
time.sleep(2)
q = queue.LifoQueue()
for i in range(5):
q.put(i)
threads = []
for i in range(4):
thread = threading.Thread(target=myqueue, args=(q,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
Output
<Thread(Thread-3882, started 4928)> removed 4 from the queue
<Thread(Thread-3883, started 4364)> removed 3 from the queue
<Thread(Thread-3884, started 6908)> removed 2 from the queue
<Thread(Thread-3885, started 3584)> removed 1 from the queue
<Thread(Thread-3882, started 4928)> removed 0 from the queue
Priority queue
在 FIFO 和 LIFO 队列中,项目的顺序与插入顺序相关。但是,在许多情况下,优先级比插入顺序更重要。让我们考虑一个现实世界的例子。假设机场的安检人员正在检查不同类别的乘客。VVIP、航空公司工作人员、海关官员这些类别的乘客可能会优先接受检查,而不是像普通人那样根据到达顺序进行检查。
In FIFO and LIFO queues, the order of items are related to the order of insertion. However, there are many cases when the priority is more important than the order of insertion. Let us consider a real world example. Suppose the security at the airport is checking people of different categories. People of the VVIP, airline staff, custom officer, categories may be checked on priority instead of being checked on the basis of arrival like it is for the commoners.
优先级队列需要考虑的另一个重要方面是如何开发任务调度程序。一种常见的设计是在队列中以优先级为基础为最重要的代理任务提供服务。此数据结构可用于根据其优先级值从队列中提取项目。
Another important aspect that needs to be considered for priority queue is how to develop a task scheduler. One common design is to serve the most agent task on priority basis in the queue. This data structure can be used to pick up the items from the queue based on their priority value.
Python Implementation of Priority Queue
在 python 中,可使用单线程和多线程实现优先级队列。
In python, priority queue can be implemented with single thread as well as multithreads.
Priority queue with single thread
对于使用单线程实现优先级队列, Queue 类将使用结构 Queue .PriorityQueue 来实现优先级容器上的任务。现在,在调用 put() 时,将添加具有值的元素,其中值最低的元素将具有最高的优先级,因此使用 get() 最先检索到。
For implementing priority queue with single thread, the Queue class will implement a task on priority container by using the structure Queue.PriorityQueue. Now, on calling put(), the elements are added with a value where the lowest value will have the highest priority and hence retrieved first by using get().
Example
考虑以下用于使用单线程实现 Priority 队列的 Python 程序 −
Consider the following Python program for implementation of Priority queue with single thread −
import queue as Q
p_queue = Q.PriorityQueue()
p_queue.put((2, 'Urgent'))
p_queue.put((1, 'Most Urgent'))
p_queue.put((10, 'Nothing important'))
prio_queue.put((5, 'Important'))
while not p_queue.empty():
item = p_queue.get()
print('%s - %s' % item)
Output
1 – Most Urgent
2 - Urgent
5 - Important
10 – Nothing important
在以上输出中,我们可以看到队列是根据优先级存储项目的,较低的值具有较高的优先级。
In the above output, we can see that the queue has stored the items based on priority – less value is having high priority.
Priority queue with multi threads
其实现与多线程的 FIFO 和 LIFO 队列实现方式类似。唯一的区别是,我们需要使用 Queue 类来使用结构 Queue.PriorityQueue 来初始化优先权。另一个区别是生成队列的方式。在下面示例中,它将由两个相同的数据集生成。
The implementation is similar to the implementation of FIFO and LIFO queues with multiple threads. The only difference is that we need to use the Queue class for initializing the priority by using the structure Queue.PriorityQueue. Another difference is with the way the queue would be generated. In the example given below, it will be generated with two identical data sets.
Example
以下 Python 程序帮助使用多线程实现优先级队列 −
The following Python program helps in the implementation of priority queue with multiple threads −
import threading
import queue
import random
import time
def myqueue(queue):
while not queue.empty():
item = queue.get()
if item is None:
break
print("{} removed {} from the queue".format(threading.current_thread(), item))
queue.task_done()
time.sleep(1)
q = queue.PriorityQueue()
for i in range(5):
q.put(i,1)
for i in range(5):
q.put(i,1)
threads = []
for i in range(2):
thread = threading.Thread(target=myqueue, args=(q,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
Output
<Thread(Thread-4939, started 2420)> removed 0 from the queue
<Thread(Thread-4940, started 3284)> removed 0 from the queue
<Thread(Thread-4939, started 2420)> removed 1 from the queue
<Thread(Thread-4940, started 3284)> removed 1 from the queue
<Thread(Thread-4939, started 2420)> removed 2 from the queue
<Thread(Thread-4940, started 3284)> removed 2 from the queue
<Thread(Thread-4939, started 2420)> removed 3 from the queue
<Thread(Thread-4940, started 3284)> removed 3 from the queue
<Thread(Thread-4939, started 2420)> removed 4 from the queue
<Thread(Thread-4940, started 3284)> removed 4 from the queue