Using the event bus

Quarkus 允许不同的 Bean 使用异步事件进行交互,从而促进松散耦合。消息发送至 virtual addresses。它提供了 3 种传递机制:

  • 点对点 - 发送消息,由一个消费者接收。如果几个消费者监听该地址,则会应用轮询;

  • 发布/订阅 - 发布消息,所有监听该地址的使用者都会收到该消息;

  • 请求/应答 - 发送消息并期望一个响应。接收者可以以异步方式响应该消息

所有这些传递机制都是非阻塞的,并且提供构建响应式应用程序的基本构建模块之一。

异步消息传递功能允许回复消息,这是反应式消息传递不支持的。但是,它仅限于单事件行为(无流)和本地消息。

Installing

此机制使用 Vert.x EventBus,因此您需要启用 vertx 扩展才能使用此功能。如果您正在创建一个新项目,请按如下方式设置 extensions 参数:

CLI
quarkus create app {create-app-group-id}:{create-app-artifact-id} \
    --no-code
cd {create-app-artifact-id}

要创建一个 Gradle 项目,添加 --gradle--gradle-kotlin-dsl 选项。 有关如何安装和使用 Quarkus CLI 的详细信息,请参见 Quarkus CLI 指南。

Maven
mvn {quarkus-platform-groupid}:quarkus-maven-plugin:{quarkus-version}:create \
    -DprojectGroupId={create-app-group-id} \
    -DprojectArtifactId={create-app-artifact-id} \
    -DnoCode
cd {create-app-artifact-id}

要创建一个 Gradle 项目,添加 -DbuildTool=gradle-DbuildTool=gradle-kotlin-dsl 选项。

适用于 Windows 用户:

  • 如果使用 cmd,(不要使用反斜杠 \ ,并将所有内容放在同一行上)

  • 如果使用 Powershell,将 -D 参数用双引号引起来,例如 "-DprojectArtifactId={create-app-artifact-id}"

如果您已经创建了一个项目,则可以使用 add-extension 命令将 vertx 扩展添加到现有 Quarkus 项目:

CLI
quarkus extension add {add-extension-extensions}
Maven
./mvnw quarkus:add-extension -Dextensions='{add-extension-extensions}'
Gradle
./gradlew addExtension --extensions='{add-extension-extensions}'

否则,您可以手动将此内容添加到构建文件的依赖关系部分:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-vertx</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-vertx")

Consuming events

要使用事件,请使用 io.quarkus.vertx.ConsumeEvent 注释:

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent                           (1)
    public String consume(String name) {    (2)
        return name.toUpperCase();
    }
}
1 如果没有设置,则地址是 bean 的完整限定名,例如,在此代码段中,它是 org.acme.vertx.GreetingService
2 方法参数是消息正文。如果方法返回 something,则它是消息响应。

默认情况下,使用该事件的代码必须是 non-blocking,因为它是在 Vert.x 事件循环上调用的。如果你的处理存在阻塞,请使用 blocking 属性:

@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
    // Something blocking
}

或者,你可以使用 @io.smallrye.common.annotation.Blocking 注解你的方法:

@ConsumeEvent(value = "blocking-consumer")
@Blocking
void consumeBlocking(String message) {
    // Something blocking
}

当使用 @Blocking 时,它会忽略 @ConsumeEventblocking 属性的值。请参阅 Quarkus Reactive Architecture documentation 以详细了解此主题。

通过返回 io.smallrye.mutiny.Unijava.util.concurrent.CompletionStage 也可以进行异步处理:

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent
    public CompletionStage<String> consume(String name) {
        // return a CompletionStage completed when the processing is finished.
        // You can also fail the CompletionStage explicitly
    }

    @ConsumeEvent
    public Uni<String> process(String name) {
        // return an Uni completed when the processing is finished.
        // You can also fail the Uni explicitly
    }
}
Mutiny

前面的示例使用 Mutiny 响应类型。如果您不熟悉 Mutiny,请查看 Mutiny - an intuitive reactive programming library

Configuring the address

`@ConsumeEvent`注释可配置为设置地址:

@ConsumeEvent("greeting")               (1)
public String consume(String name) {
    return name.toUpperCase();
}
1 接收发送到 `greeting`地址的消息

Replying

带有 @ConsumeEvent 注释的方法的 return 值用作对传入消息的响应。例如,在下面的代码段中,返回的 String 即响应。

@ConsumeEvent("greeting")
public String consume(String name) {
    return name.toUpperCase();
}

您还可以返回 `Uni<T>`或 `CompletionStage<T>`来处理异步答复:

@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
    return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}

如果您使用上下文传播扩展,则可以注入一个 executor

@Inject ManagedExecutor executor;

或者,你可以使用默认的 Quarkus 工作程序池:

Executor executor = Infrastructure.getDefaultWorkerPool();

Implementing fire and forget interactions

你不必回复收到的邮件。通常,对于 fire and forget 交互,消息会被消耗且发送者无需知道它。要实现此目的,你的使用者方法只需返回 void

@ConsumeEvent("greeting")
public void consume(String event) {
    // Do something with the event
}

Dealing with messages

如上所述,此机制基于 Vert.x 事件总线。因此,你还可以直接使用 Message

