Rxjava 简明教程

RxJava - Windowing

窗口操作符的工作方式类似于缓冲操作符,但是它允许将 Observable 发出的项目收集到另一个 Observable,而不是收集到集合中,并发出这些 Observable,而不是 collection。在下面的示例中,我们创建了一个 Observable 来发出 9 个项目,并使用窗口操作符,一起发出 3 个 Observable。

Windowing operator works similar to buffer operator but it allows to gather items emitted by an Observable into another observable instead of collection and emit those Observables instead of collections. In the example below, we’ve created an Observable to emit 9 items and using window operator, 3 Observable will be emitted together.

Windowing Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

Create the following Java program using any editor of your choice in, say, C:\> RxJava.

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      Observable<Integer> observable = Observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeOn(Schedulers.io())
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .window(3)
         .subscribe(new Observer<Observable<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
               System.out.println("Subscribed");
            }
            @Override
            public void onNext(Observable<Integer> integers) {
               System.out.println("onNext: ");
               integers.subscribe(value -> System.out.println(value));
            }
            @Override
            public void onError(Throwable e) {
               System.out.println("Error");
            }

            @Override
            public void onComplete() {
               System.out.println("Done! ");
            }
         });
      Thread.sleep(3000);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

Compile the class using javac compiler as follows −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

Now run the ObservableTester as follows −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

It should produce the following output −

Subscribed
onNext:
1
2
3
onNext:
4
5
6
onNext:
7
8
9
Done!