Mutiny - Async for mere mortals

An event-driven reactive programming API

Mutiny 与其他响应式编程库非常不同。它采用了不同的方法来设计你的程序。使用 Mutiny 时,所有内容都是事件驱动的:你收到事件,然后对此做出反应。此事件驱动方面融合了分布式系统的异步特性,并提供了一种优雅且精确的方法来表示延续。

Mutiny 提供了两种类型,它们既基于事件也基于惰性:

  • `Uni`发出一个单一事件(一个项目或一个故障)。Uni 便于表示返回 0 或 1 个结果的异步操作。一个好的示例是发送消息到消息代理程序队列的结果。

  • `Multi`发出多个事件(n 个项目、1 个故障或 1 个完成)。Multi 可以表示项目的流,可能无界。一个好的示例是从一个消息代理程序队列接收消息。

这两种类型允许表示任何类型的交互。它们就是事件源。您可以观察它们 (subscription),当它们发出项目、故障或有界 Multi 中的完成事件时,您将收到通知。当您(订阅者)收到事件时,您可以对其进行处理(例如,转换和筛选)。借助 Mutiny,您可以编写代码(如 onX().action()),该代码可读作“在项目 X 上执行操作”。

如果您想进一步了解 Mutiny 及其背后的概念,请查看 the Mutiny Reference documentation

Mutiny in Quarkus

Mutiny 是在处理来自 Quarkus 的响应式特性时的主要 API。这意味着大多数扩展支持 Mutiny,方法是以返回 Uni 和 Multi 的 API 暴露(如响应式数据源或 Rest 客户端)或理解您的方法何时返回 Uni 或 Multi(如 Quarkus REST(以前称为 RESTEasy Reactive)或响应式消息传递)。

这些集成使 Mutiny 成为使用 Quarkus 开发的每个响应式应用程序的突出且内聚的模型。此外,Mutiny 架构允许精细的死代码消除,从而改进了编译成本机时(例如使用 Quarkus 本机模式或 GraaIVM 本机映像编译器)的内存使用。

Why another reactive programming API?

经验丰富的响应式开发人员可能会疑惑,为什么 Quarkus 还会引入其他响应式编程 API,而现有的已有很多。Mutiny 采用了截然不同的角度:

Event-Driven-Mutiny 将事件置于其设计的核心。借助 Mutiny,您可以观察事件、对它们做出反应,并创建优雅且可读的处理管道。不必拥有函数式编程的博士学位。

Navigable-即使拥有智能代码补全,拥有数百种方法的类也很令人困惑。Mutiny 提供了一个可导航和显式的 API,引导您走向您需要的操作符。

Non-Blocking I/O-Mutiny 是应对带有非阻塞 I/O(为 Quarkus提供支持)的应用程序异步特性的理想伴侣。声明性地组合操作、转换数据、强制执行进度、从故障中恢复,等等。

Made for an asynchronous world-Mutiny 可以用于任何异步应用程序,如事件驱动的微服务、基于消息的应用程序、网络实用程序、数据流处理,当然还有…​ 响应式应用程序!

Reactive Streams and Converters Built-In-Mutiny 基于 Reactive Streams规范,因此它可以与任何其他响应式编程库集成。此外,它还提议转换器与其他流行库交互。

Mutiny and the I/O Threads

Quarkus 由 reactive engine提供支持,在开发响应式应用程序时,您的代码将在少数几个 I/O 线程之一上执行。请记住,您永远不能阻塞这些线程,如果您这样做,模型将会崩溃。因此,您不能使用阻塞 I/O。您需要安排 I/O 操作并传递一个延续。

reactive thread

Mutiny 事件驱动范式为此量身定制。当 I/O 操作成功完成时,表示它的 Uni 将发出一个项目事件。当它失败时,它将发出一个故障事件。使用事件驱动 API 可以简单而自然地表示延续。

Mutiny through Examples

许多 Quarkus 扩展都公开了 Mutiny API。在本节中,我们将使用 MongoDB 扩展来说明如何使用 Mutiny。

让我们设想一个表示 periodic table中元素的简单结构:

public class Element {

   public String name;
   public String symbol;
   public int position;

   public Element(String name, String symbol, int position) {
       this.name = name;
       this.symbol = symbol;
       this.position = position;
   }

   public Element() {
        // Use by JSON mappers
   }
}

此结构包含元素的名称、符号和位置。要检索和存储元素到一个 Mongo 集合,您可以使用以下代码:

@ApplicationScoped
public class ElementService {

