Rxpy 简明教程

RxPY - Quick Guide

RxPY - Overview

本章解释了响应式编程、RxPY 以及它的运算符、特性、优点和缺点。

What is Reactive Programming?

响应式编程是一种编程范例,用于处理数据流和变更传播。这意味着,当一个数据流由一个组件发出时,变更将会由一个响应式编程的库传播到其他组件。变更会持续传播,直到它到达最终接收器。

通过使用 RxPY,你可以很好地控制异步数据流,例如,可以利用可观察对象追踪对 URL 发出的请求,并使用观察器侦听请求完成以取得响应或错误信息。

RxPY 可让你使用 Observables 处理异步数据流,使用 Operators 查询数据流,即筛选、求和、连接、映射,还可利用 Schedulers 对数据流实现并发。创建一个可观察对象会提供一个观察器对象,其中有 on_next(v)、on_error(e) 和 on_completed() 方法,这些方法需要 subscribed ,以便我们在事件发生时获得通知。

observable

可观察对象可以使用管道运算符按链式格式利用多个运算符进行查询。

RxPY 在不同类别中提供运算符,例如: −

  1. Mathematical operators

  2. Transformation operators

  3. Filtering operators

  4. Error handling operators

  5. Utility operators

  6. Conditional operators

  7. Creation operators

  8. Connectable operators

本教程将详细解释这些运算符。

What is RxPy?

RxPy 定义为 使用Python 中的可观察对象集合和可管道查询运算符组合异步和事件驱动程序的库,这是 RxPy 官方网站中的定义,网址为 https://rxpy.readthedocs.io/en/latest/.

RxPY 是一个 Python 库,用于支持响应式编程。RxPy 代表 Reactive Extensions for Python 。它是一个库,使用可观察对象来处理响应式编程,用于处理异步数据调用、回调和事件驱动程序。

Features of RxPy

在 RxPy 中,以下概念用于处理异步任务 −

Observable

可观察对象是一个创建观察器并将其附加到预期的具有数据流的源的函数,例如推文、与计算机相关事件等。

Observer

它是一个对象,具有 on_next()、on_error() 和 on_completed() 方法,当可观察对象与可观察对象发生交互时,这些方法将被调用,即源与可观察对象进行交互,例如输入的推文等。

Subscription

当创建可观察对象时,我们需要订阅它以执行可观察对象。

Operators

运算符是一个纯函数,它获取可观察对象作为输入,并且输出也是一个可观察对象。你可以使用管道运算符对可观察数据对象使用多个运算符。

Subject

一个主题是一个可观察序列,同时也是一个观察器,它可以多播,即可以与订阅它的多个观察器进行通信。主题是一个冷可观察对象,这意味着值将在已订阅的观察器之间共享。

Schedulers

RxPy 的一个重要特性是并发,即允许任务并行执行。为了实现这一点,RxPy 有两个运算符 subscribe_on() 和 observe_on(),它们与调度程序一起工作,并将决定已订阅任务的执行。

Advantages of using RxPY

以下是 RxPy 的优点 −

  1. 在处理异步数据流和事件方面,RxPY 是一款非常棒的库。RxPY 使用可观察对象来处理响应式编程,用于处理异步数据调用、回调和事件驱动程序。

  2. RxPY 提供了大量数学、转换、过滤、实用程序、条件、错误处理、连接类别中的运算符,与响应式编程一起使用时可简化生活。

  3. 并发(即同时处理多项任务)是使用 RxPY 中的调度程序实现的。

  4. 使用 RxPY 改进了性能,因为这简化了异步任务和并行处理。

Disadvantage of using RxPY

  1. 使用可观察对象调试代码有点困难。

RxPY - Environment Setup

在本教程中,我们将安装 RxPy。要开始使用 RxPY,首先需要安装 Python。所以,我们准备执行以下操作:

  1. Install Python

  2. Install RxPy

Installing Python

转到 Python 官方网站: https://www.python.org/downloads/. ,如下图所示,并单击适用于 Windows、Linux/Unix 和 mac 操作系统的最新版本。根据您可用的 64 位或 32 位操作系统下载 Python。

