Rxjava 简明教程

RxJava - Quick Guide

RxJava - Overview

RxJava 是 ReactiveX 基于 Java 的扩展。它在 Java 中实现或 ReactiveX 项目。以下是 RxJava 的主要特性。

  1. Extends the observer pattern.

  2. Support sequences of data/events.

  3. 提供操作符组合序列,声明性地组合在一起。

  4. 在内部处理线程、同步、线程安全性及并行数据结构。

What is ReactiveX?

ReactiveX 是一个项目,其目标是将响应式编程概念应用于不同的编程语言。响应式编程是指在数据出现时程序随之做出反应的方案。这是一种基于事件的编程概念,事件可以传播到注册观察者。

根据 Reactive ,它们将观测者模式、迭代器模式和函数式模式的优点结合在一起。

做正确的事情的观测者模式。ReactiveX 是观测者模式、迭代器模式和函数式编程的最佳思想的结合。

Functional Programming

函数式编程围绕使用纯函数构建软件。纯函数不依赖于以前的状态,并且对于传递的相同参数始终返回相同的结果。纯函数有助于避免在多线程环境中常见与共享对象、可变数据和副作用相关的各种问题。

Reactive Programming

响应式编程是指在数据流以异步方式出现时,将数据流进行处理的事件驱动编程。

Functional Reactive Programming

RxJava 将这两个概念结合在一起,其中流的数据随着时间的推移而改变,消费者函数做出相应的反应。

The Reactive Manifesto

Reactive Manifesto 是一份在线文档,规定了应用软件系统的崇高标准。根据宣言,下列是有反应的软件的关键属性:

  1. Responsive ——应当始终及时响应。

  2. Message Driven ——应当在组件之间采用异步消息传递,以便它们维持松散的耦合。

  3. Elastic ——应当即使在高负载下也能保持响应。

  4. Resilient ——应当即使任何组件失败也能保持响应。

Key components of RxJava

RxJava 有两个关键组件:可观察序列和观察者。

  1. Observable − 它表示一个类似于 Stream 的对象,它可以发出零个或多个数据,可以发送错误信息,可以在发出一定数据后控制其速度,可以发送有限和无限数据。

  2. Observer − 订阅可观察序列的数据序列,并对可观察序列的每个项目做出反应。无论何时可观察序列发出数据,观察者都会收到通知。观察者逐个处理数据。

如果没有出现项目或没有为之前的项目返回回调,则永远不会通知观察者。

RxJava - Environment Setup

Local Environment Setup

RxJava 是 Java 的一个库,因此最基本的要求是你的机器上已安装 JDK。

System Requirement

JDK

1.5 or above.

Memory

No minimum requirement.

Disk Space

No minimum requirement.

Operating System

No minimum requirement.

Step 1 - Verify Java Installation in Your Machine

首先,打开控制台并根据您正在使用的操作系统执行 java 命令。

OS

Task

Command

Windows

Open Command Console

c:> java -version

Linux

Open Command Terminal

$ java -version

Mac

Open Terminal

machine:< joseph$ java -version

让我们验证所有操作系统的输出 −

OS

Output

Windows

Java 1.8.0_101 版本JavaTM SE Runtime Environment (编译版本 1.8.0_101)

Linux

Java 1.8.0_101 版本JavaTM SE Runtime Environment (编译版本 1.8.0_101)

Mac

Java 1.8.0_101 版本JavaTM SE Runtime Environment (编译版本 1.8.0_101)

如果你的系统上没有安装 Java,则从以下链接下载 Java 软件开发工具包 (SDK) https://www.oracle.com 。本教程假设已安装 Java 1.8.0_101。

Step 2 - Set JAVA Environment

JAVA_HOME 环境变量设置为您计算机中安装 Java 的基本目录位置。例如。

OS

Output

Windows

将环境变量 JAVA_HOME 设置为 C:\Program Files\Java\jdk1.8.0_101

Linux

export JAVA_HOME = /usr/local/java-current