@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
    System.out.println(msg.address());
    System.out.println(msg.body());
}

Handling Failures

如果带有 @ConsumeEvent 注释的方法抛出异常,则:

  • 如果设置了回复处理程序,则故障将通过带有代码 ConsumeEvent#FAILURE_CODE 和异常消息的 io.vertx.core.eventbus.ReplyException 传播回发送者,

  • 如果没有设置回复处理程序,则异常将重新抛出(并在必要时封装在 RuntimeException 中),并且可以由默认异常处理程序(即 io.vertx.core.Vertx#exceptionHandler())处理。

Sending messages

好的,我们已经学习了如何接收消息,现在让我们切换到 other side:发送者。发送和发布消息使用 Vert.x 事件总线:

package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/async")
public class EventResource {

    @Inject
    EventBus bus;                                       (1)

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)        (2)
                .onItem().transform(Message::body);
    }
}
1 Inject the Event bus
2 向地址 greeting 发送消息。消息负载是 name

EventBus 对象提供以下方法:

  1. 将消息 send 到某个特定地址 - 一个消费者接收消息。

  2. 将消息 publish 到某个特定地址 - 所有的消费者接收消息。

  3. 异步`send`一条消息并预期答复

  4. 以阻塞方式`send`一条消息并预期答复

// Case 1
bus.<String>requestAndForget("greeting", name);
// Case 2
bus.publish("greeting", name);
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
        .onItem().transform(Message::body);
// Case 4
String response = bus.<String>requestAndAwait("greeting", name).body();

Putting things together - bridging HTTP and messages

让我们重新访问一个问候 HTTP 端点并使用异步消息传递来委派对一个分离 bean 的调用。它使用请求/应答调度机制。我们发送一条消息,而不是在 Jakarta REST 端点内部实现业务逻辑。此消息由另一个 bean 使用 reply 机制接收,并发送响应。

首先使用以下命令创建一个新项目:

CLI
quarkus create app {create-app-group-id}:{create-app-artifact-id} \
    --no-code
cd {create-app-artifact-id}

要创建一个 Gradle 项目,添加 --gradle--gradle-kotlin-dsl 选项。 有关如何安装和使用 Quarkus CLI 的详细信息,请参见 Quarkus CLI 指南。

Maven
mvn {quarkus-platform-groupid}:quarkus-maven-plugin:{quarkus-version}:create \
    -DprojectGroupId={create-app-group-id} \
    -DprojectArtifactId={create-app-artifact-id} \
    -DnoCode
cd {create-app-artifact-id}

要创建一个 Gradle 项目,添加 -DbuildTool=gradle-DbuildTool=gradle-kotlin-dsl 选项。

适用于 Windows 用户:

  • 如果使用 cmd,(不要使用反斜杠 \ ,并将所有内容放在同一行上)

  • 如果使用 Powershell,将 -D 参数用双引号引起来,例如 "-DprojectArtifactId={create-app-artifact-id}"

你已经可以使用以下命令在 dev mode 中启动应用程序:

CLI
quarkus dev
Maven
./mvnw quarkus:dev
Gradle
./gradlew --console=plain quarkusDev

然后,创建一个带有以下内容的新 Jakarta REST 资源:

src/main/java/org/acme/vertx/EventResource.java
package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/async")
public class EventResource {

    @Inject
    EventBus bus;

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)            (1)
                .onItem().transform(Message::body);            (2)
    }
}
1 name 发送到 greeting 地址并请求一个响应
2 当我们得到响应后,提取主体并将其发送给用户

如果你调用这个端点,你将等待并得到一个超时。事实上,没有人监听。所以,我们需要一个消费者监听 greeting 地址。创建一个带有以下内容的 GreetingService bean:

src/main/java/org/acme/vertx/GreetingService.java
package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent("greeting")
    public String greeting(String name) {
        return "Hello " + name;
    }

}

这个 bean 接收姓名,并返回问候消息。

现在,在浏览器中打开 [role="bare"][role="bare"]http://localhost:8080/async/Quarkus,你将看到:

Hello Quarkus

为了更好地理解,让我们详细介绍一下 HTTP 请求/响应是如何处理的:

  1. 请求由 hello 方法接收

  2. 包含 name 的消息被发送到事件总线

  3. 另一个 bean 接收此消息并计算出响应

  4. 该响应使用 reply 机制发送回来

  5. 一旦发送方收到回复,其内容就会被写入到 HTTP 响应中

这个应用程序可以使用下列命令打包:

CLI
quarkus build
Maven
./mvnw install
Gradle
./gradlew build

你也可以使用以下命令编译为原生可执行文件:

CLI
quarkus build --native
Maven
./mvnw install -Dnative
Gradle
./gradlew build -Dquarkus.native.enabled=true

Using codecs

Vert.x Event Bus 使用编解码器将 serializedeserialize 对象编解码。Quarkus 为本地传递提供了默认编解码器。所以你可以按如下方式交换对象:

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", new MyName(name))
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello " + name.getName());
}

如果你想使用一个特定的编解码器,你需在两端明确设置它:

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", name,
        new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting", codec = MyNameCodec.class)            (2)
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 设置用于发送消息的编解码器的名称
2 设置用于接收消息的编解码器