Vert.x Reference Guide

Vert.x是一种用于构建响应式应用程序的工具包。如 Quarkus Reactive Architecture中所述,Quarkus 是着重底层 Vert.x。

Vert.x is a toolkit for building reactive applications. As described in the Quarkus Reactive Architecture, Quarkus uses Vert.x underneath.

本指南是 Using Eclipse Vert.x API from a Quarkus Application指南的配套指南。它提供了有关由 Quarkus 所用 Vert.x 实例的用法和配置的更多高级详细信息。

This guide is the companion to the Using Eclipse Vert.x API from a Quarkus Application guide. It provides more advanced details about the usage and the configuration of the Vert.x instance used by Quarkus.

Access the Vert.x instance

要访问托管的 Vert.x 实例,请将 `quarkus-vertx`扩展添加到您的项目中。此依赖可能已经存在于您的项目中(作为传递依赖)。

To access the managed Vert.x instance, add the quarkus-vertx extension to your project. This dependency might already be available in your project (as a transitive dependency).

利用此扩展,您可以使用字段或构造函数注入的方式检索 Vert.x 的托管实例:

With this extension, you can retrieve the managed instance of Vert.x using either field or constructor injection:

@ApplicationScoped
public class MyBean {

    // Field injection
    @Inject Vertx vertx;

    // Constructor injection
    MyBean(Vertx vertx) {
        // ...
    }

}

您可以注入以下任一选项:

You can inject either the:

  • io.vertx.core.Vertx instance exposing the bare Vert.x API

  • io.vertx.mutiny.core.Vertx instance exposing the Mutiny API

我们建议使用 Mutiny 变体,因为它与 Quarkus 提供的其他响应式 API 集成。

We recommend using the Mutiny variant as it integrates with the other reactive APIs provided by Quarkus.

Mutiny

如果您不熟悉 Mutiny,请查看 Mutiny - an intuitive reactive programming library

If you are not familiar with Mutiny, check Mutiny - an intuitive reactive programming library.

有关 Vert.x Mutiny 变体的文档可在 [role="bare"][role="bare"]https://smallrye.io/smallrye-mutiny-vertx-bindings上查阅。

Documentation about the Vert.x Mutiny variant is available on [role="bare"]https://smallrye.io/smallrye-mutiny-vertx-bindings.

Configure the Vert.x instance

您可以通过 `application.properties`文件配置 Vert.x 实例。下表列出了受支持的属性:

You can configure the Vert.x instance from the application.properties file. The following table lists the supported properties:

Unresolved directive in vertx-reference.adoc - include::{generated-dir}/config/quarkus-vertx.adoc[]

另请参阅 Customize the Vert.x configuration,了解如何使用编程方式配置 Vert.x 实例。

See Customize the Vert.x configuration to configure the Vert.x instance using a programmatic approach.

Use Vert.x clients

除了 Vert.x 核心,还可以使用大多数 Vert.x 生态系统库。某些 Quarkus 扩展已经封装了 Vert.x 库。

In addition to Vert.x core, you can use most Vert.x ecosystem libraries. Some Quarkus extension already wraps Vert.x libraries.

Available APIs

下表列出了 Vert.x 生态系统中使用的 most 库。要访问这些 API,请向项目添加指示的扩展或依赖项。查看关联的文档以了解如何使用它们。

The following table lists the most used libraries from the Vert.x ecosystem. To access these APIs, add the indicated extension or dependency to your project. Check the associated documentation to learn how to use them.

API

Extension or Dependency

Documentation

AMQP Client

io.quarkus:quarkus-messaging-amqp (extension)

Getting Started to Quarkus Messaging with AMQP

Circuit Breaker

io.smallrye.reactive:smallrye-mutiny-vertx-circuit-breaker (external dependency)

[role="bare"]https://vertx.io/docs/vertx-circuit-breaker/java/

Consul Client

io.smallrye.reactive:smallrye-mutiny-vertx-consul-client (external dependency)

[role="bare"]https://vertx.io/docs/vertx-consul-client/java/

DB2 Client

io.quarkus:quarkus-reactive-db2-client (extension)

Reactive SQL Clients

Kafka Client

io.quarkus:quarkus-messaging-kafka (extension)

Apache Kafka Reference Guide

Mail Client

io.quarkus:quarkus-mailer (extension)

Sending emails using SMTP

MQTT Client

io.quarkus:quarkus-messaging-mqtt (extension)

No guide yet

MS SQL Client

io.quarkus:quarkus-reactive-mssql-client (extension)

Reactive SQL Clients

MySQL Client

io.quarkus:quarkus-reactive-mysql-client (extension)

Reactive SQL Clients

Oracle Client

io.quarkus:quarkus-reactive-oracle-client (extension)

Reactive SQL Clients

PostgreSQL Client

io.quarkus:quarkus-reactive-pg-client (extension)

Reactive SQL Clients

RabbitMQ Client

io.smallrye.reactive:smallrye-mutiny-vertx-rabbitmq-client (external dependency)

[role="bare"]https://vertx.io/docs/vertx-rabbitmq-client/java

Redis Client

io.quarkus:quarkus-redis-client (extension)

Using the Redis Client

Web Client

io.smallrye.reactive:smallrye-mutiny-vertx-web-client (external dependency)

[role="bare"]https://vertx.io/docs/vertx-web-client/java/

要详细了解 Vert.x Mutiny API 的使用方法,请参阅 [role="bare"][role="bare"]https://smallrye.io/smallrye-mutiny-vertx-bindings.

To learn more about the usage of the Vert.x Mutiny API, refer to [role="bare"]https://smallrye.io/smallrye-mutiny-vertx-bindings.

