Vert.x Reference Guide

Access the Vert.x instance

要访问托管的 Vert.x 实例,请将 `quarkus-vertx`扩展添加到您的项目中。此依赖可能已经存在于您的项目中(作为传递依赖)。

利用此扩展,您可以使用字段或构造函数注入的方式检索 Vert.x 的托管实例:

@ApplicationScoped
public class MyBean {

    // Field injection
    @Inject Vertx vertx;

    // Constructor injection
    MyBean(Vertx vertx) {
        // ...
    }

}

您可以注入以下任一选项:

  • 公开 _bare_Vert.x API 的 `io.vertx.core.Vertx`实例

  • 公开 _Mutiny_API 的 `io.vertx.mutiny.core.Vertx`实例

我们建议使用 Mutiny 变体,因为它与 Quarkus 提供的其他响应式 API 集成。

Mutiny

如果您不熟悉 Mutiny,请查看 Mutiny - an intuitive reactive programming library

有关 Vert.x Mutiny 变体的文档可在 [role="bare"][role="bare"]https://smallrye.io/smallrye-mutiny-vertx-bindings上查阅。

Configure the Vert.x instance

您可以通过 `application.properties`文件配置 Vert.x 实例。下表列出了受支持的属性:

Unresolved include directive in modules/ROOT/pages/vertx-reference.adoc - include::../../../target/quarkus-generated-doc/config/quarkus-vertx.adoc[]

另请参阅 Customize the Vert.x configuration,了解如何使用编程方式配置 Vert.x 实例。

Use Vert.x clients

除了 Vert.x 核心,还可以使用大多数 Vert.x 生态系统库。某些 Quarkus 扩展已经封装了 Vert.x 库。

Available APIs

下表列出了 Vert.x 生态系统中使用的 most 库。要访问这些 API,请向项目添加指示的扩展或依赖项。查看关联的文档以了解如何使用它们。

API

Extension or Dependency

Documentation

AMQP Client

io.quarkus:quarkus-messaging-amqp (extension)

Getting Started to Quarkus Messaging with AMQP

Circuit Breaker

io.smallrye.reactive:smallrye-mutiny-vertx-circuit-breaker (external dependency)

[role="bare"]https://vertx.io/docs/vertx-circuit-breaker/java/

Consul Client

io.smallrye.reactive:smallrye-mutiny-vertx-consul-client (external dependency)

[role="bare"]https://vertx.io/docs/vertx-consul-client/java/

DB2 Client

io.quarkus:quarkus-reactive-db2-client (extension)

Reactive SQL Clients

Kafka Client

io.quarkus:quarkus-messaging-kafka (extension)

Apache Kafka Reference Guide

Mail Client

io.quarkus:quarkus-mailer (extension)

Sending emails using SMTP

MQTT Client

io.quarkus:quarkus-messaging-mqtt (extension)

No guide yet

MS SQL Client

io.quarkus:quarkus-reactive-mssql-client (extension)

Reactive SQL Clients

MySQL Client

io.quarkus:quarkus-reactive-mysql-client (extension)

Reactive SQL Clients

Oracle Client

io.quarkus:quarkus-reactive-oracle-client (extension)

Reactive SQL Clients

PostgreSQL Client

io.quarkus:quarkus-reactive-pg-client (extension)

Reactive SQL Clients

RabbitMQ Client

io.smallrye.reactive:smallrye-mutiny-vertx-rabbitmq-client (external dependency)

[role="bare"]https://vertx.io/docs/vertx-rabbitmq-client/java

Redis Client

io.quarkus:quarkus-redis-client (extension)

Using the Redis Client

Web Client

io.smallrye.reactive:smallrye-mutiny-vertx-web-client (external dependency)

[role="bare"]https://vertx.io/docs/vertx-web-client/java/

要详细了解 Vert.x Mutiny API 的使用方法,请参阅 [role="bare"][role="bare"]https://smallrye.io/smallrye-mutiny-vertx-bindings.

Use the Vert.x Web Client