Mac

export JAVA_HOME = /Library/Java/Home

在系统路径中追加 Java 编译器位置。

OS

Output

Windows

在系统变量 Path 的结尾追加字符串 C:\Program Files\Java\jdk1.8.0_101\bin

Linux

export PATH = $PATH:$JAVA_HOME/bin/

Mac

not required

如上所述,使用命令 java -version 验证 Java 安装。

Step 3 - Download RxJava2 Archive

RxJava @ MVNRepository 下载 RxJava jar 文件的最新版本及其关联项 Reactive Streams @ MVNRepository 。在本教程的编写时,我们下载了 rxjava-2.2.4.jar 和 reactive-streams-1.0.2.jar,并将其复制到了 C:\>RxJava 文件夹中。

OS

Archive name

Windows

rxjava-2.2.4.jar, reactive-streams-1.0.2.jar

Linux

rxjava-2.2.4.jar, reactive-streams-1.0.2.jar

Mac

rxjava-2.2.4.jar, reactive-streams-1.0.2.jar

Step 4 - Set RxJava Environment

设置 RX_JAVA 环境变量以指向存储 RxJava jar 的你的机器上的基本目录位置。我们假设已将 rxjava-2.2.4.jar 和 reactive-streams-1.0.2.jar 存储在 RxJava 文件夹中。

Sr.No

OS & Description

1

Windows 将环境变量 RX_JAVA 设置为 C:\RxJava

2

Linux export RX_JAVA = /usr/local/RxJava

3

Mac export RX_JAVA = /Library/RxJava

Step 5 - Set CLASSPATH Variable

设置 CLASSPATH 环境变量以指向 RxJava jar 位置。

Sr.No

OS & Description

1

Windows 将环境变量 CLASSPATH 设置为 %CLASSPATH%;%RX_JAVA%\rxjava-2.2.4.jar;%RX_JAVA%\reactive-streams-1.0.2.jar;.;

2

Linux export CLASSPATH = $CLASSPATH:$RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar:.

3

Mac export CLASSPATH = $CLASSPATH:$RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar:.

Step 6 - Test RxJava Setup

像下面所示创建 TestRx.java 类 −

import io.reactivex.Flowable;
public class TestRx {
   public static void main(String[] args) {
      Flowable.just("Hello World!")
         .subscribe(System.out::println);
   }
}

Step 7 - Verify the Result

使用以下 javac 编译器编译类:

C:\RxJava>javac Tester.java

验证输出。

Hello World!

RxJava - How Observable works

Observables 表示数据源,而 Observers (Subscribers) 监听这些数据源。简而言之,Observable 发射项目,Subscriber 然后消耗这些项目。

Observable

  1. 订阅者开始监听后,Observable 提供数据。

  2. Observable 可以发出任意数量的项目。

  3. Observable 也可以只发出完成信号,而不出发任何项目。

  4. Observable can terminate successfully.

  5. Observable 永远不会终止。例如按钮可以单击任意多次。

  6. Observable 可能会在任何时间点发出错误。

Subscriber

  1. Observable 可以拥有多个订阅者。

  2. 当 Observable 发出项目时,每个订阅者的 onNext() 方法都会被调用。

  3. 当 Observable 完成发出项目后,每个订阅者的 onComplete() 方法都会被调用。

  4. 如果 Observable 发出错误,每个订阅者的 onError() 方法都会被调用。

RxJava - Creating Observables

以下是可以创建可观察序列的基类。

  1. Flowable − 0..N 次流,发出 0 个或 n 个项目。支持 Reactive-Streams 和背压。

  2. Observable − 0..N 次流,但无背压。

  3. Single − 1 个项目或错误。可以视为方法调用的响应版本。

  4. Completable − 不发出任何项目。用作完成或错误的信号。可以视为 Runnable 的响应版本。

  5. MayBe − 不发出任何项目或发出 1 个项目。可以视为 Optional 的响应版本。

