Rxpy 简明教程
RxPY - Working with Subject
主题是一个可观察序列,同样也是一个观察者,它可以进行多播,即与多个已订阅的观察者对话。
A subject is an observable sequence, as well as, an observer that can multicast, i.e. talk to many observers that have subscribed.
我们将对主题讨论以下主题−
We are going to discuss the following topics on subject −
-
Create a subject
-
Subscribe to a subject
-
Passing data to subject
-
BehaviorSubject
-
ReplaySubject
-
AsyncSubject
Create a subject
要使用一个主题,我们需要按下方所示导入主题−
To work with a subject, we need to import Subject as shown below −
from rx.subject import Subject
您可以按如下方式创建一个主题对象−
You can create a subject-object as follows −
subject_test = Subject()
对象是一个具有三个方法的观察者−
The object is an observer that has three methods −
-
on_next(value)
-
on_error(error) and
-
on_completed()
Subscribe to a Subject
您可以在主题上创建多个订阅,如下所示−
You can create multiple subscription on the subject as shown below −
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
Passing Data to Subject
您可以使用 on_next(value) 方法将数据传递到所创建的主题,如下所示−
You can pass data to the subject created using the on_next(value) method as shown below −
subject_test.on_next("A")
subject_test.on_next("B")
数据将传递到所有订阅,然后添加到主题中。
The data will be passed to all the subscription, added on the subject.
这里有一个主题的工作示例。
Here, is a working example of the subject.
Example
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")
subject_test 对象是通过调用 Subject() 来创建的。subject_test 对象引用 on_next(value)、on_error(error) 和 on_completed() 方法。上述示例的输出如下所示 −
The subject_test object is created by calling a Subject(). The subject_test object has reference to on_next(value), on_error(error) and on_completed() methods. The output of the above example is shown below −
Output
E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B
我们可以使用 on_completed() 方法来停止主题执行,如下所示。
We can use the on_completed() method, to stop the subject execution as shown below.
Example
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")
一旦我们调用 complete,稍后调用的下一个方法就不再被调用。
Once we call complete, the next method called later is not invoked.
Output
E:\pyrx>python testrx.py
The value is A
The value is A
现在让我们看看如何调用 on_error(error) 方法。
Let us now see, how to call on_error(error) method.
BehaviorSubject
BehaviorSubject 在调用时将给你最新值。你可以创建行为主题,如下所示 −
BehaviorSubject will give you the latest value when called. You can create behavior subject as shown below −
from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject
这是一个使用行为主题的工作示例
Here, is a working example to use Behaviour Subject
Example
from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")
Replay Subject
replaysubject 与行为主题类似,它可以缓冲值并将其重新播放给新订阅者。这是一个回复主题的工作示例。
A replaysubject is similar to behavior subject, wherein, it can buffer the values and replay the same to the new subscribers. Here, is a working example of replay subject.
Example
from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)
在回放主题上使用的缓冲区值为 2。因此,最后两个值将被缓冲并用于所调用的新订阅者。
The buffer value used is 2 on the replay subject. So, the last two values will be buffered and used for the new subscribers called.
AsyncSubject
对于 AsyncSubject,传递给订阅者的最后一个值称为,并且仅在调用 complete() 方法之后才会执行。
In the case of AsyncSubject, the last value called is passed to the subscriber, and it will be done only after the complete() method is called.
Example
from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.