本节提供了一个使用 Vert.x WebClient 的示例,示例是在 Quarkus REST(以前称为 RESTEasy Reactive)应用程序的环境中编写的。如上表所示,向项目添加以下依赖项:

pom.xml
<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>smallrye-mutiny-vertx-web-client</artifactId>
</dependency>
build.gradle
implementation("io.smallrye.reactive:smallrye-mutiny-vertx-web-client")

现在,您可以在代码中创建 WebClient 的一个实例:

package org.acme.vertx;


import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import io.smallrye.mutiny.Uni;

import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.ext.web.client.WebClient;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClientOptions;

@Path("/fruit-data")
public class ResourceUsingWebClient {

    private final WebClient client;

    @Inject
    VertxResource(Vertx vertx) {
        this.client = WebClient.create(vertx);
    }

    @GET
    @Produces(MediaType.APPLICATION_JSON)
    @Path("/{name}")
    public Uni<JsonObject> getFruitData(String name) {
        return client.getAbs("https://.../api/fruit/" + name)
                .send()
                .onItem().transform(resp -> {
                    if (resp.statusCode() == 200) {
                        return resp.bodyAsJsonObject();
                    } else {
                        return new JsonObject()
                                .put("code", resp.statusCode())
                                .put("message", resp.bodyAsString());
                    }
                });
    }

}

此资源创建一个 WebClient,并且在收到请求后使用此客户端调用远程 HTTP API。它会根据结果转发接收到的响应,或创建一个封装错误的 JSON 对象。WebClient 是异步的(并且是非阻塞的),因此端点会返回一个 Uni

该应用程序还可以作为本地可执行文件运行。但是,我们首先需要指示 Quarkus 启用 ssl(如果远程 API 使用 HTTPS)。打开 src/main/resources/application.properties 并添加:

quarkus.ssl.native=true

然后,使用以下命令创建本地可执行文件:

CLI
quarkus build --native
Maven
./mvnw install -Dnative
Gradle
./gradlew build -Dquarkus.native.enabled=true

Use Vert.x JSON

Vert.x API 通常依赖于 JSON。Vert.x 提供了两个便捷的类来处理 JSON 文档:io.vertx.core.json.JsonObjectio.vertx.core.json.JsonArray

JsonObject 可用于将对象映射到其 JSON 表示形式,并从 JSON 文档构建一个对象:

// Map an object into JSON
Person person = ...;
JsonObject json = JsonObject.mapFrom(person);

// Build an object from JSON
json = new JsonObject();
person = json.mapTo(Person.class);

请注意,这些特性使用由 quarkus-jackson 扩展管理的映射器。请参阅 Jackson configuration 自定义映射。

JSON 对象和 JSON 数组都支持作为 Quarkus HTTP 端点请求和响应主体(使用经典 RESTEasy 和 Quarkus REST)。考虑以下端点:

package org.acme.vertx;

import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/hello")
@Produces(MediaType.APPLICATION_JSON)
public class VertxJsonResource {

    @GET
    @Path("{name}/object")
    public JsonObject jsonObject(String name) {
        return new JsonObject().put("Hello", name);
    }

    @GET
    @Path("{name}/array")
    public JsonArray jsonArray(String name) {
        return new JsonArray().add("Hello").add(name);
    }
}

[role="bare"][role="bare"]http://localhost:8080/hello/Quarkus/object 返回:

{"Hello":"Quarkus"}

[role="bare"][role="bare"]http://localhost:8080/hello/Quarkus/array 返回:

["Hello","Quarkus"]

无论 JSON 内容是请求主体还是封装在 UniMultiCompletionStagePublisher 中,此操作都很有效。

Use Verticles

verticles[Verticles] is "a simple, scalable, actor-like deployment and concurrency model" provided by _Vert.x.此模型并不声称是一个严格的 actor 模型实现,但它具有类似性,尤其是在并发性、扩展性和部署方面。要使用此模型,请编写 deploy 顶点,并通过向事件总线发送消息的方式进行通信。

