Rxpy 简明教程
RxPY - Concurrency using Scheduler
RxPy 的一个重要特性是并发性,即允许任务并行执行。为了实现这一点,我们有两个运算符 subscribe_on() 和 observe_on(),它们将与调度程序一起工作,调度程序将决定已订阅任务的执行。
One important feature of RxPy is concurrency, i.e. to allow the task to execute in parallel. To make that happen, we have two operators subscribe_on() and observe_on() that will work with a scheduler, that will decide the execution of the subscribed task.
这是一个工作示例,显示了 subscibe_on()、observe_on() 和调度程序的需要。
Here, is a working example, that shows the need for subscibe_on(), observe_on() and scheduler.
Example
import random
import time
import rx
from rx import operators as ops
def adding_delay(value):
time.sleep(random.randint(5, 20) * 0.1)
return value
# Task 1
rx.of(1,2,3,4,5).pipe(
ops.map(lambda a: adding_delay(a))
).subscribe(
lambda s: print("From Task 1: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a))
).subscribe(
lambda s: print("From Task 2: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 2 complete")
)
input("Press any key to exit\n")
在上面的示例中,我有 2 个任务:任务 1 和任务 2。任务的执行是按顺序的。只有在第一个任务完成时,第二个任务才会开始。
In the above example, I have 2 tasks: Task 1 and Task 2. The execution of the task is in sequence. The second task starts only, when the first task is done.
Output
E:\pyrx>python testrx.py
From Task 1: 1
From Task 1: 2
From Task 1: 3
From Task 1: 4
From Task 1: 5
Task 1 complete
From Task 2: 1
From Task 2: 2
From Task 2: 3
From Task 2: 4
Task 2 complete
RxPy 支持许多调度程序,在这里,我们将使用 ThreadPoolScheduler。ThreadPoolScheduler 主要将尝试管理可用 CPU 线程。
RxPy supports many Scheduler, and here, we are going to make use of ThreadPoolScheduler. ThreadPoolScheduler mainly will try to manage with the CPU threads available.
在示例中,我们之前已经看到,我们将使用一个多处理模块来提供 cpu_count。计数将提供给 ThreadPoolScheduler,ThreadPoolScheduler 会根据可用的线程设法使任务并行工作。
In the example, we have seen earlier, we are going to make use of a multiprocessing module that will give us the cpu_count. The count will be given to the ThreadPoolScheduler that will manage to get the task working in parallel based on the threads available.
以下是用例 −
Here, is a working example −
import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
time.sleep(random.randint(5, 20) * 0.1)
return value
# Task 1
rx.of(1,2,3,4,5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 1: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 2: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 2 complete")
)
input("Press any key to exit\n")
在上面的示例中,我有 2 个任务,cpu_count 为 4。由于任务为 2,并且我们可用的线程为 4,因此这两个任务可以并行开始。
In the above example, I have 2 tasks and the cpu_count is 4. Since, the task is 2 and threads available with us are 4, both the task can start in parallel.
Output
E:\pyrx>python testrx.py
Cpu count is : 4
Press any key to exit
From Task 1: 1
From Task 2: 1
From Task 1: 2
From Task 2: 2
From Task 2: 3
From Task 1: 3
From Task 2: 4
Task 2 complete
From Task 1: 4
From Task 1: 5
Task 1 complete
如果你查看输出,这两个任务都会并行启动。
If you see the output, both the task has started in parallel.
现在,考虑一种情况,任务多于 CPU 计数,即 CPU 计数为 4,任务为 5。在这种情况下,我们需要检查在任务完成后是否有任何线程已释放,以便将其分配给队列中可用的新任务。
Now, consider a scenario, where the task is more than the CPU count i.e. CPU count is 4 and tasks are 5. In this case, we would need to check if any thread has got free after task completion, so that, it can be assigned to the new task available in the queue.
为此,我们可以使用 observe_on() 运算符,它将观察调度程序是否有任何线程是空闲的。这是一个使用 observe_on() 的工作示例
For this purpose, we can use the observe_on() operator which will observe the scheduler if any threads are free. Here, is a working example using observe_on()
Example
import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
time.sleep(random.randint(5, 20) * 0.1)
return value
# Task 1
rx.of(1,2,3,4,5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 1: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 2: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 2 complete")
)
#Task 3
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 3: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 3 complete")
)
#Task 4
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 4: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 4 complete")
)
#Task 5
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.observe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 5: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 5 complete")
)
input("Press any key to exit\n")
Output
E:\pyrx>python testrx.py
Cpu count is : 4
From Task 4: 1
From Task 4: 2
From Task 1: 1
From Task 2: 1
From Task 3: 1
From Task 1: 2
From Task 3: 2
From Task 4: 3
From Task 3: 3
From Task 2: 2
From Task 1: 3
From Task 4: 4
Task 4 complete
From Task 5: 1
From Task 5: 2
From Task 5: 3
From Task 3: 4
Task 3 complete
From Task 2: 3
Press any key to exit
From Task 5: 4
Task 5 complete
From Task 1: 4
From Task 2: 4
Task 2 complete
From Task 1: 5
Task 1 complete
如果你查看输出,任务 4 完成后,线程将提供给下一个任务,即任务 5,并且任务 5 开始执行。
If you see the output, the moment task 4 is complete, the thread is given to the next task i.e., task 5 and the same starts executing.