Rxpy 简明教程
RxPY - Working with Subject
主题是一个可观察序列,同样也是一个观察者,它可以进行多播,即与多个已订阅的观察者对话。
我们将对主题讨论以下主题−
-
Create a subject
-
Subscribe to a subject
-
Passing data to subject
-
BehaviorSubject
-
ReplaySubject
-
AsyncSubject
Create a subject
要使用一个主题,我们需要按下方所示导入主题−
from rx.subject import Subject
您可以按如下方式创建一个主题对象−
subject_test = Subject()
对象是一个具有三个方法的观察者−
-
on_next(value)
-
on_error(error) and
-
on_completed()
Subscribe to a Subject
您可以在主题上创建多个订阅,如下所示−
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
Passing Data to Subject
您可以使用 on_next(value) 方法将数据传递到所创建的主题,如下所示−
subject_test.on_next("A")
subject_test.on_next("B")
数据将传递到所有订阅,然后添加到主题中。
这里有一个主题的工作示例。
Example
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")
subject_test 对象是通过调用 Subject() 来创建的。subject_test 对象引用 on_next(value)、on_error(error) 和 on_completed() 方法。上述示例的输出如下所示 −
Output
E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B
我们可以使用 on_completed() 方法来停止主题执行,如下所示。
Example
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")
一旦我们调用 complete,稍后调用的下一个方法就不再被调用。
BehaviorSubject
BehaviorSubject 在调用时将给你最新值。你可以创建行为主题,如下所示 −
from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject
这是一个使用行为主题的工作示例
Example
from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")
Replay Subject
replaysubject 与行为主题类似,它可以缓冲值并将其重新播放给新订阅者。这是一个回复主题的工作示例。
Example
from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)
在回放主题上使用的缓冲区值为 2。因此,最后两个值将被缓冲并用于所调用的新订阅者。
AsyncSubject
对于 AsyncSubject,传递给订阅者的最后一个值称为,并且仅在调用 complete() 方法之后才会执行。
Example
from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.