您可以在 Quarkus 中部署 verticles。它支持:

  • bare 顶点 - 扩展 io.vertx.core.AbstractVerticle 的 Java 类

  • Mutiny 顶点 - 扩展 io.smallrye.mutiny.vertx.core.AbstractVerticle 的 Java 类

Deploy Verticles

若要部署顶点,请使用 deployVerticle

@Inject Vertx vertx;

// ...
vertx.deployVerticle(MyVerticle.class.getName(), ar -> { });
vertx.deployVerticle(new MyVerticle(), ar -> { });

如果你使用 Vert.x 的 Mutiny 变体,请注意, deployVerticle 方法会返回一个 Uni,你将需要触发一个订阅来完成实际的部署。

后面会出现一个示例,说明如何在应用程序初始化期间部署 top vertex。

Use @ApplicationScoped beans as Verticle

一般来说,Vert.x 顶点不是 CDI Bean。因此不能使用注入。然而,在 Quarkus 中,你可以将顶点部署为 bean。请注意,在这种情况下,由 CDI(Quarkus 中的 Arc)负责创建实例。

以下片段提供了一个示例:

package io.quarkus.vertx.verticles;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class MyBeanVerticle extends AbstractVerticle {

    @ConfigProperty(name = "address") String address;

    @Override
    public Uni<Void> asyncStart() {
        return vertx.eventBus().consumer(address)
                .handler(m -> m.replyAndForget("hello"))
                .completionHandler();
    }
}

你不必注入 vertx 实例;相反,利用 AbstractVerticle 中的受保护字段。

然后,用以下内容部署顶点实例:

package io.quarkus.vertx.verticles;

import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;

@ApplicationScoped
public class VerticleDeployer {

    public void init(@Observes StartupEvent e, Vertx vertx, MyBeanVerticle verticle) {
         vertx.deployVerticle(verticle).await().indefinitely();
    }
}

如果你想部署每个公开的 AbstractVerticle,你可以使用:

public void init(@Observes StartupEvent e, Vertx vertx, Instance<AbstractVerticle> verticles) {
    for (AbstractVerticle verticle : verticles) {
        vertx.deployVerticle(verticle).await().indefinitely();
    }
}

Create multiple verticles instances

使用 @ApplicationScoped 时,你将为顶点获取一个单一实例。拥有多个顶点实例对于共享其负载很有帮助。每个顶点都将与一个不同的 I/O 线程(Vert.x 事件循环)关联。

若要部署多个顶点实例,请使用 @Dependent 作用域,而不是 @ApplicationScoped

package org.acme.verticle;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;

import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;

@Dependent
public class MyVerticle extends AbstractVerticle {

    @Override
    public Uni<Void> asyncStart() {
        return vertx.eventBus().consumer("address")
                .handler(m -> m.reply("Hello from " + this))
                .completionHandler();
    }
}

然后,按如下方式部署顶点:

package org.acme.verticle;

import io.quarkus.runtime.StartupEvent;
import io.vertx.core.DeploymentOptions;
import io.vertx.mutiny.core.Vertx;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;

@ApplicationScoped
public class MyApp {

    void init(@Observes StartupEvent ev, Vertx vertx, Instance<MyVerticle> verticles) {
        vertx
                .deployVerticle(verticles::get, new DeploymentOptions().setInstances(2))
                .await().indefinitely();
    }
}

init 方法会接收一个 Instance<MyVerticle>。然后,你将一个提供器传递给 deployVerticle 方法。提供器仅仅调用 get() 方法。感谢 @Dependent 作用域,它每次调用都返回一个新实例。最后,你将所需的实例数传递给 DeploymentOptions,例如前一个示例中的两个。它会调用提供器两次,这将创建两个顶点实例。

Use the Event Bus

Vert.x 采用内建的 event bus,你可以从 Quarkus 应用程序中使用它。因此,你的应用程序组件(CDI bean、资源等)可以使用异步事件进行交互,从而促进松散耦合。

通过事件总线,可以将 messages 发送到 virtual addresses。事件总线提供三种类型的传递机制:

  • 点对点 - 发送消息,由一个消费者接收。如果几个消费者监听该地址,则会应用轮询;

  • 发布/订阅 - 发布消息;所有监听该地址的消费者都会收到该消息;

  • 请求/回复 - 发送消息并期待回复。接收者可以使用异步方式回复消息。

