Rxjava 简明教程
RxJava - Windowing
窗口操作符的工作方式类似于缓冲操作符,但是它允许将 Observable 发出的项目收集到另一个 Observable,而不是收集到集合中,并发出这些 Observable,而不是 collection。在下面的示例中,我们创建了一个 Observable 来发出 9 个项目,并使用窗口操作符,一起发出 3 个 Observable。
Windowing operator works similar to buffer operator but it allows to gather items emitted by an Observable into another observable instead of collection and emit those Observables instead of collections. In the example below, we’ve created an Observable to emit 9 items and using window operator, 3 Observable will be emitted together.
Windowing Example
在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。
Create the following Java program using any editor of your choice in, say, C:\> RxJava.
ObservableTester.java
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable<Integer> observable = Observable.just(1, 2, 3, 4,
5, 6, 7, 8, 9);
observable.subscribeOn(Schedulers.io())
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.window(3)
.subscribe(new Observer<Observable<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("Subscribed");
}
@Override
public void onNext(Observable<Integer> integers) {
System.out.println("onNext: ");
integers.subscribe(value -> System.out.println(value));
}
@Override
public void onError(Throwable e) {
System.out.println("Error");
}
@Override
public void onComplete() {
System.out.println("Done! ");
}
});
Thread.sleep(3000);
}
}
Verify the Result
按照如下方式使用 javac 编译器编译类 −
Compile the class using javac compiler as follows −
C:\RxJava>javac ObservableTester.java
现在按照如下所示运行 ObservableTester:
Now run the ObservableTester as follows −
C:\RxJava>java ObservableTester
它应该产生以下输出 −
It should produce the following output −
Subscribed
onNext:
1
2
3
onNext:
4
5
6
onNext:
7
8
9
Done!