   final ReactiveMongoCollection<Element> collection;

   @Inject
   ElementService(ReactiveMongoClient client) {
       collection = client.getDatabase("quarkus")
               .getCollection("elements", Element.class);
   }

   public void add(Element element) {
       Uni<InsertOneResult> insertion = collection.insertOne(element);
       insertion
           .onItem().transform(r -> r.getInsertedId().asString())
           .subscribe().with(
               result -> System.out.println("inserted " + result),
               failure -> System.out.println("D'oh" + failure));
   }

   public void getAll() {
       collection.find()
           .subscribe().with(
              element -> System.out.println("Element: " + element),
             failure -> System.out.println("D'oh! " + failure),
             () -> System.out.println("No more elements")
       );
   }

}

首先,注入 Mongo 客户端。请注意,它使用了响应式 varient (io.quarkus.mongodb.reactive.ReactiveMongoClient)。在 initialize 方法中,我们检索和存储将插入元素的集合。

add 方法在一个集合中插入元素。它接收元素作为一个参数并使用集合的响应式 API。与数据库交互涉及 I/O。响应式原则禁止在等待交互完成时进行阻塞。相反,我们调度操作并传递延续。insertOne 方法返回一个 Uni,即异步操作。经预定的 I/O 就是如此。我们现在需要表达延续,这使用 .onItem() 方法完成。.onItem() 允许配置在观察到的 Uni 发射项时需要发生的操作,在我们案例中,即在预定 I/O 完成时。在此示例中,我们提取插入的文档 ID。最后一步是订购。没有它,一切都不会发生。订购触发操作。订购方法还可以定义处理程序:id 成功时的值或插入失败时的失败。

现在我们来了解第二个方法。它会检索所有存储的元素。在此案例中,它会返回多个项(每个存储的元素一项),所以我们正在使用一个 Multi。就像插入一样,获取存储的元素涉及 I/O。find 是我们的操作。就像对于 Uni 一样,您需要订购以触发操作。订购者接收项事件、失败事件或所有元素全部接收后产生的完成事件。

订购 Uni 或 Multi 至关重要,因为如果没有订购,操作永远不会执行。在 Quarkus 中,某些扩展为您处理订购。例如,在 Quarkus REST 中,您的 HTTP 方法可以返回 Uni 或 Multi,Quarkus REST 处理订购。

Mutiny Patterns

上一部分中的示例故意简单。接下来我们来了解一些常见模式。

Observing events

你可以通过以下方法来观察各种类型的事件:

on{event}().invoke(ev → System.out.println(ev));

例如,对于项,使用:onItem().invoke(item → …​);

对于失败,使用:onFailure().invoke(failure → …​)

invoke 方法是同步的。有时候你需要执行异步操作。在这种情况下,使用 call,如:onItem().call(item → someAsyncAction(item))。请注意,call 不会更改项,它只调用异步操作,并且当该操作完成后,它会向下游发出原始项。

Transforming item

第一个仪表模式包括转换你收到的项事件。正如我们在上一节中看到的,它可以指示数据库中存储元素的成功插入。

转换项使用以下方法完成:onItem().transform(item → …​.)

可以在 Mutiny documentation 中找到有关转换的更多详细信息。

Sequential composition

顺序组合允许对依赖异步操作进行链接。这是使用 onItem().transformToUni(item → …​) 实现的。与 transform 不同,传递给 transformToUni 的函数返回一个 Uni。

Uni<String> uni1 = …
uni1
.onItem().transformToUni(item -> anotherAsynchronousAction(item));

可以在 Mutiny documentation 中找到有关异步转换的更多详细信息。

Failure handling

到目前为止,我们只处理项事件,但处理失败也是至关重要的。你可以使用 onFailure() 来处理失败。

例如,你可以使用 onFailure().recoverWithItem(fallback) 来恢复带有后备项:

Uni<String> uni1 = …
uni1
.onFailure().recoverWithItem(“my fallback value”);

你也可以重试操作,如下所示:

Uni<String> uni1 = …
uni1
.onFailure().retry().atMost(5);

可以在 the handling failure documentationthe retrying on failures documentation 上找到有关故障恢复的更多信息。

Events and Actions

下表列出了你可以为 Uni 和 Multi 接收的事件。它们中的每一个都与自己的组相关联(onX)。第二个表列出了你对事件可以执行的经典操作。请注意,有些组提供了更多可能。

Events from the upstream Events from the downstream

Uni