所有这些传递机制都是非阻塞的,并且为构建响应式应用程序提供了基本条件。

Consume events

在可以使用 Vert.x API 注册消费者的同时,Quarkus 也随附有声明性支持。若要使用事件,请使用 io.quarkus.vertx.ConsumeEvent 注解:

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent                           (1)
    public String consume(String name) {    (2)
        return name.toUpperCase();
    }
}
1 如果没有设置,该地址是 bean 的完全限定名称;例如,在这个代码段中,它是 org.acme.vertx.GreetingService
2 方法参数是消息正文。如果方法返回 something,则它就是消息应答。

Configure the address

`@ConsumeEvent`注释可配置为设置地址:

@ConsumeEvent("greeting")               (1)
public String consume(String name) {
    return name.toUpperCase();
}
1 接收发送到 `greeting`地址的消息

地址值可以是属性表达式。在这种情况下,将使用已配置的值: @ConsumeEvent("${my.consumer.address}")。此外,属性表达式可以指定一个默认值: @ConsumeEvent("${my.consumer.address:defaultAddress}")

Config Property Example
@ConsumeEvent("${my.consumer.address}")   (1)
public String consume(String name) {
    return name.toLowerCase();
}
1 接收发送到使用 `my.consumer.address`键配置的地址的消息。

如果不存在具有指定键的配置属性并且未设置默认值,那么应用程序启动将失败。

Process events asynchronously

前面的示例使用同步处理。还可以通过返回 `io.smallrye.mutiny.Uni`或 `java.util.concurrent.CompletionStage`对异步处理进行处理:

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent
    public Uni<String> process(String name) {
        // return an Uni completed when the processing is finished.
        // You can also fail the Uni explicitly
    }
}
Mutiny

前面的示例使用 Mutiny 响应类型。如果您不熟悉 Mutiny,请查看 Mutiny - an intuitive reactive programming library

Blocking processing of events

默认情况下,使用事件的代码必须是 non-blocking,因为它是在 I/O 线程上调用的。如果您的处理正在阻止,请使用 `@io.smallrye.common.annotation.Blocking`注释:

@ConsumeEvent(value = "blocking-consumer")
@Blocking
void consumeBlocking(String message) {
    // Something blocking
}

或者,您可以使用 `@ConsumeEvent`注释中的 `blocking`属性:

@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
    // Something blocking
}

使用 `@Blocking`时,它会忽略 `@ConsumeEvent`的 `blocking`属性值。

Reply to events

注释有 `@ConsumeEvent`的方法的 _return_值用于响应传入的消息。例如,在下面的代码段中,返回的 `String`是响应。

@ConsumeEvent("greeting")
public String consume(String name) {
    return name.toUpperCase();
}

您还可以返回 `Uni<T>`或 `CompletionStage<T>`来处理异步答复:

@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
    return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}

如果您使用上下文传播扩展,则可以注入一个 executor

@Inject Executor executor;

Implement fire-and-forget interactions

您不必回复接收到的消息。通常,对于 _fire and forget_交互,消息会被使用,而发送方不需要知道它。要实现此模式,您的消费者方法返回 void

@ConsumeEvent("greeting")
public void consume(String event) {
    // Do something with the event
}

Consume messages (instead of events)

不同于直接使用 _payloads_的前一个示例,您还可以直接使用 Message

@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
    System.out.println(msg.address());
    System.out.println(msg.body());
}

Handle failures

如果使用 `@ConsumeEvent`注释的方法抛出异常,则:

  • 如果设置了回复处理程序,那么将通过一个具有代码 ConsumeEvent#FAILURE_CODE 和异常消息的 io.vertx.core.eventbus.ReplyException 将故障传回给发送者,

  • 如果没有设置回复处理程序,那么异常将被再次抛出(如果必要的话,封装在一个 RuntimeException 中),并且可以由默认异常处理程序 i.e. io.vertx.core.Vertx#exceptionHandler() 处理。