python

下载后,单击 .exe file 并按照步骤在系统上安装 Python。

python install

Python 程序包管理器即 pip 也会随上述安装而默认安装。为了让它在您的系统上全局起作用,直接将 Python 的位置添加到 PATH 变量,可以在安装开始时看到这一点,要记住选中复选框,上面写着添加到 PATH。如果您忘记选中它,请按照下面给出的步骤添加到 PATH。

要添加到 PATH,请按照以下步骤操作:

右键单击您的计算机图标并单击“属性”→“高级系统设置”。

它会显示如下屏幕:

system properties

单击“环境变量”,如下图所示,它会显示如下屏幕:

environment variable

选择“路径”,单击“编辑”按钮,在末尾添加 python 的位置路径。现在,让我们检查一下 python 版本。

Checking for python version

E:\pyrx>python --version
Python 3.7.3

Install RxPY

现在,我们已经安装了 python,准备安装 RxPy。

一旦安装了 python,Python 程序包管理器即 pip 也会随之安装。以下命令用于检查 pip 版本:

E:\pyrx>pip --version
pip 19.1.1 from c:\users\xxxx\appdata\local\programs\python\python37\lib\site-
packages\pip (python 3.7)

我们已经安装了 pip,版本为 19.1.1. 。现在,我们要用 pip 安装 RxPy。

指令如下 −

pip install rx
pip install rx

RxPY - Latest Release Updates

在本教程中,我们使用 RxPY 3 版和 Python 3.7.3 版。RxPY 3 版的工作方式与早期版本(即 RxPY 1 版)略有不同。

在本章中,我们将讨论这两个版本之间的差异以及在更新 Python 和 RxPY 版本时需要进行的更改。

Observable in RxPY

在 RxPy 1 版中,Observable 是一个单独的类——

from rx import Observable

要使用 Observable,您必须按如下方式使用它——

Observable.of(1,2,3,4,5,6,7,8,9,10)

在 RxPy 3 版中,Observable 直接是 rx 包的一部分。

Example

import rx
rx.of(1,2,3,4,5,6,7,8,9,10)

Operators in RxPy

在 1 版中,运算符是 Observable 类的成员函数。例如,要使用运算符,我们必须导入 Observable,如下所示——

from rx import Observable

运算符用作 Observable.operator,例如如下所示——

Observable.of(1,2,3,4,5,6,7,8,9,10)\
   .filter(lambda i: i %2 == 0) \
   .sum() \
   .subscribe(lambda x: print("Value is {0}".format(x)))

对于 RxPY 3 版,运算符是函数,按如下方式导入和使用——

import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
   ops.filter(lambda i: i %2 == 0),
   ops.sum()
).subscribe(lambda x: print("Value is {0}".format(x)))

Chaining Operators Using Pipe() method

在 RxPy 1 版中,如果您必须对一个可观察对象使用多个运算符,则必须按如下方式进行——

Example

from rx import Observable
Observable.of(1,2,3,4,5,6,7,8,9,10)\
   .filter(lambda i: i %2 == 0) \
   .sum() \
   .subscribe(lambda x: print("Value is {0}".format(x)))

但是,在 RxPY 3 版中,您可以使用 pipe() 方法和多个运算符,如下所示——

Example

import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
   ops.filter(lambda i: i %2 == 0),
   ops.sum()
).subscribe(lambda x: print("Value is {0}".format(x)))

RxPY - Working With Observables

一个可观察对象是一个创建观察员并将其附加到预期值源(例如点击、DOM 元素上的鼠标事件等)的函数。

下面提及的主题将在本章中详细研究。

  1. Create Observables

  2. 订阅和执行可观察对象

Create observables

要创建一个可观察对象,我们将使用 {}s0 方法并向其传递一个包含以下项的函数。

  1. {}s1——当 Observable 发送一个项目时,将调用此函数。

  2. on_completed() − 此函数在 observable 完成时被调用。

  3. on_error() − 此函数在 observable 上出现错误时被调用。

要使用 create() 方法,首先按下方所示导入该方法−

from rx import create

以下是创建一个 observable 的工作示例−

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_error("Error")
   observer.on_completed()