Use the Vert.x Web Client

本节提供了一个使用 Vert.x WebClient 的示例,示例是在 Quarkus REST(以前称为 RESTEasy Reactive)应用程序的环境中编写的。如上表所示,向项目添加以下依赖项:

This section gives an example using the Vert.x WebClient in the context of a Quarkus REST (formerly RESTEasy Reactive) application. As indicated in the table above, add the following dependency to your project:

pom.xml
<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>smallrye-mutiny-vertx-web-client</artifactId>
</dependency>
build.gradle
implementation("io.smallrye.reactive:smallrye-mutiny-vertx-web-client")

现在,您可以在代码中创建 WebClient 的一个实例:

Now, in your code, you can create an instance of WebClient:

package org.acme.vertx;


import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import io.smallrye.mutiny.Uni;

import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.ext.web.client.WebClient;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClientOptions;

@Path("/fruit-data")
public class ResourceUsingWebClient {

    private final WebClient client;

    @Inject
    VertxResource(Vertx vertx) {
        this.client = WebClient.create(vertx);
    }

    @GET
    @Produces(MediaType.APPLICATION_JSON)
    @Path("/{name}")
    public Uni<JsonObject> getFruitData(String name) {
        return client.getAbs("https://.../api/fruit/" + name)
                .send()
                .onItem().transform(resp -> {
                    if (resp.statusCode() == 200) {
                        return resp.bodyAsJsonObject();
                    } else {
                        return new JsonObject()
                                .put("code", resp.statusCode())
                                .put("message", resp.bodyAsString());
                    }
                });
    }

}

此资源创建一个 WebClient,并且在收到请求后使用此客户端调用远程 HTTP API。它会根据结果转发接收到的响应,或创建一个封装错误的 JSON 对象。WebClient 是异步的(并且是非阻塞的),因此端点会返回一个 Uni

This resource creates a WebClient and, upon request, uses this client to invoke a remote HTTP API. Depending on the result, the response is forwarded as received, or it creates a JSON object wrapping the error. The WebClient is asynchronous (and non-blocking), to the endpoint returns a Uni.

该应用程序还可以作为本地可执行文件运行。但是,我们首先需要指示 Quarkus 启用 ssl(如果远程 API 使用 HTTPS)。打开 src/main/resources/application.properties 并添加:

The application can also run as a native executable. But, first, we need to instruct Quarkus to enable ssl (if the remote API uses HTTPS). Open the src/main/resources/application.properties and add:

quarkus.ssl.native=true

然后,使用以下命令创建本地可执行文件:

Then, create the native executable with:

Unresolved directive in vertx-reference.adoc - include::{includes}/devtools/build-native.adoc[]

Use Vert.x JSON

Vert.x API 通常依赖于 JSON。Vert.x 提供了两个便捷的类来处理 JSON 文档:io.vertx.core.json.JsonObjectio.vertx.core.json.JsonArray

Vert.x APIs often rely on JSON. Vert.x provides two convenient classes to manipulate JSON document: io.vertx.core.json.JsonObject and io.vertx.core.json.JsonArray.

JsonObject 可用于将对象映射到其 JSON 表示形式,并从 JSON 文档构建一个对象:

JsonObject can be used to map an object into its JSON representation and build an object from a JSON document:

// Map an object into JSON
Person person = ...;
JsonObject json = JsonObject.mapFrom(person);

// Build an object from JSON
json = new JsonObject();
person = json.mapTo(Person.class);

请注意,这些特性使用由 quarkus-jackson 扩展管理的映射器。请参阅 Jackson configuration 自定义映射。

Note that these features use the mapper managed by the quarkus-jackson extension. Refer to Jackson configuration to customize the mapping.

JSON 对象和 JSON 数组都支持作为 Quarkus HTTP 端点请求和响应主体(使用经典 RESTEasy 和 Quarkus REST)。考虑以下端点:

JSON Object and JSON Array are both supported as Quarkus HTTP endpoint requests and response bodies (using classic RESTEasy and Quarkus REST). Consider these endpoints:

package org.acme.vertx;

import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/hello")
@Produces(MediaType.APPLICATION_JSON)
public class VertxJsonResource {

    @GET
    @Path("{name}/object")
    public JsonObject jsonObject(String name) {
        return new JsonObject().put("Hello", name);
    }

    @GET
    @Path("{name}/array")
    public JsonArray jsonArray(String name) {
        return new JsonArray().add("Hello").add(name);
    }
}

[role="bare"][role="bare"]http://localhost:8080/hello/Quarkus/object 返回:

{"Hello":"Quarkus"}

[role="bare"][role="bare"]http://localhost:8080/hello/Quarkus/array 返回:

["Hello","Quarkus"]

无论 JSON 内容是请求主体还是封装在 UniMultiCompletionStagePublisher 中,此操作都很有效。

This works equally well when the JSON content is a request body or is wrapped in a Uni, Multi, CompletionStage or Publisher.

Use Verticles

verticles[Verticles] is "a simple, scalable, actor-like deployment and concurrency model" provided by _Vert.x.此模型并不声称是一个严格的 actor 模型实现,但它具有类似性,尤其是在并发性、扩展性和部署方面。要使用此模型,请编写 deploy 顶点,并通过向事件总线发送消息的方式进行通信。

Verticles is "a simple, scalable, actor-like deployment and concurrency model" provided by _Vert.x. This model does not claim to be a strict actor-model implementation, but it shares similarities, especially concerning concurrency, scaling, and deployment. To use this model, you write and deploy verticles, communicating by sending messages on the event bus.