以下是 Observable 类中创建可观察序列的便利方法。

  1. just(T item) − 返回一个 Observable,它对给定的(常量引用)项目发出信号,然后完成。

  2. fromIterable(Iterable source) − 将可迭代序列转换为 ObservableSource,它发出序列中的项目。

  3. fromArray(T&#8230;&#8203; items) − 将数组转换为 ObservableSource,它发出数组中的项目。

  4. fromCallable(Callable supplier) − 返回一个 Observable,当观察者订阅它时,它会调用你指定的一个函数,然后发出该函数返回的值。

  5. fromFuture(Future future) − 将 Future 转换为 ObservableSource。

  6. interval(long initialDelay, long period, TimeUnit unit) − 返回一个 Observable,它在初始延迟后发出 0L,并在此之后,每隔一段时间就发出递增的数字。

RxJava - Single Observable

Single 类表示单值响应。Single observable 只能发出单个成功值或一个错误。它不发出 onComplete 事件。

Class Declaration

以下为 io.reactivex.Single<T> 类的声明:

public abstract class Single<T>
   extends Object
      implements SingleSource<T>

Protocol

以下为 Single Observable 采用的顺序协议:

onSubscribe (onSuccess | onError)?

Single Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {
      //Create the observable
      Single<String> testSingle = Single.just("Hello World");

      //Create an observer
      Disposable disposable = testSingle
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(
         new DisposableSingleObserver<String>() {

         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      });
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

C:\RxJava>java ObservableTester

它应该产生以下输出 −

Hello World

RxJava - MayBe Observable

MayBe 类表示延迟响应。MayBe 可观察对象可以发出一个成功的单个值或不发出值。

Class Declaration

以下为 io.reactivex.Single<T> 类的声明:

public abstract class Maybe<T>
   extends Object
      implements MaybeSource<T>

Protocol

下面是 MayBe 可观察对象的操作顺序——

onSubscribe (onSuccess | onError | OnComplete)?

MayBe Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      //Create an observer
      Disposable disposable = Maybe.just("Hello World")
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      });
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

C:\RxJava>java ObservableTester

它应该产生以下输出 −

Hello World

RxJava - Completable Observable

Completable 类表示延迟响应。Completable 可观察序列可以指示完成成功或错误。

Class Declaration

以下是 io.reactivex.Completable 类的声明 −

public abstract class Completable
extends Object
implements CompletableSource

Protocol

以下是 Completable Observable 运行的顺序协议 −

onSubscribe (onError | onComplete)?

Completable Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Completable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableCompletableObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {

      //Create an observer
      Disposable disposable = Completable.complete()
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableCompletableObserver() {
         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }
         @Override
         public void onStart() {
            System.out.println("Started!");
         }
         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      });
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

C:\RxJava>java ObservableTester

它应该产生以下输出 −

Started!
Done!

RxJava - Using CompositeDisposable

CompositeDisposable 类表示一个容器,可以容纳多个可释放对象,并提供 O(1) 的复杂度来添加和删除可释放对象。

Class Declaration

以下是 io.reactivex.disposables.CompositeDisposable 类的声明 −

public final class CompositeDisposable
extends Object
implements Disposable, io.reactivex.internal.disposables.DisposableContainer

CompositeDisposable Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Maybe;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {
      CompositeDisposable compositeDisposable = new CompositeDisposable();

      //Create an Single observer
      Disposable disposableSingle = Single.just("Hello World")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(
      new DisposableSingleObserver<String>() {
         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      });

      //Create an observer
      Disposable disposableMayBe = Maybe.just("Hi")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      });

      Thread.sleep(3000);

      compositeDisposable.add(disposableSingle);
      compositeDisposable.add(disposableMayBe);

      //start observing
      compositeDisposable.dispose();
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

C:\RxJava>java ObservableTester

它应该产生以下输出 −

Hello World
Hi

RxJava - Creating Operators

以下是用于创建可观察内容的操作符。

Sr.No.

Operator & Description

1

