Rxpy 简明教程
RxPY - Overview
本章解释了响应式编程、RxPY 以及它的运算符、特性、优点和缺点。
This chapter explains what is reactive programming, what is RxPY, its operators, features, advantages and disadvantage.
What is Reactive Programming?
响应式编程是一种编程范例,用于处理数据流和变更传播。这意味着,当一个数据流由一个组件发出时,变更将会由一个响应式编程的库传播到其他组件。变更会持续传播,直到它到达最终接收器。
Reactive programming is a programming paradigm, that deals with data flow and the propagation of change. It means that, when a data flow is emitted by one component, the change will be propagated to other components by a reactive programming library. The propagation of change will continue until it reaches the final receiver.
通过使用 RxPY,你可以很好地控制异步数据流,例如,可以利用可观察对象追踪对 URL 发出的请求,并使用观察器侦听请求完成以取得响应或错误信息。
By using RxPY, you have good control on the asynchronous data streams, for example, a request made to URL can be traced by using observable, and use the observer to listen to when the request is complete for response or error.
RxPY 可让你使用 Observables 处理异步数据流,使用 Operators 查询数据流,即筛选、求和、连接、映射,还可利用 Schedulers 对数据流实现并发。创建一个可观察对象会提供一个观察器对象,其中有 on_next(v)、on_error(e) 和 on_completed() 方法,这些方法需要 subscribed ,以便我们在事件发生时获得通知。
RxPY offers you to handle asynchronous data streams using Observables, query the data streams using Operators i.e. filter, sum, concat, map and also make use of concurrency for the data streams using Schedulers. Creating an Observable, gives an observer object with on_next(v), on_error(e) and on_completed() methods, that needs to be subscribed so that we get a notification when an event occurs.

可观察对象可以使用管道运算符按链式格式利用多个运算符进行查询。
The Observable can be queried using multiple operators in a chain format by using the pipe operator.
RxPY 在不同类别中提供运算符,例如: −
RxPY offers operators in various categories like:−
-
Mathematical operators
-
Transformation operators
-
Filtering operators
-
Error handling operators
-
Utility operators
-
Conditional operators
-
Creation operators
-
Connectable operators
本教程将详细解释这些运算符。
These operators are explained in detail in this tutorial.
What is RxPy?
RxPy 定义为 使用Python 中的可观察对象集合和可管道查询运算符组合异步和事件驱动程序的库,这是 RxPy 官方网站中的定义,网址为 https://rxpy.readthedocs.io/en/latest/.
RxPY is defined as *a library for composing asynchronous and event-based programs using observable collections and pipable query operators in Python *as per the official website of RxPy, which is https://rxpy.readthedocs.io/en/latest/.
RxPY 是一个 Python 库,用于支持响应式编程。RxPy 代表 Reactive Extensions for Python 。它是一个库,使用可观察对象来处理响应式编程,用于处理异步数据调用、回调和事件驱动程序。
RxPY is a python library to support Reactive Programming. RxPy stands for Reactive Extensions for Python. It’s a library that uses observables to work with reactive programming that deals with asynchronous data calls, callbacks and event−based programs.
Features of RxPy
在 RxPy 中,以下概念用于处理异步任务 −
In RxPy, following concepts take care of handling the asynchronous task −
Observable
可观察对象是一个创建观察器并将其附加到预期的具有数据流的源的函数,例如推文、与计算机相关事件等。
An observable is a function that creates an observer and attaches it to the source having data streams that are expected from, for example, Tweets, computer−related events, etc.
Observer
它是一个对象,具有 on_next()、on_error() 和 on_completed() 方法,当可观察对象与可观察对象发生交互时,这些方法将被调用,即源与可观察对象进行交互,例如输入的推文等。
It is an object with on_next(), on_error() and on_completed() methods, that will get called when there is interaction with the observable i.e. the source interacts for an example incoming Tweets, etc.
Subscription
当创建可观察对象时,我们需要订阅它以执行可观察对象。
When the observable is created, to execute the observable we need to subscribe to it.
Operators
运算符是一个纯函数,它获取可观察对象作为输入,并且输出也是一个可观察对象。你可以使用管道运算符对可观察数据对象使用多个运算符。
An operator is a pure function that takes in observable as input and the output is also an observable. You can use multiple operators on an observable data by using the pipe operator.
Subject
一个主题是一个可观察序列,同时也是一个观察器,它可以多播,即可以与订阅它的多个观察器进行通信。主题是一个冷可观察对象,这意味着值将在已订阅的观察器之间共享。
A subject is an observable sequence as well as an observer that can multicast, i.e. talk to many observers that have subscribed. The subject is a cold observable, i.e. the values will be shared between the observers that have been subscribed.
Schedulers
RxPy 的一个重要特性是并发,即允许任务并行执行。为了实现这一点,RxPy 有两个运算符 subscribe_on() 和 observe_on(),它们与调度程序一起工作,并将决定已订阅任务的执行。
One important feature of RxPy is concurrency i.e. to allow the task to execute in parallel. To make that happen RxPy has two operators subscribe_on() and observe_on() that works with schedulers and will decide the execution of the subscribed task.
Advantages of using RxPY
以下是 RxPy 的优点 −
The following are the advantages of RxPy −
-
RxPY is an awesome library when it comes to the handling of async data streams and events. RxPY uses observables to work with reactive programming that deals with asynchronous data calls, callbacks and event-based programs.
-
RxPY offers a huge collection of operators in mathematical, transformation, filtering, utility, conditional, error handling, join categories that makes life easy when used with reactive programming.
-
Concurrency i.e. working of multiple tasks together is achieved using schedulers in RxPY.
-
The performance is improved using RxPY as handling of async task and parallel processing is made easy.
RxPY - Environment Setup
在本教程中,我们将安装 RxPy。要开始使用 RxPY,首先需要安装 Python。所以,我们准备执行以下操作:
In this chapter, we will work on the installation of RxPy. To start working with RxPY, we need to install Python first. So, we are going to work on the following −
-
Install Python
-
Install RxPy
Installing Python
转到 Python 官方网站: https://www.python.org/downloads/. ,如下图所示,并单击适用于 Windows、Linux/Unix 和 mac 操作系统的最新版本。根据您可用的 64 位或 32 位操作系统下载 Python。
Go to the Python official site: https://www.python.org/downloads/. as shown below, and click on the latest version available for Windows, Linux/Unix, and mac os. Download Python as per your 64 or 32-bit OS available with you.