您可以在 Quarkus 中部署 verticles。它支持:

You can deploy verticles in Quarkus. It supports:

  • bare verticle - Java classes extending io.vertx.core.AbstractVerticle

  • Mutiny verticle - Java classes extending io.smallrye.mutiny.vertx.core.AbstractVerticle

Deploy Verticles

若要部署顶点,请使用 deployVerticle

To deploy verticles, use the deployVerticle method:

@Inject Vertx vertx;

// ...
vertx.deployVerticle(MyVerticle.class.getName(), ar -> { });
vertx.deployVerticle(new MyVerticle(), ar -> { });

如果你使用 Vert.x 的 Mutiny 变体,请注意, deployVerticle 方法会返回一个 Uni,你将需要触发一个订阅来完成实际的部署。

If you use the Mutiny-variant of Vert.x, be aware that the deployVerticle method returns a Uni, and you would need to trigger a subscription to make the actual deployment.

后面会出现一个示例,说明如何在应用程序初始化期间部署 top vertex。

An example explaining how to deploy verticles during the initialization of the application will follow.

Use @ApplicationScoped beans as Verticle

一般来说,Vert.x 顶点不是 CDI Bean。因此不能使用注入。然而,在 Quarkus 中,你可以将顶点部署为 bean。请注意,在这种情况下,由 CDI(Quarkus 中的 Arc)负责创建实例。

In general, Vert.x verticles are not CDI beans. And so cannot use injection. However, in Quarkus, you can deploy verticles as beans. Note that in this case, CDI (Arc in Quarkus) is responsible for creating the instance.

以下片段提供了一个示例:

The following snippet provides an example:

package io.quarkus.vertx.verticles;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class MyBeanVerticle extends AbstractVerticle {

    @ConfigProperty(name = "address") String address;

    @Override
    public Uni<Void> asyncStart() {
        return vertx.eventBus().consumer(address)
                .handler(m -> m.replyAndForget("hello"))
                .completionHandler();
    }
}

你不必注入 vertx 实例;相反,利用 AbstractVerticle 中的受保护字段。

You don’t have to inject the vertx instance; instead, leverage the protected field from AbstractVerticle.

然后,用以下内容部署顶点实例:

Then, deploy the verticle instances with:

package io.quarkus.vertx.verticles;

import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;

@ApplicationScoped
public class VerticleDeployer {

    public void init(@Observes StartupEvent e, Vertx vertx, MyBeanVerticle verticle) {
         vertx.deployVerticle(verticle).await().indefinitely();
    }
}

如果你想部署每个公开的 AbstractVerticle,你可以使用:

If you want to deploy every exposed AbstractVerticle, you can use:

public void init(@Observes StartupEvent e, Vertx vertx, Instance<AbstractVerticle> verticles) {
    for (AbstractVerticle verticle : verticles) {
        vertx.deployVerticle(verticle).await().indefinitely();
    }
}

Create multiple verticles instances

使用 @ApplicationScoped 时,你将为顶点获取一个单一实例。拥有多个顶点实例对于共享其负载很有帮助。每个顶点都将与一个不同的 I/O 线程(Vert.x 事件循环)关联。

When using @ApplicationScoped, you will get a single instance for your verticle. Having multiple instances of verticles can be helpful to share the load among them. Each of them will be associated with a different I/O thread (Vert.x event loop).

若要部署多个顶点实例,请使用 @Dependent 作用域,而不是 @ApplicationScoped

To deploy multiple instances of your verticle, use the @Dependent scope instead of @ApplicationScoped:

package org.acme.verticle;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;

import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;

@Dependent
public class MyVerticle extends AbstractVerticle {

    @Override
    public Uni<Void> asyncStart() {
        return vertx.eventBus().consumer("address")
                .handler(m -> m.reply("Hello from " + this))
                .completionHandler();
    }
}

然后,按如下方式部署顶点:

Then, deploy your verticle as follows:

package org.acme.verticle;

import io.quarkus.runtime.StartupEvent;
import io.vertx.core.DeploymentOptions;
import io.vertx.mutiny.core.Vertx;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;

@ApplicationScoped
public class MyApp {

    void init(@Observes StartupEvent ev, Vertx vertx, Instance<MyVerticle> verticles) {
        vertx
                .deployVerticle(verticles::get, new DeploymentOptions().setInstances(2))
                .await().indefinitely();
    }
}

init 方法会接收一个 Instance<MyVerticle>。然后,你将一个提供器传递给 deployVerticle 方法。提供器仅仅调用 get() 方法。感谢 @Dependent 作用域,它每次调用都返回一个新实例。最后,你将所需的实例数传递给 DeploymentOptions,例如前一个示例中的两个。它会调用提供器两次,这将创建两个顶点实例。

The init method receives an Instance<MyVerticle>. Then, you pass a supplier to the deployVerticle method. The supplier is just calling the get() method. Thanks to the @Dependent scope, it returns a new instance on every call. Finally, you pass the desired number of instances to the DeploymentOptions, such as two in the previous example. It will call the supplier twice, which will create two instances of your verticle.

Use the Event Bus

Vert.x 采用内建的 event bus,你可以从 Quarkus 应用程序中使用它。因此,你的应用程序组件(CDI bean、资源等)可以使用异步事件进行交互,从而促进松散耦合。

Vert.x comes with a built-in event bus that you can use from your Quarkus application. So, your application components (CDI beans, resources…​) can interact using asynchronous events, thus promoting loose-coupling.

通过事件总线,可以将 messages 发送到 virtual addresses。事件总线提供三种类型的传递机制:

With the event bus, you send messages to virtual addresses. The event bus offers three types of delivery mechanisms:

  • point-to-point - send the message, one consumer receives it. If several consumers listen to the address, a round-robin is applied;

  • publish/subscribe - publish a message; all the consumers listening to the address are receiving the message;

  • request/reply - send the message and expect a response. The receiver can respond to the message in an asynchronous fashion.

所有这些传递机制都是非阻塞的,并且为构建响应式应用程序提供了基本条件。

All these delivery mechanisms are non-blocking and are providing one of the fundamental bricks to build reactive applications.

Consume events

在可以使用 Vert.x API 注册消费者的同时,Quarkus 也随附有声明性支持。若要使用事件,请使用 io.quarkus.vertx.ConsumeEvent 注解:

While you can use the Vert.x API to register consumers, Quarkus comes with declarative support. To consume events, use the io.quarkus.vertx.ConsumeEvent annotation:

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent                           (1)
    public String consume(String name) {    (2)
        return name.toUpperCase();
    }
}
1 If not set, the address is the fully qualified name of the bean; for instance, in this snippet, it’s org.acme.vertx.GreetingService.
2 The method parameter is the message body. If the method returns something, it’s the message response.

Configure the address

`@ConsumeEvent`注释可配置为设置地址:

The @ConsumeEvent annotation can be configured to set the address:

@ConsumeEvent("greeting")               (1)
public String consume(String name) {
    return name.toUpperCase();
}
1 Receive the messages sent to the greeting address

地址值可以是属性表达式。在这种情况下,将使用已配置的值: @ConsumeEvent("${my.consumer.address}")。此外,属性表达式可以指定一个默认值: @ConsumeEvent("${my.consumer.address:defaultAddress}")

The address value can be a property expression. In this case, the configured value is used instead: @ConsumeEvent("${my.consumer.address}"). Additionally, the property expression can specify a default value: @ConsumeEvent("${my.consumer.address:defaultAddress}").

Config Property Example
@ConsumeEvent("${my.consumer.address}")   (1)
public String consume(String name) {
    return name.toLowerCase();
}
1 Receive the messages sent to the address configured with the my.consumer.address key.

如果不存在具有指定键的配置属性并且未设置默认值,那么应用程序启动将失败。

If no config property with the specified key exists and no default value is set then the application startup fails.

Process events asynchronously

前面的示例使用同步处理。还可以通过返回 `io.smallrye.mutiny.Uni`或 `java.util.concurrent.CompletionStage`对异步处理进行处理:

The previous examples use synchronous processing. Asynchronous processing is also possible by returning either an io.smallrye.mutiny.Uni or a java.util.concurrent.CompletionStage:

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent
    public Uni<String> process(String name) {
        // return an Uni completed when the processing is finished.
        // You can also fail the Uni explicitly
    }
}
Mutiny

前面的示例使用 Mutiny 响应类型。如果您不熟悉 Mutiny,请查看 Mutiny - an intuitive reactive programming library

The previous example uses Mutiny reactive types. If you are not familiar with Mutiny, check Mutiny - an intuitive reactive programming library.

Blocking processing of events

默认情况下,使用事件的代码必须是 non-blocking,因为它是在 I/O 线程上调用的。如果您的处理正在阻止,请使用 `@io.smallrye.common.annotation.Blocking`注释:

By default, the code consuming the event must be non-blocking, as it’s called on an I/O thread. If your processing is blocking, use the @io.smallrye.common.annotation.Blocking annotation:

@ConsumeEvent(value = "blocking-consumer")
@Blocking
void consumeBlocking(String message) {
    // Something blocking
}

或者,您可以使用 `@ConsumeEvent`注释中的 `blocking`属性:

Alternatively, you can use the blocking attribute from the @ConsumeEvent annotation:

@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
    // Something blocking
}

使用 `@Blocking`时,它会忽略 `@ConsumeEvent`的 `blocking`属性值。

When using @Blocking, it ignores the value of the blocking attribute of @ConsumeEvent.

Reply to events

注释有 `@ConsumeEvent`的方法的 _return_值用于响应传入的消息。例如,在下面的代码段中,返回的 `String`是响应。

The return value of a method annotated with @ConsumeEvent is used to respond to the incoming message. For instance, in the following snippet, the returned String is the response.

@ConsumeEvent("greeting")
public String consume(String name) {
    return name.toUpperCase();
}

您还可以返回 `Uni<T>`或 `CompletionStage<T>`来处理异步答复:

You can also return a Uni<T> or a CompletionStage<T> to handle asynchronous reply:

@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
    return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}

如果您使用上下文传播扩展,则可以注入一个 executor

You can inject an executor if you use the Context Propagation extension:

@Inject Executor executor;

Implement fire-and-forget interactions

您不必回复接收到的消息。通常,对于 _fire and forget_交互,消息会被使用,而发送方不需要知道它。要实现此模式,您的消费者方法返回 void

You don’t have to reply to received messages. Typically, for a fire and forget interaction, the messages are consumed, and the sender does not need to know about it. To implement this pattern, your consumer method returns void.

@ConsumeEvent("greeting")
public void consume(String event) {
    // Do something with the event
}

Consume messages (instead of events)

不同于直接使用 _payloads_的前一个示例,您还可以直接使用 Message

Unlike the previous example using the payloads directly, you can also use Message directly:

@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
    System.out.println(msg.address());
    System.out.println(msg.body());
}

Handle failures

如果使用 `@ConsumeEvent`注释的方法抛出异常,则:

If a method annotated with @ConsumeEvent throws an exception, then:

  • if a reply handler is set, then the failure is propagated back to the sender via an io.vertx.core.eventbus.ReplyException with code ConsumeEvent#FAILURE_CODE and the exception message,

  • if no reply handler is set, then the exception is rethrown (and wrapped in a RuntimeException if necessary) and can be handled by the default exception handler, i.e. io.vertx.core.Vertx#exceptionHandler().

Send messages

发送和发布消息使用 Vert.x 事件总线:

Sending and publishing messages use the Vert.x event bus:

package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/async")
public class EventResource {

    @Inject
    EventBus bus;                                            (1)

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)        (2)
                .onItem().transform(Message::body);
    }
}
1 Inject the Event bus
2 Send a message to the address greeting. Message payload is name

EventBus 对象提供以下方法:

The EventBus object provides methods to:

  1. send a message to a specific address - one single consumer receives the message.

  2. publish a message to a specific address - all consumers receive the messages.

  3. request a message and expect a reply

// Case 1
bus.sendAndForget("greeting", name)
// Case 2
bus.publish("greeting", name)
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
        .onItem().transform(Message::body);

Process events on virtual threads

使用 @ConsumeEvent 注解的方法也可以使用 @RunOnVirtualThread 注解。在此情况下,该方法在虚拟线程上被调用。每个事件在不同的虚拟线程上被调用。

Methods annotated with @ConsumeEvent can also be annotated with @RunOnVirtualThread. In this case, the method is invoked on a virtual thread. Each event is invoked on a different virtual thread.

要使用此功能,请确保:

To use this feature, make sure:

  1. Your Java runtime supports virtual threads.

  2. Your method uses a blocking signature.

第二点意味着只有返回一个对象或 void 的方法才能使用 @RunOnVirtualThread。返回一个 Uni 或一个 CompletionStage cannot 的方法在虚拟线程上运行。

The second point means only methods returning an object or void can use @RunOnVirtualThread. Methods returning a Uni or a CompletionStage cannot run on virtual threads.

阅读 the virtual thread guide 以了解更多。

Read the virtual thread guide for more details.

Use codecs

链接:https://vertx.io/docs/vertx-core/java/event_bus[Vert.x Event Bus] uses codecs to _serialize 和 deserialize 消息对象。Quarkus 为本地交付提供了一个默认编解码器。针对本地消费者的返回类型和消息体参数(即使用 @ConsumeEvent 注解的方法,其中 ConsumeEvent#local() == true 是默认值),此编解码器将被自动使用。

The Vert.x Event Bus uses codecs to _serialize and deserialize message objects. Quarkus provides a default codec for local delivery. This codec is automatically used for return types and message body parameters of local consumers, i.e. methods annotated with @ConsumeEvent whete ConsumeEvent#local() == true (which is the default).

因此,您可以按照如下方式交换消息对象:

So that you can exchange the message objects as follows:

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", new MyName(name))
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello " + name.getName());
}

如果您想使用特定的编解码器,则需要在两端明确地设置它:

If you want to use a specific codec, you need to set it on both ends explicitly:

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", name,
        new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting", codec = MyNameCodec.class)            (2)
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 Set the name of the codec to use to send the message
2 Set the codec to use to receive the message

Combine HTTP and the Event Bus

让我们重新审视一下 HTTP greeting 端点,并使用异步消息传递将调用委托给一个单独的 bean。它使用请求/响应分发机制。我们在 Jakarta REST 端点中发送消息,而不是实现其中的业务逻辑。另一个 bean 会使用 reply 机制接收此消息,然后发送响应。

Let’s revisit a greeting HTTP endpoint and use asynchronous message passing to delegate the call to a separated bean. It uses the request/reply dispatching mechanism. Instead of implementing the business logic inside the Jakarta REST endpoint, we are sending a message. Another bean consumes this message, and the response is sent using the reply mechanism.

在你的 HTTP 端点类中,注入事件总线并使用 request 方法向事件总线发送消息,并期待一个响应:

In your HTTP endpoint class, inject the event bus and uses the request method to send a message to the event bus and expect a response:

package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/bus")
public class EventResource {

    @Inject
    EventBus bus;

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)            (1)
                .onItem().transform(Message::body);            (2)
    }
}
1 send the name to the greeting address and request a response
2 when we get the response, extract the body and send it to the user

HTTP 方法返回一个 Uni。如果你使用的是 Quarkus REST,Uni 支持是内置的。如果你使用的是 classic RESTEasy,你需要向你的项目中添加 quarkus resteasy-mutiny 扩展。

the HTTP method returns a Uni. If you are using Quarkus REST, Uni support is built-in. If you are using classic RESTEasy, you need to add the quarkus resteasy-mutiny extension to your project.

我们需要一个侦听 greeting 地址的消费者。此消费者可以位于同一个类或另一个 Bean 中,例如:

We need a consumer listening on the greeting address. This consumer can be in the same class or another bean such as:

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent("greeting")
    public String greeting(String name) {
        return "Hello " + name;
    }

}

此 Bean 接收姓名并返回 greeting 消息。

This bean receives the name and returns the greeting message.

有了这些,/bus/quarkus 上的每个 HTTP 请求都会向事件总线发送一条消息、等待回复,而且当回复到达后,编写 HTTP 响应:

With this in place, every HTTP request on /bus/quarkus sends a message to the event bus, waits for a reply, and when this one arrives, writes the HTTP response:

Hello Quarkus

为了更好地理解,让我们详细介绍一下 HTTP 请求/响应是如何处理的:

To better understand, let’s detail how the HTTP request/response has been handled:

  1. The request is received by the greeting method

  2. a message containing the name is sent to the event bus

  3. Another bean receives this message and computes the response

  4. This response is sent back using the reply mechanism

  5. Once the reply is received by the sender, the content is written to the HTTP response

Bidirectional communication with browsers by using SockJS

Vert.x 提供的 SockJS 网桥允许浏览器应用程序和 Quarkus 应用程序使用事件总线进行通信。它把两边连接起来。因此,双方都可以向另一方发送消息并收到对方发送的消息。它支持这三种传递机制。

The SockJS bridge provided by Vert.x allows browser applications and Quarkus applications to communicate using the event bus. It connects both sides. So, both sides can send messages received on the other side. It supports the three delivery mechanisms.

SockJS 协商 Quarkus 应用程序和浏览器之间的通信通道。如果支持 WebSocket,它会使用它们;否则,它会降级为 SSE、长轮询等。

SockJS negotiates the communication channel between the Quarkus application and the browser. If WebSockets are supported, it uses them; otherwise, it degrades to SSE, long polling, etc.

因此,要使用 SockJS,你需要配置网桥,特别是用来进行通信的地址:

So use SockJS, you need to configure the bridge, especially the addresses that will be used to communicate:

package org.acme;

import io.vertx.core.Vertx;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.sockjs.SockJSBridgeOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import java.util.concurrent.atomic.AtomicInteger;

@ApplicationScoped
public class SockJsExample {

    @Inject
    Vertx vertx;

    public void init(@Observes Router router) {
        SockJSHandler sockJSHandler = SockJSHandler.create(vertx);
        Router bridge = sockJSHandler.bridge(new SockJSBridgeOptions()
                .addOutboundPermitted(new PermittedOptions().setAddress("ticks")));
        router.route("/eventbus/*").subRouter(bridge);

        AtomicInteger counter = new AtomicInteger();
        vertx.setPeriodic(1000,
                ignored -> vertx.eventBus().publish("ticks", counter.getAndIncrement()));
    }

}

此代码将 SockJS 网桥配置为将所有针对 ticks 地址的消息发送到已连接的浏览器。更多有关配置的详细说明可以在 the Vert.x SockJS Bridge documentation 中找到。

This code configures the SockJS bridge to send all the messages targeting the ticks address to the connected browsers. More detailed explanations about the configuration can be found on the Vert.x SockJS Bridge documentation.

浏览器必须使用 vertx-eventbus JavaScript 库来使用消息:

The browser must use the vertx-eventbus JavaScript library to consume the message:

<!doctype html>
<html>
<head>
    <meta charset="utf-8"/>
    <title>SockJS example - Quarkus</title>
    <script src="https://code.jquery.com/jquery-3.3.1.min.js"
            integrity="sha256-FgpCb/KJQlLNfOu91ta32o/NMZxltwRo8QtmkMRdAu8=" crossorigin="anonymous"></script>
    <script type="application/javascript" src="https://cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js"></script>
    <script src="https://cdn.jsdelivr.net/npm/vertx3-eventbus-client@3.8.5/vertx-eventbus.min.js"></script>
</head>
<body>

<h1>SockJS Examples</h1>

<p><strong>Last Tick:</strong> <span id="tick"></span></p>

</body>
<script>
    var eb = new EventBus('/eventbus');

    eb.onopen = function () {

        eb.registerHandler('ticks', function (error, message) {
            $("#tick").html(message.body);
        });
    }

</script>
</html>

Use native transports

原生可执行文件中不支持原生传输。

Native transports are not supported in native executables.

如要使用 io_uring,请参考 Use io_uring部分。

To use io_uring, refer to the Use io_uring section.

Vert.x 能够使用 Netty’s native transports,可以在特定平台上提供性能提升。若要启用它们,您必须包含平台的相应依赖项。通常建议同时拥有这两者,以保持应用程序与平台无关。Netty 足够智能,可以使用正确的依赖项,包括在不受支持的平台上根本不使用它们:

Vert.x is capable of using Netty’s native transports, which offers performance improvements on specific platforms. To enable them, you must include the appropriate dependency for your platform. It’s usually a good idea to have both to keep your application platform-agnostic. Netty is smart enough to use the correct one, that includes none at all on unsupported platforms:

pom.xml
<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-transport-native-epoll</artifactId>
  <classifier>linux-x86_64</classifier>
</dependency>

<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-transport-native-kqueue</artifactId>
  <classifier>osx-x86_64</classifier>
</dependency>
build.gradle
implementation("io.netty:netty-transport-native-epoll::linux-x86_64")

implementation("io.netty:netty-transport-native-kqueue::osx-x86_64")

您还必须显式配置 Vert.x 以使用原生传输。在 `application.properties`中添加:

You will also have to explicitly configure Vert.x to use the native transport. In application.properties add:

quarkus.vertx.prefer-native-transport=true

或在 `application.yml`中:

Or in application.yml:

quarkus:
  vertx:
    prefer-native-transport: true

如果一切运行良好,quarkus 将记录:

If all is well quarkus will log:

[io.qua.ver.cor.run.VertxCoreRecorder] (main) Vertx has Native Transport Enabled: true

Native Linux transport

在 Linux 上,您可以启用以下套接字选项:

On Linux you can enable the following socket options:

  • SO_REUSEPORT

quarkus.http.so-reuse-port=true
  • TCP_QUICKACK

quarkus.http.tcp-quick-ack=true
  • TCP_CORK

quarkus.http.tcp-cork=true
  • TCP_FASTOPEN

quarkus.http.tcp-fast-open=true

Native macOS transport