Send messages

发送和发布消息使用 Vert.x 事件总线:

package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/async")
public class EventResource {

    @Inject
    EventBus bus;                                            (1)

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)        (2)
                .onItem().transform(Message::body);
    }
}
1 Inject the Event bus
2 向地址 greeting 发送消息。消息负载是 name

EventBus 对象提供以下方法:

  1. 将消息 send 到某个特定地址 - 一个消费者接收消息。

  2. 将消息 publish 到某个特定地址 - 所有的消费者接收消息。

  3. request 消息并且期望一个回复

// Case 1
bus.sendAndForget("greeting", name)
// Case 2
bus.publish("greeting", name)
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
        .onItem().transform(Message::body);

Process events on virtual threads

使用 @ConsumeEvent 注解的方法也可以使用 @RunOnVirtualThread 注解。在此情况下,该方法在虚拟线程上被调用。每个事件在不同的虚拟线程上被调用。

要使用此功能,请确保:

  1. Java 运行时支持虚拟线程。

  2. 方法使用阻塞签名。

第二点意味着只有返回一个对象或 void 的方法才能使用 @RunOnVirtualThread。返回一个 Uni 或一个 CompletionStage cannot 的方法在虚拟线程上运行。

阅读 the virtual thread guide 以了解更多。

Use codecs

链接:https://vertx.io/docs/vertx-core/java/event_bus[Vert.x Event Bus] uses codecs to _serialize 和 deserialize 消息对象。Quarkus 为本地交付提供了一个默认编解码器。针对本地消费者的返回类型和消息体参数(即使用 @ConsumeEvent 注解的方法,其中 ConsumeEvent#local() == true 是默认值),此编解码器将被自动使用。

因此,您可以按照如下方式交换消息对象:

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", new MyName(name))
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello " + name.getName());
}

如果您想使用特定的编解码器,则需要在两端明确地设置它:

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", name,
        new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting", codec = MyNameCodec.class)            (2)
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 设置用于发送消息的编解码器的名称
2 设置用于接收消息的编解码器

Combine HTTP and the Event Bus

让我们重新审视一下 HTTP greeting 端点,并使用异步消息传递将调用委托给一个单独的 bean。它使用请求/响应分发机制。我们在 Jakarta REST 端点中发送消息,而不是实现其中的业务逻辑。另一个 bean 会使用 reply 机制接收此消息,然后发送响应。

在你的 HTTP 端点类中,注入事件总线并使用 request 方法向事件总线发送消息,并期待一个响应:

package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/bus")
public class EventResource {

    @Inject
    EventBus bus;

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)            (1)
                .onItem().transform(Message::body);            (2)
    }
}
1 name 发送到 greeting 地址并请求一个响应
2 当我们得到响应后,提取主体并将其发送给用户

HTTP 方法返回一个 Uni。如果你使用的是 Quarkus REST,Uni 支持是内置的。如果你使用的是 classic RESTEasy,你需要向你的项目中添加 quarkus resteasy-mutiny 扩展。

我们需要一个侦听 greeting 地址的消费者。此消费者可以位于同一个类或另一个 Bean 中,例如:

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent("greeting")
    public String greeting(String name) {
        return "Hello " + name;
    }

}

此 Bean 接收姓名并返回 greeting 消息。

有了这些,/bus/quarkus 上的每个 HTTP 请求都会向事件总线发送一条消息、等待回复,而且当回复到达后,编写 HTTP 响应:

Hello Quarkus

为了更好地理解,让我们详细介绍一下 HTTP 请求/响应是如何处理的:

  1. 请求由 greeting 方法接收

  2. 包含 name 的消息被发送到事件总线

  3. 另一个 bean 接收此消息并计算出响应

  4. 该响应使用 reply 机制发送回来

  5. 一旦发送方收到回复,其内容就会被写入到 HTTP 响应中

Bidirectional communication with browsers by using SockJS

Vert.x 提供的 SockJS 网桥允许浏览器应用程序和 Quarkus 应用程序使用事件总线进行通信。它把两边连接起来。因此,双方都可以向另一方发送消息并收到对方发送的消息。它支持这三种传递机制。

