Using the event bus
Quarkus 允许不同的 Bean 使用异步事件进行交互,从而促进松散耦合。消息发送至 virtual addresses。它提供了 3 种传递机制:
-
点对点 - 发送消息,由一个消费者接收。如果几个消费者监听该地址,则会应用轮询;
-
发布/订阅 - 发布消息,所有监听该地址的使用者都会收到该消息;
-
请求/应答 - 发送消息并期望一个响应。接收者可以以异步方式响应该消息
所有这些传递机制都是非阻塞的,并且提供构建响应式应用程序的基本构建模块之一。
异步消息传递功能允许回复消息,这是反应式消息传递不支持的。但是,它仅限于单事件行为(无流)和本地消息。 |
Installing
此机制使用 Vert.x EventBus,因此您需要启用 vertx
扩展才能使用此功能。如果您正在创建一个新项目,请按如下方式设置 extensions
参数:
quarkus create app {create-app-group-id}:{create-app-artifact-id} \
--no-code
cd {create-app-artifact-id}
要创建一个 Gradle 项目,添加 --gradle
或 --gradle-kotlin-dsl
选项。
有关如何安装和使用 Quarkus CLI 的详细信息,请参见 Quarkus CLI 指南。
mvn {quarkus-platform-groupid}:quarkus-maven-plugin:{quarkus-version}:create \
-DprojectGroupId={create-app-group-id} \
-DprojectArtifactId={create-app-artifact-id} \
-DnoCode
cd {create-app-artifact-id}
要创建一个 Gradle 项目,添加 -DbuildTool=gradle
或 -DbuildTool=gradle-kotlin-dsl
选项。
适用于 Windows 用户:
-
如果使用 cmd,(不要使用反斜杠
\
,并将所有内容放在同一行上) -
如果使用 Powershell,将
-D
参数用双引号引起来,例如"-DprojectArtifactId={create-app-artifact-id}"
如果您已经创建了一个项目,则可以使用 add-extension
命令将 vertx
扩展添加到现有 Quarkus 项目:
quarkus extension add {add-extension-extensions}
./mvnw quarkus:add-extension -Dextensions='{add-extension-extensions}'
./gradlew addExtension --extensions='{add-extension-extensions}'
否则,您可以手动将此内容添加到构建文件的依赖关系部分:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx</artifactId>
</dependency>
implementation("io.quarkus:quarkus-vertx")
Consuming events
要使用事件,请使用 io.quarkus.vertx.ConsumeEvent
注释:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent (1)
public String consume(String name) { (2)
return name.toUpperCase();
}
}
1 | 如果没有设置,则地址是 bean 的完整限定名,例如,在此代码段中,它是 org.acme.vertx.GreetingService 。 |
2 | 方法参数是消息正文。如果方法返回 something,则它是消息响应。 |
默认情况下,使用该事件的代码必须是 non-blocking,因为它是在 Vert.x 事件循环上调用的。如果你的处理存在阻塞,请使用 blocking
属性:
@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
// Something blocking
}
或者,你可以使用 @io.smallrye.common.annotation.Blocking
注解你的方法:
@ConsumeEvent(value = "blocking-consumer")
@Blocking
void consumeBlocking(String message) {
// Something blocking
}
当使用 @Blocking
时,它会忽略 @ConsumeEvent
的 blocking
属性的值。请参阅 Quarkus Reactive Architecture documentation 以详细了解此主题。
通过返回 io.smallrye.mutiny.Uni
或 java.util.concurrent.CompletionStage
也可以进行异步处理:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent
public CompletionStage<String> consume(String name) {
// return a CompletionStage completed when the processing is finished.
// You can also fail the CompletionStage explicitly
}
@ConsumeEvent
public Uni<String> process(String name) {
// return an Uni completed when the processing is finished.
// You can also fail the Uni explicitly
}
}
Mutiny
前面的示例使用 Mutiny 响应类型。如果您不熟悉 Mutiny,请查看 Mutiny - an intuitive reactive programming library。 |
Configuring the address
`@ConsumeEvent`注释可配置为设置地址:
@ConsumeEvent("greeting") (1)
public String consume(String name) {
return name.toUpperCase();
}
1 | 接收发送到 `greeting`地址的消息 |
Replying
带有 @ConsumeEvent
注释的方法的 return 值用作对传入消息的响应。例如,在下面的代码段中,返回的 String
即响应。
@ConsumeEvent("greeting")
public String consume(String name) {
return name.toUpperCase();
}
您还可以返回 `Uni<T>`或 `CompletionStage<T>`来处理异步答复:
@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}
如果您使用上下文传播扩展,则可以注入一个
或者,你可以使用默认的 Quarkus 工作程序池:
|
Implementing fire and forget interactions
你不必回复收到的邮件。通常,对于 fire and forget 交互,消息会被消耗且发送者无需知道它。要实现此目的,你的使用者方法只需返回 void
@ConsumeEvent("greeting")
public void consume(String event) {
// Do something with the event
}
Sending messages
好的,我们已经学习了如何接收消息,现在让我们切换到 other side:发送者。发送和发布消息使用 Vert.x 事件总线:
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
@Inject
EventBus bus; (1)
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (2)
.onItem().transform(Message::body);
}
}
1 | Inject the Event bus |
2 | 向地址 greeting 发送消息。消息负载是 name |
EventBus
对象提供以下方法:
-
将消息
send
到某个特定地址 - 一个消费者接收消息。 -
将消息
publish
到某个特定地址 - 所有的消费者接收消息。 -
异步`send`一条消息并预期答复
-
以阻塞方式`send`一条消息并预期答复
// Case 1
bus.<String>requestAndForget("greeting", name);
// Case 2
bus.publish("greeting", name);
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
.onItem().transform(Message::body);
// Case 4
String response = bus.<String>requestAndAwait("greeting", name).body();
Putting things together - bridging HTTP and messages
让我们重新访问一个问候 HTTP 端点并使用异步消息传递来委派对一个分离 bean 的调用。它使用请求/应答调度机制。我们发送一条消息,而不是在 Jakarta REST 端点内部实现业务逻辑。此消息由另一个 bean 使用 reply 机制接收,并发送响应。
首先使用以下命令创建一个新项目:
quarkus create app {create-app-group-id}:{create-app-artifact-id} \
--no-code
cd {create-app-artifact-id}
要创建一个 Gradle 项目,添加 --gradle
或 --gradle-kotlin-dsl
选项。
有关如何安装和使用 Quarkus CLI 的详细信息,请参见 Quarkus CLI 指南。
mvn {quarkus-platform-groupid}:quarkus-maven-plugin:{quarkus-version}:create \
-DprojectGroupId={create-app-group-id} \
-DprojectArtifactId={create-app-artifact-id} \
-DnoCode
cd {create-app-artifact-id}
要创建一个 Gradle 项目,添加 -DbuildTool=gradle
或 -DbuildTool=gradle-kotlin-dsl
选项。
适用于 Windows 用户:
-
如果使用 cmd,(不要使用反斜杠
\
,并将所有内容放在同一行上) -
如果使用 Powershell,将
-D
参数用双引号引起来,例如"-DprojectArtifactId={create-app-artifact-id}"
你已经可以使用以下命令在 dev mode 中启动应用程序:
quarkus dev
./mvnw quarkus:dev
./gradlew --console=plain quarkusDev
然后,创建一个带有以下内容的新 Jakarta REST 资源:
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
@Inject
EventBus bus;
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (1)
.onItem().transform(Message::body); (2)
}
}
1 | 将 name 发送到 greeting 地址并请求一个响应 |
2 | 当我们得到响应后,提取主体并将其发送给用户 |
如果你调用这个端点,你将等待并得到一个超时。事实上,没有人监听。所以,我们需要一个消费者监听 greeting
地址。创建一个带有以下内容的 GreetingService
bean:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent("greeting")
public String greeting(String name) {
return "Hello " + name;
}
}
这个 bean 接收姓名,并返回问候消息。
现在,在浏览器中打开 [role="bare"][role="bare"]http://localhost:8080/async/Quarkus,你将看到:
Hello Quarkus
为了更好地理解,让我们详细介绍一下 HTTP 请求/响应是如何处理的:
-
请求由
hello
方法接收 -
包含 name 的消息被发送到事件总线
-
另一个 bean 接收此消息并计算出响应
-
该响应使用 reply 机制发送回来
-
一旦发送方收到回复,其内容就会被写入到 HTTP 响应中
这个应用程序可以使用下列命令打包:
quarkus build
./mvnw install
./gradlew build
你也可以使用以下命令编译为原生可执行文件:
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
Using codecs
Vert.x Event Bus 使用编解码器将 serialize 和 deserialize 对象编解码。Quarkus 为本地传递提供了默认编解码器。所以你可以按如下方式交换对象:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", new MyName(name))
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello " + name.getName());
}
如果你想使用一个特定的编解码器,你需在两端明确设置它:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name,
new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting", codec = MyNameCodec.class) (2)
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 | 设置用于发送消息的编解码器的名称 |
2 | 设置用于接收消息的编解码器 |