Rxjava 简明教程

RxJava - Quick Guide

RxJava - Overview

RxJava 是 ReactiveX 基于 Java 的扩展。它在 Java 中实现或 ReactiveX 项目。以下是 RxJava 的主要特性。

RxJava is a Java based extension of ReactiveX. It provides implementation or ReactiveX project in Java. Following are the key characteristics of RxJava.

  1. Extends the observer pattern.

  2. Support sequences of data/events.

  3. Provides operators to compose sequences together declaratively.

  4. Handles threading, synchronization, thread-safety and concurrent data structures internally.

What is ReactiveX?

ReactiveX 是一个项目,其目标是将响应式编程概念应用于不同的编程语言。响应式编程是指在数据出现时程序随之做出反应的方案。这是一种基于事件的编程概念,事件可以传播到注册观察者。

ReactiveX is a project which aims to provide reactive programming concept to various programming languages. Reactive Programming refers to the scenario where program reacts as and when data appears. It is a event based programming concept and events can propagate to registers observers.

根据 Reactive ,它们将观测者模式、迭代器模式和函数式模式的优点结合在一起。

As per the Reactive, they have combined the best of Observer pattern, Iterator pattern and functional pattern.

做正确的事情的观测者模式。ReactiveX 是观测者模式、迭代器模式和函数式编程的最佳思想的结合。

The Observer pattern done right. ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming.

Functional Programming

函数式编程围绕使用纯函数构建软件。纯函数不依赖于以前的状态,并且对于传递的相同参数始终返回相同的结果。纯函数有助于避免在多线程环境中常见与共享对象、可变数据和副作用相关的各种问题。

Functional programming revolves around building the software using pure functions. A pure function do not depends upon previous state and always returns the same result for the same parameters passed. Pure functions helps avoiding problems associated with shared objects, mutable data and side effects often prevalent in multi-threading environments.

Reactive Programming

响应式编程是指在数据流以异步方式出现时,将数据流进行处理的事件驱动编程。

Reactive programming refers to event driven programming where data streams comes in asynchronous fashion and get processed when they are arrived.

Functional Reactive Programming

RxJava 将这两个概念结合在一起,其中流的数据随着时间的推移而改变,消费者函数做出相应的反应。

RxJava implements both the concepts together, where data of streams changes over time and consumer function reacts accordingly.

The Reactive Manifesto

Reactive Manifesto 是一份在线文档,规定了应用软件系统的崇高标准。根据宣言,下列是有反应的软件的关键属性:

Reactive Manifesto is an on-line document stating the high standard of application software systems. As per the manifesto, following are the key attributes of a reactive software −

  1. Responsive − Should always respond in a timely fashion.

  2. Message Driven − Should use asynchronous message-passing between components so that they maintain loose coupling.

  3. Elastic − Should stay responsive even under high load.

  4. Resilient − Should stay responsive even if any component(s) fail.

Key components of RxJava

RxJava 有两个关键组件:可观察序列和观察者。

RxJava have two key components: Observables and Observer.

  1. Observable − It represents an object similar to Stream which can emit zero or more data, can send error message, whose speed can be controlled while emitting a set of data, can send finite as well as infinite data.

  2. Observer − It subscribes to Observable’s data of sequence and reacts per item of the observables. Observers are notified whenever Observable emits a data. An Observer handles data one by one.

如果没有出现项目或没有为之前的项目返回回调,则永远不会通知观察者。

An observer is never notified if items are not present or a callback is not returned for a previous item.

RxJava - Environment Setup

Local Environment Setup

RxJava 是 Java 的一个库,因此最基本的要求是你的机器上已安装 JDK。

RxJava is a library for Java, so the very first requirement is to have JDK installed in your machine.

System Requirement

JDK

1.5 or above.

Memory

No minimum requirement.

Disk Space

No minimum requirement.

Operating System

No minimum requirement.

Step 1 - Verify Java Installation in Your Machine

首先,打开控制台并根据您正在使用的操作系统执行 java 命令。

First of all, open the console and execute a java command based on the operating system you are working on.

OS

Task

Command

Windows

Open Command Console

c:> java -version

Linux

Open Command Terminal

$ java -version

Mac

Open Terminal

machine:< joseph$ java -version

让我们验证所有操作系统的输出 −

Let’s verify the output for all the operating systems −

OS

Output

Windows

java version "1.8.0_101" Java™ SE Runtime Environment (build 1.8.0_101)

Linux

java version "1.8.0_101" Java™ SE Runtime Environment (build 1.8.0_101)

Mac

java version "1.8.0_101" Java™ SE Runtime Environment (build 1.8.0_101)

如果你的系统上没有安装 Java,则从以下链接下载 Java 软件开发工具包 (SDK) https://www.oracle.com 。本教程假设已安装 Java 1.8.0_101。