source = create(test_observable).

Subscribe and Execute an Observable

要订阅一个 observable,我们需要使用 subscribe() 函数,并传递回调函数 on_next、on_error 和 on_completed。

以下是工作示例−

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

subscribe() 方法负责执行 observable。回调函数 on_nexton_erroron_completed 必须传递给 subscribe 方法。随后对 subscribe 方法的调用将执行 test_observable() 函数。

不必将所有三个回调函数都传递给 subscribe() 方法。您可以根据自己的需要传递 on_next()、on_error() 和 on_completed()。

lambda 函数用于 on_next、on_error 和 on_completed。它将采用参数并执行给定的表达式。

以下是创建的 observable 的输出−

E:\pyrx>python testrx.py
Got - Hello
Job Done!

RxPY - Operators

本章详细介绍了 RxPY 中的运算符。这些运算符包括 −

  1. Working with Operators

  2. Mathematical operators

  3. Transformation operators

  4. Filtering operators

  5. Error handling operators

  6. Utility operators

  7. Conditional operators

  8. Creation operators

  9. Connectable operators

  10. Combining operators

Reactive (Rx) python 几乎有大量的运算符,使用 python 编程可以轻松处理。你可以一起使用这些多个运算符,例如,在处理字符串时,你可以使用 map、filter、merge 运算符。

Working with Operators

你可以使用 pipe() 方法一起使用多个运算符。此方法允许将多个运算符链接在一起。

以下是用运算符的工作示例 −

test = of(1,2,3) // an observable
subscriber = test.pipe(
   op1(),
   op2(),
   op3()
)

在上述示例中,我们使用 of() 方法创建了一个可观察对象,其中包含值 1、2 和 3。现在,在该可观察对象上,你可以使用任意数量的运算符,使用上述 pipe() 方法执行不同的操作。将对给定可观察对象按顺序执行运算符。

要使用运算符,首先按如下所示进行导入 −

from rx import of, operators as op

以下是用例 −

testrx.py

from rx import of, operators as op
test = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
sub1 = test.pipe(
   op.filter(lambda s: s%2==0),
   op.reduce(lambda acc, x: acc + x)
)
sub1.subscribe(lambda x: print("Sum of Even numbers is {0}".format(x)))

在上述示例中,有一个数字列表,我们使用一个 filter 运算符从中过滤偶数,然后使用一个 reduce 运算符添加它。

Output

E:\pyrx>python testrx.py
Sum of Even numbers is 30

以下是我们将要讨论的运算符列表 −

  1. Creating Observables

  2. Mathematical operators

  3. Transformation operators

  4. Filtering operators

  5. Error handling operators

  6. Utility operators

  7. Conditional

  8. Connectable

  9. Combining operators

Creating Observables

以下是我们在“创建”类别中将讨论的可观察对象

Observable

Description

create

此方法用于创建可观察对象。

empty

此可观察对象不会输出任何内容,并直接发出完成状态。

never

此方法创建一个永远不会达到完成状态的可观察对象。

throw

此方法将创建将抛出错误的可观察对象。

from_

此方法将给定的数组或对象转换为可观察对象。

interval

此方法将在超时后提供一系列生成的值。

just

此方法将给定的值转换为可观察对象。

range

这种方法将基于给定的输入给出一个范围的整数。

repeat_value

此方法将创建一个可观察对象,该对象将根据给定的计数重复给定值。

start

此方法采用一个函数作为输入,并返回一个可观察对象,该对象将返回输入函数中的值。

timer

此方法将在超时完成之后按顺序发出值。

Mathematical operators

我们将在数学运算符类别中讨论的运算符如下:−

Operator

Description

average

此运算符将计算给定源可观察对象的平均值,并输出一个具有平均值的可观察对象。

concat

此运算符将采用两个或更多个可观察对象,并给出具有序列中所有值的单个可观察对象。

count

此运算符采用具有值的 Observable,并将其转换为具有单个值的 Observable。计数函数采用谓词函数作为可选参数。该函数为布尔类型,仅当满足条件时才将值添加到输出。

max