下载后,单击 .exe file 并按照步骤在系统上安装 Python。
Once you have downloaded, click on the .exe file and follow the steps to install python on your system.

Python 程序包管理器即 pip 也会随上述安装而默认安装。为了让它在您的系统上全局起作用,直接将 Python 的位置添加到 PATH 变量,可以在安装开始时看到这一点,要记住选中复选框,上面写着添加到 PATH。如果您忘记选中它,请按照下面给出的步骤添加到 PATH。
The python package manager, i.e. pip will also get installed by default with the above installation. To make it work globally on your system, directly add the location of python to the PATH variable, the same is shown at the start of the installation, to remember to check the checkbox, which says ADD to PATH. In case, you forget to check it, please follow the below given steps to add to PATH.
要添加到 PATH,请按照以下步骤操作:
To add to PATH follow the below steps −
右键单击您的计算机图标并单击“属性”→“高级系统设置”。
Right-click on your Computer icon and click on properties → Advanced System Settings.
它会显示如下屏幕:
It will display the screen as shown below −

单击“环境变量”,如下图所示,它会显示如下屏幕:
Click on Environment Variables as shown above. It will display the screen as shown below −

选择“路径”,单击“编辑”按钮,在末尾添加 python 的位置路径。现在,让我们检查一下 python 版本。
Select Path and click on Edit button, add the location path of your python at the end. Now, let’s check the python version.
Install RxPY
现在,我们已经安装了 python,准备安装 RxPy。
Now, that we have python installed, we are going to install RxPy.
一旦安装了 python,Python 程序包管理器即 pip 也会随之安装。以下命令用于检查 pip 版本:
Once python is installed, python package manager, i.e. pip will also get installed. Following is the command to check pip version −
E:\pyrx>pip --version
pip 19.1.1 from c:\users\xxxx\appdata\local\programs\python\python37\lib\site-
packages\pip (python 3.7)
我们已经安装了 pip,版本为 19.1.1. 。现在,我们要用 pip 安装 RxPy。
We have pip installed and the version is 19.1.1. Now, we will use pip to install RxPy
指令如下 −
The command is as follows −
pip install rx