If you do not have Java installed on your system, then download the Java Software Development Kit (SDK) from the following link https://www.oracle.com. We are assuming Java 1.8.0_101 as the installed version for this tutorial.

Step 2 - Set JAVA Environment

JAVA_HOME 环境变量设置为您计算机中安装 Java 的基本目录位置。例如。

Set the JAVA_HOME environment variable to point to the base directory location where Java is installed on your machine. For example.

OS

Output

Windows

Set the environment variable JAVA_HOME to C:\Program Files\Java\jdk1.8.0_101

Linux

export JAVA_HOME = /usr/local/java-current

Mac

export JAVA_HOME = /Library/Java/Home

在系统路径中追加 Java 编译器位置。

Append Java compiler location to the System Path.

OS

Output

Windows

Append the string C:\Program Files\Java\jdk1.8.0_101\bin at the end of the system variable, Path.

Linux

export PATH = $PATH:$JAVA_HOME/bin/

Mac

not required

如上所述,使用命令 java -version 验证 Java 安装。

Verify Java installation using the command java -version as explained above.

Step 3 - Download RxJava2 Archive

RxJava @ MVNRepository 下载 RxJava jar 文件的最新版本及其关联项 Reactive Streams @ MVNRepository 。在本教程的编写时,我们下载了 rxjava-2.2.4.jar 和 reactive-streams-1.0.2.jar,并将其复制到了 C:\>RxJava 文件夹中。

Download the latest version of RxJava jar file from RxJava @ MVNRepository and its dependency Reactive Streams @ MVNRepository . At the time of writing this tutorial, we have downloaded rxjava-2.2.4.jar, reactive-streams-1.0.2.jar and copied it into C:\>RxJava folder.

OS

Archive name

Windows

rxjava-2.2.4.jar, reactive-streams-1.0.2.jar

Linux

rxjava-2.2.4.jar, reactive-streams-1.0.2.jar

Mac

rxjava-2.2.4.jar, reactive-streams-1.0.2.jar

Step 4 - Set RxJava Environment

设置 RX_JAVA 环境变量以指向存储 RxJava jar 的你的机器上的基本目录位置。我们假设已将 rxjava-2.2.4.jar 和 reactive-streams-1.0.2.jar 存储在 RxJava 文件夹中。

Set the RX_JAVA environment variable to point to the base directory location where RxJava jar is stored on your machine. Let’s assuming we’ve stored rxjava-2.2.4.jar and reactive-streams-1.0.2.jar in the RxJava folder.

Sr.No

OS & Description

1

Windows Set the environment variable RX_JAVA to C:\RxJava

2

Linux export RX_JAVA = /usr/local/RxJava

3

Mac export RX_JAVA = /Library/RxJava

Step 5 - Set CLASSPATH Variable

设置 CLASSPATH 环境变量以指向 RxJava jar 位置。

Set the CLASSPATH environment variable to point to the RxJava jar location.

Sr.No

OS & Description

1

Windows Set the environment variable CLASSPATH to %CLASSPATH%;%RX_JAVA%\rxjava-2.2.4.jar;%RX_JAVA%\reactive-streams-1.0.2.jar;.;

2

Linux export CLASSPATH = $CLASSPATH:$RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar:.

3

Mac export CLASSPATH = $CLASSPATH:$RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar:.

Step 6 - Test RxJava Setup

像下面所示创建 TestRx.java 类 −

Create a class TestRx.java as shown below −

import io.reactivex.Flowable;
public class TestRx {
   public static void main(String[] args) {
      Flowable.just("Hello World!")
         .subscribe(System.out::println);
   }
}

Step 7 - Verify the Result

使用以下 javac 编译器编译类:

Compile the classes using javac compiler as follows −

C:\RxJava>javac Tester.java

验证输出。

Verify the output.

Hello World!

RxJava - How Observable works

Observables 表示数据源,而 Observers (Subscribers) 监听这些数据源。简而言之,Observable 发射项目,Subscriber 然后消耗这些项目。

Observables represents the sources of data where as Observers (Subscribers) listen to them. In nutshell, an Observable emits items and a Subscriber then consumes these items.

Observable

  1. Observable provides data once subscriber starts listening.

  2. Observable can emit any number of items.

  3. Observable can emit only signal of completion as well with no item.

  4. Observable can terminate successfully.

  5. Observable may never terminate. e.g. a button can be clicked any number of times.

  6. Observable may throw error at any point of time.

Subscriber

  1. Observable can have multiple subscribers.

  2. When an Observable emits an item, each subscriber onNext() method gets invoked.

  3. When an Observable finished emitting items, each subscriber onComplete() method gets invoked.

  4. If an Observable emits error, each subscriber onError() method gets invoked.

RxJava - Creating Observables

以下是可以创建可观察序列的基类。