该运算符将给出源可观察对象中的最大值的可观察对象。

min

该运算符将给出源可观察对象中的最小值的可观察对象。

reduce

此运算符采用一个名为累加器函数的函数,该函数用于源可观察对象中的值,它以可观察对象的格式返回累加值,同时将可选的种子值传递给累加器函数。

sum

此运算符将返回源可观察对象中所有值的总和的可观察对象。

Transformation operators

我们在变换运算符类别中要讨论的运算符如下所示 −

Operator

Category

buffer

该运算符将收集源可观察对象中的所有值,并在给定的边界条件满足后以规则的时间间隔发出它们。

ground_by

此运算符将根据给定的 key_mapper 函数对源可观察对象中的值进行分组。

map

此运算符将根据给定映射函数的输出,将源可观察对象中的每个值更改为一个新值。

scan

此运算符将累加器函数应用于来自源可观察对象的值,并返回具有新值的可观察对象。

Filtering operators

我们将讨论的过滤操作符类别中的操作符如下:

Operator

Category

debounce

此操作符将给出来自源可观察对象的值,直到指定时间段为止,并忽略其余时间。

distinct

此操作符将给出与源可观察对象不同的所有值。

element_at

此操作符将根据给定的索引从源可观察对象提供一个元素。

filter

此操作符将根据给定的谓词函数从源可观察对象筛选值。

first

此操作符将给出源可观察对象的第一个元素。

ignore_elements

此操作符将忽略来自源可观察对象的所有值,而仅执行对完成或错误回调函数的调用。

last

此操作符将给出源可观察对象的最后一个元素。

skip

此操作符将返回一个可观察对象,该对象将跳过作为输入获取的计数项的首次出现。

skip_last

此操作符将返回一个可观察对象,该对象将跳过作为输入获取的计数项的最后一次出现。

take

此操作符将根据给定的计数连续有序地给出一系列源值。

take_last

此操作符将根据给定的计数从后往前连续有序地给出一系列源值。

Error handling operators

我们将讨论的错误处理操作符类别中的操作符包括:-

Operator

Description

catch

当出现异常时,此操作符将终止源可观察对象。

retry

当出现错误时,此操作符将在源可观察对象上重试,一旦重试次数完成,它将终止。

Utility operators

以下是我们将讨论的实用程序操作符类别中的操作符。

Operator

Description

delay

该操作符将根据给定的时间或日期延迟源可观察的排放。

materialize

该操作符将以显式通知值的形式转换来自源可观察的值。

time_interval

该操作符将给出来自源可观察的值之间经过的时间。

timeout

该操作符将在经过时间后给出来自源可观察的所有值,否则将触发错误。

timestamp

该操作符将给源可观察的所有值附加一个时间戳。

Conditional and Boolean operators

我们将在条件和布尔操作符类别中讨论的操作符如下所列:

Operator

Description

all

该操作符将检查源可观察的所有值是否满足给定的条件。

contains

该操作符将返回一个可观察的 true 或 false 值,如果存在给定值并且它是源可观察的值。

default_if_empty

如果源可观察为空,该操作符将返回一个默认值。

sequence_equal

该操作符将比较两个序列的可观察或一个值数组,并返回一个具有值 true 或 false 的可观察。

skip_until

该操作符将丢弃源可观察的值,直到第二个可观察发出一个值。

skip_while

该操作符将返回一个具有符合通过条件的源可观察的值的可观察。

take_until

该操作符将丢弃源可观察的值,直到第二个可观察发出一个值或终止。

take_while

当条件失败时,该操作符将丢弃源可观察的值。

Connectable Operators

我们将在可连接运算符类别中讨论的操作符是:

Operator

Description

publish

该方法将可观察转换为可连接可观察。

ref_count

该运算符将使可观察成为一个正常的可观察。

replay

此方法与 replaySubject 的工作方式类似。此方法将返回相同的值,即使 observable 已经播发,并且部分订阅者订阅较晚。

Combining Operators

以下是在组合操作程序类别中我们要讨论的操作程序。

Operator

Description

combine_latest

此操作程序会为给定的 observable 输入创建元组。

merge

