Rxpy 简明教程

RxPY - Working with Subject

主题是一个可观察序列,同样也是一个观察者,它可以进行多播,即与多个已订阅的观察者对话。

A subject is an observable sequence, as well as, an observer that can multicast, i.e. talk to many observers that have subscribed.

我们将对主题讨论以下主题−

We are going to discuss the following topics on subject −

  1. Create a subject

  2. Subscribe to a subject

  3. Passing data to subject

  4. BehaviorSubject

  5. ReplaySubject

  6. AsyncSubject

Create a subject

要使用一个主题,我们需要按下方所示导入主题−

To work with a subject, we need to import Subject as shown below −

from rx.subject import Subject

您可以按如下方式创建一个主题对象−

You can create a subject-object as follows −

subject_test = Subject()

对象是一个具有三个方法的观察者−

The object is an observer that has three methods −

  1. on_next(value)

  2. on_error(error) and

  3. on_completed()

Subscribe to a Subject

您可以在主题上创建多个订阅,如下所示−

You can create multiple subscription on the subject as shown below −

subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)

Passing Data to Subject

您可以使用 on_next(value) 方法将数据传递到所创建的主题,如下所示−

You can pass data to the subject created using the on_next(value) method as shown below −

subject_test.on_next("A")
subject_test.on_next("B")

数据将传递到所有订阅,然后添加到主题中。

The data will be passed to all the subscription, added on the subject.

这里有一个主题的工作示例。

Here, is a working example of the subject.

Example

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")

subject_test 对象是通过调用 Subject() 来创建的。subject_test 对象引用 on_next(value)、on_error(error) 和 on_completed() 方法。上述示例的输出如下所示 −

The subject_test object is created by calling a Subject(). The subject_test object has reference to on_next(value), on_error(error) and on_completed() methods. The output of the above example is shown below −

Output

E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B

我们可以使用 on_completed() 方法来停止主题执行,如下所示。

We can use the on_completed() method, to stop the subject execution as shown below.

Example

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")

一旦我们调用 complete,稍后调用的下一个方法就不再被调用。

Once we call complete, the next method called later is not invoked.

Output

E:\pyrx>python testrx.py
The value is A
The value is A

现在让我们看看如何调用 on_error(error) 方法。

Let us now see, how to call on_error(error) method.

Example

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))

Output

E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!

BehaviorSubject

BehaviorSubject 在调用时将给你最新值。你可以创建行为主题,如下所示 −

BehaviorSubject will give you the latest value when called. You can create behavior subject as shown below −

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject

这是一个使用行为主题的工作示例

Here, is a working example to use Behaviour Subject

Example

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
   lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
   lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")

Output

E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject

Replay Subject

replaysubject 与行为主题类似,它可以缓冲值并将其重新播放给新订阅者。这是一个回复主题的工作示例。

A replaysubject is similar to behavior subject, wherein, it can buffer the values and replay the same to the new subscribers. Here, is a working example of replay subject.

Example

from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)

在回放主题上使用的缓冲区值为 2。因此,最后两个值将被缓冲并用于所调用的新订阅者。

The buffer value used is 2 on the replay subject. So, the last two values will be buffered and used for the new subscribers called.

Output

E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5

AsyncSubject

对于 AsyncSubject,传递给订阅者的最后一个值称为,并且仅在调用 complete() 方法之后才会执行。

In the case of AsyncSubject, the last value called is passed to the subscriber, and it will be done only after the complete() method is called.

Example

from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.

Output

E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2