Using the event bus
Quarkus 允许不同的 Bean 使用异步事件进行交互,从而促进松散耦合。消息发送至 virtual addresses。它提供了 3 种传递机制:
Quarkus allows different beans to interact using asynchronous events, thus promoting loose-coupling. The messages are sent to virtual addresses. It offers 3 types of delivery mechanism:
-
point-to-point - send the message, one consumer receives it. If several consumers listen to the address, a round-robin is applied;
-
publish/subscribe - publish a message, all the consumers listening to the address are receiving the message;
-
request/reply - send the message and expect a response. The receiver can respond to the message in an asynchronous-fashion
所有这些传递机制都是非阻塞的,并且提供构建响应式应用程序的基本构建模块之一。
All these delivery mechanisms are non-blocking, and are providing one of the fundamental brick to build reactive applications.
异步消息传递功能允许回复消息,这是反应式消息传递不支持的。但是,它仅限于单事件行为(无流)和本地消息。 |
The asynchronous message passing feature allows replying to messages which is not supported by Reactive Messaging. However, it is limited to single-event behavior (no stream) and to local messages. |
Installing
此机制使用 Vert.x EventBus,因此您需要启用 vertx
扩展才能使用此功能。如果您正在创建一个新项目,请按如下方式设置 extensions
参数:
This mechanism uses the Vert.x EventBus, so you need to enable the vertx
extension to use this feature.
If you are creating a new project, set the extensions
parameter as follows:
Unresolved directive in reactive-event-bus.adoc - include::{includes}/devtools/create-app.adoc[]
如果您已经创建了一个项目,则可以使用 add-extension
命令将 vertx
扩展添加到现有 Quarkus 项目:
If you have an already created project, the vertx
extension can be added to an existing Quarkus project with
the add-extension
command:
Unresolved directive in reactive-event-bus.adoc - include::{includes}/devtools/extension-add.adoc[]
否则,您可以手动将此内容添加到构建文件的依赖关系部分:
Otherwise, you can manually add this to the dependencies section of your build file:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx</artifactId>
</dependency>
implementation("io.quarkus:quarkus-vertx")
Consuming events
要使用事件,请使用 io.quarkus.vertx.ConsumeEvent
注释:
To consume events, use the io.quarkus.vertx.ConsumeEvent
annotation:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent (1)
public String consume(String name) { (2)
return name.toUpperCase();
}
}
1 | If not set, the address is the fully qualified name of the bean, for instance, in this snippet it’s org.acme.vertx.GreetingService . |
2 | The method parameter is the message body. If the method returns something it’s the message response. |
默认情况下,使用该事件的代码必须是 non-blocking,因为它是在 Vert.x 事件循环上调用的。如果你的处理存在阻塞,请使用 blocking
属性:
By default, the code consuming the event must be non-blocking, as it’s called on the Vert.x event loop.
If your processing is blocking, use the blocking
attribute:
@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
// Something blocking
}
或者,你可以使用 @io.smallrye.common.annotation.Blocking
注解你的方法:
Alternatively, you can annotate your method with @io.smallrye.common.annotation.Blocking
:
@ConsumeEvent(value = "blocking-consumer")
@Blocking
void consumeBlocking(String message) {
// Something blocking
}
当使用 @Blocking
时,它会忽略 @ConsumeEvent
的 blocking
属性的值。请参阅 Quarkus Reactive Architecture documentation 以详细了解此主题。
When using @Blocking
, it ignores the value of the blocking
attribute of @ConsumeEvent
.
See the Quarkus Reactive Architecture documentation for further details on this topic.
通过返回 io.smallrye.mutiny.Uni
或 java.util.concurrent.CompletionStage
也可以进行异步处理:
Asynchronous processing is also possible by returning either an io.smallrye.mutiny.Uni
or a java.util.concurrent.CompletionStage
:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent
public CompletionStage<String> consume(String name) {
// return a CompletionStage completed when the processing is finished.
// You can also fail the CompletionStage explicitly
}
@ConsumeEvent
public Uni<String> process(String name) {
// return an Uni completed when the processing is finished.
// You can also fail the Uni explicitly
}
}
Mutiny
前面的示例使用 Mutiny 响应类型。如果您不熟悉 Mutiny,请查看 Mutiny - an intuitive reactive programming library。 The previous example uses Mutiny reactive types. If you are not familiar with Mutiny, check Mutiny - an intuitive reactive programming library. |
Configuring the address
`@ConsumeEvent`注释可配置为设置地址:
The @ConsumeEvent
annotation can be configured to set the address:
@ConsumeEvent("greeting") (1)
public String consume(String name) {
return name.toUpperCase();
}
1 | Receive the messages sent to the greeting address |
Replying
带有 @ConsumeEvent
注释的方法的 return 值用作对传入消息的响应。例如,在下面的代码段中,返回的 String
即响应。
The return value of a method annotated with @ConsumeEvent
is used as response to the incoming message.
For instance, in the following snippet, the returned String
is the response.
@ConsumeEvent("greeting")
public String consume(String name) {
return name.toUpperCase();
}
您还可以返回 `Uni<T>`或 `CompletionStage<T>`来处理异步答复:
You can also return a Uni<T>
or a CompletionStage<T>
to handle asynchronous reply:
@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}
如果您使用上下文传播扩展,则可以注入一个 You can inject an
或者,你可以使用默认的 Quarkus 工作程序池: Alternatively, you can use the default Quarkus worker pool using:
|
Implementing fire and forget interactions
你不必回复收到的邮件。通常,对于 fire and forget 交互,消息会被消耗且发送者无需知道它。要实现此目的,你的使用者方法只需返回 void
You don’t have to reply to received messages.
Typically, for a fire and forget interaction, the messages are consumed and the sender does not need to know about it.
To implement this, your consumer method just returns void
@ConsumeEvent("greeting")
public void consume(String event) {
// Do something with the event
}
Dealing with messages
如上所述,此机制基于 Vert.x 事件总线。因此,你还可以直接使用 Message
:
As said above, this mechanism is based on the Vert.x event bus. So, you can also use Message
directly:
@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
System.out.println(msg.address());
System.out.println(msg.body());
}
Handling Failures
如果带有 @ConsumeEvent
注释的方法抛出异常,则:
If a method annotated with @ConsumeEvent
throws an exception then:
-
if a reply handler is set then the failure is propagated back to the sender via an
io.vertx.core.eventbus.ReplyException
with codeConsumeEvent#FAILURE_CODE
and the exception message, -
if no reply handler is set then the exception is rethrown (and wrapped in a
RuntimeException
if necessary) and can be handled by the default exception handler, i.e.io.vertx.core.Vertx#exceptionHandler()
.
Sending messages
好的,我们已经学习了如何接收消息,现在让我们切换到 other side:发送者。发送和发布消息使用 Vert.x 事件总线:
Ok, we have seen how to receive messages, let’s now switch to the other side: the sender. Sending and publishing messages use the Vert.x event bus:
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
@Inject
EventBus bus; (1)
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (2)
.onItem().transform(Message::body);
}
}
1 | Inject the Event bus |
2 | Send a message to the address greeting . Message payload is name |
EventBus
对象提供以下方法:
The EventBus
object provides methods to:
-
send
a message to a specific address - one single consumer receives the message. -
publish
a message to a specific address - all consumers receive the messages. -
send
a message and expect reply asynchronously -
send
a message and expect reply in a blocking manner
// Case 1
bus.<String>requestAndForget("greeting", name);
// Case 2
bus.publish("greeting", name);
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
.onItem().transform(Message::body);
// Case 4
String response = bus.<String>requestAndAwait("greeting", name).body();
Putting things together - bridging HTTP and messages
让我们重新访问一个问候 HTTP 端点并使用异步消息传递来委派对一个分离 bean 的调用。它使用请求/应答调度机制。我们发送一条消息,而不是在 Jakarta REST 端点内部实现业务逻辑。此消息由另一个 bean 使用 reply 机制接收,并发送响应。
Let’s revisit a greeting HTTP endpoint and use asynchronous message passing to delegate the call to a separated bean. It uses the request/reply dispatching mechanism. Instead of implementing the business logic inside the Jakarta REST endpoint, we are sending a message. This message is consumed by another bean and the response is sent using the reply mechanism.
首先使用以下命令创建一个新项目:
First create a new project using:
Unresolved directive in reactive-event-bus.adoc - include::{includes}/devtools/create-app.adoc[]
你已经可以使用以下命令在 dev mode 中启动应用程序:
You can already start the application in dev mode using:
Unresolved directive in reactive-event-bus.adoc - include::{includes}/devtools/dev.adoc[]
然后,创建一个带有以下内容的新 Jakarta REST 资源:
Then, creates a new Jakarta REST resource with the following content:
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
@Inject
EventBus bus;
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (1)
.onItem().transform(Message::body); (2)
}
}
1 | send the name to the greeting address and request a response |
2 | when we get the response, extract the body and send it to the user |
如果你调用这个端点,你将等待并得到一个超时。事实上,没有人监听。所以,我们需要一个消费者监听 greeting
地址。创建一个带有以下内容的 GreetingService
bean:
If you call this endpoint, you will wait and get a timeout. Indeed, no one is listening.
So, we need a consumer listening on the greeting
address. Create a GreetingService
bean with the following content:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent("greeting")
public String greeting(String name) {
return "Hello " + name;
}
}
这个 bean 接收姓名,并返回问候消息。
This bean receives the name, and returns the greeting message.
现在,在浏览器中打开 [role="bare"][role="bare"]http://localhost:8080/async/Quarkus,你将看到:
Now, open your browser to [role="bare"]http://localhost:8080/async/Quarkus, and you should see:
Hello Quarkus
为了更好地理解,让我们详细介绍一下 HTTP 请求/响应是如何处理的:
To better understand, let’s detail how the HTTP request/response has been handled:
-
The request is received by the
hello
method -
a message containing the name is sent to the event bus
-
Another bean receives this message and computes the response
-
This response is sent back using the reply mechanism
-
Once the reply is received by the sender, the content is written to the HTTP response
这个应用程序可以使用下列命令打包:
This application can be packaged using:
Unresolved directive in reactive-event-bus.adoc - include::{includes}/devtools/build.adoc[]
你也可以使用以下命令编译为原生可执行文件:
You can also compile it as a native executable with:
Unresolved directive in reactive-event-bus.adoc - include::{includes}/devtools/build-native.adoc[]
Using codecs
Vert.x Event Bus 使用编解码器将 serialize 和 deserialize 对象编解码。Quarkus 为本地传递提供了默认编解码器。所以你可以按如下方式交换对象:
The Vert.x Event Bus uses codecs to serialize and deserialize objects. Quarkus provides a default codec for local delivery. So you can exchange objects as follows:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", new MyName(name))
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello " + name.getName());
}
如果你想使用一个特定的编解码器,你需在两端明确设置它:
If you want to use a specific codec, you need to explicitly set it on both ends:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name,
new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting", codec = MyNameCodec.class) (2)
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 | Set the name of the codec to use to send the message |
2 | Set the codec to use to receive the message |