在 macOS Sierra 及更高版本上,您可以启用以下套接字选项:

On macOS Sierra and above you can enable the following socket options:

  • SO_REUSEPORT

quarkus.http.so-reuse-port=true

Use a Vert.x context-aware scheduler

部分 Mutiny 运算符需要在执行器线程池上安排工作。一个很好的示例是 .onItem().delayIt().by(Duration.ofMillis(10),因为它需要此类执行器来延迟发射。

Some Mutiny operators need to schedule work on an executor thread pool. A good example is .onItem().delayIt().by(Duration.ofMillis(10) as it needs such an executor to delay emissions.

默认执行器由 `io.smallrye.mutiny.infrastructure.Infrastructure`返回,并且已由 Quarkus 配置和管理。

The default executor is returned by io.smallrye.mutiny.infrastructure.Infrastructure and it is already configured and managed by Quarkus.

话虽如此,有时您需要确保在 Vert.x(已复制)上下文中运行操作,而不仅仅是在任意线程中运行。

That being said, there are cases where you need to make sure that an operation is run on a Vert.x (duplicated) context and not just on any random thread.

`io.smallrye.mutiny.vertx.core.ContextAwareScheduler`界面提供了一个获取上下文感知调度程序的 API。此类调度程序使用以下内容进行配置:

The io.smallrye.mutiny.vertx.core.ContextAwareScheduler interface offers an API to obtain context-aware schedulers. Such a scheduler is configured with:

  1. a delegate ScheduledExecutorService of your choice (hint: you can reuse Infrastructure.getDefaultWorkerPool()), and

  2. a context fetching strategy among:

    • an explicit Context, or

    • calling Vertx::getOrCreateContext() either on the current thread or later when the scheduling request happens, or

    • calling Vertx::currentContext(), which fails if the current thread is not a Vert.x thread.

以下是一个 `ContextAwareScheduler`的使用示例:

Here is a sample where ContextAwareScheduler is used:

class MyVerticle extends AbstractVerticle {

    @Override
    public Uni<Void> asyncStart() {
        vertx.getOrCreateContext().put("foo", "bar");

        var delegate = Infrastructure.getDefaultWorkerPool();
        var scheduler = ContextAwareScheduler.delegatingTo(delegate)
            .withCurrentContext();

        return Uni.createFrom().voidItem()
                .onItem().delayIt().onExecutor(scheduler).by(Duration.ofMillis(10))
                .onItem().invoke(() -> {
                    // Prints "bar"
                    var ctx = vertx.getOrCreateContext();
                    System.out.println(ctx.get("foo"));
                });
    }
}

此示例通过捕获调用 `asyncStart()`的 Vert.x 事件循环的上下文来创建调度器。`delayIt`运算符使用该调度程序,我们可以检查我们在 `invoke`中获取的上下文是一个 Vert.x 复制的上下文,其中已传播键 `"foo"`的数据。

In this example a scheduler is created by capturing the context of the Vert.x event-loop that calls asyncStart(). The delayIt operator uses that scheduler, and we can check that the context that we get in invoke is a Vert.x duplicated context where the data for key "foo" has been propagated.

Use a Unix domain socket

如果从同一主机建立与 quarkus 服务的连接,则监听 Unix 域套接字时,我们可以消除 TCP 的开销。如果您使用 Envoy 之类的代理设置服务网格,则这种情况可能会发生,因为在这种情况下,对服务的访问会通过代理进行。

Listening on a Unix domain socket allows us to dispense with the overhead of TCP if the connection to the quarkus service is established from the same host. This can happen if access to the service goes through a proxy which is often the case if you’re setting up a service mesh with a proxy like Envoy.

这段文字只能在支持 Use native transports 的平台上运作。

This will only work on platforms that support Use native transports.

启用适当的 Use native transports,并设置以下环境属性:

Enable the appropriate Use native transports and set the following environment property:

quarkus.http.domain-socket=/var/run/io.quarkus.app.socket
quarkus.http.domain-socket-enabled=true

quarkus.vertx.prefer-native-transport=true

它本身不会禁用将默认在 0.0.0.0:8080 上打开的 tcp 套接字。可以明确禁用:

By itself this will not disable the tcp socket which by default will open on 0.0.0.0:8080. It can be explicitly disabled:

quarkus.http.host-enabled=false

这些属性可以通过 Java 的 -D 命令行参数或 application.properties 设置。

These properties can be set through Java’s -D command line parameter or on application.properties.

不要忘记添加本机传输依赖关系。有关详细信息,请参见 Use native transports

Do not forget to add the native transport dependency. See Use native transports for details.

确保您的应用程序具有向套接字写入的正确权限。

Make sure your application has the right permissions to write to the socket.

Use io_uring

io_uring 不受本机可执行文件支持。

io_uring is not supported in native executables.

io_uring 支持仍处于试验阶段。

io_uring support is experimental

io_uring 是一个 Linux 内核界面,允许您异步发送和接收数据。它为文件和网络 I/O 提供统一语义。它最初设计为针对块设备和文件,但此后获得了处理网络套接字等事务的能力。它有可能单独为网络 I/O 提供适度的性能优势,为混合文件和网络 I/O 应用程序工作负载提供更大的优势。

io_uring is a Linux kernel interface that allows you to send and receive data asynchronously. It provides unified semantics for both file and network I/O. It was originally designed to target block devices and files but has since gained the ability to work with things like network sockets. It has the potential to provide modest performance benefits to network I/O on its own and greater benefits for mixed file and network I/O application workloads.

要了解更多有关 io_uring 的信息,我们推荐以下链接:

To learn more about io_uring, we recommend the following links:

  • Why you should use io_uring for network I/O: The main benefit of io_uring for network I/O is a modern asynchronous API that is straightforward to use and provides unified semantics for file and network I/O. A potential performance benefit of io_uring for network I/O is reducing the number of syscalls. This could provide the biggest benefit for high volumes of small operations where the overhead of system calls can be significant.

  • The Backend Revolution and Why io_uring Is So Important: The io_uring API uses two ring buffers for communication between application and kernel (hence the API name) and designed in a way that enables natural batching of requests and responses. Besides, it provides a way to submit multiple requests in one system call, which can reduce overhead.

  • What exactly is io_uring?: io_uring is a Linux kernel interface to efficiently allow you to send and receive data asynchronously. It was originally designed to target block devices and files but has since gained the ability to work with things like network sockets.

要使用 io_uring,您需要向您的项目添加两个依赖关系并启用本机传输。首先向您的项目添加以下依赖关系:

To use io_uring, you need to add two dependencies to your project and enable native transport. First add the following dependencies to your project:

pom.xml
<dependency>
    <groupId>io.netty.incubator</groupId>
    <artifactId>netty-incubator-transport-native-io_uring</artifactId>
    <version>0.0.21.Final</version> <!-- Update this version (https://github.com/netty/netty-incubator-transport-io_uring/tags) -->
    <classifier>linux-x86_64</classifier>
</dependency>
<dependency>
      <groupId>io.vertx</groupId>
      <artifactId>vertx-io_uring-incubator</artifactId>
</dependency>
build.gradle
// Update the io_uring version by picking the latest from https://github.com/netty/netty-incubator-transport-io_uring/tags
implementation("io.netty.incubator:netty-incubator-transport-native-io_uring:0.0.21.Final")
implementation("io.vertx:vertx-io_uring-incubator")

然后,在 application.properties 中添加:

Then, in the application.properties, add:

quarkus.vertx.prefer-native-transport=true
Can I use io_uring on my Linux machine?

要检查您是否可以在 Linux 机器上使用 io_uring,请执行以下命令:

To check if you can use io_uring on your Linux machine, execute the following command:

> grep io_uring_setup /proc/kallsyms
0000000000000000 t __pfx_io_uring_setup
0000000000000000 t io_uring_setup
0000000000000000 T __pfx___x64_sys_io_uring_setup
0000000000000000 T __x64_sys_io_uring_setup
0000000000000000 T __pfx___ia32_sys_io_uring_setup
0000000000000000 T __ia32_sys_io_uring_setup
0000000000000000 d event_exit__io_uring_setup
0000000000000000 d event_enter__io_uring_setup
0000000000000000 d __syscall_meta__io_uring_setup
0000000000000000 d args__io_uring_setup
0000000000000000 d types__io_uring_setup
0000000000000000 d __event_exit__io_uring_setup
0000000000000000 d __event_enter__io_uring_setup
0000000000000000 d __p_syscall_meta__io_uring_setup

如果打印出类似以上内容,您可以使用 io_uring

If it prints something like above, you can use io_uring.

Troubleshooting

io_uring 支持仍处于实验阶段。如果您看到一些奇怪的行为,请检查 Netty io_uring FAQ。此外, netty io_uring was slower than epoll 问题描述了一些配置错误。

io_uring support is still experimental. Check the Netty io_uring FAQ if you see some odd behavior. Also, the netty io_uring was slower than epoll issue describes a few configuration mistakes.

域套接字还不受 io_uring 支持。

Domain sockets are not yet supported with io_uring.

Vert.x 异步文件系统 API 尚未使用 io_uring。

The Vert.x asynchronous file system API does not use io_uring yet.

Deploy on read-only environments

在仅读文件系统环境中,您可能会收到以下形式的错误:

In environments with read only file systems you may receive errors of the form:

java.lang.IllegalStateException: Failed to create cache dir

假设 /tmp/ 可写,则可以通过将 vertx.cacheDirBase 属性设置为指向 /tmp/ 中的目录来修复此问题,例如,在 Kubernetes 中,通过创建具有值 -Dvertx.cacheDirBase=/tmp/vertx 的环境变量 JAVA_OPTS,或在 application.properties 中设置 quarkus.vertx.cache-directory 属性:

Assuming /tmp/ is writable this can be fixed by setting the vertx.cacheDirBase property to point to a directory in /tmp/ for instance in Kubernetes by creating an environment variable JAVA_OPTS with the value -Dvertx.cacheDirBase=/tmp/vertx, or setting the quarkus.vertx.cache-directory property in application.properties:

quarkus.vertx.cache-directory=/tmp/vertx

Customize the Vert.x configuration

托管 Vert.x 实例的配置可以使用 application.properties 文件提供,但也可以使用 special beans.CDI bean 暴露 io.quarkus.vertx.VertxOptionsCustomizer 接口可用于自定义 Vert.x 配置。例如,以下自定义程序会更改 tmp 基础目录:

The configuration of the managed Vert.x instance can be provided using the application.properties file, but also using special beans. CDI beans exposing the io.quarkus.vertx.VertxOptionsCustomizer interface can be used to customize the Vert.x configuration. For example, the following customizer change the tmp base directory:

@ApplicationScoped
public class MyCustomizer implements VertxOptionsCustomizer {

    @Override
    public void accept(VertxOptions options) {
        options.setFileSystemOptions(new FileSystemOptions().setFileCacheDir("target"));
    }
}

customizer Bean 接收 VertxOptions (来自应用程序配置),并可以修改它们。

The customizer beans received the VertxOptions (coming from the application configuration), and can modify them.