Following are the base classes to create observables.

  1. Flowable − 0..N flows, Emits 0 or n items. Supports Reactive-Streams and back-pressure.

  2. Observable − 0..N flows ,but no back-pressure.

  3. Single − 1 item or error. Can be treated as a reactive version of method call.

  4. Completable − No item emitted. Used as a signal for completion or error. Can be treated as a reactive version of Runnable.

  5. MayBe − Either No item or 1 item emitted. Can be treated as a reactive version of Optional.

以下是 Observable 类中创建可观察序列的便利方法。

Following are the convenient methods to create observables in Observable class.

  1. just(T item) − Returns an Observable that signals the given (constant reference) item and then completes.

  2. fromIterable(Iterable source) − Converts an Iterable sequence into an ObservableSource that emits the items in the sequence.

  3. fromArray(T…​ items) − Converts an Array into an ObservableSource that emits the items in the Array.

  4. fromCallable(Callable supplier) − Returns an Observable that, when an observer subscribes to it, invokes a function you specify and then emits the value returned from that function.

  5. fromFuture(Future future) − Converts a Future into an ObservableSource.

  6. interval(long initialDelay, long period, TimeUnit unit) − Returns an Observable that emits a 0L after the initialDelay and ever increasing numbers after each period of time thereafter.

RxJava - Single Observable

Single 类表示单值响应。Single observable 只能发出单个成功值或一个错误。它不发出 onComplete 事件。

The Single class represents the single value response. Single observable can only emit either a single successful value or an error. It does not emit onComplete event.

Class Declaration

以下为 io.reactivex.Single<T> 类的声明:

Following is the declaration for io.reactivex.Single<T> class −

public abstract class Single<T>
   extends Object
      implements SingleSource<T>

Protocol

以下为 Single Observable 采用的顺序协议:

Following is the sequential protocol that Single Observable operates −

onSubscribe (onSuccess | onError)?

Single Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

Create the following Java program using any editor of your choice in, say, C:\> RxJava.

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {
      //Create the observable
      Single<String> testSingle = Single.just("Hello World");

      //Create an observer
      Disposable disposable = testSingle
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(
         new DisposableSingleObserver<String>() {

         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      });
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

Compile the class using javac compiler as follows −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

Now run the ObservableTester as follows −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

It should produce the following output −

Hello World

RxJava - MayBe Observable

MayBe 类表示延迟响应。MayBe 可观察对象可以发出一个成功的单个值或不发出值。

The MayBe class represents deferred response. MayBe observable can emit either a single successful value or no value.

Class Declaration

以下为 io.reactivex.Single<T> 类的声明:

Following is the declaration for io.reactivex.Single<T> class −

public abstract class Maybe<T>
   extends Object
      implements MaybeSource<T>

Protocol

下面是 MayBe 可观察对象的操作顺序——

Following is the sequential protocol that MayBe Observable operates −

onSubscribe (onSuccess | onError | OnComplete)?

MayBe Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

Create the following Java program using any editor of your choice in, say, C:\> RxJava.

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      //Create an observer
      Disposable disposable = Maybe.just("Hello World")
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      });
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

Compile the class using javac compiler as follows −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

Now run the ObservableTester as follows −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

It should produce the following output −

Hello World

RxJava - Completable Observable

Completable 类表示延迟响应。Completable 可观察序列可以指示完成成功或错误。

The Completable class represents deferred response. Completable observable can either indicate a successful completion or error.

Class Declaration

以下是 io.reactivex.Completable 类的声明 −

Following is the declaration for io.reactivex.Completable class −

public abstract class Completable
extends Object
implements CompletableSource

Protocol

以下是 Completable Observable 运行的顺序协议 −

Following is the sequential protocol that Completable Observable operates −

onSubscribe (onError | onComplete)?

Completable Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

Create the following Java program using any editor of your choice in, say, C:\> RxJava.

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Completable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableCompletableObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {

      //Create an observer
      Disposable disposable = Completable.complete()
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableCompletableObserver() {
         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }
         @Override
         public void onStart() {
            System.out.println("Started!");
         }
         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      });
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

Compile the class using javac compiler as follows −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

Now run the ObservableTester as follows −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

It should produce the following output −

Started!
Done!

RxJava - Using CompositeDisposable

CompositeDisposable 类表示一个容器,可以容纳多个可释放对象,并提供 O(1) 的复杂度来添加和删除可释放对象。

The CompositeDisposable class represents a container which can hold multiple disposable and offers O(1) complexity of adding and removing disposables.

Class Declaration

以下是 io.reactivex.disposables.CompositeDisposable 类的声明 −

Following is the declaration for io.reactivex.disposables.CompositeDisposable class −

public final class CompositeDisposable
extends Object
implements Disposable, io.reactivex.internal.disposables.DisposableContainer

CompositeDisposable Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

Create the following Java program using any editor of your choice in, say, C:\> RxJava.

ObservableTester.java