订阅 (1)、项 (0..1)、失败 (0..1)

Cancellation

Multi

订阅 (1)、项 (0..n)、失败 (0..1)、完成 (0..1)

Cancellation, Request

the event documentation上查看事件完整列表。

Action API Comment

transform

onItem().transform(Function<I, O> function);

使用同步函数将事件转换为另一个事件。下游会收到函数的结果(或在转换失败时收到一个失败)。

transformToUni

onItem().transformToUni(Function<I, Uni<O>> function);

使用异步函数将事件转换为另一个事件。下游会收到由生成的 Uni 发出的项(或在转换失败时收到一个失败)。如果生成的 Uni 发出一个失败,那么该失败会传递给下游。

invoke

onItem().invoke(Consumer<I> consumer)

调用同步使用者。这对于执行副作用操作尤其方便。下游会收到原始事件,或在使用者抛出异常时收到一个失败。

call

onItem().call(Function<I, Uni<?>>)

调用异步函数。这对于执行异步副作用操作尤其方便。下游会收到原始事件,或在使用者抛出异常或所生成的 Uni 发出一个失败时收到一个失败。

fail

onItem().failWith(Function<I, Throwable>)

收到事件时发出一个失败。

complete (Multi only)

onItem().complete()

在收到事件时发出完成事件。

Other patterns

Mutiny 提供许多其他功能。访问 Mutiny documentation查看更多示例,包括事件完整列表以及如何处理它们。

以下是几个常见指南:

Shortcuts

在使用 Uni 时,可能需要写 onItem(),这可能会比较繁琐。幸运的是,Mutiny 提供一些快捷方式让你的代码更简洁:

Shortcut Equivalent

uni.map(x → y)

uni.onItem().transform(x → y)

uni.flatMap(x → uni2)

uni.onItem().transformToUni(x → uni2)

uni.chain(x → uni2)

uni.onItem().transformToUni(x → uni2)

uni.invoke(x → System.out.println(x))

uni.onItem().invoke(x → System.out.println(x))

uni.call(x → uni2)

uni.onItem().call(x → uni2)

uni.eventually) → System.out.println("eventually"

uni.onItemOrFailure().invokeignoredItem, ignoredException) → System.out.println("eventually"

uni.eventually(() → uni2)

uni.onItemOrFailure().call((ignoredItem, ignoredException) → uni2)

uni.replaceWith(x)

uni.onItem().transform(ignored → x)

uni.replaceWith(uni2)

uni.onItem().transformToUni(ignored → uni2)

uni.replaceIfNullWith(x)

uni.onItem().ifNull().continueWith(x)

Reactive Streams

Mutiny 使用 Reactive Streams.`Multi`实现了 `Publisher`并强制执行反压协议。发送受到下游订阅者发送的请求的限制。因此,它不会使订阅者超载。请注意,在某些情况下,你无法遵循此协议(因为 Multi 发出的事件无法控制,例如时间或传感器测量)。在这种情况下,Mutiny 提供一种使用 `onOverflow()`控制溢出的方法。

Uni`未实现 Reactive Streams `Publisher。一个 `Uni`只能发出一个事件,因此订阅 `Uni`足以表示你打算使用结果,而不必使用请求协议过程。

Mutiny and Vert.x

Vert.x 工具包用于构建响应式应用程序和系统。它提供了遵循响应式原则(即非阻塞和异步)的大量库生态系统。Vert.x 是 Quarkus 的重要组成部分,因为它提供了响应式功能。

此外,整个 Vert.x API 也可以与 Quarkus 一起使用。为了提供统一的体验,Vert.x API 也可以使用 Mutiny 变体,即返回 Uni 和 Multi。

有关此 API 的更多详细信息请见: [role="bare"][role="bare"]https://quarkus.io/blog/mutiny-vertx/.

Mutiny Integration in Quarkus

Mutiny 在 Quarkus 中的集成超出了单纯的库。Mutiny 暴露允许 Quarkus 和 Mutiny 紧密集成的钩子:

  • 如果在 I/O 线程上运行,调用 awaittoIterable 将导致失败,从而防止阻塞 I/O 线程;

  • log() 运算符使用 Quarkus 日志记录器;

  • 默认 Mutiny 线程池是 Quarkus 工作线程池;

  • 使用 Mutiny Uni 和 Multi 时,默认情况下启用上下文传播

有关基础设施集成的更多详细信息,请参见 [role="bare"][role="bare"]https://smallrye.io/smallrye-mutiny/latest/guides/framework-integration/.