Create 从头开始创建可观察内容,并允许观察者方法以编程方式调用。

2

Defer 在观察者订阅之前不要创建一个 Observable。为每个观察者创建新的 observable。

3

Empty/Never/Throw 创建一个行为受限的 Observable。

4

From 将对象/数据结构转换成 Observable。

5

Interval 创建一个 Observable 按照指定的时间间隔顺序发出整数值。

6

Just 将一个对象/数据结构转换成 Observable 以发出相同或相同类型的对象。

7

Range 创建一个 Observable 按照给定范围顺序发出整数值。

8

Repeat 创建一个 Observable 顺序重复发出整数值。

9

Start 创建一个 Observable 以发出一个函数的返回值。

10

Timer 创建一个 Observable 以在指定延迟后发出一个单一项目。

Creating Operator Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
//Using fromArray operator to create an Observable
public class ObservableTester  {
   public static void main(String[] args) {
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

C:\RxJava>java ObservableTester

它应该产生以下输出 −

ABCDEFG

RxJava - Transforming Operators

以下是用于转换从 Observable 发出的项的操作符。

Sr.No.

Operator & Description

1

Buffer 以一定频率从 Observable 收集项到包中,然后发出包而不是项。

2

FlatMap 用于嵌套的 Observable 中。将项转换为 Observable。然后将项压平到单个 Observable 中。

3

GroupBy 将 Observable 拆分为按键组织的一组 Observable,以发出不同组的项。

4

Map 向每个发出的项应用一个函数来对其进行转换。

5

Scan 向每个发出的项应用一个函数,按照顺序依次发出后续值。

6

Window 以一定频率从 Observable 收集项到 Observable 窗口,然后发出窗口而不是项。

Transforming Operator Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
//Using map operator to transform an Observable
public class ObservableTester  {
   public static void main(String[] args) {
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

C:\RxJava>java ObservableTester

它应该产生以下输出 −

ABCDEFG

RxJava - Filtering Operators

以下是用于有选择地从 Observable 发出项目的操作符。

Sr.No.

Operator & Description

1

Debounce 仅在没有发出其他项目的情况下出现超时时才发出项目。

2

Distinct Emits only unique items.

3

ElementAt 仅发出 Observable 发出的第 n 个索引的项目。

4

Filter 只发出通过给定谓词函数的那些项目。

5

First 发出第一个项目,或通过给定准则的第一个项目。

6

IgnoreElements 不从 Observable 发出任何项目,但会标记为完成。

7

Last 从 Observable 发出最后一个元素。

8

Sample 使用给定的时间间隔发出最新的项目。

9

Skip 从 Observable 跳过前 n 个项目。

10

SkipLast 从 Observable 跳过最后 n 个项目。

11

Take 从 Observable 取前 n 个项目。

12

TakeLast 从 Observable 取最后 n 个项目。

Filtering Operator Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
//Using take operator to filter an Observable
public class ObservableTester  {
   public static void main(String[] args) {
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .take(2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

C:\RxJava>java ObservableTester

它应该产生以下输出 −

ab

RxJava - Combining Operators

以下是用于从多个 Observable 中创建单个 Observable 的操作符。

Sr.No.

Operator & Description

1

*And/Then/When*使用模式和计划中介结合项集。

2

*CombineLatest*通过指定函数结合每个 Observable 发出的最新项,并发出结果项。

3

*Join*结合两个 Observable 发出的项,如果是在第二个 Observable 发出项的时间范围内发出的。

4

*Merge*结合 Observable 发出的项。

5

*StartWith*在开始发出源 Observable 项之前,先发出一个指定项序列

6

*Switch*发出 Observable 发出的最新项。

7

*Zip*根据函数结合 Observable 的项,并发出结果项。

Combining Operator Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
//Using combineLatest operator to combine Observables
public class ObservableTester {
   public static void main(String[] args) {
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.combineLatest(observable1, observable2, (a,b) -> a + b)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

C:\RxJava>java ObservableTester

它应该产生以下输出 −

g1g2g3g4g5g6

RxJava - Utility Operators

以下是通常与 Observable 一起使用的操作符。

Sr.No.

Operator & Description

1

Delay 注册操作以处理 Observable 生命周期事件。

2

Materialize/Dematerialize 表示已发出的项和发送的通知。

3

ObserveOn 指定要观察的调度程序。

4

Serialize 强制 Observable 执行序列化调用。

5

Subscribe 对 Observable 的项目和通知(如完成)的发出进行操作

6

SubscribeOn 订阅 Observable 时,指定要使用的调度程序。

7

TimeInterval 将 Observable 转换为发出两次发出之间经过的时间量指示。

8

Timeout 如果没有发出任何项目,在指定的时间发生时发出错误通知。

9

Timestamp 为每个发出的项目附加时间戳。

9

Using 创建一个与 Observable 具有相同生命周期的可处置资源。

Utility Operator Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
//Using subscribe operator to subscribe to an Observable
public class ObservableTester  {
   public static void main(String[] args) {
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable.subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

C:\RxJava>java ObservableTester

它应该产生以下输出 −

abcdefg

RxJava - Conditional Operators

以下是用于计算一个或多个 Observable 或已发出项的运算符。

Sr.No.

Operator & Description

1

All 评估所有已发出的项,以满足给定条件。

2

Amb 仅发出多个 Observable 的第一个 Observable 的所有项。

3

Contains 检查 Observable 是否发出了特定项。

4

DefaultIfEmpty 如果 Observable 什么都没有发出,则发出默认项。

5

SequenceEqual 检查两个 Observable 是否发出相同的项序列。

6

SkipUntil 丢弃第一个 Observable 发出的项,直至第二个 Observable 发出一个项。

7

SkipWhile 在给定条件变为假之前,丢弃 Observable 发出的项。

8

TakeUntil 在第二个 Observable 发出一个项或终止后,丢弃 Observable 发出的项。

9

TakeWhile 在指定条件变为假后,丢弃 Observable 发出的项。

Conditional Operator Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
//Using defaultIfEmpty operator to operate on an Observable
public class ObservableTester  {
   public static void main(String[] args) {
      final StringBuilder result = new StringBuilder();
      Observable.empty()
      .defaultIfEmpty("No Data")
      .subscribe(s -> result.append(s));
      System.out.println(result);
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result1 = new StringBuilder();
      Observable.fromArray(letters)
      .firstElement()
      .defaultIfEmpty("No data")
      .subscribe(s -> result1.append(s));
      System.out.println(result1);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

C:\RxJava>java ObservableTester

它应该产生以下输出 −

No Data
a

RxJava - Mathematical Operators

以下是操作 Observable 发出的整个项目的运算符。

Sr.No.

Operator & Description

1

Average 评估所有项目的平均值并发出结果。

2

Concat 从多个 Observable 中发出所有项目,不交错。

3

Count 计算所有项目并发出结果。

4

Max 评估所有项目的最大值项目并发出结果。

5

Min 评估所有项目的最小值项目并发出结果。

6

Reduce 对每个项目应用一个函数并返回结果。

7

Sum 评估所有项目的总和并发出结果。

Mathematical Operator Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
//Using concat operator to operate on multiple Observables
public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.concat(observable1, observable2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

C:\RxJava>java ObservableTester

它应该产生以下输出 −

abcdefg123456

RxJava - Connectable Operators

以下是更精确控制订阅的运算符。

Sr.No.

Operator & Description

1

Connect 指示可连接的 Observable 向其订阅者发出项目。

2

Publish 将 Observable 转换为可连接的 Observable。

3

RefCount 将可连接 Observable 转换为普通 Observable。

4

Replay 即使 Observable 已开始发出项且订阅者稍后才订阅,也要确保每个订阅者都看到相同的已发出项序列。

Connectable Operator Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
//Using connect operator on a ConnectableObservable
public class ObservableTester {
   public static void main(String[] args) {
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      ConnectableObservable<String> connectable = Observable.fromArray(letters).publish();
      connectable.subscribe(letter -> result.append(letter));
      System.out.println(result.length());
      connectable.connect();
      System.out.println(result.length());
      System.out.println(result);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

C:\RxJava>java ObservableTester

它应该产生以下输出 −

0
7
abcdefg

RxJava - Subjects

根据 Reactive ,一个 Subject 既可以充当 Observable,又可以充当 Observer。

有四种类型的 Subject −

Sr.No.

Subject & Description

1

Publish Subject 仅发出订阅时间之后发出的那些项。

2

*重播 Subject*发出源 Observable 发出的所有项,而不管它何时订阅 Observable。

3

Behavior Subject 在订阅后,发出最晚的项,然后继续发出源 Observable 发出的项。

4

Async Subject 在发出完最后一个项目后,Observable 会发出此项目。

RxJava - PublishSubject

PublishSubject 向当前已订阅的 Observer 发出项,并向当前或迟到的 Observer 发出终结事件。

Class Declaration

以下是 io.reactivex.subjects.PublishSubject<T> 类的声明 −

public final class PublishSubject<T>
extends Subject<T>

PublishSubject Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

ObservableTester.java

import io.reactivex.subjects.PublishSubject;
public class ObservableTester  {
   public static void main(String[] args) {
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();

      PublishSubject<String> subject = PublishSubject.create();
      subject.subscribe(value -> result1.append(value) );
      subject.onNext("a");
      subject.onNext("b");
      subject.onNext("c");
      subject.subscribe(value -> result2.append(value));
      subject.onNext("d");
      subject.onComplete();

      //Output will be abcd
      System.out.println(result1);
      //Output will be d only
      //as subscribed after c item emitted.
      System.out.println(result2);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

C:\RxJava>java ObservableTester

它应该产生以下输出 −

abcd
d

RxJava - BehaviorSubject

BehaviorSubject 向每个订阅的观察者发出它已观察到的最新项,然后发出所有后续观察到的项。

Class Declaration

以下是对 io.reactivex.subjects.BehaviorSubject<T> 类的声明:

public final class BehaviorSubject<T>
extends Subject<T>

BehaviorSubject Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

ObservableTester.java

import io.reactivex.subjects.BehaviorSubject;
public class ObservableTester  {
   public static void main(String[] args) {
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();
      BehaviorSubject<String> subject =  BehaviorSubject.create();
      subject.subscribe(value -> result1.append(value) );
      subject.onNext("a");
      subject.onNext("b");
      subject.onNext("c");
      subject.subscribe(value -> result2.append(value));
      subject.onNext("d");
      subject.onComplete();
      //Output will be abcd
      System.out.println(result1);
      //Output will be cd being BehaviorSubject
      //(c is last item emitted before subscribe)
      System.out.println(result2);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

C:\RxJava>java ObservableTester

它应该产生以下输出 −

abcd
cd

RxJava - ReplaySubject

ReplaySubject 会将事件/项目重放给当前和后来的观察者。

Class Declaration

下面是 io.reactivex.subjects.ReplaySubject<T> 类声明——

public final class ReplaySubject<T>
extends Subject<T>

ReplaySubject Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

ObservableTester.java

import io.reactivex.subjects.ReplaySubject;
public class ObservableTester  {
   public static void main(String[] args) {
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();

      ReplaySubject<String> subject = ReplaySubject.create();
      subject.subscribe(value -> result1.append(value) );
      subject.onNext("a");
      subject.onNext("b");
      subject.onNext("c");
      subject.subscribe(value -> result2.append(value));
      subject.onNext("d");
      subject.onComplete();

      //Output will be abcd
      System.out.println(result1);
      //Output will be abcd being ReplaySubject
      //as ReplaySubject emits all the items
      System.out.println(result2);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

C:\RxJava>java ObservableTester

它应该产生以下输出 −

abcd
abcd

RxJava - AsyncSubject

AsyncSubject 仅发出最后的值,然后向观察者发出完成事件或接收到的错误。

Class Declaration

io.reactivex.subjects.AsyncSubject<T> 类的声明如下:

public final class  AsyncSubject<T>
extends Subject<T>

AsyncSubject Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

ObservableTester.java

import io.reactivex.subjects. AsyncSubject;
public class ObservableTester  {
   public static void main(String[] args) {
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();

      AsyncSubject<String> subject =  AsyncSubject.create();
      subject.subscribe(value -> result1.append(value) );
      subject.onNext("a");
      subject.onNext("b");
      subject.onNext("c");
      subject.subscribe(value -> result2.append(value));
      subject.onNext("d");
      subject.onComplete();

      //Output will be d being the last item emitted
      System.out.println(result1);
      //Output will be d being the last item emitted
      System.out.println(result2);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

C:\RxJava>java ObservableTester

它应该产生以下输出 −

d
d

RxJava - Schedulers

在多线程环境中使用计划程序来使用 Observable 运算符。

根据 Reactive ,计划程序用于计划运算符链应用到不同的线程的方式。

默认情况下,一个 Observable 及其应用到它的运算符链将在调用其 Subscribe 方法的同一线程上执行其工作,并通知其观察者。SubscribeOn 运算符通过指定 Observable 应该在它上操作的不同计划程序来更改此行为。ObserveOn 运算符指定 Observable 将使用它的观察者来发送通知的不同计划程序。

RxJava 中有以下类型的计划程序可用:

Sr.No.

Scheduler & Description

1

Schedulers.computation() 创建并返回一个用于计算工作的计划程序。要计划的线程数取决于系统中存在的 CPU。允许每个 CPU 一个线程。最适合事件循环或回调操作。

2

Schedulers.io() 创建并返回一个用于 IO 绑定工作的计划程序。线程池可能根据需要扩展。

3

Schedulers.newThread() 创建并返回一个为每个工作单元创建一个新线程的计划程序。

4

Schedulers.trampoline() 创建并返回一个将工作排队到当前线程在当前工作完成后执行的计划程序。

4

Schedulers.from(java.util.concurrent.Executor executor) 将 Executor 转换为一个新的计划程序实例。

RxJava - Trampoline Scheduler

Schedulers.trampoline() 方法创建并返回一个 Scheduler,该 Scheduler 将队列中的工作置于当前线程上,以便在当前工作完成后执行。

Schedulers.trampoline() Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread "
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.trampoline()))
         .subscribe(length -> System.out.println("Receiver Thread "
            + Thread.currentThread().getName()
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

C:\RxJava>java ObservableTester

它应该产生以下输出 −

Processing Thread main
Receiver Thread main, Item length 1
Processing Thread main
Receiver Thread main, Item length 2
Processing Thread main
Receiver Thread main, Item length 3

RxJava - NewThread Scheduler

Schedulers.newThread() 方法创建一个调度器,并为每个工作单元创建一个新线程,然后返回该调度器。

Schedulers.newThread() Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread "
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.newThread()))
         .subscribe(length -> System.out.println("Receiver Thread "
            + Thread.currentThread().getName()
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

C:\RxJava>java ObservableTester

它应该产生以下输出 −

Processing Thread RxNewThreadScheduler-1
Receiver Thread RxNewThreadScheduler-1, Item length 1
Processing Thread RxNewThreadScheduler-2
Receiver Thread RxNewThreadScheduler-2, Item length 2
Processing Thread RxNewThreadScheduler-3
Receiver Thread RxNewThreadScheduler-3, Item length 3

RxJava - Computation Scheduler

Schedulers.computation() 方法创建并返回一个专用于计算工作的 Scheduler。要调度的线程数取决于系统中存在的 CPU。每个 CPU 允许一个线程。最适合事件循环或回叫操作。

Schedulers.computation() Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread "
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.computation()))
         .subscribe(length -> System.out.println("Receiver Thread "
            + Thread.currentThread().getName()
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

C:\RxJava>java ObservableTester

它应该产生以下输出 −

Processing Thread RxComputationThreadPool-1
Receiver Thread RxComputationThreadPool-1, Item length 1
Processing Thread RxComputationThreadPool-2
Receiver Thread RxComputationThreadPool-2, Item length 2
Processing Thread RxComputationThreadPool-3
Receiver Thread RxComputationThreadPool-3, Item length 3

RxJava - IO Scheduler

Schedulers.io() 方法创建一个调度器,并为 IO 绑定工作返回该调度器。线程池可能会根据需要进行扩展。最适用于 I/O 密集型操作。

Schedulers.io() Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread "
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.io()))
         .subscribe(length -> System.out.println("Receiver Thread "
            + Thread.currentThread().getName()
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

C:\RxJava>java ObservableTester

它应该产生以下输出 −

Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 1
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 2
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 3

RxJava - From Scheduler

Schedulers.from(Executor) 方法将 Executor 转换为新的 Scheduler 实例。

Schedulers.from(Executor) Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

ObservableTester.java

import java.util.Random;
import java.util.concurrent.Executors;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread "
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3))))
         .subscribe(length -> System.out.println("Receiver Thread "
            + Thread.currentThread().getName()
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

C:\RxJava>java ObservableTester

它应该产生以下输出 −

Processing Thread pool-1-thread-1
Processing Thread pool-3-thread-1
Receiver Thread pool-1-thread-1, Item length 1
Processing Thread pool-4-thread-1
Receiver Thread pool-4-thread-1, Item length 3
Receiver Thread pool-3-thread-1, Item length 2

RxJava - Buffering

缓存操作符允许将 Observable 发出的项目收集到列表或捆绑包中,并发出这些捆绑包,而不是项目。在下面的示例中,我们创建了一个 Observable 来发出 9 个项目,并使用缓存,一起发出 3 个项目。

Buffering Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import java.util.List;
import java.util.concurrent.TimeUnit;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      Observable<Integer> observable = Observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeOn(Schedulers.io())
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .buffer(3)
         .subscribe(new Observer<List<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
               System.out.println("Subscribed");
            }
            @Override
            public void onNext(List<Integer> integers) {
               System.out.println("onNext: ");
               for (Integer value : integers) {
                  System.out.println(value);
               }
            }
            @Override
            public void onError(Throwable e) {
               System.out.println("Error");
            }

            @Override
            public void onComplete() {
               System.out.println("Done! ");
            }
         });
      Thread.sleep(3000);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

C:\RxJava>java ObservableTester

它应该产生以下输出 −

Subscribed
onNext:
1
2
3
onNext:
4
5
6
onNext:
7
8
9
Done!

RxJava - Windowing

窗口操作符的工作方式类似于缓冲操作符,但是它允许将 Observable 发出的项目收集到另一个 Observable,而不是收集到集合中,并发出这些 Observable,而不是 collection。在下面的示例中,我们创建了一个 Observable 来发出 9 个项目,并使用窗口操作符,一起发出 3 个 Observable。

Windowing Example

在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      Observable<Integer> observable = Observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeOn(Schedulers.io())
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .window(3)
         .subscribe(new Observer<Observable<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
               System.out.println("Subscribed");
            }
            @Override
            public void onNext(Observable<Integer> integers) {
               System.out.println("onNext: ");
               integers.subscribe(value -> System.out.println(value));
            }
            @Override
            public void onError(Throwable e) {
               System.out.println("Error");
            }

            @Override
            public void onComplete() {
               System.out.println("Done! ");
            }
         });
      Thread.sleep(3000);
   }
}

Verify the Result

按照如下方式使用 javac 编译器编译类 −

C:\RxJava>javac ObservableTester.java

现在按照如下所示运行 ObservableTester:

C:\RxJava>java ObservableTester

它应该产生以下输出 −

Subscribed
onNext:
1
2
3
onNext:
4
5
6
onNext:
7
8
9
Done!