RxPY - Latest Release Updates
在本教程中,我们使用 RxPY 3 版和 Python 3.7.3 版。RxPY 3 版的工作方式与早期版本(即 RxPY 1 版)略有不同。
In this tutorial, we are using RxPY version 3 and python version 3.7.3. The working of RxPY version 3 differs a little bit with the earlier version, i.e. RxPY version 1.
在本章中,我们将讨论这两个版本之间的差异以及在更新 Python 和 RxPY 版本时需要进行的更改。
In this chapter, we are going to discuss the differences between the 2 versions and changes that need to be done in case you are updating Python and RxPY versions.
Observable in RxPY
在 RxPy 1 版中,Observable 是一个单独的类——
In RxPy version 1, Observable was a separate class −
from rx import Observable
要使用 Observable,您必须按如下方式使用它——
To use the Observable, you have to use it as follows −
Observable.of(1,2,3,4,5,6,7,8,9,10)
在 RxPy 3 版中,Observable 直接是 rx 包的一部分。
In RxPy version 3, Observable is directly a part of the rx package.
Example
import rx
rx.of(1,2,3,4,5,6,7,8,9,10)
Operators in RxPy
在 1 版中,运算符是 Observable 类的成员函数。例如,要使用运算符,我们必须导入 Observable,如下所示——
In version 1, the operator was methods in the Observable class. For example, to make use of operators we have to import Observable as shown below −
from rx import Observable
运算符用作 Observable.operator,例如如下所示——
The operators are used as Observable.operator, for example, as shown below −
Observable.of(1,2,3,4,5,6,7,8,9,10)\
.filter(lambda i: i %2 == 0) \
.sum() \
.subscribe(lambda x: print("Value is {0}".format(x)))
对于 RxPY 3 版,运算符是函数,按如下方式导入和使用——
In the case of RxPY version 3, operators are function and are imported and used as follows −
import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
ops.filter(lambda i: i %2 == 0),
ops.sum()
).subscribe(lambda x: print("Value is {0}".format(x)))
Chaining Operators Using Pipe() method
在 RxPy 1 版中,如果您必须对一个可观察对象使用多个运算符,则必须按如下方式进行——
In RxPy version 1, in case you had to use multiple operators on an observable, it had to be done as follows −
Example
from rx import Observable
Observable.of(1,2,3,4,5,6,7,8,9,10)\
.filter(lambda i: i %2 == 0) \
.sum() \
.subscribe(lambda x: print("Value is {0}".format(x)))
但是,在 RxPY 3 版中,您可以使用 pipe() 方法和多个运算符,如下所示——
But, in case of RxPY version 3, you can use pipe() method and multiple operators as shown below −
Example
import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
ops.filter(lambda i: i %2 == 0),
ops.sum()
).subscribe(lambda x: print("Value is {0}".format(x)))
RxPY - Working With Observables
一个可观察对象是一个创建观察员并将其附加到预期值源(例如点击、DOM 元素上的鼠标事件等)的函数。
An observable, is a function that creates an observer and attaches it to the source where values are expected, for example, clicks, mouse events from a dom element, etc.
下面提及的主题将在本章中详细研究。
The topics mentioned below will be studied in detail in this chapter.
-
Create Observables
-
Subscribe and Execute an Observable
Create observables
要创建一个可观察对象,我们将使用 {}s0 方法并向其传递一个包含以下项的函数。
To create an observable we will use create() method and pass the function to it that has the following items.
-
on_next() − This function gets called when the Observable emits an item.
-
on_completed() − This function gets called when the Observable is complete.
-
on_error() − This function gets called when an error occurs on the Observable.
要使用 create() 方法,首先按下方所示导入该方法−
To work with create() method first import the method as shown below −
from rx import create
以下是创建一个 observable 的工作示例−
Here is a working example, to create an observable −
testrx.py
testrx.py
from rx import create
deftest_observable(observer, scheduler):
observer.on_next("Hello")
observer.on_error("Error")
observer.on_completed()
source = create(test_observable).
Subscribe and Execute an Observable
要订阅一个 observable,我们需要使用 subscribe() 函数,并传递回调函数 on_next、on_error 和 on_completed。
To subscribe to an observable, we need to use subscribe() function and pass the callback function on_next, on_error and on_completed.
以下是工作示例−
Here is a working example −
testrx.py
testrx.py
from rx import create
deftest_observable(observer, scheduler):
observer.on_next("Hello")
observer.on_completed()
source = create(test_observable)
source.subscribe(
on_next = lambda i: print("Got - {0}".format(i)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!"),
)
subscribe() 方法负责执行 observable。回调函数 on_next 、 on_error 和 on_completed 必须传递给 subscribe 方法。随后对 subscribe 方法的调用将执行 test_observable() 函数。
The subscribe() method takes care of executing the observable. The callback function on_next, on_error and on_completed has to be passed to the subscribe method. Call to subscribe method, in turn, executes the test_observable() function.
不必将所有三个回调函数都传递给 subscribe() 方法。您可以根据自己的需要传递 on_next()、on_error() 和 on_completed()。
It is not mandatory to pass all three callback functions to the subscribe() method. You can pass as per your requirements the on_next(), on_error() and on_completed().
lambda 函数用于 on_next、on_error 和 on_completed。它将采用参数并执行给定的表达式。
The lambda function is used for on_next, on_error and on_completed. It will take in the arguments and execute the expression given.
以下是创建的 observable 的输出−
Here is the output, of the observable created −
E:\pyrx>python testrx.py
Got - Hello
Job Done!
RxPY - Operators
本章详细介绍了 RxPY 中的运算符。这些运算符包括 −
This chapter explains about the operators in RxPY in detail. These operators include −
-
Working with Operators
-
Mathematical operators
-
Transformation operators
-
Filtering operators
-
Error handling operators
-
Utility operators
-
Conditional operators
-
Creation operators
-
Connectable operators
-
Combining operators
Reactive (Rx) python 几乎有大量的运算符,使用 python 编程可以轻松处理。你可以一起使用这些多个运算符,例如,在处理字符串时,你可以使用 map、filter、merge 运算符。
Reactive (Rx) python has almost lots of operators, that make life easy with python coding. You can use these multiple operators together, for example, while working with strings you can use map, filter, merge operators.
Working with Operators
你可以使用 pipe() 方法一起使用多个运算符。此方法允许将多个运算符链接在一起。
You can work with multiple operators together using pipe() method. This method allows chaining multiple operators together.
以下是用运算符的工作示例 −
Here, is a working example of using operators −
test = of(1,2,3) // an observable
subscriber = test.pipe(
op1(),
op2(),
op3()
)
在上述示例中,我们使用 of() 方法创建了一个可观察对象,其中包含值 1、2 和 3。现在,在该可观察对象上,你可以使用任意数量的运算符,使用上述 pipe() 方法执行不同的操作。将对给定可观察对象按顺序执行运算符。
In the above example, we have created an observable using of() method that takes in values 1, 2 and 3. Now, on this observable, you can perform a different operation, using any numbers of operators using pipe() method as shown above. The execution of operators will go on sequentially on the observable given.
要使用运算符,首先按如下所示进行导入 −
To work with operators, first import it as shown below −
from rx import of, operators as op
以下是用例 −
Here, is a working example −
testrx.py
testrx.py
from rx import of, operators as op
test = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
sub1 = test.pipe(
op.filter(lambda s: s%2==0),
op.reduce(lambda acc, x: acc + x)
)
sub1.subscribe(lambda x: print("Sum of Even numbers is {0}".format(x)))
在上述示例中,有一个数字列表,我们使用一个 filter 运算符从中过滤偶数,然后使用一个 reduce 运算符添加它。
In the above example, there is a list of numbers, from which we are filtering even numbers using a filter operator and later adding it using a reduce operator.
Output
E:\pyrx>python testrx.py
Sum of Even numbers is 30
以下是我们将要讨论的运算符列表 −
Here is a list of Operators, that we are going to discuss −
-
Creating Observables
-
Mathematical operators
-
Transformation operators
-
Filtering operators
-
Error handling operators
-
Utility operators
-
Conditional
-
Connectable
-
Combining operators
Creating Observables
以下是我们在“创建”类别中将讨论的可观察对象
Following are the observables, we are going to discuss in Creation category
Observable |
Description |
create |
This method is used to create an observable. |
empty |
This observable will not output anything and directly emit the complete state. |
never |
This method creates an observable that will never reach the complete state. |
throw |
This method will create an observable that will throw an error. |
from_ |
This method will convert the given array or object into an observable. |
interval |
This method will give a series of values produced after a timeout. |
just |
This method will convert given value into an observable. |
range |
This method will give a range of integers based on the input given. |
repeat_value |
This method will create an observable that will repeat the given value as per the count is given. |
start |
This method takes in a function as an input and returns an observable that will return value from the input function. |
timer |
This method will emit the values in sequence after the timeout is done. |
Mathematical operators
我们将在数学运算符类别中讨论的运算符如下:−
The operators we are going to discuss in Mathematical operator category are as follows: −
Operator |
Description |
average |
This operator will calculate the average from the source observable given and output an observable that will have the average value. |
concat |
This operator will take in two or more observables and given a single observable with all the values in the sequence. |
count |
This operator takes in an Observable with values and converts it into an Observable that will have a single value. The count function takes in predicate function as an optional argument. The function is of type boolean and will add value to the output only if it satisfies the condition. |
max |
This operator will give an observable with max value from the source observable. |
min |
This operator will give an observable with min value from the source observable. |
reduce |
This operator takes in a function called accumulator function that is used on the values coming from the source observable, and it returns the accumulated values in the form of an observable, with an optional seed value passed to the accumulator function. |
sum |
This operator will return an observable with the sum of all the values from source observables. |
Transformation operators
我们在变换运算符类别中要讨论的运算符如下所示 −
The operators we are going to discuss in the Transformation operator category are mentioned below −
Operator |
Category |
buffer |
This operator will collect all the values from the source observable, and emit them at regular intervals once the given boundary condition is satisfied. |
ground_by |
This operator will group the values coming from the source observable based on the key_mapper function given. |
map |
This operator will change each value from the source observable into a new value based on the output of the mapper_func given. |
scan |
This operator will apply an accumulator function to the values coming from the source observable and return an observable with new values. |
Filtering operators
我们将讨论的过滤操作符类别中的操作符如下:
The operators we are going to discuss in Filtering operator category are given below −
Operator |
Category |
debounce |
This operator will give the values from the source observable, until the timespan given and ignore the rest of the time passes. |
distinct |
This operator will give all the values that are distinct from the source observable. |
element_at |
This operator will give an element from the source observable for the index given. |
filter |
This operator will filter values from the source observable based on the predicate function given. |
first |
This operator will give the first element from the source observable. |
ignore_elements |
This operator will ignore all the values from the source observable and only execute calls to complete or error callback functions. |
last |
This operator will give the last element from the source observable. |
skip |
This operator will give back an observable that will skip the first occurrence of count items taken as input. |
skip_last |
This operator will give back an observable that will skip the last occurrence of count items taken as input. |
take |
This operator will give a list of source values in continuous order based on the count given. |
take_last |
This operator will give a list of source values in continuous order from last based on the count given. |
Error handling operators
我们将讨论的错误处理操作符类别中的操作符包括:-
The operators we are going to discuss in the Error handling operator category are: -
Operator |
Description |
catch |
This operator will terminate the source observable when there is an exception. |
retry |
This operator will retry on the source observable when there is an error and once the retry count is done it will terminate. |
Utility operators
以下是我们将讨论的实用程序操作符类别中的操作符。
The following are the operators we are going to discuss in the Utility operator category.
Operator |
Description |
delay |
This operator will delay the source observable emission as per the time or date is given. |
materialize |
This operator will convert the values from the source observable with the values emitted in the form of explicit notification values. |
time_interval |
This operator will give the time elapsed between the values from the source observable. |
timeout |
This operator will give all the values from the source observable after the elapsed time or else will trigger an error. |
timestamp |
This operator will attach a timestamp to all the values from the source observable. |
Conditional and Boolean operators
我们将在条件和布尔操作符类别中讨论的操作符如下所列:
The operators we are going to discuss in Conditional and Boolean Operator category are as given below −
Operator |
Description |
all |
This operator will check if all the values from the source observable satisfy the condition given. |
contains |
This operator will return an observable with the value true or false if the given value is present and if it is the value of the source observable. |
default_if_empty |
This operator will return a default value if the source observable is empty. |
sequence_equal |
This operator will compare two sequences of observables or an array of values and return an observable with the value true or false. |
skip_until |
This operator will discard values from the source observable until the second observable emits a value. |
skip_while |
This operator will return an observable with values from the source observable that satisfies the condition passed. |
take_until |
This operator will discard values from the source observable after the second observable emits a value or is terminated. |
take_while |
This operator will discard values from the source observable when the condition fails. |
Connectable Operators
我们将在可连接运算符类别中讨论的操作符是:
The operators we are going to discuss in Connectable Operator category are −
Operator |
Description |
publish |
This method will convert the observable into a connectable observable. |
ref_count |
This operator will make the observable a normal observable. |
replay |
This method works similar to the replaySubject. This method will return the same values, even if the observable has already emitted and some of the subscribers are late in subscribing. |
Combining Operators
以下是在组合操作程序类别中我们要讨论的操作程序。
The following are the operators we are going to discuss in the Combining operator category.
Operator |
Description |
combine_latest |
This operator will create a tuple for the observable given as input. |
merge |
This operator will merge given observables. |
start_with |
This operator will take in the given values and add at the start of the source observable return back the full sequence. |
zip |
This operator returns an observable with values in a tuple form which is formed by taking the first value of the given observable and so on. |
RxPY - Working With Subject
主题是一个可观察序列,同样也是一个观察者,它可以进行多播,即与多个已订阅的观察者对话。
A subject is an observable sequence, as well as, an observer that can multicast, i.e. talk to many observers that have subscribed.
我们将对主题讨论以下主题−
We are going to discuss the following topics on subject −
-
Create a subject
-
Subscribe to a subject
-
Passing data to subject
-
BehaviorSubject
-
ReplaySubject
-
AsyncSubject
Create a subject
要使用一个主题,我们需要按下方所示导入主题−
To work with a subject, we need to import Subject as shown below −
from rx.subject import Subject
您可以按如下方式创建一个主题对象−
You can create a subject-object as follows −
subject_test = Subject()
对象是一个具有三个方法的观察者−
The object is an observer that has three methods −
-
on_next(value)
-
on_error(error) and
-
on_completed()
Subscribe to a Subject
您可以在主题上创建多个订阅,如下所示−
You can create multiple subscription on the subject as shown below −
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
Passing Data to Subject
您可以使用 on_next(value) 方法将数据传递到所创建的主题,如下所示−
You can pass data to the subject created using the on_next(value) method as shown below −
subject_test.on_next("A")
subject_test.on_next("B")
数据将传递到所有订阅,然后添加到主题中。
The data will be passed to all the subscription, added on the subject.
这里有一个主题的工作示例。
Here, is a working example of the subject.
Example
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")
subject_test 对象是通过调用 Subject() 来创建的。subject_test 对象引用 on_next(value)、on_error(error) 和 on_completed() 方法。上述示例的输出如下所示 −
The subject_test object is created by calling a Subject(). The subject_test object has reference to on_next(value), on_error(error) and on_completed() methods. The output of the above example is shown below −
Output
E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B
我们可以使用 on_completed() 方法来停止主题执行,如下所示。
We can use the on_completed() method, to stop the subject execution as shown below.
Example
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")
一旦我们调用 complete,稍后调用的下一个方法就不再被调用。
Once we call complete, the next method called later is not invoked.
Output
E:\pyrx>python testrx.py
The value is A
The value is A
现在让我们看看如何调用 on_error(error) 方法。
Let us now see, how to call on_error(error) method.
BehaviorSubject
BehaviorSubject 在调用时将给你最新值。你可以创建行为主题,如下所示 −
BehaviorSubject will give you the latest value when called. You can create behavior subject as shown below −
from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject
这是一个使用行为主题的工作示例
Here, is a working example to use Behaviour Subject
Example
from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")
Replay Subject
replaysubject 与行为主题类似,它可以缓冲值并将其重新播放给新订阅者。这是一个回复主题的工作示例。
A replaysubject is similar to behavior subject, wherein, it can buffer the values and replay the same to the new subscribers. Here, is a working example of replay subject.
Example
from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)
在回放主题上使用的缓冲区值为 2。因此,最后两个值将被缓冲并用于所调用的新订阅者。
The buffer value used is 2 on the replay subject. So, the last two values will be buffered and used for the new subscribers called.
AsyncSubject
对于 AsyncSubject,传递给订阅者的最后一个值称为,并且仅在调用 complete() 方法之后才会执行。
In the case of AsyncSubject, the last value called is passed to the subscriber, and it will be done only after the complete() method is called.
Example
from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.
RxPY - Concurrency Using Scheduler
RxPy 的一个重要特性是并发性,即允许任务并行执行。为了实现这一点,我们有两个运算符 subscribe_on() 和 observe_on(),它们将与调度程序一起工作,调度程序将决定已订阅任务的执行。
One important feature of RxPy is concurrency, i.e. to allow the task to execute in parallel. To make that happen, we have two operators subscribe_on() and observe_on() that will work with a scheduler, that will decide the execution of the subscribed task.
这是一个工作示例,显示了 subscibe_on()、observe_on() 和调度程序的需要。
Here, is a working example, that shows the need for subscibe_on(), observe_on() and scheduler.
Example
import random
import time
import rx
from rx import operators as ops
def adding_delay(value):
time.sleep(random.randint(5, 20) * 0.1)
return value
# Task 1
rx.of(1,2,3,4,5).pipe(
ops.map(lambda a: adding_delay(a))
).subscribe(
lambda s: print("From Task 1: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a))
).subscribe(
lambda s: print("From Task 2: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 2 complete")
)
input("Press any key to exit\n")
在上面的示例中,我有 2 个任务:任务 1 和任务 2。任务的执行是按顺序的。只有在第一个任务完成时,第二个任务才会开始。
In the above example, I have 2 tasks: Task 1 and Task 2. The execution of the task is in sequence. The second task starts only, when the first task is done.
Output
E:\pyrx>python testrx.py
From Task 1: 1
From Task 1: 2
From Task 1: 3
From Task 1: 4
From Task 1: 5
Task 1 complete
From Task 2: 1
From Task 2: 2
From Task 2: 3
From Task 2: 4
Task 2 complete
RxPy 支持许多调度程序,在这里,我们将使用 ThreadPoolScheduler。ThreadPoolScheduler 主要将尝试管理可用 CPU 线程。
RxPy supports many Scheduler, and here, we are going to make use of ThreadPoolScheduler. ThreadPoolScheduler mainly will try to manage with the CPU threads available.
在示例中,我们之前已经看到,我们将使用一个多处理模块来提供 cpu_count。计数将提供给 ThreadPoolScheduler,ThreadPoolScheduler 会根据可用的线程设法使任务并行工作。
In the example, we have seen earlier, we are going to make use of a multiprocessing module that will give us the cpu_count. The count will be given to the ThreadPoolScheduler that will manage to get the task working in parallel based on the threads available.
以下是用例 −
Here, is a working example −
import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
time.sleep(random.randint(5, 20) * 0.1)
return value
# Task 1
rx.of(1,2,3,4,5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 1: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 2: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 2 complete")
)
input("Press any key to exit\n")
在上面的示例中,我有 2 个任务,cpu_count 为 4。由于任务为 2,并且我们可用的线程为 4,因此这两个任务可以并行开始。
In the above example, I have 2 tasks and the cpu_count is 4. Since, the task is 2 and threads available with us are 4, both the task can start in parallel.
Output
E:\pyrx>python testrx.py
Cpu count is : 4
Press any key to exit
From Task 1: 1
From Task 2: 1
From Task 1: 2
From Task 2: 2
From Task 2: 3
From Task 1: 3
From Task 2: 4
Task 2 complete
From Task 1: 4
From Task 1: 5
Task 1 complete
如果你查看输出,这两个任务都会并行启动。
If you see the output, both the task has started in parallel.
现在,考虑一种情况,任务多于 CPU 计数,即 CPU 计数为 4,任务为 5。在这种情况下,我们需要检查在任务完成后是否有任何线程已释放,以便将其分配给队列中可用的新任务。
Now, consider a scenario, where the task is more than the CPU count i.e. CPU count is 4 and tasks are 5. In this case, we would need to check if any thread has got free after task completion, so that, it can be assigned to the new task available in the queue.
为此,我们可以使用 observe_on() 运算符,它将观察调度程序是否有任何线程是空闲的。这是一个使用 observe_on() 的工作示例
For this purpose, we can use the observe_on() operator which will observe the scheduler if any threads are free. Here, is a working example using observe_on()
Example
import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
time.sleep(random.randint(5, 20) * 0.1)
return value
# Task 1
rx.of(1,2,3,4,5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 1: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 2: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 2 complete")
)
#Task 3
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 3: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 3 complete")
)
#Task 4
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.subscribe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 4: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 4 complete")
)
#Task 5
rx.range(1, 5).pipe(
ops.map(lambda a: adding_delay(a)),
ops.observe_on(thread_pool_scheduler)
).subscribe(
lambda s: print("From Task 5: {0}".format(s)),
lambda e: print(e),
lambda: print("Task 5 complete")
)
input("Press any key to exit\n")
Output
E:\pyrx>python testrx.py
Cpu count is : 4
From Task 4: 1
From Task 4: 2
From Task 1: 1
From Task 2: 1
From Task 3: 1
From Task 1: 2
From Task 3: 2
From Task 4: 3
From Task 3: 3
From Task 2: 2
From Task 1: 3
From Task 4: 4
Task 4 complete
From Task 5: 1
From Task 5: 2
From Task 5: 3
From Task 3: 4
Task 3 complete
From Task 2: 3
Press any key to exit
From Task 5: 4
Task 5 complete
From Task 1: 4
From Task 2: 4
Task 2 complete
From Task 1: 5
Task 1 complete
如果你查看输出,任务 4 完成后,线程将提供给下一个任务,即任务 5,并且任务 5 开始执行。
If you see the output, the moment task 4 is complete, the thread is given to the next task i.e., task 5 and the same starts executing.
RxPY - Examples
在本章中,我们将详细讨论以下主题 −
In this chapter, we will discuss the following topics in detail −
-
Basic Example showing the working of observable, operators, and subscribing to the observer.
-
Difference between observable and subject.
-
Understanding cold and hot observables.
下面给出了一个基础示例,展示了可观察、操作符的工作原理,以及订阅观察者。
Given below is a basic example showing the working of observable, operators, and subscribing to the observer.
Example
test.py
test.py
import requests
import rx
import json
from rx import operators as ops
def filternames(x):
if (x["name"].startswith("C")):
return x["name"]
else :
return ""
content = requests.get('https://jsonplaceholder.typicode.com/users')
y = json.loads(content.text)
source = rx.from_(y)
case1 = source.pipe(
ops.filter(lambda c: filternames(c)),
ops.map(lambda a:a["name"])
)
case1.subscribe(
on_next = lambda i: print("Got - {0}".format(i)), 8. RxPy — Examples
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!"),
)
这是一个非常简单的示例,其中,我从这个 URL 获取用户数据
Here, is a very simple example, wherein, I am getting user data from this URL −
过滤数据,按“C”开头的名字进行过滤,然后使用 map 只返回名字。以下是相同数据的输出 −
Filtering the data, to give the names starting with "C", and later using the map to return the names only. Here is the output for the same −
E:\pyrx\examples>python test.py
Got - Clementine Bauch
Got - Chelsey Dietrich
Got - Clementina DuBuque
Job Done!
Difference between observable and subject
在这个示例中,我们将查看可观察与主题之间的差异。
In this example, we will see the difference between an observable and a subject.
from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))
Output
E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821
在以上示例中,每次订阅可观察,它都会给你新的值。
In the above example, every time you subscribe to the observable, it will give you new values.
Subject Example
from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)
Output
E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065
如果你看到这些值是在两个使用主题的订阅者之间共享的。
If you see the values are shared, between both subscribers using the subject.
Understanding Cold and Hot Observables
可观察被分类为
An observable is classified as
-
Cold Observables
-
Hot Observables
当多个订阅者都订阅的时候就会发现可观测的差异。
The difference in observables will be noticed when multiple subscribers are subscribing.
Cold Observables
冷可观测是指可执行且在每次订阅时都会呈现数据的可观测。当它被订阅时,可观测会被执行,并给出新值。
Cold observables, are observable that are executed, and renders data each time it is subscribed. When it is subscribed, the observable is executed and the fresh values are given.
以下示例给出了对冷可观察的理解。
The following example gives the understanding of cold observable.
from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))
Output
E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821
在以上示例中,每次订阅可观察,它都会执行可观察并发出值。这些值也可以像在以上示例中展示的那样在不同的订阅者之间不同。
In the above example, every time you subscribe to the observable, it will execute the observable and emit values. The values can also differ from subscriber to subscriber as shown in the example above.
Hot Observables
在热可观察的情况下,当它们准备好时会发出值,而且不会总是等待订阅。当值被发出时,所有的订阅者都将得到相同的值。
In the case of hot observable, they will emit the values when they are ready and will not always wait for a subscription. When the values are emitted, all the subscribers will get the same value.
当你希望在可观察准备好时发出值,或者希望向所有订阅者共享相同的值时,你可以利用热可观察。
You can make use of hot observable when you want values to emitted when the observable is ready, or you want to share the same values to all your subscribers.
热可观察的示例包括主题和可连接操作符。
An example of hot observable is Subject and connectable operators.
from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)
Output
E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065
如果你看到,相同的值在订阅者之间被共享。你可以使用 publish() 可连接可观察操作符实现相同的目的。
If you see, the same value is shared between the subscribers. You can achieve the same using publish () connectable observable operator.