Vert.x Reference Guide
Vert.x是一种用于构建响应式应用程序的工具包。如 Quarkus Reactive Architecture中所述,Quarkus 是着重底层 Vert.x。
Vert.x is a toolkit for building reactive applications. As described in the Quarkus Reactive Architecture, Quarkus uses Vert.x underneath.
本指南是 Using Eclipse Vert.x API from a Quarkus Application指南的配套指南。它提供了有关由 Quarkus 所用 Vert.x 实例的用法和配置的更多高级详细信息。
This guide is the companion to the Using Eclipse Vert.x API from a Quarkus Application guide. It provides more advanced details about the usage and the configuration of the Vert.x instance used by Quarkus.
Access the Vert.x instance
要访问托管的 Vert.x 实例,请将 `quarkus-vertx`扩展添加到您的项目中。此依赖可能已经存在于您的项目中(作为传递依赖)。
To access the managed Vert.x instance, add the quarkus-vertx
extension to your project.
This dependency might already be available in your project (as a transitive dependency).
利用此扩展,您可以使用字段或构造函数注入的方式检索 Vert.x 的托管实例:
With this extension, you can retrieve the managed instance of Vert.x using either field or constructor injection:
@ApplicationScoped
public class MyBean {
// Field injection
@Inject Vertx vertx;
// Constructor injection
MyBean(Vertx vertx) {
// ...
}
}
您可以注入以下任一选项:
You can inject either the:
-
io.vertx.core.Vertx
instance exposing the bare Vert.x API -
io.vertx.mutiny.core.Vertx
instance exposing the Mutiny API
我们建议使用 Mutiny 变体,因为它与 Quarkus 提供的其他响应式 API 集成。
We recommend using the Mutiny variant as it integrates with the other reactive APIs provided by Quarkus.
Mutiny
如果您不熟悉 Mutiny,请查看 Mutiny - an intuitive reactive programming library。 If you are not familiar with Mutiny, check Mutiny - an intuitive reactive programming library. |
有关 Vert.x Mutiny 变体的文档可在 [role="bare"][role="bare"]https://smallrye.io/smallrye-mutiny-vertx-bindings上查阅。
Documentation about the Vert.x Mutiny variant is available on [role="bare"]https://smallrye.io/smallrye-mutiny-vertx-bindings.
Configure the Vert.x instance
您可以通过 `application.properties`文件配置 Vert.x 实例。下表列出了受支持的属性:
You can configure the Vert.x instance from the application.properties
file.
The following table lists the supported properties:
Unresolved directive in vertx-reference.adoc - include::{generated-dir}/config/quarkus-vertx.adoc[]
另请参阅 Customize the Vert.x configuration,了解如何使用编程方式配置 Vert.x 实例。
See Customize the Vert.x configuration to configure the Vert.x instance using a programmatic approach.
Use Vert.x clients
除了 Vert.x 核心,还可以使用大多数 Vert.x 生态系统库。某些 Quarkus 扩展已经封装了 Vert.x 库。
In addition to Vert.x core, you can use most Vert.x ecosystem libraries. Some Quarkus extension already wraps Vert.x libraries.
Available APIs
下表列出了 Vert.x 生态系统中使用的 most 库。要访问这些 API,请向项目添加指示的扩展或依赖项。查看关联的文档以了解如何使用它们。
The following table lists the most used libraries from the Vert.x ecosystem. To access these APIs, add the indicated extension or dependency to your project. Check the associated documentation to learn how to use them.
API |
Extension or Dependency |
Documentation |
AMQP Client |
|
|
Circuit Breaker |
|
[role="bare"]https://vertx.io/docs/vertx-circuit-breaker/java/ |
Consul Client |
|
[role="bare"]https://vertx.io/docs/vertx-consul-client/java/ |
DB2 Client |
|
|
Kafka Client |
|
|
Mail Client |
|
|
MQTT Client |
|
No guide yet |
MS SQL Client |
|
|
MySQL Client |
|
|
Oracle Client |
|
|
PostgreSQL Client |
|
|
RabbitMQ Client |
|
[role="bare"]https://vertx.io/docs/vertx-rabbitmq-client/java |
Redis Client |
|
|
Web Client |
|
[role="bare"]https://vertx.io/docs/vertx-web-client/java/ |
要详细了解 Vert.x Mutiny API 的使用方法,请参阅 [role="bare"][role="bare"]https://smallrye.io/smallrye-mutiny-vertx-bindings.
To learn more about the usage of the Vert.x Mutiny API, refer to [role="bare"]https://smallrye.io/smallrye-mutiny-vertx-bindings.
Use the Vert.x Web Client
本节提供了一个使用 Vert.x WebClient
的示例,示例是在 Quarkus REST(以前称为 RESTEasy Reactive)应用程序的环境中编写的。如上表所示,向项目添加以下依赖项:
This section gives an example using the Vert.x WebClient
in the context of a Quarkus REST (formerly RESTEasy Reactive) application.
As indicated in the table above, add the following dependency to your project:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-web-client</artifactId>
</dependency>
implementation("io.smallrye.reactive:smallrye-mutiny-vertx-web-client")
现在,您可以在代码中创建 WebClient
的一个实例:
Now, in your code, you can create an instance of WebClient
:
package org.acme.vertx;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.ext.web.client.WebClient;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClientOptions;
@Path("/fruit-data")
public class ResourceUsingWebClient {
private final WebClient client;
@Inject
VertxResource(Vertx vertx) {
this.client = WebClient.create(vertx);
}
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/{name}")
public Uni<JsonObject> getFruitData(String name) {
return client.getAbs("https://.../api/fruit/" + name)
.send()
.onItem().transform(resp -> {
if (resp.statusCode() == 200) {
return resp.bodyAsJsonObject();
} else {
return new JsonObject()
.put("code", resp.statusCode())
.put("message", resp.bodyAsString());
}
});
}
}
此资源创建一个 WebClient
,并且在收到请求后使用此客户端调用远程 HTTP API。它会根据结果转发接收到的响应,或创建一个封装错误的 JSON 对象。WebClient
是异步的(并且是非阻塞的),因此端点会返回一个 Uni
。
This resource creates a WebClient
and, upon request, uses this client to invoke a remote HTTP API.
Depending on the result, the response is forwarded as received, or it creates a JSON object wrapping the error.
The WebClient
is asynchronous (and non-blocking), to the endpoint returns a Uni
.
该应用程序还可以作为本地可执行文件运行。但是,我们首先需要指示 Quarkus 启用 ssl(如果远程 API 使用 HTTPS)。打开 src/main/resources/application.properties
并添加:
The application can also run as a native executable.
But, first, we need to instruct Quarkus to enable ssl (if the remote API uses HTTPS).
Open the src/main/resources/application.properties
and add:
quarkus.ssl.native=true
然后,使用以下命令创建本地可执行文件:
Then, create the native executable with:
Unresolved directive in vertx-reference.adoc - include::{includes}/devtools/build-native.adoc[]
Use Vert.x JSON
Vert.x API 通常依赖于 JSON。Vert.x 提供了两个便捷的类来处理 JSON 文档:io.vertx.core.json.JsonObject
和 io.vertx.core.json.JsonArray
。
Vert.x APIs often rely on JSON.
Vert.x provides two convenient classes to manipulate JSON document: io.vertx.core.json.JsonObject
and io.vertx.core.json.JsonArray
.
JsonObject
可用于将对象映射到其 JSON 表示形式,并从 JSON 文档构建一个对象:
JsonObject
can be used to map an object into its JSON representation and build an object from a JSON document:
// Map an object into JSON
Person person = ...;
JsonObject json = JsonObject.mapFrom(person);
// Build an object from JSON
json = new JsonObject();
person = json.mapTo(Person.class);
请注意,这些特性使用由 quarkus-jackson
扩展管理的映射器。请参阅 Jackson configuration 自定义映射。
Note that these features use the mapper managed by the quarkus-jackson
extension.
Refer to Jackson configuration to customize the mapping.
JSON 对象和 JSON 数组都支持作为 Quarkus HTTP 端点请求和响应主体(使用经典 RESTEasy 和 Quarkus REST)。考虑以下端点:
JSON Object and JSON Array are both supported as Quarkus HTTP endpoint requests and response bodies (using classic RESTEasy and Quarkus REST). Consider these endpoints:
package org.acme.vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/hello")
@Produces(MediaType.APPLICATION_JSON)
public class VertxJsonResource {
@GET
@Path("{name}/object")
public JsonObject jsonObject(String name) {
return new JsonObject().put("Hello", name);
}
@GET
@Path("{name}/array")
public JsonArray jsonArray(String name) {
return new JsonArray().add("Hello").add(name);
}
}
[role="bare"][role="bare"]http://localhost:8080/hello/Quarkus/object 返回:
[role="bare"]http://localhost:8080/hello/Quarkus/object returns:
{"Hello":"Quarkus"}
[role="bare"][role="bare"]http://localhost:8080/hello/Quarkus/array 返回:
[role="bare"]http://localhost:8080/hello/Quarkus/array returns:
["Hello","Quarkus"]
无论 JSON 内容是请求主体还是封装在 Uni
、Multi
、CompletionStage
或 Publisher
中,此操作都很有效。
This works equally well when the JSON content is a request body or is wrapped in a Uni
, Multi
, CompletionStage
or Publisher
.
Use Verticles
verticles[Verticles] is "a simple, scalable, actor-like deployment and concurrency model" provided by _Vert.x.此模型并不声称是一个严格的 actor 模型实现,但它具有类似性,尤其是在并发性、扩展性和部署方面。要使用此模型,请编写 deploy 顶点,并通过向事件总线发送消息的方式进行通信。
Verticles is "a simple, scalable, actor-like deployment and concurrency model" provided by _Vert.x. This model does not claim to be a strict actor-model implementation, but it shares similarities, especially concerning concurrency, scaling, and deployment. To use this model, you write and deploy verticles, communicating by sending messages on the event bus.
您可以在 Quarkus 中部署 verticles。它支持:
You can deploy verticles in Quarkus. It supports:
-
bare verticle - Java classes extending
io.vertx.core.AbstractVerticle
-
Mutiny verticle - Java classes extending
io.smallrye.mutiny.vertx.core.AbstractVerticle
Deploy Verticles
若要部署顶点,请使用 deployVerticle
:
To deploy verticles, use the deployVerticle
method:
@Inject Vertx vertx;
// ...
vertx.deployVerticle(MyVerticle.class.getName(), ar -> { });
vertx.deployVerticle(new MyVerticle(), ar -> { });
如果你使用 Vert.x 的 Mutiny 变体,请注意, deployVerticle
方法会返回一个 Uni
,你将需要触发一个订阅来完成实际的部署。
If you use the Mutiny-variant of Vert.x, be aware that the deployVerticle
method returns a Uni
, and you would need to trigger a subscription to make the actual deployment.
后面会出现一个示例,说明如何在应用程序初始化期间部署 top vertex。 |
An example explaining how to deploy verticles during the initialization of the application will follow. |
Use @ApplicationScoped beans as Verticle
一般来说,Vert.x 顶点不是 CDI Bean。因此不能使用注入。然而,在 Quarkus 中,你可以将顶点部署为 bean。请注意,在这种情况下,由 CDI(Quarkus 中的 Arc)负责创建实例。
In general, Vert.x verticles are not CDI beans. And so cannot use injection. However, in Quarkus, you can deploy verticles as beans. Note that in this case, CDI (Arc in Quarkus) is responsible for creating the instance.
以下片段提供了一个示例:
The following snippet provides an example:
package io.quarkus.vertx.verticles;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class MyBeanVerticle extends AbstractVerticle {
@ConfigProperty(name = "address") String address;
@Override
public Uni<Void> asyncStart() {
return vertx.eventBus().consumer(address)
.handler(m -> m.replyAndForget("hello"))
.completionHandler();
}
}
你不必注入 vertx
实例;相反,利用 AbstractVerticle
中的受保护字段。
You don’t have to inject the vertx
instance; instead, leverage the protected field from AbstractVerticle
.
然后,用以下内容部署顶点实例:
Then, deploy the verticle instances with:
package io.quarkus.vertx.verticles;
import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
@ApplicationScoped
public class VerticleDeployer {
public void init(@Observes StartupEvent e, Vertx vertx, MyBeanVerticle verticle) {
vertx.deployVerticle(verticle).await().indefinitely();
}
}
如果你想部署每个公开的 AbstractVerticle
,你可以使用:
If you want to deploy every exposed AbstractVerticle
, you can use:
public void init(@Observes StartupEvent e, Vertx vertx, Instance<AbstractVerticle> verticles) {
for (AbstractVerticle verticle : verticles) {
vertx.deployVerticle(verticle).await().indefinitely();
}
}
Create multiple verticles instances
使用 @ApplicationScoped
时,你将为顶点获取一个单一实例。拥有多个顶点实例对于共享其负载很有帮助。每个顶点都将与一个不同的 I/O 线程(Vert.x 事件循环)关联。
When using @ApplicationScoped
, you will get a single instance for your verticle.
Having multiple instances of verticles can be helpful to share the load among them.
Each of them will be associated with a different I/O thread (Vert.x event loop).
若要部署多个顶点实例,请使用 @Dependent
作用域,而不是 @ApplicationScoped
:
To deploy multiple instances of your verticle, use the @Dependent
scope instead of @ApplicationScoped
:
package org.acme.verticle;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
@Dependent
public class MyVerticle extends AbstractVerticle {
@Override
public Uni<Void> asyncStart() {
return vertx.eventBus().consumer("address")
.handler(m -> m.reply("Hello from " + this))
.completionHandler();
}
}
然后,按如下方式部署顶点:
Then, deploy your verticle as follows:
package org.acme.verticle;
import io.quarkus.runtime.StartupEvent;
import io.vertx.core.DeploymentOptions;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
@ApplicationScoped
public class MyApp {
void init(@Observes StartupEvent ev, Vertx vertx, Instance<MyVerticle> verticles) {
vertx
.deployVerticle(verticles::get, new DeploymentOptions().setInstances(2))
.await().indefinitely();
}
}
init
方法会接收一个 Instance<MyVerticle>
。然后,你将一个提供器传递给 deployVerticle
方法。提供器仅仅调用 get()
方法。感谢 @Dependent
作用域,它每次调用都返回一个新实例。最后,你将所需的实例数传递给 DeploymentOptions
,例如前一个示例中的两个。它会调用提供器两次,这将创建两个顶点实例。
The init
method receives an Instance<MyVerticle>
.
Then, you pass a supplier to the deployVerticle
method.
The supplier is just calling the get()
method.
Thanks to the @Dependent
scope, it returns a new instance on every call.
Finally, you pass the desired number of instances to the DeploymentOptions
, such as two in the previous example.
It will call the supplier twice, which will create two instances of your verticle.
Use the Event Bus
Vert.x 采用内建的 event bus,你可以从 Quarkus 应用程序中使用它。因此,你的应用程序组件(CDI bean、资源等)可以使用异步事件进行交互,从而促进松散耦合。
Vert.x comes with a built-in event bus that you can use from your Quarkus application. So, your application components (CDI beans, resources…) can interact using asynchronous events, thus promoting loose-coupling.
通过事件总线,可以将 messages 发送到 virtual addresses。事件总线提供三种类型的传递机制:
With the event bus, you send messages to virtual addresses. The event bus offers three types of delivery mechanisms:
-
point-to-point - send the message, one consumer receives it. If several consumers listen to the address, a round-robin is applied;
-
publish/subscribe - publish a message; all the consumers listening to the address are receiving the message;
-
request/reply - send the message and expect a response. The receiver can respond to the message in an asynchronous fashion.
所有这些传递机制都是非阻塞的,并且为构建响应式应用程序提供了基本条件。
All these delivery mechanisms are non-blocking and are providing one of the fundamental bricks to build reactive applications.
Consume events
在可以使用 Vert.x API 注册消费者的同时,Quarkus 也随附有声明性支持。若要使用事件,请使用 io.quarkus.vertx.ConsumeEvent
注解:
While you can use the Vert.x API to register consumers, Quarkus comes with declarative support.
To consume events, use the io.quarkus.vertx.ConsumeEvent
annotation:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent (1)
public String consume(String name) { (2)
return name.toUpperCase();
}
}
1 | If not set, the address is the fully qualified name of the bean; for instance, in this snippet, it’s org.acme.vertx.GreetingService . |
2 | The method parameter is the message body. If the method returns something, it’s the message response. |
Configure the address
`@ConsumeEvent`注释可配置为设置地址:
The @ConsumeEvent
annotation can be configured to set the address:
@ConsumeEvent("greeting") (1)
public String consume(String name) {
return name.toUpperCase();
}
1 | Receive the messages sent to the greeting address |
地址值可以是属性表达式。在这种情况下,将使用已配置的值: @ConsumeEvent("${my.consumer.address}")
。此外,属性表达式可以指定一个默认值: @ConsumeEvent("${my.consumer.address:defaultAddress}")
。
The address value can be a property expression.
In this case, the configured value is used instead: @ConsumeEvent("${my.consumer.address}")
.
Additionally, the property expression can specify a default value: @ConsumeEvent("${my.consumer.address:defaultAddress}")
.
@ConsumeEvent("${my.consumer.address}") (1)
public String consume(String name) {
return name.toLowerCase();
}
1 | Receive the messages sent to the address configured with the my.consumer.address key. |
如果不存在具有指定键的配置属性并且未设置默认值,那么应用程序启动将失败。 |
If no config property with the specified key exists and no default value is set then the application startup fails. |
Process events asynchronously
前面的示例使用同步处理。还可以通过返回 `io.smallrye.mutiny.Uni`或 `java.util.concurrent.CompletionStage`对异步处理进行处理:
The previous examples use synchronous processing.
Asynchronous processing is also possible by returning either an io.smallrye.mutiny.Uni
or a java.util.concurrent.CompletionStage
:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent
public Uni<String> process(String name) {
// return an Uni completed when the processing is finished.
// You can also fail the Uni explicitly
}
}
Mutiny
前面的示例使用 Mutiny 响应类型。如果您不熟悉 Mutiny,请查看 Mutiny - an intuitive reactive programming library。 The previous example uses Mutiny reactive types. If you are not familiar with Mutiny, check Mutiny - an intuitive reactive programming library. |
Blocking processing of events
默认情况下,使用事件的代码必须是 non-blocking,因为它是在 I/O 线程上调用的。如果您的处理正在阻止,请使用 `@io.smallrye.common.annotation.Blocking`注释:
By default, the code consuming the event must be non-blocking, as it’s called on an I/O thread.
If your processing is blocking, use the @io.smallrye.common.annotation.Blocking
annotation:
@ConsumeEvent(value = "blocking-consumer")
@Blocking
void consumeBlocking(String message) {
// Something blocking
}
或者,您可以使用 `@ConsumeEvent`注释中的 `blocking`属性:
Alternatively, you can use the blocking
attribute from the @ConsumeEvent
annotation:
@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
// Something blocking
}
使用 `@Blocking`时,它会忽略 `@ConsumeEvent`的 `blocking`属性值。
When using @Blocking
, it ignores the value of the blocking
attribute of @ConsumeEvent
.
Reply to events
注释有 `@ConsumeEvent`的方法的 _return_值用于响应传入的消息。例如,在下面的代码段中,返回的 `String`是响应。
The return value of a method annotated with @ConsumeEvent
is used to respond to the incoming message.
For instance, in the following snippet, the returned String
is the response.
@ConsumeEvent("greeting")
public String consume(String name) {
return name.toUpperCase();
}
您还可以返回 `Uni<T>`或 `CompletionStage<T>`来处理异步答复:
You can also return a Uni<T>
or a CompletionStage<T>
to handle asynchronous reply:
@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}
如果您使用上下文传播扩展,则可以注入一个 You can inject an
|
Implement fire-and-forget interactions
您不必回复接收到的消息。通常,对于 _fire and forget_交互,消息会被使用,而发送方不需要知道它。要实现此模式,您的消费者方法返回 void
。
You don’t have to reply to received messages.
Typically, for a fire and forget interaction, the messages are consumed, and the sender does not need to know about it.
To implement this pattern, your consumer method returns void
.
@ConsumeEvent("greeting")
public void consume(String event) {
// Do something with the event
}
Consume messages (instead of events)
不同于直接使用 _payloads_的前一个示例,您还可以直接使用 Message
:
Unlike the previous example using the payloads directly, you can also use Message
directly:
@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
System.out.println(msg.address());
System.out.println(msg.body());
}
Handle failures
如果使用 `@ConsumeEvent`注释的方法抛出异常,则:
If a method annotated with @ConsumeEvent
throws an exception, then:
-
if a reply handler is set, then the failure is propagated back to the sender via an
io.vertx.core.eventbus.ReplyException
with codeConsumeEvent#FAILURE_CODE
and the exception message, -
if no reply handler is set, then the exception is rethrown (and wrapped in a
RuntimeException
if necessary) and can be handled by the default exception handler, i.e.io.vertx.core.Vertx#exceptionHandler()
.
Send messages
发送和发布消息使用 Vert.x 事件总线:
Sending and publishing messages use the Vert.x event bus:
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
@Inject
EventBus bus; (1)
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (2)
.onItem().transform(Message::body);
}
}
1 | Inject the Event bus |
2 | Send a message to the address greeting . Message payload is name |
EventBus
对象提供以下方法:
The EventBus
object provides methods to:
-
send
a message to a specific address - one single consumer receives the message. -
publish
a message to a specific address - all consumers receive the messages. -
request
a message and expect a reply
// Case 1
bus.sendAndForget("greeting", name)
// Case 2
bus.publish("greeting", name)
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
.onItem().transform(Message::body);
Process events on virtual threads
使用 @ConsumeEvent
注解的方法也可以使用 @RunOnVirtualThread
注解。在此情况下,该方法在虚拟线程上被调用。每个事件在不同的虚拟线程上被调用。
Methods annotated with @ConsumeEvent
can also be annotated with @RunOnVirtualThread
.
In this case, the method is invoked on a virtual thread.
Each event is invoked on a different virtual thread.
要使用此功能,请确保:
To use this feature, make sure:
-
Your Java runtime supports virtual threads.
-
Your method uses a blocking signature.
第二点意味着只有返回一个对象或 void
的方法才能使用 @RunOnVirtualThread
。返回一个 Uni
或一个 CompletionStage
cannot 的方法在虚拟线程上运行。
The second point means only methods returning an object or void
can use @RunOnVirtualThread
.
Methods returning a Uni
or a CompletionStage
cannot run on virtual threads.
阅读 the virtual thread guide 以了解更多。
Read the virtual thread guide for more details.
Use codecs
链接:https://vertx.io/docs/vertx-core/java/event_bus[Vert.x Event Bus] uses codecs to _serialize 和 deserialize 消息对象。Quarkus 为本地交付提供了一个默认编解码器。针对本地消费者的返回类型和消息体参数(即使用 @ConsumeEvent
注解的方法,其中 ConsumeEvent#local() == true
是默认值),此编解码器将被自动使用。
The Vert.x Event Bus uses codecs to _serialize and deserialize message objects.
Quarkus provides a default codec for local delivery.
This codec is automatically used for return types and message body parameters of local consumers, i.e. methods annotated with @ConsumeEvent
whete ConsumeEvent#local() == true
(which is the default).
因此,您可以按照如下方式交换消息对象:
So that you can exchange the message objects as follows:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", new MyName(name))
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello " + name.getName());
}
如果您想使用特定的编解码器,则需要在两端明确地设置它:
If you want to use a specific codec, you need to set it on both ends explicitly:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name,
new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting", codec = MyNameCodec.class) (2)
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 | Set the name of the codec to use to send the message |
2 | Set the codec to use to receive the message |
Combine HTTP and the Event Bus
让我们重新审视一下 HTTP greeting 端点,并使用异步消息传递将调用委托给一个单独的 bean。它使用请求/响应分发机制。我们在 Jakarta REST 端点中发送消息,而不是实现其中的业务逻辑。另一个 bean 会使用 reply 机制接收此消息,然后发送响应。
Let’s revisit a greeting HTTP endpoint and use asynchronous message passing to delegate the call to a separated bean. It uses the request/reply dispatching mechanism. Instead of implementing the business logic inside the Jakarta REST endpoint, we are sending a message. Another bean consumes this message, and the response is sent using the reply mechanism.
在你的 HTTP 端点类中,注入事件总线并使用 request
方法向事件总线发送消息,并期待一个响应:
In your HTTP endpoint class, inject the event bus and uses the request
method to send a message to the event bus and expect a response:
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/bus")
public class EventResource {
@Inject
EventBus bus;
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (1)
.onItem().transform(Message::body); (2)
}
}
1 | send the name to the greeting address and request a response |
2 | when we get the response, extract the body and send it to the user |
HTTP 方法返回一个 |
the HTTP method returns a |
我们需要一个侦听 greeting
地址的消费者。此消费者可以位于同一个类或另一个 Bean 中,例如:
We need a consumer listening on the greeting
address.
This consumer can be in the same class or another bean such as:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent("greeting")
public String greeting(String name) {
return "Hello " + name;
}
}
此 Bean 接收姓名并返回 greeting 消息。
This bean receives the name and returns the greeting message.
有了这些,/bus/quarkus
上的每个 HTTP 请求都会向事件总线发送一条消息、等待回复,而且当回复到达后,编写 HTTP 响应:
With this in place, every HTTP request on /bus/quarkus
sends a message to the event bus, waits for a reply, and when this one arrives, writes the HTTP response:
Hello Quarkus
为了更好地理解,让我们详细介绍一下 HTTP 请求/响应是如何处理的:
To better understand, let’s detail how the HTTP request/response has been handled:
-
The request is received by the
greeting
method -
a message containing the name is sent to the event bus
-
Another bean receives this message and computes the response
-
This response is sent back using the reply mechanism
-
Once the reply is received by the sender, the content is written to the HTTP response
Bidirectional communication with browsers by using SockJS
Vert.x 提供的 SockJS 网桥允许浏览器应用程序和 Quarkus 应用程序使用事件总线进行通信。它把两边连接起来。因此,双方都可以向另一方发送消息并收到对方发送的消息。它支持这三种传递机制。
The SockJS bridge provided by Vert.x allows browser applications and Quarkus applications to communicate using the event bus. It connects both sides. So, both sides can send messages received on the other side. It supports the three delivery mechanisms.
SockJS 协商 Quarkus 应用程序和浏览器之间的通信通道。如果支持 WebSocket,它会使用它们;否则,它会降级为 SSE、长轮询等。
SockJS negotiates the communication channel between the Quarkus application and the browser. If WebSockets are supported, it uses them; otherwise, it degrades to SSE, long polling, etc.
因此,要使用 SockJS,你需要配置网桥,特别是用来进行通信的地址:
So use SockJS, you need to configure the bridge, especially the addresses that will be used to communicate:
package org.acme;
import io.vertx.core.Vertx;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.sockjs.SockJSBridgeOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import java.util.concurrent.atomic.AtomicInteger;
@ApplicationScoped
public class SockJsExample {
@Inject
Vertx vertx;
public void init(@Observes Router router) {
SockJSHandler sockJSHandler = SockJSHandler.create(vertx);
Router bridge = sockJSHandler.bridge(new SockJSBridgeOptions()
.addOutboundPermitted(new PermittedOptions().setAddress("ticks")));
router.route("/eventbus/*").subRouter(bridge);
AtomicInteger counter = new AtomicInteger();
vertx.setPeriodic(1000,
ignored -> vertx.eventBus().publish("ticks", counter.getAndIncrement()));
}
}
此代码将 SockJS 网桥配置为将所有针对 ticks
地址的消息发送到已连接的浏览器。更多有关配置的详细说明可以在 the Vert.x SockJS Bridge documentation 中找到。
This code configures the SockJS bridge to send all the messages targeting the ticks
address to the connected browsers.
More detailed explanations about the configuration can be found on the Vert.x SockJS Bridge documentation.
浏览器必须使用 vertx-eventbus
JavaScript 库来使用消息:
The browser must use the vertx-eventbus
JavaScript library to consume the message:
<!doctype html>
<html>
<head>
<meta charset="utf-8"/>
<title>SockJS example - Quarkus</title>
<script src="https://code.jquery.com/jquery-3.3.1.min.js"
integrity="sha256-FgpCb/KJQlLNfOu91ta32o/NMZxltwRo8QtmkMRdAu8=" crossorigin="anonymous"></script>
<script type="application/javascript" src="https://cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/vertx3-eventbus-client@3.8.5/vertx-eventbus.min.js"></script>
</head>
<body>
<h1>SockJS Examples</h1>
<p><strong>Last Tick:</strong> <span id="tick"></span></p>
</body>
<script>
var eb = new EventBus('/eventbus');
eb.onopen = function () {
eb.registerHandler('ticks', function (error, message) {
$("#tick").html(message.body);
});
}
</script>
</html>
Use native transports
原生可执行文件中不支持原生传输。
Native transports are not supported in native executables.
如要使用 |
To use |
Vert.x 能够使用 Netty’s native transports,可以在特定平台上提供性能提升。若要启用它们,您必须包含平台的相应依赖项。通常建议同时拥有这两者,以保持应用程序与平台无关。Netty 足够智能,可以使用正确的依赖项,包括在不受支持的平台上根本不使用它们:
Vert.x is capable of using Netty’s native transports, which offers performance improvements on specific platforms. To enable them, you must include the appropriate dependency for your platform. It’s usually a good idea to have both to keep your application platform-agnostic. Netty is smart enough to use the correct one, that includes none at all on unsupported platforms:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<classifier>osx-x86_64</classifier>
</dependency>
implementation("io.netty:netty-transport-native-epoll::linux-x86_64")
implementation("io.netty:netty-transport-native-kqueue::osx-x86_64")
您还必须显式配置 Vert.x 以使用原生传输。在 `application.properties`中添加:
You will also have to explicitly configure Vert.x to use the native transport.
In application.properties
add:
quarkus.vertx.prefer-native-transport=true
或在 `application.yml`中:
Or in application.yml
:
quarkus:
vertx:
prefer-native-transport: true
如果一切运行良好,quarkus 将记录:
If all is well quarkus will log:
[io.qua.ver.cor.run.VertxCoreRecorder] (main) Vertx has Native Transport Enabled: true
Use a Vert.x context-aware scheduler
部分 Mutiny 运算符需要在执行器线程池上安排工作。一个很好的示例是 .onItem().delayIt().by(Duration.ofMillis(10)
,因为它需要此类执行器来延迟发射。
Some Mutiny operators need to schedule work on an executor thread pool.
A good example is .onItem().delayIt().by(Duration.ofMillis(10)
as it needs such an executor to delay emissions.
默认执行器由 `io.smallrye.mutiny.infrastructure.Infrastructure`返回,并且已由 Quarkus 配置和管理。
The default executor is returned by io.smallrye.mutiny.infrastructure.Infrastructure
and it is already configured and managed by Quarkus.
话虽如此,有时您需要确保在 Vert.x(已复制)上下文中运行操作,而不仅仅是在任意线程中运行。
That being said, there are cases where you need to make sure that an operation is run on a Vert.x (duplicated) context and not just on any random thread.
`io.smallrye.mutiny.vertx.core.ContextAwareScheduler`界面提供了一个获取上下文感知调度程序的 API。此类调度程序使用以下内容进行配置:
The io.smallrye.mutiny.vertx.core.ContextAwareScheduler
interface offers an API to obtain context-aware schedulers.
Such a scheduler is configured with:
-
a delegate
ScheduledExecutorService
of your choice (hint: you can reuseInfrastructure.getDefaultWorkerPool()
), and -
a context fetching strategy among:
-
an explicit
Context
, or -
calling
Vertx::getOrCreateContext()
either on the current thread or later when the scheduling request happens, or -
calling
Vertx::currentContext()
, which fails if the current thread is not a Vert.x thread.
-
以下是一个 `ContextAwareScheduler`的使用示例:
Here is a sample where ContextAwareScheduler
is used:
class MyVerticle extends AbstractVerticle {
@Override
public Uni<Void> asyncStart() {
vertx.getOrCreateContext().put("foo", "bar");
var delegate = Infrastructure.getDefaultWorkerPool();
var scheduler = ContextAwareScheduler.delegatingTo(delegate)
.withCurrentContext();
return Uni.createFrom().voidItem()
.onItem().delayIt().onExecutor(scheduler).by(Duration.ofMillis(10))
.onItem().invoke(() -> {
// Prints "bar"
var ctx = vertx.getOrCreateContext();
System.out.println(ctx.get("foo"));
});
}
}
此示例通过捕获调用 `asyncStart()`的 Vert.x 事件循环的上下文来创建调度器。`delayIt`运算符使用该调度程序,我们可以检查我们在 `invoke`中获取的上下文是一个 Vert.x 复制的上下文,其中已传播键 `"foo"`的数据。
In this example a scheduler is created by capturing the context of the Vert.x event-loop that calls asyncStart()
.
The delayIt
operator uses that scheduler, and we can check that the context that we get in invoke
is a Vert.x duplicated context where the data for key "foo"
has been propagated.
Use a Unix domain socket
如果从同一主机建立与 quarkus 服务的连接,则监听 Unix 域套接字时,我们可以消除 TCP 的开销。如果您使用 Envoy 之类的代理设置服务网格,则这种情况可能会发生,因为在这种情况下,对服务的访问会通过代理进行。
Listening on a Unix domain socket allows us to dispense with the overhead of TCP if the connection to the quarkus service is established from the same host. This can happen if access to the service goes through a proxy which is often the case if you’re setting up a service mesh with a proxy like Envoy.
这段文字只能在支持 Use native transports 的平台上运作。
This will only work on platforms that support Use native transports.
启用适当的 Use native transports,并设置以下环境属性:
Enable the appropriate Use native transports and set the following environment property:
quarkus.http.domain-socket=/var/run/io.quarkus.app.socket quarkus.http.domain-socket-enabled=true quarkus.vertx.prefer-native-transport=true
它本身不会禁用将默认在 0.0.0.0:8080
上打开的 tcp 套接字。可以明确禁用:
By itself this will not disable the tcp socket which by default will open on
0.0.0.0:8080
. It can be explicitly disabled:
quarkus.http.host-enabled=false
这些属性可以通过 Java 的 -D
命令行参数或 application.properties
设置。
These properties can be set through Java’s -D
command line parameter or
on application.properties
.
不要忘记添加本机传输依赖关系。有关详细信息,请参见 Use native transports。
Do not forget to add the native transport dependency. See Use native transports for details.
确保您的应用程序具有向套接字写入的正确权限。
Make sure your application has the right permissions to write to the socket.
Use io_uring
io_uring
不受本机可执行文件支持。
io_uring
is not supported in native executables.
|
|
io_uring
是一个 Linux 内核界面,允许您异步发送和接收数据。它为文件和网络 I/O 提供统一语义。它最初设计为针对块设备和文件,但此后获得了处理网络套接字等事务的能力。它有可能单独为网络 I/O 提供适度的性能优势,为混合文件和网络 I/O 应用程序工作负载提供更大的优势。
io_uring
is a Linux kernel interface that allows you to send and receive data asynchronously.
It provides unified semantics for both file and network I/O.
It was originally designed to target block devices and files but has since gained the ability to work with things like network sockets.
It has the potential to provide modest performance benefits to network I/O on its own and greater benefits for mixed file and network I/O application workloads.
要了解更多有关 io_uring
的信息,我们推荐以下链接:
To learn more about io_uring
, we recommend the following links:
-
Why you should use io_uring for network I/O: The main benefit of io_uring for network I/O is a modern asynchronous API that is straightforward to use and provides unified semantics for file and network I/O. A potential performance benefit of io_uring for network I/O is reducing the number of syscalls. This could provide the biggest benefit for high volumes of small operations where the overhead of system calls can be significant.
-
The Backend Revolution and Why io_uring Is So Important: The io_uring API uses two ring buffers for communication between application and kernel (hence the API name) and designed in a way that enables natural batching of requests and responses. Besides, it provides a way to submit multiple requests in one system call, which can reduce overhead.
-
What exactly is io_uring?: io_uring is a Linux kernel interface to efficiently allow you to send and receive data asynchronously. It was originally designed to target block devices and files but has since gained the ability to work with things like network sockets.
要使用 io_uring
,您需要向您的项目添加两个依赖关系并启用本机传输。首先向您的项目添加以下依赖关系:
To use io_uring
, you need to add two dependencies to your project and enable native transport.
First add the following dependencies to your project:
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
<version>0.0.21.Final</version> <!-- Update this version (https://github.com/netty/netty-incubator-transport-io_uring/tags) -->
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-io_uring-incubator</artifactId>
</dependency>
// Update the io_uring version by picking the latest from https://github.com/netty/netty-incubator-transport-io_uring/tags
implementation("io.netty.incubator:netty-incubator-transport-native-io_uring:0.0.21.Final")
implementation("io.vertx:vertx-io_uring-incubator")
然后,在 application.properties
中添加:
Then, in the application.properties
, add:
quarkus.vertx.prefer-native-transport=true
Can I use io_uring on my Linux machine?
要检查您是否可以在 Linux 机器上使用 To check if you can use
如果打印出类似以上内容,您可以使用 If it prints something like above, you can use |
Troubleshooting
|
域套接字还不受 io_uring 支持。
Domain sockets are not yet supported with io_uring.
Vert.x 异步文件系统 API 尚未使用 io_uring。
The Vert.x asynchronous file system API does not use io_uring yet.
Deploy on read-only environments
在仅读文件系统环境中,您可能会收到以下形式的错误:
In environments with read only file systems you may receive errors of the form:
java.lang.IllegalStateException: Failed to create cache dir
假设 /tmp/
可写,则可以通过将 vertx.cacheDirBase
属性设置为指向 /tmp/
中的目录来修复此问题,例如,在 Kubernetes 中,通过创建具有值 -Dvertx.cacheDirBase=/tmp/vertx
的环境变量 JAVA_OPTS
,或在 application.properties
中设置 quarkus.vertx.cache-directory
属性:
Assuming /tmp/
is writable this can be fixed by setting the vertx.cacheDirBase
property to point to a directory in /tmp/
for instance in Kubernetes by creating an environment variable JAVA_OPTS
with the value -Dvertx.cacheDirBase=/tmp/vertx
, or setting the quarkus.vertx.cache-directory
property in application.properties
:
quarkus.vertx.cache-directory=/tmp/vertx
Customize the Vert.x configuration
托管 Vert.x 实例的配置可以使用 application.properties
文件提供,但也可以使用 special beans.CDI bean 暴露 io.quarkus.vertx.VertxOptionsCustomizer
接口可用于自定义 Vert.x 配置。例如,以下自定义程序会更改 tmp
基础目录:
The configuration of the managed Vert.x instance can be provided using the application.properties
file, but also using special beans.
CDI beans exposing the io.quarkus.vertx.VertxOptionsCustomizer
interface can be used to customize the Vert.x configuration.
For example, the following customizer change the tmp
base directory:
@ApplicationScoped
public class MyCustomizer implements VertxOptionsCustomizer {
@Override
public void accept(VertxOptions options) {
options.setFileSystemOptions(new FileSystemOptions().setFileCacheDir("target"));
}
}
customizer Bean 接收 VertxOptions
(来自应用程序配置),并可以修改它们。
The customizer beans received the VertxOptions
(coming from the application configuration), and can modify them.