import io.reactivex.Maybe;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {
      CompositeDisposable compositeDisposable = new CompositeDisposable();

      //Create an Single observer
      Disposable disposableSingle = Single.just("Hello World")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(
      new DisposableSingleObserver<String>() {
         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      });

      //Create an observer
      Disposable disposableMayBe = Maybe.just("Hi")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      });

      Thread.sleep(3000);

      compositeDisposable.add(disposableSingle);
      compositeDisposable.add(disposableMayBe);

      //start observing
      compositeDisposable.dispose();
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

Compile the class using javac compiler as follows −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

Now run the ObservableTester as follows −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

It should produce the following output −

Hello World
Hi

RxJava - Creating Operators

以下是用于创建可观察内容的操作符。

Following are the operators which are used to create an Observable.

Sr.No.

Operator & Description

1

Create Creates an Observable from scratch and allows observer method to call programmatically.

2

Defer Do not create an Observable until an observer subscribes. Creates a fresh observable for each observer.

3

Empty/Never/Throw Creates an Observable with limited behavior.

4

From Converts an object/data structure into an Observable.

5

Interval Creates an Observable emitting integers in sequence with a gap of specified time interval.

6

Just Converts an object/data structure into an Observable to emit the same or same type of objects.

7

Range Creates an Observable emitting integers in sequence of given range.

8

Repeat Creates an Observable emitting integers in sequence repeatedly.

9

Start Creates an Observable to emit the return value of a function.

10

Timer Creates an Observable to emit a single item after given delay.

Creating Operator Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

