Mutiny - Async for mere mortals
An event-driven reactive programming API
Mutiny 与其他响应式编程库非常不同。它采用了不同的方法来设计你的程序。使用 Mutiny 时,所有内容都是事件驱动的:你收到事件,然后对此做出反应。此事件驱动方面融合了分布式系统的异步特性,并提供了一种优雅且精确的方法来表示延续。
Mutiny is very different from the other reactive programming libraries. It takes a different approach to design your program. With Mutiny everything is event-driven: you receive events, and you react to them. This event-driven aspect embraces the asynchronous nature of distributed systems and provides an elegant and precise way to express continuation.
Mutiny 提供了两种类型,它们既基于事件也基于惰性:
Mutiny offers two types that are both event-driven and lazy:
-
A
Uni
emits a single event (an item or a failure). Unis are convenient to represent asynchronous actions that return 0 or 1 result. A good example is the result of sending a message to a message broker queue. -
A
Multi
emits multiple events (n items, 1 failure or 1 completion). Multis can represent streams of items, potentially unbounded. A good example is receiving messages from a message broker queue.
这两种类型允许表示任何类型的交互。它们就是事件源。您可以观察它们 (subscription),当它们发出项目、故障或有界 Multi 中的完成事件时,您将收到通知。当您(订阅者)收到事件时,您可以对其进行处理(例如,转换和筛选)。借助 Mutiny,您可以编写代码(如 onX().action()),该代码可读作“在项目 X 上执行操作”。
These two types allow representing any type of interactions. They are event sources. You observe them (subscription) and you get notified when they emit an item, a failure, or, in the case of a bounded Multi, a completion event. When you (the subscriber) receive the event, you can process it (e.g., transform it, filter it). With Mutiny, you are going to write code like onX().action(), which reads as “on item X do action”.
如果您想进一步了解 Mutiny 及其背后的概念,请查看 the Mutiny Reference documentation。
If you want to know more about Mutiny, and the concepts behind it, check the Mutiny Reference documentation.
Mutiny in Quarkus
Mutiny 是在处理来自 Quarkus 的响应式特性时的主要 API。这意味着大多数扩展支持 Mutiny,方法是以返回 Uni 和 Multi 的 API 暴露(如响应式数据源或 Rest 客户端)或理解您的方法何时返回 Uni 或 Multi(如 Quarkus REST(以前称为 RESTEasy Reactive)或响应式消息传递)。
Mutiny is the primary API when dealing with the reactive features from Quarkus. It means that most extensions support Mutiny either by exposing an API returning Unis and Multis (such as reactive data sources or rest clients) or understanding when your methods return a Uni or a Multi (such as Quarkus REST (formerly RESTEasy Reactive) or Reactive Messaging).
这些集成使 Mutiny 成为使用 Quarkus 开发的每个响应式应用程序的突出且内聚的模型。此外,Mutiny 架构允许精细的死代码消除,从而改进了编译成本机时(例如使用 Quarkus 本机模式或 GraaIVM 本机映像编译器)的内存使用。
These integrations make Mutiny a prominent and cohesive model for every reactive application developed with Quarkus. In addition, Mutiny architecture allows fine-grain dead-code elimination which improves the memory usage when compiled to native (such as with Quarkus native mode or GraalVM native image compiler).
Why another reactive programming API?
经验丰富的响应式开发人员可能会疑惑,为什么 Quarkus 还会引入其他响应式编程 API,而现有的已有很多。Mutiny 采用了截然不同的角度:
Seasoned reactive developers may wonder why Quarkus introduced yet another reactive programming APIs while there are existing ones. Mutiny is taking a very different angle:
Event-Driven-Mutiny 将事件置于其设计的核心。借助 Mutiny,您可以观察事件、对它们做出反应,并创建优雅且可读的处理管道。不必拥有函数式编程的博士学位。
Event-Driven - Mutiny places events at the core of its design. With Mutiny, you observe events, react to them, and create elegant and readable processing pipelines. A Ph.D. in functional programming is not required.
Navigable-即使拥有智能代码补全,拥有数百种方法的类也很令人困惑。Mutiny 提供了一个可导航和显式的 API,引导您走向您需要的操作符。
Navigable - Even with intelligent code completion, classes with hundreds of methods are confusing. Mutiny provides a navigable and explicit API driving you towards the operator you need.
Non-Blocking I/O-Mutiny 是应对带有非阻塞 I/O(为 Quarkus提供支持)的应用程序异步特性的理想伴侣。声明性地组合操作、转换数据、强制执行进度、从故障中恢复,等等。
Non-Blocking I/O - Mutiny is the perfect companion to tame the asynchronous nature of applications with non-blocking I/O (which powers Quarkus). Declaratively compose operations, transform data, enforce progress, recover from failures, and more.
Made for an asynchronous world-Mutiny 可以用于任何异步应用程序,如事件驱动的微服务、基于消息的应用程序、网络实用程序、数据流处理,当然还有… 响应式应用程序!
Made for an asynchronous world - Mutiny can be used in any asynchronous application such as event-driven microservices, message-based applications, network utilities, data stream processing, and of course… reactive applications!
Reactive Streams and Converters Built-In-Mutiny 基于 Reactive Streams规范,因此它可以与任何其他响应式编程库集成。此外,它还提议转换器与其他流行库交互。
Reactive Streams and Converters Built-In - Mutiny is based on the Reactive Streams specification, and so it can be integrated with any other reactive programming library. In addition, it proposes converters to interact with other popular libraries.
Mutiny and the I/O Threads
Quarkus 由 reactive engine提供支持,在开发响应式应用程序时,您的代码将在少数几个 I/O 线程之一上执行。请记住,您永远不能阻塞这些线程,如果您这样做,模型将会崩溃。因此,您不能使用阻塞 I/O。您需要安排 I/O 操作并传递一个延续。
Quarkus is powered by a reactive engine, and when developing a reactive application, your code is executed on one of the few I/O threads. Remember, you must never block these threads, and the model would collapse if you do. So, you can’t use blocking I/O. Instead, you need to schedule the I/O operation and pass a continuation.
Mutiny 事件驱动范式为此量身定制。当 I/O 操作成功完成时,表示它的 Uni 将发出一个项目事件。当它失败时,它将发出一个故障事件。使用事件驱动 API 可以简单而自然地表示延续。
The Mutiny event-driven paradigm is tailored for this. When the I/O operation completes successfully, the Uni that represents it emits an item event. When it fails, it emits a failure event. The continuation is simply and naturally expressed using the event-driven API.
Mutiny through Examples
许多 Quarkus 扩展都公开了 Mutiny API。在本节中,我们将使用 MongoDB 扩展来说明如何使用 Mutiny。
Many Quarkus extensions expose Mutiny APIs. In this section, we use the MongoDB extension to illustrate how to use Mutiny.
让我们设想一个表示 periodic table中元素的简单结构:
Let’s imagine a simple structure representing an element from the periodic table:
public class Element {
public String name;
public String symbol;
public int position;
public Element(String name, String symbol, int position) {
this.name = name;
this.symbol = symbol;
this.position = position;
}
public Element() {
// Use by JSON mappers
}
}
此结构包含元素的名称、符号和位置。要检索和存储元素到一个 Mongo 集合,您可以使用以下代码:
This structure contains the name, symbol, and position of the element. To retrieve and store elements into a Mongo collection, you can use the following code:
@ApplicationScoped
public class ElementService {
final ReactiveMongoCollection<Element> collection;
@Inject
ElementService(ReactiveMongoClient client) {
collection = client.getDatabase("quarkus")
.getCollection("elements", Element.class);
}
public void add(Element element) {
Uni<InsertOneResult> insertion = collection.insertOne(element);
insertion
.onItem().transform(r -> r.getInsertedId().asString())
.subscribe().with(
result -> System.out.println("inserted " + result),
failure -> System.out.println("D'oh" + failure));
}
public void getAll() {
collection.find()
.subscribe().with(
element -> System.out.println("Element: " + element),
failure -> System.out.println("D'oh! " + failure),
() -> System.out.println("No more elements")
);
}
}
首先,注入 Mongo 客户端。请注意,它使用了响应式 varient (io.quarkus.mongodb.reactive.ReactiveMongoClient
)。在 initialize 方法中,我们检索和存储将插入元素的集合。
First, the Mongo client is injected.
Note that it uses the reactive variant (io.quarkus.mongodb.reactive.ReactiveMongoClient
).
In the initialize method, we retrieve and store the collection in which elements will be inserted.
add
方法在一个集合中插入元素。它接收元素作为一个参数并使用集合的响应式 API。与数据库交互涉及 I/O。响应式原则禁止在等待交互完成时进行阻塞。相反,我们调度操作并传递延续。insertOne
方法返回一个 Uni,即异步操作。经预定的 I/O 就是如此。我们现在需要表达延续,这使用 .onItem()
方法完成。.onItem()
允许配置在观察到的 Uni 发射项时需要发生的操作,在我们案例中,即在预定 I/O 完成时。在此示例中,我们提取插入的文档 ID。最后一步是订购。没有它,一切都不会发生。订购触发操作。订购方法还可以定义处理程序:id
成功时的值或插入失败时的失败。
The add
method inserts an element in the collection.
It receives the element as a parameter and uses the reactive API of the collection.
Interacting with the database involves I/Os.
The reactive principles forbid blocking while waiting for the interaction to complete.
Instead, we schedule the operation and pass a continuation.
The insertOne
method returns a Uni, i.e., an asynchronous operation.
That’s the scheduled I/O. We now need to express the continuation, which is done using the .onItem()
method.
.onItem()
allows configuring what needs to happen when the observed Uni emits an item, in our case when the scheduled I/Os completes.
In this example, we extract the inserted document id.
The final step is the subscription.
Without it, nothing would ever happen. Subscribing triggers the operation.
The subscription method can also define handlers: the id
value on success, or a failure when the insertion fails.
现在我们来了解第二个方法。它会检索所有存储的元素。在此案例中,它会返回多个项(每个存储的元素一项),所以我们正在使用一个 Multi
。就像插入一样,获取存储的元素涉及 I/O。find
是我们的操作。就像对于 Uni 一样,您需要订购以触发操作。订购者接收项事件、失败事件或所有元素全部接收后产生的完成事件。
Let’s now look at the second method.
It retrieves all the stored elements.
In this case, it returns multiple items (one per stored element), so we are using a Multi
.
As for the insertion, getting the stored elements involves I/Os.
The find
is our operation.
As for Uni, you need to subscribe to trigger the operation.
The subscriber receives item events, a failure event, or a completion event when all the elements have been received.
订购 Uni 或 Multi 至关重要,因为如果没有订购,操作永远不会执行。在 Quarkus 中,某些扩展为您处理订购。例如,在 Quarkus REST 中,您的 HTTP 方法可以返回 Uni 或 Multi,Quarkus REST 处理订购。
Subscribing to a Uni or a Multi is essential, as without it, the operation is never executed. In Quarkus some extensions deal with the subscription for you. For example, in Quarkus REST your HTTP methods can return a Uni or a Multi, and Quarkus REST handles the subscription.
Mutiny Patterns
上一部分中的示例故意简单。接下来我们来了解一些常见模式。
The example from the last section was simplistic on purpose. Let’s have a look at a few common patterns.
Observing events
你可以通过以下方法来观察各种类型的事件:
You can observe the various kind of events using:
on{event}().invoke(ev → System.out.println(ev));
例如,对于项,使用:onItem().invoke(item → …);
For example, for items use:
onItem().invoke(item → …);
对于失败,使用:onFailure().invoke(failure → …)
For failure, use:
onFailure().invoke(failure → …)
invoke
方法是同步的。有时候你需要执行异步操作。在这种情况下,使用 call
,如:onItem().call(item → someAsyncAction(item))
。请注意,call
不会更改项,它只调用异步操作,并且当该操作完成后,它会向下游发出原始项。
The invoke
method is synchronous.
Sometimes you need to execute an asynchronous action.
In this case use call
, as in: onItem().call(item → someAsyncAction(item))
.
Note that call
does not change the item, it just calls an asynchronous action, and when this one completes, it emits the original item downstream.
Transforming item
第一个仪表模式包括转换你收到的项事件。正如我们在上一节中看到的,它可以指示数据库中存储元素的成功插入。
The first instrumental pattern consists of transforming the item events you receive. As we have seen in the previous section, it could indicate the successful insertion, or the elements stored in the database.
转换项使用以下方法完成:onItem().transform(item → ….)
。
Transforming an item is done using: onItem().transform(item → ….)
.
可以在 Mutiny documentation 中找到有关转换的更多详细信息。
More details about transformation can be found in the Mutiny documentation.
Sequential composition
顺序组合允许对依赖异步操作进行链接。这是使用 onItem().transformToUni(item → …)
实现的。与 transform
不同,传递给 transformToUni
的函数返回一个 Uni。
Sequential composition allows chaining dependent asynchronous operations. This is achieved using onItem().transformToUni(item → …)
.
Unlike transform
, the function passed to transformToUni
returns a Uni.
Uni<String> uni1 = …
uni1
.onItem().transformToUni(item -> anotherAsynchronousAction(item));
可以在 Mutiny documentation 中找到有关异步转换的更多详细信息。
More details about asynchronous transformation can be found in the Mutiny documentation.
Failure handling
到目前为止,我们只处理项事件,但处理失败也是至关重要的。你可以使用 onFailure()
来处理失败。
So far we only handle the item events, but handling failure is essential. You can handle failures using onFailure()
.
例如,你可以使用 onFailure().recoverWithItem(fallback)
来恢复带有后备项:
For example, you can recover with a fallback item using onFailure().recoverWithItem(fallback)
:
Uni<String> uni1 = …
uni1
.onFailure().recoverWithItem(“my fallback value”);
你也可以重试操作,如下所示:
You can also retry the operation such as in:
Uni<String> uni1 = …
uni1
.onFailure().retry().atMost(5);
可以在 the handling failure documentation 和 the retrying on failures documentation 上找到有关故障恢复的更多信息。
More info about failure recovery can be found on the handling failure documentation and the retrying on failures documentation.
Events and Actions
下表列出了你可以为 Uni 和 Multi 接收的事件。它们中的每一个都与自己的组相关联(onX)。第二个表列出了你对事件可以执行的经典操作。请注意,有些组提供了更多可能。
The following tables list the events that you can receive for Uni and Multi. Each of them is associated with its own group (onX). The second table lists the classic action you can do upon an event. Note that some groups offer more possibilities.
Events from the upstream | Events from the downstream | |
---|---|---|
Uni |
Subscription (1), Item (0..1), failure (0..1) |
Cancellation |
Multi |
Subscription (1), Item (0..n), failure (0..1), completion (0..1) |
Cancellation, Request |
在 the event documentation上查看事件完整列表。
Check the full list of events on the event documentation.
Action | API | Comment |
---|---|---|
transform |
|
Transform the event into another event using a synchronous function. The downstream receives the result of the function (or a failure if the transformation failed). |
transformToUni |
|
Transform the event into another event using an asynchronous function. The downstream receives the item emitted by the produced Uni (or a failure if the transformation failed). If the produced Uni emits a failure, that failure is passed to the downstream. |
invoke |
|
Invokes the synchronous consumer. This is particularly convenient to execute side effects actions. The downstream receives the original event, or a failure if the consumer throws an exception |
call |
|
Invokes the asynchronous function. This is particularly convenient to execute asynchronous side effect actions.The downstream receives the original event, or a failure if the consumer throws an exception or if the produced Uni emits a failure. |
fail |
|
Emits a failure when it receives the event. |
complete (Multi only) |
|
Emits the completion event when it receives the event. |
Other patterns
Mutiny 提供许多其他功能。访问 Mutiny documentation查看更多示例,包括事件完整列表以及如何处理它们。
Mutiny provides lots of other features. Head over to the Mutiny documentation to see many more examples, including the whole list of events and how to handle them.
以下是几个常见指南:
Some frequently asked guides are the following:
-
merge vs. concatenation - [role="bare"]https://smallrye.io/smallrye-mutiny/latest/guides/merging-and-concatenating-streams/
-
controlling the emission thread - [role="bare"]https://smallrye.io/smallrye-mutiny/latest/guides/emit-on-vs-run-subscription-on/
-
explicit blocking - [role="bare"]https://smallrye.io/smallrye-mutiny/latest/guides/imperative-to-reactive/
Shortcuts
在使用 Uni 时,可能需要写 onItem()
,这可能会比较繁琐。幸运的是,Mutiny 提供一些快捷方式让你的代码更简洁:
When using Uni, having to write onItem()
can be cumbersome.
Fortunately, Mutiny provides a set of shortcut to make your code more concise:
Shortcut | Equivalent |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Reactive Streams
Mutiny 使用 Reactive Streams.`Multi`实现了 `Publisher`并强制执行反压协议。发送受到下游订阅者发送的请求的限制。因此,它不会使订阅者超载。请注意,在某些情况下,你无法遵循此协议(因为 Multi 发出的事件无法控制,例如时间或传感器测量)。在这种情况下,Mutiny 提供一种使用 `onOverflow()`控制溢出的方法。
Mutiny uses Reactive Streams.
Multi
implements Publisher
and enforces the back-pressure protocol.
Emissions are constrained by the request emitted from the downstream subscribers.
Thus, it does not overload the subscribers.
Note that in some cases, you can’t follow this protocol (because the Multi emits events that can’t be controlled, such as time, or measures sensors).
In this case, Mutiny provides a way to control the overflow using onOverflow()
.
Uni`未实现 Reactive Streams `Publisher
。一个 `Uni`只能发出一个事件,因此订阅 `Uni`足以表示你打算使用结果,而不必使用请求协议过程。
Uni
does not implement Reactive Streams Publisher
.
A Uni
can only emit one event, so subscribing to the Uni
is enough to express your intent to use the result and does not need the request protocol ceremony.
Mutiny and Vert.x
Vert.x 工具包用于构建响应式应用程序和系统。它提供了遵循响应式原则(即非阻塞和异步)的大量库生态系统。Vert.x 是 Quarkus 的重要组成部分,因为它提供了响应式功能。
Vert.x is a toolkit to build reactive applications and systems. It provides a huge ecosystem of libraries following the reactive principles (i.e., non-blocking and asynchronous). Vert.x is an essential part of Quarkus, as it provides its reactive capabilities.
此外,整个 Vert.x API 也可以与 Quarkus 一起使用。为了提供统一的体验,Vert.x API 也可以使用 Mutiny 变体,即返回 Uni 和 Multi。
In addition, the whole Vert.x API can be used with Quarkus. To provide a cohesive experience, the Vert.x API is also available using a Mutiny variant, i.e., returns Uni and Multi.
有关此 API 的更多详细信息请见: [role="bare"][role="bare"]https://quarkus.io/blog/mutiny-vertx/.
More details about this API can be found on: [role="bare"]https://quarkus.io/blog/mutiny-vertx/.
Mutiny Integration in Quarkus
Mutiny 在 Quarkus 中的集成超出了单纯的库。Mutiny 暴露允许 Quarkus 和 Mutiny 紧密集成的钩子:
The integration of Mutiny in Quarkus goes beyond just the library. Mutiny exposes hooks that allow Quarkus and Mutiny to be closely integrated:
-
Calling
await
ortoIterable
would fail if you are running on an I/O thread, preventing blocking the I/O thread; -
The
log()
operator use the Quarkus logger; -
The default Mutiny thread pool is the Quarkus worker thread pool;
-
Context Propagation is enabled by default when using Mutiny Uni and Multi
有关基础设施集成的更多详细信息,请参见 [role="bare"][role="bare"]https://smallrye.io/smallrye-mutiny/latest/guides/framework-integration/.
More details about the infrastructure integration can be found on [role="bare"]https://smallrye.io/smallrye-mutiny/latest/guides/framework-integration/.