Rxpy 简明教程

RxPY - Working With Observables

一个可观察对象是一个创建观察员并将其附加到预期值源(例如点击、DOM 元素上的鼠标事件等)的函数。

下面提及的主题将在本章中详细研究。

  1. Create Observables

  2. 订阅和执行可观察对象

Create observables

要创建一个可观察对象,我们将使用 {}s0 方法并向其传递一个包含以下项的函数。

  1. {}s1——当 Observable 发送一个项目时,将调用此函数。

  2. on_completed() − 此函数在 observable 完成时被调用。

  3. on_error() − 此函数在 observable 上出现错误时被调用。

要使用 create() 方法,首先按下方所示导入该方法−

from rx import create

以下是创建一个 observable 的工作示例−

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_error("Error")
   observer.on_completed()
source = create(test_observable).

Subscribe and Execute an Observable

要订阅一个 observable,我们需要使用 subscribe() 函数,并传递回调函数 on_next、on_error 和 on_completed。

以下是工作示例−

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

subscribe() 方法负责执行 observable。回调函数 on_nexton_erroron_completed 必须传递给 subscribe 方法。随后对 subscribe 方法的调用将执行 test_observable() 函数。

不必将所有三个回调函数都传递给 subscribe() 方法。您可以根据自己的需要传递 on_next()、on_error() 和 on_completed()。

lambda 函数用于 on_next、on_error 和 on_completed。它将采用参数并执行给定的表达式。

以下是创建的 observable 的输出−

E:\pyrx>python testrx.py
Got - Hello
Job Done!