Rxjava 简明教程
RxJava - Overview
RxJava 是 ReactiveX 基于 Java 的扩展。它在 Java 中实现或 ReactiveX 项目。以下是 RxJava 的主要特性。
-
Extends the observer pattern.
-
Support sequences of data/events.
-
提供操作符组合序列,声明性地组合在一起。
-
在内部处理线程、同步、线程安全性及并行数据结构。
What is ReactiveX?
ReactiveX 是一个项目,其目标是将响应式编程概念应用于不同的编程语言。响应式编程是指在数据出现时程序随之做出反应的方案。这是一种基于事件的编程概念,事件可以传播到注册观察者。
根据 Reactive ,它们将观测者模式、迭代器模式和函数式模式的优点结合在一起。
做正确的事情的观测者模式。ReactiveX 是观测者模式、迭代器模式和函数式编程的最佳思想的结合。
Functional Programming
函数式编程围绕使用纯函数构建软件。纯函数不依赖于以前的状态,并且对于传递的相同参数始终返回相同的结果。纯函数有助于避免在多线程环境中常见与共享对象、可变数据和副作用相关的各种问题。
The Reactive Manifesto
Reactive Manifesto 是一份在线文档,规定了应用软件系统的崇高标准。根据宣言,下列是有反应的软件的关键属性:
-
Responsive ——应当始终及时响应。
-
Message Driven ——应当在组件之间采用异步消息传递,以便它们维持松散的耦合。
-
Elastic ——应当即使在高负载下也能保持响应。
-
Resilient ——应当即使任何组件失败也能保持响应。
RxJava - Environment Setup
Step 1 - Verify Java Installation in Your Machine
首先,打开控制台并根据您正在使用的操作系统执行 java 命令。
OS |
Task |
Command |
Windows |
Open Command Console |
c:> java -version |
Linux |
Open Command Terminal |
$ java -version |
Mac |
Open Terminal |
machine:< joseph$ java -version |
让我们验证所有操作系统的输出 −
OS |
Output |
Windows |
Java 1.8.0_101 版本JavaTM SE Runtime Environment (编译版本 1.8.0_101) |
Linux |
Java 1.8.0_101 版本JavaTM SE Runtime Environment (编译版本 1.8.0_101) |
Mac |
Java 1.8.0_101 版本JavaTM SE Runtime Environment (编译版本 1.8.0_101) |
如果你的系统上没有安装 Java,则从以下链接下载 Java 软件开发工具包 (SDK) https://www.oracle.com 。本教程假设已安装 Java 1.8.0_101。
Step 2 - Set JAVA Environment
将 JAVA_HOME 环境变量设置为您计算机中安装 Java 的基本目录位置。例如。
OS |
Output |
Windows |
将环境变量 JAVA_HOME 设置为 C:\Program Files\Java\jdk1.8.0_101 |
Linux |
export JAVA_HOME = /usr/local/java-current |
Mac |
export JAVA_HOME = /Library/Java/Home |
在系统路径中追加 Java 编译器位置。
OS |
Output |
Windows |
在系统变量 Path 的结尾追加字符串 C:\Program Files\Java\jdk1.8.0_101\bin 。 |
Linux |
export PATH = $PATH:$JAVA_HOME/bin/ |
Mac |
not required |
如上所述,使用命令 java -version 验证 Java 安装。
Step 3 - Download RxJava2 Archive
从 RxJava @ MVNRepository 下载 RxJava jar 文件的最新版本及其关联项 Reactive Streams @ MVNRepository 。在本教程的编写时,我们下载了 rxjava-2.2.4.jar 和 reactive-streams-1.0.2.jar,并将其复制到了 C:\>RxJava 文件夹中。
OS |
Archive name |
Windows |
rxjava-2.2.4.jar, reactive-streams-1.0.2.jar |
Linux |
rxjava-2.2.4.jar, reactive-streams-1.0.2.jar |
Mac |
rxjava-2.2.4.jar, reactive-streams-1.0.2.jar |
Step 4 - Set RxJava Environment
设置 RX_JAVA 环境变量以指向存储 RxJava jar 的你的机器上的基本目录位置。我们假设已将 rxjava-2.2.4.jar 和 reactive-streams-1.0.2.jar 存储在 RxJava 文件夹中。
Sr.No |
OS & Description |
1 |
Windows 将环境变量 RX_JAVA 设置为 C:\RxJava |
2 |
Linux export RX_JAVA = /usr/local/RxJava |
3 |
Mac export RX_JAVA = /Library/RxJava |
Step 5 - Set CLASSPATH Variable
设置 CLASSPATH 环境变量以指向 RxJava jar 位置。
Sr.No |
OS & Description |
1 |
Windows 将环境变量 CLASSPATH 设置为 %CLASSPATH%;%RX_JAVA%\rxjava-2.2.4.jar;%RX_JAVA%\reactive-streams-1.0.2.jar;.; |
2 |
Linux export CLASSPATH = $CLASSPATH:$RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar:. |
3 |
Mac export CLASSPATH = $CLASSPATH:$RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar:. |
RxJava - How Observable works
RxJava - Creating Observables
以下是可以创建可观察序列的基类。
-
Flowable − 0..N 次流,发出 0 个或 n 个项目。支持 Reactive-Streams 和背压。
-
Observable − 0..N 次流,但无背压。
-
Single − 1 个项目或错误。可以视为方法调用的响应版本。
-
Completable − 不发出任何项目。用作完成或错误的信号。可以视为 Runnable 的响应版本。
-
MayBe − 不发出任何项目或发出 1 个项目。可以视为 Optional 的响应版本。
以下是 Observable 类中创建可观察序列的便利方法。
-
just(T item) − 返回一个 Observable,它对给定的(常量引用)项目发出信号,然后完成。
-
fromIterable(Iterable source) − 将可迭代序列转换为 ObservableSource,它发出序列中的项目。
-
fromArray(T…​ items) − 将数组转换为 ObservableSource,它发出数组中的项目。
-
fromCallable(Callable supplier) − 返回一个 Observable,当观察者订阅它时,它会调用你指定的一个函数,然后发出该函数返回的值。
-
fromFuture(Future future) − 将 Future 转换为 ObservableSource。
-
interval(long initialDelay, long period, TimeUnit unit) − 返回一个 Observable,它在初始延迟后发出 0L,并在此之后,每隔一段时间就发出递增的数字。
RxJava - Single Observable
Single 类表示单值响应。Single observable 只能发出单个成功值或一个错误。它不发出 onComplete 事件。
Class Declaration
以下为 io.reactivex.Single<T> 类的声明:
public abstract class Single<T>
extends Object
implements SingleSource<T>
Single Example
在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。
ObservableTester.java
import java.util.concurrent.TimeUnit;
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
//Create the observable
Single<String> testSingle = Single.just("Hello World");
//Create an observer
Disposable disposable = testSingle
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(
new DisposableSingleObserver<String>() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onSuccess(String value) {
System.out.println(value);
}
});
Thread.sleep(3000);
//start observing
disposable.dispose();
}
}
RxJava - MayBe Observable
MayBe 类表示延迟响应。MayBe 可观察对象可以发出一个成功的单个值或不发出值。
Class Declaration
以下为 io.reactivex.Single<T> 类的声明:
public abstract class Maybe<T>
extends Object
implements MaybeSource<T>
MayBe Example
在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。
ObservableTester.java
import java.util.concurrent.TimeUnit;
import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
//Create an observer
Disposable disposable = Maybe.just("Hello World")
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableMaybeObserver<String>() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onSuccess(String value) {
System.out.println(value);
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(3000);
//start observing
disposable.dispose();
}
}
RxJava - Completable Observable
Completable 类表示延迟响应。Completable 可观察序列可以指示完成成功或错误。
Class Declaration
以下是 io.reactivex.Completable 类的声明 −
public abstract class Completable
extends Object
implements CompletableSource
Completable Example
在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。
ObservableTester.java
import java.util.concurrent.TimeUnit;
import io.reactivex.Completable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableCompletableObserver;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
//Create an observer
Disposable disposable = Completable.complete()
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableCompletableObserver() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onStart() {
System.out.println("Started!");
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(3000);
//start observing
disposable.dispose();
}
}
RxJava - Using CompositeDisposable
CompositeDisposable 类表示一个容器,可以容纳多个可释放对象,并提供 O(1) 的复杂度来添加和删除可释放对象。
Class Declaration
以下是 io.reactivex.disposables.CompositeDisposable 类的声明 −
public final class CompositeDisposable
extends Object
implements Disposable, io.reactivex.internal.disposables.DisposableContainer
CompositeDisposable Example
在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Maybe;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
CompositeDisposable compositeDisposable = new CompositeDisposable();
//Create an Single observer
Disposable disposableSingle = Single.just("Hello World")
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(
new DisposableSingleObserver<String>() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onSuccess(String value) {
System.out.println(value);
}
});
//Create an observer
Disposable disposableMayBe = Maybe.just("Hi")
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableMaybeObserver<String>() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onSuccess(String value) {
System.out.println(value);
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(3000);
compositeDisposable.add(disposableSingle);
compositeDisposable.add(disposableMayBe);
//start observing
compositeDisposable.dispose();
}
}
RxJava - Creating Operators
以下是用于创建可观察内容的操作符。
Sr.No. |
Operator & Description |
1 |
Create 从头开始创建可观察内容,并允许观察者方法以编程方式调用。 |
2 |
Defer 在观察者订阅之前不要创建一个 Observable。为每个观察者创建新的 observable。 |
3 |
Empty/Never/Throw 创建一个行为受限的 Observable。 |
4 |
From 将对象/数据结构转换成 Observable。 |
5 |
Interval 创建一个 Observable 按照指定的时间间隔顺序发出整数值。 |
6 |
Just 将一个对象/数据结构转换成 Observable 以发出相同或相同类型的对象。 |
7 |
Range 创建一个 Observable 按照给定范围顺序发出整数值。 |
8 |
Repeat 创建一个 Observable 顺序重复发出整数值。 |
9 |
Start 创建一个 Observable 以发出一个函数的返回值。 |
10 |
Timer 创建一个 Observable 以在指定延迟后发出一个单一项目。 |
Creating Operator Example
在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Observable;
//Using fromArray operator to create an Observable
public class ObservableTester {
public static void main(String[] args) {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
Observable<String> observable = Observable.fromArray(letters);
observable
.map(String::toUpperCase)
.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}
RxJava - Transforming Operators
以下是用于转换从 Observable 发出的项的操作符。
Sr.No. |
Operator & Description |
1 |
Buffer 以一定频率从 Observable 收集项到包中,然后发出包而不是项。 |
2 |
FlatMap 用于嵌套的 Observable 中。将项转换为 Observable。然后将项压平到单个 Observable 中。 |
3 |
GroupBy 将 Observable 拆分为按键组织的一组 Observable,以发出不同组的项。 |
4 |
Map 向每个发出的项应用一个函数来对其进行转换。 |
5 |
Scan 向每个发出的项应用一个函数,按照顺序依次发出后续值。 |
6 |
Window 以一定频率从 Observable 收集项到 Observable 窗口,然后发出窗口而不是项。 |
Transforming Operator Example
在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Observable;
//Using map operator to transform an Observable
public class ObservableTester {
public static void main(String[] args) {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
Observable<String> observable = Observable.fromArray(letters);
observable
.map(String::toUpperCase)
.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}
RxJava - Filtering Operators
以下是用于有选择地从 Observable 发出项目的操作符。
Sr.No. |
Operator & Description |
1 |
Debounce 仅在没有发出其他项目的情况下出现超时时才发出项目。 |
2 |
Distinct Emits only unique items. |
3 |
ElementAt 仅发出 Observable 发出的第 n 个索引的项目。 |
4 |
Filter 只发出通过给定谓词函数的那些项目。 |
5 |
First 发出第一个项目,或通过给定准则的第一个项目。 |
6 |
IgnoreElements 不从 Observable 发出任何项目,但会标记为完成。 |
7 |
Last 从 Observable 发出最后一个元素。 |
8 |
Sample 使用给定的时间间隔发出最新的项目。 |
9 |
Skip 从 Observable 跳过前 n 个项目。 |
10 |
SkipLast 从 Observable 跳过最后 n 个项目。 |
11 |
Take 从 Observable 取前 n 个项目。 |
12 |
TakeLast 从 Observable 取最后 n 个项目。 |
Filtering Operator Example
在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Observable;
//Using take operator to filter an Observable
public class ObservableTester {
public static void main(String[] args) {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
Observable<String> observable = Observable.fromArray(letters);
observable
.take(2)
.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}
RxJava - Combining Operators
以下是用于从多个 Observable 中创建单个 Observable 的操作符。
Sr.No. |
Operator & Description |
1 |
*And/Then/When*使用模式和计划中介结合项集。 |
2 |
*CombineLatest*通过指定函数结合每个 Observable 发出的最新项,并发出结果项。 |
3 |
*Join*结合两个 Observable 发出的项,如果是在第二个 Observable 发出项的时间范围内发出的。 |
4 |
*Merge*结合 Observable 发出的项。 |
5 |
*StartWith*在开始发出源 Observable 项之前,先发出一个指定项序列 |
6 |
*Switch*发出 Observable 发出的最新项。 |
7 |
*Zip*根据函数结合 Observable 的项,并发出结果项。 |
Combining Operator Example
在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Observable;
//Using combineLatest operator to combine Observables
public class ObservableTester {
public static void main(String[] args) {
Integer[] numbers = { 1, 2, 3, 4, 5, 6};
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
Observable<String> observable1 = Observable.fromArray(letters);
Observable<Integer> observable2 = Observable.fromArray(numbers);
Observable.combineLatest(observable1, observable2, (a,b) -> a + b)
.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}
RxJava - Utility Operators
以下是通常与 Observable 一起使用的操作符。
Sr.No. |
Operator & Description |
1 |
Delay 注册操作以处理 Observable 生命周期事件。 |
2 |
Materialize/Dematerialize 表示已发出的项和发送的通知。 |
3 |
ObserveOn 指定要观察的调度程序。 |
4 |
Serialize 强制 Observable 执行序列化调用。 |
5 |
Subscribe 对 Observable 的项目和通知(如完成)的发出进行操作 |
6 |
SubscribeOn 订阅 Observable 时,指定要使用的调度程序。 |
7 |
TimeInterval 将 Observable 转换为发出两次发出之间经过的时间量指示。 |
8 |
Timeout 如果没有发出任何项目,在指定的时间发生时发出错误通知。 |
9 |
Timestamp 为每个发出的项目附加时间戳。 |
9 |
Using 创建一个与 Observable 具有相同生命周期的可处置资源。 |
Utility Operator Example
在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Observable;
//Using subscribe operator to subscribe to an Observable
public class ObservableTester {
public static void main(String[] args) {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
Observable<String> observable = Observable.fromArray(letters);
observable.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}
RxJava - Conditional Operators
以下是用于计算一个或多个 Observable 或已发出项的运算符。
Sr.No. |
Operator & Description |
1 |
All 评估所有已发出的项,以满足给定条件。 |
2 |
Amb 仅发出多个 Observable 的第一个 Observable 的所有项。 |
3 |
Contains 检查 Observable 是否发出了特定项。 |
4 |
DefaultIfEmpty 如果 Observable 什么都没有发出,则发出默认项。 |
5 |
SequenceEqual 检查两个 Observable 是否发出相同的项序列。 |
6 |
SkipUntil 丢弃第一个 Observable 发出的项,直至第二个 Observable 发出一个项。 |
7 |
SkipWhile 在给定条件变为假之前,丢弃 Observable 发出的项。 |
8 |
TakeUntil 在第二个 Observable 发出一个项或终止后,丢弃 Observable 发出的项。 |
9 |
TakeWhile 在指定条件变为假后,丢弃 Observable 发出的项。 |
Conditional Operator Example
在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Observable;
//Using defaultIfEmpty operator to operate on an Observable
public class ObservableTester {
public static void main(String[] args) {
final StringBuilder result = new StringBuilder();
Observable.empty()
.defaultIfEmpty("No Data")
.subscribe(s -> result.append(s));
System.out.println(result);
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result1 = new StringBuilder();
Observable.fromArray(letters)
.firstElement()
.defaultIfEmpty("No data")
.subscribe(s -> result1.append(s));
System.out.println(result1);
}
}
RxJava - Mathematical Operators
以下是操作 Observable 发出的整个项目的运算符。
Sr.No. |
Operator & Description |
1 |
Average 评估所有项目的平均值并发出结果。 |
2 |
Concat 从多个 Observable 中发出所有项目,不交错。 |
3 |
Count 计算所有项目并发出结果。 |
4 |
Max 评估所有项目的最大值项目并发出结果。 |
5 |
Min 评估所有项目的最小值项目并发出结果。 |
6 |
Reduce 对每个项目应用一个函数并返回结果。 |
7 |
Sum 评估所有项目的总和并发出结果。 |
Mathematical Operator Example
在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Observable;
//Using concat operator to operate on multiple Observables
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Integer[] numbers = { 1, 2, 3, 4, 5, 6};
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
Observable<String> observable1 = Observable.fromArray(letters);
Observable<Integer> observable2 = Observable.fromArray(numbers);
Observable.concat(observable1, observable2)
.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}
RxJava - Connectable Operators
以下是更精确控制订阅的运算符。
Sr.No. |
Operator & Description |
1 |
Connect 指示可连接的 Observable 向其订阅者发出项目。 |
2 |
Publish 将 Observable 转换为可连接的 Observable。 |
3 |
RefCount 将可连接 Observable 转换为普通 Observable。 |
4 |
Replay 即使 Observable 已开始发出项且订阅者稍后才订阅,也要确保每个订阅者都看到相同的已发出项序列。 |
Connectable Operator Example
在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
//Using connect operator on a ConnectableObservable
public class ObservableTester {
public static void main(String[] args) {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
ConnectableObservable<String> connectable = Observable.fromArray(letters).publish();
connectable.subscribe(letter -> result.append(letter));
System.out.println(result.length());
connectable.connect();
System.out.println(result.length());
System.out.println(result);
}
}
RxJava - Subjects
根据 Reactive ,一个 Subject 既可以充当 Observable,又可以充当 Observer。
有四种类型的 Subject −
Sr.No. |
Subject & Description |
1 |
Publish Subject 仅发出订阅时间之后发出的那些项。 |
2 |
*重播 Subject*发出源 Observable 发出的所有项,而不管它何时订阅 Observable。 |
3 |
Behavior Subject 在订阅后,发出最晚的项,然后继续发出源 Observable 发出的项。 |
4 |
Async Subject 在发出完最后一个项目后,Observable 会发出此项目。 |
RxJava - PublishSubject
PublishSubject 向当前已订阅的 Observer 发出项,并向当前或迟到的 Observer 发出终结事件。
Class Declaration
以下是 io.reactivex.subjects.PublishSubject<T> 类的声明 −
public final class PublishSubject<T>
extends Subject<T>
PublishSubject Example
在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。
ObservableTester.java
import io.reactivex.subjects.PublishSubject;
public class ObservableTester {
public static void main(String[] args) {
final StringBuilder result1 = new StringBuilder();
final StringBuilder result2 = new StringBuilder();
PublishSubject<String> subject = PublishSubject.create();
subject.subscribe(value -> result1.append(value) );
subject.onNext("a");
subject.onNext("b");
subject.onNext("c");
subject.subscribe(value -> result2.append(value));
subject.onNext("d");
subject.onComplete();
//Output will be abcd
System.out.println(result1);
//Output will be d only
//as subscribed after c item emitted.
System.out.println(result2);
}
}
RxJava - BehaviorSubject
BehaviorSubject 向每个订阅的观察者发出它已观察到的最新项,然后发出所有后续观察到的项。
Class Declaration
以下是对 io.reactivex.subjects.BehaviorSubject<T> 类的声明:
public final class BehaviorSubject<T>
extends Subject<T>
BehaviorSubject Example
在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。
ObservableTester.java
import io.reactivex.subjects.BehaviorSubject;
public class ObservableTester {
public static void main(String[] args) {
final StringBuilder result1 = new StringBuilder();
final StringBuilder result2 = new StringBuilder();
BehaviorSubject<String> subject = BehaviorSubject.create();
subject.subscribe(value -> result1.append(value) );
subject.onNext("a");
subject.onNext("b");
subject.onNext("c");
subject.subscribe(value -> result2.append(value));
subject.onNext("d");
subject.onComplete();
//Output will be abcd
System.out.println(result1);
//Output will be cd being BehaviorSubject
//(c is last item emitted before subscribe)
System.out.println(result2);
}
}
RxJava - ReplaySubject
ReplaySubject 会将事件/项目重放给当前和后来的观察者。
Class Declaration
下面是 io.reactivex.subjects.ReplaySubject<T> 类声明——
public final class ReplaySubject<T>
extends Subject<T>
ReplaySubject Example
在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。
ObservableTester.java
import io.reactivex.subjects.ReplaySubject;
public class ObservableTester {
public static void main(String[] args) {
final StringBuilder result1 = new StringBuilder();
final StringBuilder result2 = new StringBuilder();
ReplaySubject<String> subject = ReplaySubject.create();
subject.subscribe(value -> result1.append(value) );
subject.onNext("a");
subject.onNext("b");
subject.onNext("c");
subject.subscribe(value -> result2.append(value));
subject.onNext("d");
subject.onComplete();
//Output will be abcd
System.out.println(result1);
//Output will be abcd being ReplaySubject
//as ReplaySubject emits all the items
System.out.println(result2);
}
}
RxJava - AsyncSubject
AsyncSubject 仅发出最后的值,然后向观察者发出完成事件或接收到的错误。
Class Declaration
io.reactivex.subjects.AsyncSubject<T> 类的声明如下:
public final class AsyncSubject<T>
extends Subject<T>
AsyncSubject Example
在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。
ObservableTester.java
import io.reactivex.subjects. AsyncSubject;
public class ObservableTester {
public static void main(String[] args) {
final StringBuilder result1 = new StringBuilder();
final StringBuilder result2 = new StringBuilder();
AsyncSubject<String> subject = AsyncSubject.create();
subject.subscribe(value -> result1.append(value) );
subject.onNext("a");
subject.onNext("b");
subject.onNext("c");
subject.subscribe(value -> result2.append(value));
subject.onNext("d");
subject.onComplete();
//Output will be d being the last item emitted
System.out.println(result1);
//Output will be d being the last item emitted
System.out.println(result2);
}
}
RxJava - Schedulers
在多线程环境中使用计划程序来使用 Observable 运算符。
根据 Reactive ,计划程序用于计划运算符链应用到不同的线程的方式。
默认情况下,一个 Observable 及其应用到它的运算符链将在调用其 Subscribe 方法的同一线程上执行其工作,并通知其观察者。SubscribeOn 运算符通过指定 Observable 应该在它上操作的不同计划程序来更改此行为。ObserveOn 运算符指定 Observable 将使用它的观察者来发送通知的不同计划程序。
RxJava 中有以下类型的计划程序可用:
Sr.No. |
Scheduler & Description |
1 |
Schedulers.computation() 创建并返回一个用于计算工作的计划程序。要计划的线程数取决于系统中存在的 CPU。允许每个 CPU 一个线程。最适合事件循环或回调操作。 |
2 |
Schedulers.io() 创建并返回一个用于 IO 绑定工作的计划程序。线程池可能根据需要扩展。 |
3 |
Schedulers.newThread() 创建并返回一个为每个工作单元创建一个新线程的计划程序。 |
4 |
Schedulers.trampoline() 创建并返回一个将工作排队到当前线程在当前工作完成后执行的计划程序。 |
4 |
Schedulers.from(java.util.concurrent.Executor executor) 将 Executor 转换为一个新的计划程序实例。 |
RxJava - Trampoline Scheduler
Schedulers.trampoline() 方法创建并返回一个 Scheduler,该 Scheduler 将队列中的工作置于当前线程上,以便在当前工作完成后执行。
Schedulers.trampoline() Example
在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。
ObservableTester.java
import java.util.Random;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable.just("A", "AB", "ABC")
.flatMap(v -> getLengthWithDelay(v)
.doOnNext(s -> System.out.println("Processing Thread "
+ Thread.currentThread().getName()))
.subscribeOn(Schedulers.trampoline()))
.subscribe(length -> System.out.println("Receiver Thread "
+ Thread.currentThread().getName()
+ ", Item length " + length));
Thread.sleep(10000);
}
protected static Observable<Integer> getLengthWithDelay(String v) {
Random random = new Random();
try {
Thread.sleep(random.nextInt(3) * 1000);
return Observable.just(v.length());
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
Verify the Result
按照如下方式使用 javac 编译器编译类 −
C:\RxJava>javac ObservableTester.java
现在按照如下所示运行 ObservableTester:
C:\RxJava>java ObservableTester
它应该产生以下输出 −
Processing Thread main
Receiver Thread main, Item length 1
Processing Thread main
Receiver Thread main, Item length 2
Processing Thread main
Receiver Thread main, Item length 3
RxJava - NewThread Scheduler
Schedulers.newThread() 方法创建一个调度器,并为每个工作单元创建一个新线程,然后返回该调度器。
Schedulers.newThread() Example
在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。
ObservableTester.java
import java.util.Random;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable.just("A", "AB", "ABC")
.flatMap(v -> getLengthWithDelay(v)
.doOnNext(s -> System.out.println("Processing Thread "
+ Thread.currentThread().getName()))
.subscribeOn(Schedulers.newThread()))
.subscribe(length -> System.out.println("Receiver Thread "
+ Thread.currentThread().getName()
+ ", Item length " + length));
Thread.sleep(10000);
}
protected static Observable<Integer> getLengthWithDelay(String v) {
Random random = new Random();
try {
Thread.sleep(random.nextInt(3) * 1000);
return Observable.just(v.length());
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
Verify the Result
按照如下方式使用 javac 编译器编译类 −
C:\RxJava>javac ObservableTester.java
现在按照如下所示运行 ObservableTester:
C:\RxJava>java ObservableTester
它应该产生以下输出 −
Processing Thread RxNewThreadScheduler-1
Receiver Thread RxNewThreadScheduler-1, Item length 1
Processing Thread RxNewThreadScheduler-2
Receiver Thread RxNewThreadScheduler-2, Item length 2
Processing Thread RxNewThreadScheduler-3
Receiver Thread RxNewThreadScheduler-3, Item length 3
RxJava - Computation Scheduler
Schedulers.computation() 方法创建并返回一个专用于计算工作的 Scheduler。要调度的线程数取决于系统中存在的 CPU。每个 CPU 允许一个线程。最适合事件循环或回叫操作。
Schedulers.computation() Example
在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。
ObservableTester.java
import java.util.Random;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable.just("A", "AB", "ABC")
.flatMap(v -> getLengthWithDelay(v)
.doOnNext(s -> System.out.println("Processing Thread "
+ Thread.currentThread().getName()))
.subscribeOn(Schedulers.computation()))
.subscribe(length -> System.out.println("Receiver Thread "
+ Thread.currentThread().getName()
+ ", Item length " + length));
Thread.sleep(10000);
}
protected static Observable<Integer> getLengthWithDelay(String v) {
Random random = new Random();
try {
Thread.sleep(random.nextInt(3) * 1000);
return Observable.just(v.length());
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
Verify the Result
按照如下方式使用 javac 编译器编译类 −
C:\RxJava>javac ObservableTester.java
现在按照如下所示运行 ObservableTester:
C:\RxJava>java ObservableTester
它应该产生以下输出 −
Processing Thread RxComputationThreadPool-1
Receiver Thread RxComputationThreadPool-1, Item length 1
Processing Thread RxComputationThreadPool-2
Receiver Thread RxComputationThreadPool-2, Item length 2
Processing Thread RxComputationThreadPool-3
Receiver Thread RxComputationThreadPool-3, Item length 3
RxJava - IO Scheduler
Schedulers.io() 方法创建一个调度器,并为 IO 绑定工作返回该调度器。线程池可能会根据需要进行扩展。最适用于 I/O 密集型操作。
Schedulers.io() Example
在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。
ObservableTester.java
import java.util.Random;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable.just("A", "AB", "ABC")
.flatMap(v -> getLengthWithDelay(v)
.doOnNext(s -> System.out.println("Processing Thread "
+ Thread.currentThread().getName()))
.subscribeOn(Schedulers.io()))
.subscribe(length -> System.out.println("Receiver Thread "
+ Thread.currentThread().getName()
+ ", Item length " + length));
Thread.sleep(10000);
}
protected static Observable<Integer> getLengthWithDelay(String v) {
Random random = new Random();
try {
Thread.sleep(random.nextInt(3) * 1000);
return Observable.just(v.length());
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
Verify the Result
按照如下方式使用 javac 编译器编译类 −
C:\RxJava>javac ObservableTester.java
现在按照如下所示运行 ObservableTester:
C:\RxJava>java ObservableTester
它应该产生以下输出 −
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 1
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 2
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 3
RxJava - From Scheduler
Schedulers.from(Executor) 方法将 Executor 转换为新的 Scheduler 实例。
Schedulers.from(Executor) Example
在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。
ObservableTester.java
import java.util.Random;
import java.util.concurrent.Executors;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable.just("A", "AB", "ABC")
.flatMap(v -> getLengthWithDelay(v)
.doOnNext(s -> System.out.println("Processing Thread "
+ Thread.currentThread().getName()))
.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3))))
.subscribe(length -> System.out.println("Receiver Thread "
+ Thread.currentThread().getName()
+ ", Item length " + length));
Thread.sleep(10000);
}
protected static Observable<Integer> getLengthWithDelay(String v) {
Random random = new Random();
try {
Thread.sleep(random.nextInt(3) * 1000);
return Observable.just(v.length());
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
Verify the Result
按照如下方式使用 javac 编译器编译类 −
C:\RxJava>javac ObservableTester.java
现在按照如下所示运行 ObservableTester:
C:\RxJava>java ObservableTester
它应该产生以下输出 −
Processing Thread pool-1-thread-1
Processing Thread pool-3-thread-1
Receiver Thread pool-1-thread-1, Item length 1
Processing Thread pool-4-thread-1
Receiver Thread pool-4-thread-1, Item length 3
Receiver Thread pool-3-thread-1, Item length 2
RxJava - Buffering
缓存操作符允许将 Observable 发出的项目收集到列表或捆绑包中,并发出这些捆绑包,而不是项目。在下面的示例中,我们创建了一个 Observable 来发出 9 个项目,并使用缓存,一起发出 3 个项目。
Buffering Example
在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable<Integer> observable = Observable.just(1, 2, 3, 4,
5, 6, 7, 8, 9);
observable.subscribeOn(Schedulers.io())
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.buffer(3)
.subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("Subscribed");
}
@Override
public void onNext(List<Integer> integers) {
System.out.println("onNext: ");
for (Integer value : integers) {
System.out.println(value);
}
}
@Override
public void onError(Throwable e) {
System.out.println("Error");
}
@Override
public void onComplete() {
System.out.println("Done! ");
}
});
Thread.sleep(3000);
}
}
RxJava - Windowing
窗口操作符的工作方式类似于缓冲操作符,但是它允许将 Observable 发出的项目收集到另一个 Observable,而不是收集到集合中,并发出这些 Observable,而不是 collection。在下面的示例中,我们创建了一个 Observable 来发出 9 个项目,并使用窗口操作符,一起发出 3 个 Observable。
Windowing Example
在任意编辑器中使用任何您选择的语言(如 C:\>RxJava) 创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable<Integer> observable = Observable.just(1, 2, 3, 4,
5, 6, 7, 8, 9);
observable.subscribeOn(Schedulers.io())
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.window(3)
.subscribe(new Observer<Observable<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("Subscribed");
}
@Override
public void onNext(Observable<Integer> integers) {
System.out.println("onNext: ");
integers.subscribe(value -> System.out.println(value));
}
@Override
public void onError(Throwable e) {
System.out.println("Error");
}
@Override
public void onComplete() {
System.out.println("Done! ");
}
});
Thread.sleep(3000);
}
}