Using the event bus

Quarkus 允许不同的 Bean 使用异步事件进行交互,从而促进松散耦合。消息发送至 virtual addresses。它提供了 3 种传递机制:

Quarkus allows different beans to interact using asynchronous events, thus promoting loose-coupling. The messages are sent to virtual addresses. It offers 3 types of delivery mechanism:

  • point-to-point - send the message, one consumer receives it. If several consumers listen to the address, a round-robin is applied;

  • publish/subscribe - publish a message, all the consumers listening to the address are receiving the message;

  • request/reply - send the message and expect a response. The receiver can respond to the message in an asynchronous-fashion

所有这些传递机制都是非阻塞的,并且提供构建响应式应用程序的基本构建模块之一。

All these delivery mechanisms are non-blocking, and are providing one of the fundamental brick to build reactive applications.

异步消息传递功能允许回复消息,这是反应式消息传递不支持的。但是,它仅限于单事件行为(无流)和本地消息。

The asynchronous message passing feature allows replying to messages which is not supported by Reactive Messaging. However, it is limited to single-event behavior (no stream) and to local messages.

Installing

此机制使用 Vert.x EventBus,因此您需要启用 vertx 扩展才能使用此功能。如果您正在创建一个新项目,请按如下方式设置 extensions 参数:

This mechanism uses the Vert.x EventBus, so you need to enable the vertx extension to use this feature. If you are creating a new project, set the extensions parameter as follows:

Unresolved directive in reactive-event-bus.adoc - include::{includes}/devtools/create-app.adoc[]

如果您已经创建了一个项目,则可以使用 add-extension 命令将 vertx 扩展添加到现有 Quarkus 项目:

If you have an already created project, the vertx extension can be added to an existing Quarkus project with the add-extension command:

Unresolved directive in reactive-event-bus.adoc - include::{includes}/devtools/extension-add.adoc[]

否则,您可以手动将此内容添加到构建文件的依赖关系部分:

Otherwise, you can manually add this to the dependencies section of your build file:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-vertx</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-vertx")

Consuming events

要使用事件,请使用 io.quarkus.vertx.ConsumeEvent 注释:

To consume events, use the io.quarkus.vertx.ConsumeEvent annotation:

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent                           (1)
    public String consume(String name) {    (2)
        return name.toUpperCase();
    }
}
1 If not set, the address is the fully qualified name of the bean, for instance, in this snippet it’s org.acme.vertx.GreetingService.
2 The method parameter is the message body. If the method returns something it’s the message response.

默认情况下,使用该事件的代码必须是 non-blocking,因为它是在 Vert.x 事件循环上调用的。如果你的处理存在阻塞,请使用 blocking 属性:

By default, the code consuming the event must be non-blocking, as it’s called on the Vert.x event loop. If your processing is blocking, use the blocking attribute:

@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
    // Something blocking
}

或者,你可以使用 @io.smallrye.common.annotation.Blocking 注解你的方法:

Alternatively, you can annotate your method with @io.smallrye.common.annotation.Blocking:

@ConsumeEvent(value = "blocking-consumer")
@Blocking
void consumeBlocking(String message) {
    // Something blocking
}

当使用 @Blocking 时,它会忽略 @ConsumeEventblocking 属性的值。请参阅 Quarkus Reactive Architecture documentation 以详细了解此主题。

When using @Blocking, it ignores the value of the blocking attribute of @ConsumeEvent. See the Quarkus Reactive Architecture documentation for further details on this topic.

通过返回 io.smallrye.mutiny.Unijava.util.concurrent.CompletionStage 也可以进行异步处理:

Asynchronous processing is also possible by returning either an io.smallrye.mutiny.Uni or a java.util.concurrent.CompletionStage:

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent
    public CompletionStage<String> consume(String name) {
        // return a CompletionStage completed when the processing is finished.
        // You can also fail the CompletionStage explicitly
    }

    @ConsumeEvent
    public Uni<String> process(String name) {
        // return an Uni completed when the processing is finished.
        // You can also fail the Uni explicitly
    }
}
Mutiny

前面的示例使用 Mutiny 响应类型。如果您不熟悉 Mutiny,请查看 Mutiny - an intuitive reactive programming library

The previous example uses Mutiny reactive types. If you are not familiar with Mutiny, check Mutiny - an intuitive reactive programming library.

Configuring the address

`@ConsumeEvent`注释可配置为设置地址:

The @ConsumeEvent annotation can be configured to set the address:

@ConsumeEvent("greeting")               (1)
public String consume(String name) {
    return name.toUpperCase();
}
1 Receive the messages sent to the greeting address

Replying

带有 @ConsumeEvent 注释的方法的 return 值用作对传入消息的响应。例如,在下面的代码段中,返回的 String 即响应。

The return value of a method annotated with @ConsumeEvent is used as response to the incoming message. For instance, in the following snippet, the returned String is the response.

@ConsumeEvent("greeting")
public String consume(String name) {
    return name.toUpperCase();
}

您还可以返回 `Uni<T>`或 `CompletionStage<T>`来处理异步答复:

You can also return a Uni<T> or a CompletionStage<T> to handle asynchronous reply:

@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
    return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}

如果您使用上下文传播扩展,则可以注入一个 executor

You can inject an executor if you use the Context Propagation extension:

@Inject ManagedExecutor executor;

或者,你可以使用默认的 Quarkus 工作程序池:

Alternatively, you can use the default Quarkus worker pool using:

Executor executor = Infrastructure.getDefaultWorkerPool();

Implementing fire and forget interactions

你不必回复收到的邮件。通常,对于 fire and forget 交互,消息会被消耗且发送者无需知道它。要实现此目的,你的使用者方法只需返回 void