SockJS 协商 Quarkus 应用程序和浏览器之间的通信通道。如果支持 WebSocket,它会使用它们;否则,它会降级为 SSE、长轮询等。

因此,要使用 SockJS,你需要配置网桥,特别是用来进行通信的地址:

package org.acme;

import io.vertx.core.Vertx;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.sockjs.SockJSBridgeOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import java.util.concurrent.atomic.AtomicInteger;

@ApplicationScoped
public class SockJsExample {

    @Inject
    Vertx vertx;

    public void init(@Observes Router router) {
        SockJSHandler sockJSHandler = SockJSHandler.create(vertx);
        Router bridge = sockJSHandler.bridge(new SockJSBridgeOptions()
                .addOutboundPermitted(new PermittedOptions().setAddress("ticks")));
        router.route("/eventbus/*").subRouter(bridge);

        AtomicInteger counter = new AtomicInteger();
        vertx.setPeriodic(1000,
                ignored -> vertx.eventBus().publish("ticks", counter.getAndIncrement()));
    }

}

此代码将 SockJS 网桥配置为将所有针对 ticks 地址的消息发送到已连接的浏览器。更多有关配置的详细说明可以在 the Vert.x SockJS Bridge documentation 中找到。

浏览器必须使用 vertx-eventbus JavaScript 库来使用消息:

<!doctype html>
<html>
<head>
    <meta charset="utf-8"/>
    <title>SockJS example - Quarkus</title>
    <script src="https://code.jquery.com/jquery-3.3.1.min.js"
            integrity="sha256-FgpCb/KJQlLNfOu91ta32o/NMZxltwRo8QtmkMRdAu8=" crossorigin="anonymous"></script>
    <script type="application/javascript" src="https://cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js"></script>
    <script src="https://cdn.jsdelivr.net/npm/vertx3-eventbus-client@3.8.5/vertx-eventbus.min.js"></script>
</head>
<body>

<h1>SockJS Examples</h1>

<p><strong>Last Tick:</strong> <span id="tick"></span></p>

</body>
<script>
    var eb = new EventBus('/eventbus');

    eb.onopen = function () {

        eb.registerHandler('ticks', function (error, message) {
            $("#tick").html(message.body);
        });
    }

</script>
</html>

Use native transports

原生可执行文件中不支持原生传输。

如要使用 io_uring,请参考 Use io_uring部分。

Vert.x 能够使用 Netty’s native transports,可以在特定平台上提供性能提升。若要启用它们,您必须包含平台的相应依赖项。通常建议同时拥有这两者,以保持应用程序与平台无关。Netty 足够智能,可以使用正确的依赖项,包括在不受支持的平台上根本不使用它们:

pom.xml
<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-transport-native-epoll</artifactId>
  <classifier>linux-x86_64</classifier>
</dependency>

<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-transport-native-kqueue</artifactId>
  <classifier>osx-x86_64</classifier>
</dependency>
build.gradle
implementation("io.netty:netty-transport-native-epoll::linux-x86_64")

implementation("io.netty:netty-transport-native-kqueue::osx-x86_64")

您还必须显式配置 Vert.x 以使用原生传输。在 `application.properties`中添加:

quarkus.vertx.prefer-native-transport=true

或在 `application.yml`中:

quarkus:
  vertx:
    prefer-native-transport: true

如果一切运行良好,quarkus 将记录:

[io.qua.ver.cor.run.VertxCoreRecorder] (main) Vertx has Native Transport Enabled: true

Native Linux transport

在 Linux 上,您可以启用以下套接字选项:

  • SO_REUSEPORT

quarkus.http.so-reuse-port=true
  • TCP_QUICKACK

quarkus.http.tcp-quick-ack=true
  • TCP_CORK

quarkus.http.tcp-cork=true
  • TCP_FASTOPEN

quarkus.http.tcp-fast-open=true

Native macOS transport

在 macOS Sierra 及更高版本上,您可以启用以下套接字选项:

  • SO_REUSEPORT