Create the following Java program using any editor of your choice in, say, C:\> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using fromArray operator to create an Observable
public class ObservableTester  {
   public static void main(String[] args) {
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

Compile the class using javac compiler as follows −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

Now run the ObservableTester as follows −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

It should produce the following output −

ABCDEFG

RxJava - Transforming Operators

以下是用于转换从 Observable 发出的项的操作符。

Following are the operators which are used to transform an item emitted from an Observable.

Sr.No.

Operator & Description

1

Buffer Gathers items from Observable into bundles periodically and then emit the bundles rather than items.

2

FlatMap Used in nested observables. Transforms items into Observables. Then flatten the items into single Observable.

3

GroupBy Divide an Observable into set of Observables organized by key to emit different group of items.

4

Map Apply a function to each emitted item to transform it.

5

Scan Apply a function to each emitted item, sequentially and then emit the successive value.

6

Window Gathers items from Observable into Observable windows periodically and then emit the windows rather than items.

Transforming Operator Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

Create the following Java program using any editor of your choice in, say, C:\> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using map operator to transform an Observable
public class ObservableTester  {
   public static void main(String[] args) {
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

Compile the class using javac compiler as follows −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

Now run the ObservableTester as follows −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

It should produce the following output −

ABCDEFG

RxJava - Filtering Operators

以下是用于有选择地从 Observable 发出项目的操作符。

Following are the operators which are used to selectively emit item(s) from an Observable.

Sr.No.

Operator & Description

1

Debounce Emits items only when timeout occurs without emiting another item.

2

Distinct Emits only unique items.

3

ElementAt emit only item at n index emitted by an Observable.

4

Filter Emits only those items which pass the given predicate function.

5

First Emits the first item or first item which passed the given criteria.

6

IgnoreElements Do not emits any items from Observable but marks completion.

7

Last Emits the last element from Observable.

8

Sample Emits the most recent item with given time interval.

9

Skip Skips the first n items from an Observable.

10

SkipLast Skips the last n items from an Observable.

11

Take takes the first n items from an Observable.

12

TakeLast takes the last n items from an Observable.

Filtering Operator Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

Create the following Java program using any editor of your choice in, say, C:\> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using take operator to filter an Observable
public class ObservableTester  {
   public static void main(String[] args) {
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .take(2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

Compile the class using javac compiler as follows −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

Now run the ObservableTester as follows −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

It should produce the following output −

ab

RxJava - Combining Operators

以下是用于从多个 Observable 中创建单个 Observable 的操作符。

Following are the operators which are used to create a single Observable from multiple Observables.

Sr.No.

Operator & Description

1

*And/Then/When*Combine item sets using Pattern and Plan intermediaries.

2

*CombineLatest*Combine the latest item emitted by each Observable via a specified function and emit resulted item.

3

*Join*Combine items emitted by two Observables if emitted during time-frame of second Observable emitted item.

4

*Merge*Combines the items emitted of Observables.

5

*StartWith*Emit a specified sequence of items before starting to emit the items from the source Observable

6

*Switch*Emits the most recent items emitted by Observables.

7

*Zip*Combines items of Observables based on function and emits the resulted items.

Combining Operator Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

Create the following Java program using any editor of your choice in, say, C:\> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using combineLatest operator to combine Observables
public class ObservableTester {
   public static void main(String[] args) {
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.combineLatest(observable1, observable2, (a,b) -> a + b)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

Compile the class using javac compiler as follows −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

Now run the ObservableTester as follows −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

It should produce the following output −

g1g2g3g4g5g6

RxJava - Utility Operators

以下是通常与 Observable 一起使用的操作符。

Following are the operators which are often useful with Observables.

Sr.No.

Operator & Description

1

Delay Register action to handle Observable life-cycle events.

2

Materialize/Dematerialize Represents item emitted and notification sent.

3

ObserveOn Specify the scheduler to be observed.

4

Serialize Force Observable to make serialized calls.

5

Subscribe Operate upon the emissions of items and notifications like complete from an Observable

6

SubscribeOn Specify the scheduler to be used by an Observable when it is subscribed to.

7

TimeInterval Convert an Observable to emit indications of the amount of time elapsed between emissions.

8

Timeout Issues error notification if specified time occurs without emitting any item.

9

Timestamp Attach timestamp to each item emitted.

9

Using Creates a disposable resource or same lifespan as that of Observable.

Utility Operator Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

Create the following Java program using any editor of your choice in, say, C:\> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using subscribe operator to subscribe to an Observable
public class ObservableTester  {
   public static void main(String[] args) {
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable.subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

Compile the class using javac compiler as follows −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

Now run the ObservableTester as follows −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

It should produce the following output −

abcdefg

RxJava - Conditional Operators

以下是用于计算一个或多个 Observable 或已发出项的运算符。

Following are the operators which evaluates one or multiple Observables or items emitted.

Sr.No.

Operator & Description

1

All Evaluates all items emitted to meet given criteria.

2

Amb Emits all items from the first Observable only given multiple Observables.

3

Contains Checks if an Observable emits a particular item or not.

4

DefaultIfEmpty Emits default item if Observable do not emit anything.

5

SequenceEqual Checks if two Observables emit the same sequence of items.

6

SkipUntil Discards items emitted by first Observable until a second Observable emits an item.

7

SkipWhile Discard items emitted by an Observable until a given condition becomes false.

8

TakeUntil Discards items emitted by an Observable after a second Observable emits an item or terminates.

9

TakeWhile Discard items emitted by an Observable after a specified condition becomes false.

Conditional Operator Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

Create the following Java program using any editor of your choice in, say, C:\> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using defaultIfEmpty operator to operate on an Observable
public class ObservableTester  {
   public static void main(String[] args) {
      final StringBuilder result = new StringBuilder();
      Observable.empty()
      .defaultIfEmpty("No Data")
      .subscribe(s -> result.append(s));
      System.out.println(result);
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result1 = new StringBuilder();
      Observable.fromArray(letters)
      .firstElement()
      .defaultIfEmpty("No data")
      .subscribe(s -> result1.append(s));
      System.out.println(result1);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

Compile the class using javac compiler as follows −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

Now run the ObservableTester as follows −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

It should produce the following output −

No Data
a

RxJava - Mathematical Operators

以下是操作 Observable 发出的整个项目的运算符。

Following are the operators which operates on entire items emitted by an Observable.

Sr.No.

Operator & Description

1

Average Evaluates averages of all items and emit the result.

2

Concat Emits all items from multiple Observable without interleaving.

3

Count Counts all items and emit the result.

4

Max Evaluates max valued item of all items and emit the result.

5

Min Evaluates min valued item of all items and emit the result.

6

Reduce Apply a function on each item and return the result.

7

Sum Evaluates sum of all items and emit the result.

Mathematical Operator Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

Create the following Java program using any editor of your choice in, say, C:\> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using concat operator to operate on multiple Observables
public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.concat(observable1, observable2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

Compile the class using javac compiler as follows −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

Now run the ObservableTester as follows −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

It should produce the following output −

abcdefg123456

RxJava - Connectable Operators

以下是更精确控制订阅的运算符。

Following are the operators which has more precisely control over subscription.

Sr.No.

Operator & Description

1

Connect Instruct a connectable Observable to emit items to its subscribers.

2

Publish Converts an Observable to connectable Observable.

3

RefCount Converts a connectable Observable to ordinary Observable.

4

Replay Ensure same sequence of emitted items to be seen by each subscriber, even after the Observable has begun emitting items and subscribers subscribe later.

Connectable Operator Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

Create the following Java program using any editor of your choice in, say, C:\> RxJava.

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
//Using connect operator on a ConnectableObservable
public class ObservableTester {
   public static void main(String[] args) {
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      ConnectableObservable<String> connectable = Observable.fromArray(letters).publish();
      connectable.subscribe(letter -> result.append(letter));
      System.out.println(result.length());
      connectable.connect();
      System.out.println(result.length());
      System.out.println(result);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

Compile the class using javac compiler as follows −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

Now run the ObservableTester as follows −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

It should produce the following output −

0
7
abcdefg

RxJava - Subjects

根据 Reactive ,一个 Subject 既可以充当 Observable,又可以充当 Observer。

As per the Reactive, a Subject can act as both Observable as well as Observer.

有四种类型的 Subject −

There are four types of Subjects −

Sr.No.

Subject & Description

1

Publish Subject Emits only those items which are emitted after time of subscription.

2

*Replay Subject*Emits all the items emitted by source Observable regardless of when it has subscribed the Observable.

3

Behavior Subject Upon subscription, emits the most recent item then continue to emit item emitted by the source Observable.

4

Async Subject Emits the last item emitted by the source Observable after it’s completes emission.

RxJava - PublishSubject

PublishSubject 向当前已订阅的 Observer 发出项,并向当前或迟到的 Observer 发出终结事件。

PublishSubject emits items to currently subscribed Observers and terminal events to current or late Observers.

Class Declaration

以下是 io.reactivex.subjects.PublishSubject<T> 类的声明 −

Following is the declaration for io.reactivex.subjects.PublishSubject<T> class −

public final class PublishSubject<T>
extends Subject<T>

PublishSubject Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

Create the following Java program using any editor of your choice in, say, C:\> RxJava.

ObservableTester.java

import io.reactivex.subjects.PublishSubject;
public class ObservableTester  {
   public static void main(String[] args) {
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();

      PublishSubject<String> subject = PublishSubject.create();
      subject.subscribe(value -> result1.append(value) );
      subject.onNext("a");
      subject.onNext("b");
      subject.onNext("c");
      subject.subscribe(value -> result2.append(value));
      subject.onNext("d");
      subject.onComplete();

      //Output will be abcd
      System.out.println(result1);
      //Output will be d only
      //as subscribed after c item emitted.
      System.out.println(result2);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

Compile the class using javac compiler as follows −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

Now run the ObservableTester as follows −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

It should produce the following output −

abcd
d

RxJava - BehaviorSubject

BehaviorSubject 向每个订阅的观察者发出它已观察到的最新项,然后发出所有后续观察到的项。

BehaviorSubject emits the most recent item it has observed and then all subsequent observed items to each subscribed Observer.

Class Declaration

以下是对 io.reactivex.subjects.BehaviorSubject<T> 类的声明:

Following is the declaration for io.reactivex.subjects.BehaviorSubject<T> class −

public final class BehaviorSubject<T>
extends Subject<T>

BehaviorSubject Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

Create the following Java program using any editor of your choice in, say, C:\> RxJava.

ObservableTester.java

import io.reactivex.subjects.BehaviorSubject;
public class ObservableTester  {
   public static void main(String[] args) {
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();
      BehaviorSubject<String> subject =  BehaviorSubject.create();
      subject.subscribe(value -> result1.append(value) );
      subject.onNext("a");
      subject.onNext("b");
      subject.onNext("c");
      subject.subscribe(value -> result2.append(value));
      subject.onNext("d");
      subject.onComplete();
      //Output will be abcd
      System.out.println(result1);
      //Output will be cd being BehaviorSubject
      //(c is last item emitted before subscribe)
      System.out.println(result2);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

Compile the class using javac compiler as follows −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

Now run the ObservableTester as follows −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

It should produce the following output −

abcd
cd

RxJava - ReplaySubject

ReplaySubject 会将事件/项目重放给当前和后来的观察者。

ReplaySubject replays events/items to current and late Observers.

Class Declaration

下面是 io.reactivex.subjects.ReplaySubject<T> 类声明——

Following is the declaration for io.reactivex.subjects.ReplaySubject<T> class −

public final class ReplaySubject<T>
extends Subject<T>

ReplaySubject Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

Create the following Java program using any editor of your choice in, say, C:\> RxJava.

ObservableTester.java

import io.reactivex.subjects.ReplaySubject;
public class ObservableTester  {
   public static void main(String[] args) {
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();

      ReplaySubject<String> subject = ReplaySubject.create();
      subject.subscribe(value -> result1.append(value) );
      subject.onNext("a");
      subject.onNext("b");
      subject.onNext("c");
      subject.subscribe(value -> result2.append(value));
      subject.onNext("d");
      subject.onComplete();

      //Output will be abcd
      System.out.println(result1);
      //Output will be abcd being ReplaySubject
      //as ReplaySubject emits all the items
      System.out.println(result2);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

Compile the class using javac compiler as follows −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

Now run the ObservableTester as follows −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

It should produce the following output −

abcd
abcd

RxJava - AsyncSubject

AsyncSubject 仅发出最后的值,然后向观察者发出完成事件或接收到的错误。

AsyncSubject emits the only last value followed by a completion event or the received error to Observers.

Class Declaration

io.reactivex.subjects.AsyncSubject<T> 类的声明如下:

Following is the declaration for io.reactivex.subjects.AsyncSubject<T> class −

public final class  AsyncSubject<T>
extends Subject<T>

AsyncSubject Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

Create the following Java program using any editor of your choice in, say, C:\> RxJava.

ObservableTester.java

import io.reactivex.subjects. AsyncSubject;
public class ObservableTester  {
   public static void main(String[] args) {
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();

      AsyncSubject<String> subject =  AsyncSubject.create();
      subject.subscribe(value -> result1.append(value) );
      subject.onNext("a");
      subject.onNext("b");
      subject.onNext("c");
      subject.subscribe(value -> result2.append(value));
      subject.onNext("d");
      subject.onComplete();

      //Output will be d being the last item emitted
      System.out.println(result1);
      //Output will be d being the last item emitted
      System.out.println(result2);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

Compile the class using javac compiler as follows −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

Now run the ObservableTester as follows −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

It should produce the following output −

d
d

RxJava - Schedulers

在多线程环境中使用计划程序来使用 Observable 运算符。

Schedulers are used in multi-threading environment to work with Observable operators.

根据 Reactive ,计划程序用于计划运算符链应用到不同的线程的方式。

As per the Reactive,Scheduler are used to schedule how chain of operators will apply to different threads.

默认情况下,一个 Observable 及其应用到它的运算符链将在调用其 Subscribe 方法的同一线程上执行其工作,并通知其观察者。SubscribeOn 运算符通过指定 Observable 应该在它上操作的不同计划程序来更改此行为。ObserveOn 运算符指定 Observable 将使用它的观察者来发送通知的不同计划程序。

By default, an Observable and the chain of operators that you apply to it will do its work, and will notify its observers, on the same thread on which its Subscribe method is called. The SubscribeOn operator changes this behavior by specifying a different Scheduler on which the Observable should operate. The ObserveOn operator specifies a different Scheduler that the Observable will use to send notifications to its observers.

RxJava 中有以下类型的计划程序可用:

There are following types of Schedulers available in RxJava −

Sr.No.

Scheduler & Description

1

Schedulers.computation() Creates and returns a Scheduler intended for computational work. Count of threads to be scheduled depends upon the CPUs present in the system. One thread is allowed per CPU. Best for event-loops or callback operations.

2

Schedulers.io() Creates and returns a Scheduler intended for IO-bound work. Thread pool may extend as needed.

3

Schedulers.newThread() Creates and returns a Scheduler that creates a new Thread for each unit of work.

4

Schedulers.trampoline() Creates and returns a Scheduler that queues work on the current thread to be executed after the current work completes.

4

Schedulers.from(java.util.concurrent.Executor executor) Converts an Executor into a new Scheduler instance.

RxJava - Trampoline Scheduler

Schedulers.trampoline() 方法创建并返回一个 Scheduler,该 Scheduler 将队列中的工作置于当前线程上,以便在当前工作完成后执行。

Schedulers.trampoline() method creates and returns a Scheduler that queues work on the current thread to be executed after the current work completes.

Schedulers.trampoline() Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

Create the following Java program using any editor of your choice in, say, C:\> RxJava.

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread "
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.trampoline()))
         .subscribe(length -> System.out.println("Receiver Thread "
            + Thread.currentThread().getName()
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

Compile the class using javac compiler as follows −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

Now run the ObservableTester as follows −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

It should produce the following output −

Processing Thread main
Receiver Thread main, Item length 1
Processing Thread main
Receiver Thread main, Item length 2
Processing Thread main
Receiver Thread main, Item length 3

RxJava - NewThread Scheduler

Schedulers.newThread() 方法创建一个调度器,并为每个工作单元创建一个新线程,然后返回该调度器。

Schedulers.newThread() method creates and returns a Scheduler that creates a new Thread for each unit of work.

Schedulers.newThread() Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

Create the following Java program using any editor of your choice in, say, C:\> RxJava.

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread "
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.newThread()))
         .subscribe(length -> System.out.println("Receiver Thread "
            + Thread.currentThread().getName()
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

Compile the class using javac compiler as follows −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

Now run the ObservableTester as follows −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

It should produce the following output −

Processing Thread RxNewThreadScheduler-1
Receiver Thread RxNewThreadScheduler-1, Item length 1
Processing Thread RxNewThreadScheduler-2
Receiver Thread RxNewThreadScheduler-2, Item length 2
Processing Thread RxNewThreadScheduler-3
Receiver Thread RxNewThreadScheduler-3, Item length 3

RxJava - Computation Scheduler

Schedulers.computation() 方法创建并返回一个专用于计算工作的 Scheduler。要调度的线程数取决于系统中存在的 CPU。每个 CPU 允许一个线程。最适合事件循环或回叫操作。

Schedulers.computation() method creates and returns a Scheduler intended for computational work. Count of threads to be scheduled depends upon the CPUs present in the system. One thread is allowed per CPU. Best for event-loops or callback operations.

Schedulers.computation() Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

Create the following Java program using any editor of your choice in, say, C:\> RxJava.

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread "
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.computation()))
         .subscribe(length -> System.out.println("Receiver Thread "
            + Thread.currentThread().getName()
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

Compile the class using javac compiler as follows −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

Now run the ObservableTester as follows −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

It should produce the following output −

Processing Thread RxComputationThreadPool-1
Receiver Thread RxComputationThreadPool-1, Item length 1
Processing Thread RxComputationThreadPool-2
Receiver Thread RxComputationThreadPool-2, Item length 2
Processing Thread RxComputationThreadPool-3
Receiver Thread RxComputationThreadPool-3, Item length 3

RxJava - IO Scheduler

Schedulers.io() 方法创建一个调度器,并为 IO 绑定工作返回该调度器。线程池可能会根据需要进行扩展。最适用于 I/O 密集型操作。

Schedulers.io() method creates and returns a Scheduler intended for IO-bound work. Thread pool may extend as needed. Best for I/O intensive operations.

Schedulers.io() Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

Create the following Java program using any editor of your choice in, say, C:\> RxJava.

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread "
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.io()))
         .subscribe(length -> System.out.println("Receiver Thread "
            + Thread.currentThread().getName()
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

Compile the class using javac compiler as follows −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

Now run the ObservableTester as follows −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

It should produce the following output −

Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 1
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 2
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 3

RxJava - From Scheduler

Schedulers.from(Executor) 方法将 Executor 转换为新的 Scheduler 实例。

Schedulers.from(Executor) method converts an Executor into a new Scheduler instance.

Schedulers.from(Executor) Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

Create the following Java program using any editor of your choice in, say, C:\> RxJava.

ObservableTester.java

import java.util.Random;
import java.util.concurrent.Executors;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread "
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3))))
         .subscribe(length -> System.out.println("Receiver Thread "
            + Thread.currentThread().getName()
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

Compile the class using javac compiler as follows −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

Now run the ObservableTester as follows −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

It should produce the following output −

Processing Thread pool-1-thread-1
Processing Thread pool-3-thread-1
Receiver Thread pool-1-thread-1, Item length 1
Processing Thread pool-4-thread-1
Receiver Thread pool-4-thread-1, Item length 3
Receiver Thread pool-3-thread-1, Item length 2

RxJava - Buffering

缓存操作符允许将 Observable 发出的项目收集到列表或捆绑包中,并发出这些捆绑包,而不是项目。在下面的示例中,我们创建了一个 Observable 来发出 9 个项目,并使用缓存,一起发出 3 个项目。

Buffering operator allows to gather items emitted by an Observable into a list or bundles and emit those bundles instead of items. In the example below, we’ve created an Observable to emit 9 items and using buffering, 3 items will be emitted together.

Buffering Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

Create the following Java program using any editor of your choice in, say, C:\> RxJava.

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import java.util.List;
import java.util.concurrent.TimeUnit;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      Observable<Integer> observable = Observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeOn(Schedulers.io())
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .buffer(3)
         .subscribe(new Observer<List<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
               System.out.println("Subscribed");
            }
            @Override
            public void onNext(List<Integer> integers) {
               System.out.println("onNext: ");
               for (Integer value : integers) {
                  System.out.println(value);
               }
            }
            @Override
            public void onError(Throwable e) {
               System.out.println("Error");
            }

            @Override
            public void onComplete() {
               System.out.println("Done! ");
            }
         });
      Thread.sleep(3000);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

Compile the class using javac compiler as follows −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

Now run the ObservableTester as follows −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

It should produce the following output −

Subscribed
onNext:
1
2
3
onNext:
4
5
6
onNext:
7
8
9
Done!

RxJava - Windowing

窗口操作符的工作方式类似于缓冲操作符,但是它允许将 Observable 发出的项目收集到另一个 Observable,而不是收集到集合中,并发出这些 Observable,而不是 collection。在下面的示例中,我们创建了一个 Observable 来发出 9 个项目,并使用窗口操作符,一起发出 3 个 Observable。

Windowing operator works similar to buffer operator but it allows to gather items emitted by an Observable into another observable instead of collection and emit those Observables instead of collections. In the example below, we’ve created an Observable to emit 9 items and using window operator, 3 Observable will be emitted together.

Windowing Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

Create the following Java program using any editor of your choice in, say, C:\> RxJava.

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      Observable<Integer> observable = Observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeOn(Schedulers.io())
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .window(3)
         .subscribe(new Observer<Observable<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
               System.out.println("Subscribed");
            }
            @Override
            public void onNext(Observable<Integer> integers) {
               System.out.println("onNext: ");
               integers.subscribe(value -> System.out.println(value));
            }
            @Override
            public void onError(Throwable e) {
               System.out.println("Error");
            }

            @Override
            public void onComplete() {
               System.out.println("Done! ");
            }
         });
      Thread.sleep(3000);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

Compile the class using javac compiler as follows −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

Now run the ObservableTester as follows −

C:\RxJava>java ObservableTester

它应该产生以下输出 −

It should produce the following output −

Subscribed
onNext:
1
2
3
onNext:
4
5
6
onNext:
7
8
9
Done!