You don’t have to reply to received messages. Typically, for a fire and forget interaction, the messages are consumed and the sender does not need to know about it. To implement this, your consumer method just returns void

@ConsumeEvent("greeting")
public void consume(String event) {
    // Do something with the event
}

Dealing with messages

如上所述,此机制基于 Vert.x 事件总线。因此,你还可以直接使用 Message

As said above, this mechanism is based on the Vert.x event bus. So, you can also use Message directly:

@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
    System.out.println(msg.address());
    System.out.println(msg.body());
}

Handling Failures

如果带有 @ConsumeEvent 注释的方法抛出异常,则:

If a method annotated with @ConsumeEvent throws an exception then:

  • if a reply handler is set then the failure is propagated back to the sender via an io.vertx.core.eventbus.ReplyException with code ConsumeEvent#FAILURE_CODE and the exception message,

  • if no reply handler is set then the exception is rethrown (and wrapped in a RuntimeException if necessary) and can be handled by the default exception handler, i.e. io.vertx.core.Vertx#exceptionHandler().

Sending messages

好的,我们已经学习了如何接收消息,现在让我们切换到 other side:发送者。发送和发布消息使用 Vert.x 事件总线:

Ok, we have seen how to receive messages, let’s now switch to the other side: the sender. Sending and publishing messages use the Vert.x event bus:

package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/async")
public class EventResource {

    @Inject
    EventBus bus;                                       (1)

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)        (2)
                .onItem().transform(Message::body);
    }
}
1 Inject the Event bus
2 Send a message to the address greeting. Message payload is name

EventBus 对象提供以下方法:

The EventBus object provides methods to:

  1. send a message to a specific address - one single consumer receives the message.

  2. publish a message to a specific address - all consumers receive the messages.

  3. send a message and expect reply asynchronously

  4. send a message and expect reply in a blocking manner

// Case 1
bus.<String>requestAndForget("greeting", name);
// Case 2
bus.publish("greeting", name);
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
        .onItem().transform(Message::body);
// Case 4
String response = bus.<String>requestAndAwait("greeting", name).body();

Putting things together - bridging HTTP and messages

让我们重新访问一个问候 HTTP 端点并使用异步消息传递来委派对一个分离 bean 的调用。它使用请求/应答调度机制。我们发送一条消息,而不是在 Jakarta REST 端点内部实现业务逻辑。此消息由另一个 bean 使用 reply 机制接收,并发送响应。

Let’s revisit a greeting HTTP endpoint and use asynchronous message passing to delegate the call to a separated bean. It uses the request/reply dispatching mechanism. Instead of implementing the business logic inside the Jakarta REST endpoint, we are sending a message. This message is consumed by another bean and the response is sent using the reply mechanism.

首先使用以下命令创建一个新项目:

First create a new project using:

Unresolved directive in reactive-event-bus.adoc - include::{includes}/devtools/create-app.adoc[]

你已经可以使用以下命令在 dev mode 中启动应用程序:

You can already start the application in dev mode using:

Unresolved directive in reactive-event-bus.adoc - include::{includes}/devtools/dev.adoc[]

然后,创建一个带有以下内容的新 Jakarta REST 资源:

Then, creates a new Jakarta REST resource with the following content:

src/main/java/org/acme/vertx/EventResource.java
package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/async")
public class EventResource {

    @Inject
    EventBus bus;

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)            (1)
                .onItem().transform(Message::body);            (2)
    }
}
1 send the name to the greeting address and request a response
2 when we get the response, extract the body and send it to the user

如果你调用这个端点,你将等待并得到一个超时。事实上,没有人监听。所以,我们需要一个消费者监听 greeting 地址。创建一个带有以下内容的 GreetingService bean:

If you call this endpoint, you will wait and get a timeout. Indeed, no one is listening. So, we need a consumer listening on the greeting address. Create a GreetingService bean with the following content:

src/main/java/org/acme/vertx/GreetingService.java
package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent("greeting")
    public String greeting(String name) {
        return "Hello " + name;
    }

}

这个 bean 接收姓名,并返回问候消息。

This bean receives the name, and returns the greeting message.

现在,在浏览器中打开 [role="bare"][role="bare"]http://localhost:8080/async/Quarkus,你将看到:

Now, open your browser to [role="bare"]http://localhost:8080/async/Quarkus, and you should see:

Hello Quarkus

为了更好地理解,让我们详细介绍一下 HTTP 请求/响应是如何处理的:

To better understand, let’s detail how the HTTP request/response has been handled:

  1. The request is received by the hello method

  2. a message containing the name is sent to the event bus

  3. Another bean receives this message and computes the response

  4. This response is sent back using the reply mechanism

  5. Once the reply is received by the sender, the content is written to the HTTP response

这个应用程序可以使用下列命令打包:

This application can be packaged using:

Unresolved directive in reactive-event-bus.adoc - include::{includes}/devtools/build.adoc[]

你也可以使用以下命令编译为原生可执行文件:

You can also compile it as a native executable with:

Unresolved directive in reactive-event-bus.adoc - include::{includes}/devtools/build-native.adoc[]

Using codecs

Vert.x Event Bus 使用编解码器将 serializedeserialize 对象编解码。Quarkus 为本地传递提供了默认编解码器。所以你可以按如下方式交换对象:

The Vert.x Event Bus uses codecs to serialize and deserialize objects. Quarkus provides a default codec for local delivery. So you can exchange objects as follows:

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", new MyName(name))
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello " + name.getName());
}

如果你想使用一个特定的编解码器,你需在两端明确设置它:

If you want to use a specific codec, you need to explicitly set it on both ends:

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", name,
        new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting", codec = MyNameCodec.class)            (2)
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 Set the name of the codec to use to send the message
2 Set the codec to use to receive the message