quarkus.http.so-reuse-port=true

Use a Vert.x context-aware scheduler

部分 Mutiny 运算符需要在执行器线程池上安排工作。一个很好的示例是 .onItem().delayIt().by(Duration.ofMillis(10),因为它需要此类执行器来延迟发射。

默认执行器由 `io.smallrye.mutiny.infrastructure.Infrastructure`返回,并且已由 Quarkus 配置和管理。

话虽如此,有时您需要确保在 Vert.x(已复制)上下文中运行操作,而不仅仅是在任意线程中运行。

`io.smallrye.mutiny.vertx.core.ContextAwareScheduler`界面提供了一个获取上下文感知调度程序的 API。此类调度程序使用以下内容进行配置:

  1. 根据您的选择委托 ScheduledExecutorService(提示:可以重复使用 Infrastructure.getDefaultWorkerPool()),以及

  2. 从以下内容中调用上下文获取策略:

    • an explicit Context, or

    • 在当前线程或稍后调用 `Vertx::getOrCreateContext()`在调度请求发生时,或

    • 调用 Vertx::currentContext(),如果当前线程不是 Vert.x 线程,则将失败。

以下是一个 `ContextAwareScheduler`的使用示例:

class MyVerticle extends AbstractVerticle {

    @Override
    public Uni<Void> asyncStart() {
        vertx.getOrCreateContext().put("foo", "bar");

        var delegate = Infrastructure.getDefaultWorkerPool();
        var scheduler = ContextAwareScheduler.delegatingTo(delegate)
            .withCurrentContext();

        return Uni.createFrom().voidItem()
                .onItem().delayIt().onExecutor(scheduler).by(Duration.ofMillis(10))
                .onItem().invoke(() -> {
                    // Prints "bar"
                    var ctx = vertx.getOrCreateContext();
                    System.out.println(ctx.get("foo"));
                });
    }
}

此示例通过捕获调用 `asyncStart()`的 Vert.x 事件循环的上下文来创建调度器。`delayIt`运算符使用该调度程序,我们可以检查我们在 `invoke`中获取的上下文是一个 Vert.x 复制的上下文,其中已传播键 `"foo"`的数据。

Use a Unix domain socket

如果从同一主机建立与 quarkus 服务的连接,则监听 Unix 域套接字时,我们可以消除 TCP 的开销。如果您使用 Envoy 之类的代理设置服务网格,则这种情况可能会发生,因为在这种情况下,对服务的访问会通过代理进行。

这段文字只能在支持 Use native transports 的平台上运作。

启用适当的 Use native transports,并设置以下环境属性:

quarkus.http.domain-socket=/var/run/io.quarkus.app.socket
quarkus.http.domain-socket-enabled=true

quarkus.vertx.prefer-native-transport=true

它本身不会禁用将默认在 0.0.0.0:8080 上打开的 tcp 套接字。可以明确禁用:

quarkus.http.host-enabled=false

这些属性可以通过 Java 的 -D 命令行参数或 application.properties 设置。

不要忘记添加本机传输依赖关系。有关详细信息,请参见 Use native transports

确保您的应用程序具有向套接字写入的正确权限。

Use io_uring

io_uring 不受本机可执行文件支持。

io_uring 支持仍处于试验阶段。

io_uring 是一个 Linux 内核界面,允许您异步发送和接收数据。它为文件和网络 I/O 提供统一语义。它最初设计为针对块设备和文件,但此后获得了处理网络套接字等事务的能力。它有可能单独为网络 I/O 提供适度的性能优势,为混合文件和网络 I/O 应用程序工作负载提供更大的优势。

要了解更多有关 io_uring 的信息,我们推荐以下链接:

  • Why you should use io_uring for network I/O:io_uring 对网络 I/O 的主要好处是易于使用的现代异步 API,并为文件和网络 I/O 提供统一语义。io_uring 对网络 I/O 的潜在性能优势是减少系统调用的数量。这可以为大量小型操作提供最大的好处,因为系统调用的开销可能很大。

  • The Backend Revolution and Why io_uring Is So Important:io_uring API 使用两个环形缓冲区进行应用程序和内核之间的通信(因此得名),并以一种能够实现批处理请求和响应的方式设计。此外,它提供了一种在一次系统调用中提交多个请求的方法,这可以减少开销。

  • What exactly is io_uring?:io_uring 是一个 Linux 内核界面,可以有效地让您异步发送和接收数据。它最初设计为针对块设备和文件,但此后获得了处理网络套接字等事务的能力。

要使用 io_uring,您需要向您的项目添加两个依赖关系并启用本机传输。首先向您的项目添加以下依赖关系:

pom.xml
<dependency>
    <groupId>io.netty.incubator</groupId>
    <artifactId>netty-incubator-transport-native-io_uring</artifactId>
    <version>0.0.21.Final</version> <!-- Update this version (https://github.com/netty/netty-incubator-transport-io_uring/tags) -->
    <classifier>linux-x86_64</classifier>
</dependency>
<dependency>
      <groupId>io.vertx</groupId>
      <artifactId>vertx-io_uring-incubator</artifactId>
</dependency>
build.gradle
// Update the io_uring version by picking the latest from https://github.com/netty/netty-incubator-transport-io_uring/tags
implementation("io.netty.incubator:netty-incubator-transport-native-io_uring:0.0.21.Final")
implementation("io.vertx:vertx-io_uring-incubator")

然后,在 application.properties 中添加:

quarkus.vertx.prefer-native-transport=true
Can I use io_uring on my Linux machine?

要检查您是否可以在 Linux 机器上使用 io_uring,请执行以下命令:

> grep io_uring_setup /proc/kallsyms
0000000000000000 t __pfx_io_uring_setup
0000000000000000 t io_uring_setup
0000000000000000 T __pfx___x64_sys_io_uring_setup
0000000000000000 T __x64_sys_io_uring_setup
0000000000000000 T __pfx___ia32_sys_io_uring_setup
0000000000000000 T __ia32_sys_io_uring_setup
0000000000000000 d event_exit__io_uring_setup
0000000000000000 d event_enter__io_uring_setup
0000000000000000 d __syscall_meta__io_uring_setup
0000000000000000 d args__io_uring_setup
0000000000000000 d types__io_uring_setup
0000000000000000 d __event_exit__io_uring_setup
0000000000000000 d __event_enter__io_uring_setup
0000000000000000 d __p_syscall_meta__io_uring_setup

如果打印出类似以上内容,您可以使用 io_uring

Troubleshooting

io_uring 支持仍处于实验阶段。如果您看到一些奇怪的行为,请检查 Netty io_uring FAQ。此外, netty io_uring was slower than epoll 问题描述了一些配置错误。

域套接字还不受 io_uring 支持。

Vert.x 异步文件系统 API 尚未使用 io_uring。

Deploy on read-only environments

在仅读文件系统环境中,您可能会收到以下形式的错误:

java.lang.IllegalStateException: Failed to create cache dir

假设 /tmp/ 可写,则可以通过将 vertx.cacheDirBase 属性设置为指向 /tmp/ 中的目录来修复此问题,例如,在 Kubernetes 中,通过创建具有值 -Dvertx.cacheDirBase=/tmp/vertx 的环境变量 JAVA_OPTS,或在 application.properties 中设置 quarkus.vertx.cache-directory 属性:

quarkus.vertx.cache-directory=/tmp/vertx

Customize the Vert.x configuration

托管 Vert.x 实例的配置可以使用 application.properties 文件提供,但也可以使用 special beans.CDI bean 暴露 io.quarkus.vertx.VertxOptionsCustomizer 接口可用于自定义 Vert.x 配置。例如,以下自定义程序会更改 tmp 基础目录:

@ApplicationScoped
public class MyCustomizer implements VertxOptionsCustomizer {

    @Override
    public void accept(VertxOptions options) {
        options.setFileSystemOptions(new FileSystemOptions().setFileCacheDir("target"));
    }
}

customizer Bean 接收 VertxOptions (来自应用程序配置),并可以修改它们。