此操作程序会合并给定的 observables。

start_with

此操作程序会取给定的值并添加到源 observable 返回的完整序列的开头。

zip

此操作程序返回一个 observable,其中的值以元组形式出现,该元组是通过取给定 observable 的第一个值等方式形成的。

RxPY - Working With Subject

主题是一个可观察序列,同样也是一个观察者,它可以进行多播,即与多个已订阅的观察者对话。

我们将对主题讨论以下主题−

  1. Create a subject

  2. Subscribe to a subject

  3. Passing data to subject

  4. BehaviorSubject

  5. ReplaySubject

  6. AsyncSubject

Create a subject

要使用一个主题,我们需要按下方所示导入主题−

from rx.subject import Subject

您可以按如下方式创建一个主题对象−

subject_test = Subject()

对象是一个具有三个方法的观察者−

  1. on_next(value)

  2. on_error(error) and

  3. on_completed()

Subscribe to a Subject

您可以在主题上创建多个订阅,如下所示−

subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)

Passing Data to Subject

您可以使用 on_next(value) 方法将数据传递到所创建的主题,如下所示−

subject_test.on_next("A")
subject_test.on_next("B")

数据将传递到所有订阅,然后添加到主题中。

这里有一个主题的工作示例。

Example

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")

subject_test 对象是通过调用 Subject() 来创建的。subject_test 对象引用 on_next(value)、on_error(error) 和 on_completed() 方法。上述示例的输出如下所示 −

Output

E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B

我们可以使用 on_completed() 方法来停止主题执行,如下所示。

Example

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")

一旦我们调用 complete,稍后调用的下一个方法就不再被调用。

Output

E:\pyrx>python testrx.py
The value is A
The value is A

现在让我们看看如何调用 on_error(error) 方法。

Example

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))

Output

E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!

BehaviorSubject

BehaviorSubject 在调用时将给你最新值。你可以创建行为主题,如下所示 −

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject

这是一个使用行为主题的工作示例

Example

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
   lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
   lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")

Output

E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject

Replay Subject

replaysubject 与行为主题类似,它可以缓冲值并将其重新播放给新订阅者。这是一个回复主题的工作示例。

Example

from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)

在回放主题上使用的缓冲区值为 2。因此,最后两个值将被缓冲并用于所调用的新订阅者。

Output

E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5

AsyncSubject

对于 AsyncSubject,传递给订阅者的最后一个值称为,并且仅在调用 complete() 方法之后才会执行。

Example

from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.

Output

E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2

RxPY - Concurrency Using Scheduler

RxPy 的一个重要特性是并发性,即允许任务并行执行。为了实现这一点,我们有两个运算符 subscribe_on() 和 observe_on(),它们将与调度程序一起工作,调度程序将决定已订阅任务的执行。

这是一个工作示例,显示了 subscibe_on()、observe_on() 和调度程序的需要。

Example

import random
import time
import rx
from rx import operators as ops
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
input("Press any key to exit\n")

在上面的示例中,我有 2 个任务:任务 1 和任务 2。任务的执行是按顺序的。只有在第一个任务完成时,第二个任务才会开始。

Output

E:\pyrx>python testrx.py
From Task 1: 1
From Task 1: 2
From Task 1: 3
From Task 1: 4
From Task 1: 5
Task 1 complete
From Task 2: 1
From Task 2: 2
From Task 2: 3
From Task 2: 4
Task 2 complete

RxPy 支持许多调度程序,在这里,我们将使用 ThreadPoolScheduler。ThreadPoolScheduler 主要将尝试管理可用 CPU 线程。

在示例中,我们之前已经看到,我们将使用一个多处理模块来提供 cpu_count。计数将提供给 ThreadPoolScheduler,ThreadPoolScheduler 会根据可用的线程设法使任务并行工作。

以下是用例 −

import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
input("Press any key to exit\n")

在上面的示例中,我有 2 个任务,cpu_count 为 4。由于任务为 2,并且我们可用的线程为 4,因此这两个任务可以并行开始。

Output

E:\pyrx>python testrx.py
Cpu count is : 4
Press any key to exit
From Task 1: 1
From Task 2: 1
From Task 1: 2
From Task 2: 2
From Task 2: 3
From Task 1: 3
From Task 2: 4
Task 2 complete
From Task 1: 4
From Task 1: 5
Task 1 complete

如果你查看输出,这两个任务都会并行启动。

现在,考虑一种情况,任务多于 CPU 计数,即 CPU 计数为 4,任务为 5。在这种情况下,我们需要检查在任务完成后是否有任何线程已释放,以便将其分配给队列中可用的新任务。

为此,我们可以使用 observe_on() 运算符,它将观察调度程序是否有任何线程是空闲的。这是一个使用 observe_on() 的工作示例

Example

import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
#Task 3
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 3: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 3 complete")
)
#Task 4
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 4: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 4 complete")
)
#Task 5
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.observe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 5: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 5 complete")
)
input("Press any key to exit\n")

Output

E:\pyrx>python testrx.py
Cpu count is : 4
From Task 4: 1
From Task 4: 2
From Task 1: 1
From Task 2: 1
From Task 3: 1
From Task 1: 2
From Task 3: 2
From Task 4: 3
From Task 3: 3
From Task 2: 2
From Task 1: 3
From Task 4: 4
Task 4 complete
From Task 5: 1
From Task 5: 2
From Task 5: 3
From Task 3: 4
Task 3 complete
From Task 2: 3
Press any key to exit
From Task 5: 4
Task 5 complete
From Task 1: 4
From Task 2: 4
Task 2 complete
From Task 1: 5
Task 1 complete

如果你查看输出,任务 4 完成后,线程将提供给下一个任务,即任务 5,并且任务 5 开始执行。

RxPY - Examples

在本章中,我们将详细讨论以下主题 −

  1. 基础示例展示了可观察、操作符的工作原理,以及订阅观察者。

  2. 可观察与主题之间的差异。

  3. 理解冷可观察和热可观察。

下面给出了一个基础示例,展示了可观察、操作符的工作原理,以及订阅观察者。

Example

test.py

import requests
import rx
import json
from rx import operators as ops
def filternames(x):
   if (x["name"].startswith("C")):
      return x["name"]
   else :
      return ""
content = requests.get('https://jsonplaceholder.typicode.com/users')
y = json.loads(content.text)
source = rx.from_(y)
case1 = source.pipe(
   ops.filter(lambda c: filternames(c)),
   ops.map(lambda a:a["name"])
)
case1.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)), 8. RxPy — Examples
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

这是一个非常简单的示例,其中,我从这个 URL 获取用户数据

过滤数据,按“C”开头的名字进行过滤,然后使用 map 只返回名字。以下是相同数据的输出 −

E:\pyrx\examples>python test.py
Got - Clementine Bauch
Got - Chelsey Dietrich
Got - Clementina DuBuque
Job Done!

Difference between observable and subject

在这个示例中,我们将查看可观察与主题之间的差异。

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

Output

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

在以上示例中,每次订阅可观察,它都会给你新的值。

Subject Example

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

Output

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

如果你看到这些值是在两个使用主题的订阅者之间共享的。

Understanding Cold and Hot Observables

可观察被分类为

  1. Cold Observables

  2. Hot Observables

当多个订阅者都订阅的时候就会发现可观测的差异。

Cold Observables

冷可观测是指可执行且在每次订阅时都会呈现数据的可观测。当它被订阅时,可观测会被执行,并给出新值。

以下示例给出了对冷可观察的理解。

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

Output

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

在以上示例中,每次订阅可观察,它都会执行可观察并发出值。这些值也可以像在以上示例中展示的那样在不同的订阅者之间不同。

Hot Observables

在热可观察的情况下,当它们准备好时会发出值,而且不会总是等待订阅。当值被发出时,所有的订阅者都将得到相同的值。

当你希望在可观察准备好时发出值,或者希望向所有订阅者共享相同的值时,你可以利用热可观察。

热可观察的示例包括主题和可连接操作符。

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

Output

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

如果你看到,相同的值在订阅者之间被共享。你可以使用 publish() 可连接可观察操作符实现相同的目的。