Vert.x Reference Guide
Vert.x是一种用于构建响应式应用程序的工具包。如 Quarkus Reactive Architecture中所述,Quarkus 是着重底层 Vert.x。 本指南是 Using Eclipse Vert.x API from a Quarkus Application指南的配套指南。它提供了有关由 Quarkus 所用 Vert.x 实例的用法和配置的更多高级详细信息。
- Access the Vert.x instance
- Configure the Vert.x instance
- Use Vert.x clients
- Use Vert.x JSON
- Use Verticles
- Use the Event Bus
- Consume events
- Configure the address
- Process events asynchronously
- Blocking processing of events
- Reply to events
- Implement fire-and-forget interactions
- Consume messages (instead of events)
- Handle failures
- Send messages
- Process events on virtual threads
- Use codecs
- Combine HTTP and the Event Bus
- Bidirectional communication with browsers by using SockJS
- Use native transports
- Use a Vert.x context-aware scheduler
- Use a Unix domain socket
- Use io_uring
- Deploy on read-only environments
- Customize the Vert.x configuration
Access the Vert.x instance
要访问托管的 Vert.x 实例,请将 `quarkus-vertx`扩展添加到您的项目中。此依赖可能已经存在于您的项目中(作为传递依赖)。
利用此扩展,您可以使用字段或构造函数注入的方式检索 Vert.x 的托管实例:
@ApplicationScoped
public class MyBean {
// Field injection
@Inject Vertx vertx;
// Constructor injection
MyBean(Vertx vertx) {
// ...
}
}
您可以注入以下任一选项:
-
公开 _bare_Vert.x API 的 `io.vertx.core.Vertx`实例
-
公开 _Mutiny_API 的 `io.vertx.mutiny.core.Vertx`实例
我们建议使用 Mutiny 变体,因为它与 Quarkus 提供的其他响应式 API 集成。
Mutiny
如果您不熟悉 Mutiny,请查看 Mutiny - an intuitive reactive programming library。 |
有关 Vert.x Mutiny 变体的文档可在 [role="bare"][role="bare"]https://smallrye.io/smallrye-mutiny-vertx-bindings上查阅。
Configure the Vert.x instance
您可以通过 `application.properties`文件配置 Vert.x 实例。下表列出了受支持的属性:
Unresolved include directive in modules/ROOT/pages/vertx-reference.adoc - include::../../../target/quarkus-generated-doc/config/quarkus-vertx.adoc[]
另请参阅 Customize the Vert.x configuration,了解如何使用编程方式配置 Vert.x 实例。
Use Vert.x clients
除了 Vert.x 核心,还可以使用大多数 Vert.x 生态系统库。某些 Quarkus 扩展已经封装了 Vert.x 库。
Available APIs
下表列出了 Vert.x 生态系统中使用的 most 库。要访问这些 API,请向项目添加指示的扩展或依赖项。查看关联的文档以了解如何使用它们。
API |
Extension or Dependency |
Documentation |
AMQP Client |
|
|
Circuit Breaker |
|
[role="bare"]https://vertx.io/docs/vertx-circuit-breaker/java/ |
Consul Client |
|
[role="bare"]https://vertx.io/docs/vertx-consul-client/java/ |
DB2 Client |
|
|
Kafka Client |
|
|
Mail Client |
|
|
MQTT Client |
|
No guide yet |
MS SQL Client |
|
|
MySQL Client |
|
|
Oracle Client |
|
|
PostgreSQL Client |
|
|
RabbitMQ Client |
|
[role="bare"]https://vertx.io/docs/vertx-rabbitmq-client/java |
Redis Client |
|
|
Web Client |
|
[role="bare"]https://vertx.io/docs/vertx-web-client/java/ |
要详细了解 Vert.x Mutiny API 的使用方法,请参阅 [role="bare"][role="bare"]https://smallrye.io/smallrye-mutiny-vertx-bindings.
Use the Vert.x Web Client
本节提供了一个使用 Vert.x WebClient
的示例,示例是在 Quarkus REST(以前称为 RESTEasy Reactive)应用程序的环境中编写的。如上表所示,向项目添加以下依赖项:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-web-client</artifactId>
</dependency>
implementation("io.smallrye.reactive:smallrye-mutiny-vertx-web-client")
现在,您可以在代码中创建 WebClient
的一个实例:
package org.acme.vertx;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.ext.web.client.WebClient;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClientOptions;
@Path("/fruit-data")
public class ResourceUsingWebClient {
private final WebClient client;
@Inject
VertxResource(Vertx vertx) {
this.client = WebClient.create(vertx);
}
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/{name}")
public Uni<JsonObject> getFruitData(String name) {
return client.getAbs("https://.../api/fruit/" + name)
.send()
.onItem().transform(resp -> {
if (resp.statusCode() == 200) {
return resp.bodyAsJsonObject();
} else {
return new JsonObject()
.put("code", resp.statusCode())
.put("message", resp.bodyAsString());
}
});
}
}
此资源创建一个 WebClient
,并且在收到请求后使用此客户端调用远程 HTTP API。它会根据结果转发接收到的响应,或创建一个封装错误的 JSON 对象。WebClient
是异步的(并且是非阻塞的),因此端点会返回一个 Uni
。
该应用程序还可以作为本地可执行文件运行。但是,我们首先需要指示 Quarkus 启用 ssl(如果远程 API 使用 HTTPS)。打开 src/main/resources/application.properties
并添加:
quarkus.ssl.native=true
然后,使用以下命令创建本地可执行文件:
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
Use Vert.x JSON
Vert.x API 通常依赖于 JSON。Vert.x 提供了两个便捷的类来处理 JSON 文档:io.vertx.core.json.JsonObject
和 io.vertx.core.json.JsonArray
。
JsonObject
可用于将对象映射到其 JSON 表示形式,并从 JSON 文档构建一个对象:
// Map an object into JSON
Person person = ...;
JsonObject json = JsonObject.mapFrom(person);
// Build an object from JSON
json = new JsonObject();
person = json.mapTo(Person.class);
请注意,这些特性使用由 quarkus-jackson
扩展管理的映射器。请参阅 Jackson configuration 自定义映射。
JSON 对象和 JSON 数组都支持作为 Quarkus HTTP 端点请求和响应主体(使用经典 RESTEasy 和 Quarkus REST)。考虑以下端点:
package org.acme.vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/hello")
@Produces(MediaType.APPLICATION_JSON)
public class VertxJsonResource {
@GET
@Path("{name}/object")
public JsonObject jsonObject(String name) {
return new JsonObject().put("Hello", name);
}
@GET
@Path("{name}/array")
public JsonArray jsonArray(String name) {
return new JsonArray().add("Hello").add(name);
}
}
[role="bare"][role="bare"]http://localhost:8080/hello/Quarkus/object 返回:
{"Hello":"Quarkus"}
[role="bare"][role="bare"]http://localhost:8080/hello/Quarkus/array 返回:
["Hello","Quarkus"]
无论 JSON 内容是请求主体还是封装在 Uni
、Multi
、CompletionStage
或 Publisher
中,此操作都很有效。
Use Verticles
verticles[Verticles] is "a simple, scalable, actor-like deployment and concurrency model" provided by _Vert.x.此模型并不声称是一个严格的 actor 模型实现,但它具有类似性,尤其是在并发性、扩展性和部署方面。要使用此模型,请编写 deploy 顶点,并通过向事件总线发送消息的方式进行通信。
您可以在 Quarkus 中部署 verticles。它支持:
-
bare 顶点 - 扩展
io.vertx.core.AbstractVerticle
的 Java 类 -
Mutiny 顶点 - 扩展
io.smallrye.mutiny.vertx.core.AbstractVerticle
的 Java 类
Deploy Verticles
若要部署顶点,请使用 deployVerticle
:
@Inject Vertx vertx;
// ...
vertx.deployVerticle(MyVerticle.class.getName(), ar -> { });
vertx.deployVerticle(new MyVerticle(), ar -> { });
如果你使用 Vert.x 的 Mutiny 变体,请注意, deployVerticle
方法会返回一个 Uni
,你将需要触发一个订阅来完成实际的部署。
后面会出现一个示例,说明如何在应用程序初始化期间部署 top vertex。 |
Use @ApplicationScoped beans as Verticle
一般来说,Vert.x 顶点不是 CDI Bean。因此不能使用注入。然而,在 Quarkus 中,你可以将顶点部署为 bean。请注意,在这种情况下,由 CDI(Quarkus 中的 Arc)负责创建实例。
以下片段提供了一个示例:
package io.quarkus.vertx.verticles;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class MyBeanVerticle extends AbstractVerticle {
@ConfigProperty(name = "address") String address;
@Override
public Uni<Void> asyncStart() {
return vertx.eventBus().consumer(address)
.handler(m -> m.replyAndForget("hello"))
.completionHandler();
}
}
你不必注入 vertx
实例;相反,利用 AbstractVerticle
中的受保护字段。
然后,用以下内容部署顶点实例:
package io.quarkus.vertx.verticles;
import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
@ApplicationScoped
public class VerticleDeployer {
public void init(@Observes StartupEvent e, Vertx vertx, MyBeanVerticle verticle) {
vertx.deployVerticle(verticle).await().indefinitely();
}
}
如果你想部署每个公开的 AbstractVerticle
,你可以使用:
public void init(@Observes StartupEvent e, Vertx vertx, Instance<AbstractVerticle> verticles) {
for (AbstractVerticle verticle : verticles) {
vertx.deployVerticle(verticle).await().indefinitely();
}
}
Create multiple verticles instances
使用 @ApplicationScoped
时,你将为顶点获取一个单一实例。拥有多个顶点实例对于共享其负载很有帮助。每个顶点都将与一个不同的 I/O 线程(Vert.x 事件循环)关联。
若要部署多个顶点实例,请使用 @Dependent
作用域,而不是 @ApplicationScoped
:
package org.acme.verticle;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
@Dependent
public class MyVerticle extends AbstractVerticle {
@Override
public Uni<Void> asyncStart() {
return vertx.eventBus().consumer("address")
.handler(m -> m.reply("Hello from " + this))
.completionHandler();
}
}
然后,按如下方式部署顶点:
package org.acme.verticle;
import io.quarkus.runtime.StartupEvent;
import io.vertx.core.DeploymentOptions;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
@ApplicationScoped
public class MyApp {
void init(@Observes StartupEvent ev, Vertx vertx, Instance<MyVerticle> verticles) {
vertx
.deployVerticle(verticles::get, new DeploymentOptions().setInstances(2))
.await().indefinitely();
}
}
init
方法会接收一个 Instance<MyVerticle>
。然后,你将一个提供器传递给 deployVerticle
方法。提供器仅仅调用 get()
方法。感谢 @Dependent
作用域,它每次调用都返回一个新实例。最后,你将所需的实例数传递给 DeploymentOptions
,例如前一个示例中的两个。它会调用提供器两次,这将创建两个顶点实例。
Use the Event Bus
Vert.x 采用内建的 event bus,你可以从 Quarkus 应用程序中使用它。因此,你的应用程序组件(CDI bean、资源等)可以使用异步事件进行交互,从而促进松散耦合。
通过事件总线,可以将 messages 发送到 virtual addresses。事件总线提供三种类型的传递机制:
-
点对点 - 发送消息,由一个消费者接收。如果几个消费者监听该地址,则会应用轮询;
-
发布/订阅 - 发布消息;所有监听该地址的消费者都会收到该消息;
-
请求/回复 - 发送消息并期待回复。接收者可以使用异步方式回复消息。
所有这些传递机制都是非阻塞的,并且为构建响应式应用程序提供了基本条件。
Consume events
在可以使用 Vert.x API 注册消费者的同时,Quarkus 也随附有声明性支持。若要使用事件,请使用 io.quarkus.vertx.ConsumeEvent
注解:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent (1)
public String consume(String name) { (2)
return name.toUpperCase();
}
}
1 | 如果没有设置,该地址是 bean 的完全限定名称;例如,在这个代码段中,它是 org.acme.vertx.GreetingService 。 |
2 | 方法参数是消息正文。如果方法返回 something,则它就是消息应答。 |
Configure the address
`@ConsumeEvent`注释可配置为设置地址:
@ConsumeEvent("greeting") (1)
public String consume(String name) {
return name.toUpperCase();
}
1 | 接收发送到 `greeting`地址的消息 |
地址值可以是属性表达式。在这种情况下,将使用已配置的值: @ConsumeEvent("${my.consumer.address}")
。此外,属性表达式可以指定一个默认值: @ConsumeEvent("${my.consumer.address:defaultAddress}")
。
@ConsumeEvent("${my.consumer.address}") (1)
public String consume(String name) {
return name.toLowerCase();
}
1 | 接收发送到使用 `my.consumer.address`键配置的地址的消息。 |
如果不存在具有指定键的配置属性并且未设置默认值,那么应用程序启动将失败。 |
Process events asynchronously
前面的示例使用同步处理。还可以通过返回 `io.smallrye.mutiny.Uni`或 `java.util.concurrent.CompletionStage`对异步处理进行处理:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent
public Uni<String> process(String name) {
// return an Uni completed when the processing is finished.
// You can also fail the Uni explicitly
}
}
Mutiny
前面的示例使用 Mutiny 响应类型。如果您不熟悉 Mutiny,请查看 Mutiny - an intuitive reactive programming library。 |
Blocking processing of events
默认情况下,使用事件的代码必须是 non-blocking,因为它是在 I/O 线程上调用的。如果您的处理正在阻止,请使用 `@io.smallrye.common.annotation.Blocking`注释:
@ConsumeEvent(value = "blocking-consumer")
@Blocking
void consumeBlocking(String message) {
// Something blocking
}
或者,您可以使用 `@ConsumeEvent`注释中的 `blocking`属性:
@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
// Something blocking
}
使用 `@Blocking`时,它会忽略 `@ConsumeEvent`的 `blocking`属性值。
Reply to events
注释有 `@ConsumeEvent`的方法的 _return_值用于响应传入的消息。例如,在下面的代码段中,返回的 `String`是响应。
@ConsumeEvent("greeting")
public String consume(String name) {
return name.toUpperCase();
}
您还可以返回 `Uni<T>`或 `CompletionStage<T>`来处理异步答复:
@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}
如果您使用上下文传播扩展,则可以注入一个
|
Implement fire-and-forget interactions
您不必回复接收到的消息。通常,对于 _fire and forget_交互,消息会被使用,而发送方不需要知道它。要实现此模式,您的消费者方法返回 void
。
@ConsumeEvent("greeting")
public void consume(String event) {
// Do something with the event
}
Consume messages (instead of events)
不同于直接使用 _payloads_的前一个示例,您还可以直接使用 Message
:
@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
System.out.println(msg.address());
System.out.println(msg.body());
}
Handle failures
如果使用 `@ConsumeEvent`注释的方法抛出异常,则:
-
如果设置了回复处理程序,那么将通过一个具有代码
ConsumeEvent#FAILURE_CODE
和异常消息的io.vertx.core.eventbus.ReplyException
将故障传回给发送者, -
如果没有设置回复处理程序,那么异常将被再次抛出(如果必要的话,封装在一个
RuntimeException
中),并且可以由默认异常处理程序 i.e.io.vertx.core.Vertx#exceptionHandler()
处理。
Send messages
发送和发布消息使用 Vert.x 事件总线:
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
@Inject
EventBus bus; (1)
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (2)
.onItem().transform(Message::body);
}
}
1 | Inject the Event bus |
2 | 向地址 greeting 发送消息。消息负载是 name |
EventBus
对象提供以下方法:
-
将消息
send
到某个特定地址 - 一个消费者接收消息。 -
将消息
publish
到某个特定地址 - 所有的消费者接收消息。 -
request
消息并且期望一个回复
// Case 1
bus.sendAndForget("greeting", name)
// Case 2
bus.publish("greeting", name)
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
.onItem().transform(Message::body);
Process events on virtual threads
使用 @ConsumeEvent
注解的方法也可以使用 @RunOnVirtualThread
注解。在此情况下,该方法在虚拟线程上被调用。每个事件在不同的虚拟线程上被调用。
要使用此功能,请确保:
-
Java 运行时支持虚拟线程。
-
方法使用阻塞签名。
第二点意味着只有返回一个对象或 void
的方法才能使用 @RunOnVirtualThread
。返回一个 Uni
或一个 CompletionStage
cannot 的方法在虚拟线程上运行。
阅读 the virtual thread guide 以了解更多。
Use codecs
链接:https://vertx.io/docs/vertx-core/java/event_bus[Vert.x Event Bus] uses codecs to _serialize 和 deserialize 消息对象。Quarkus 为本地交付提供了一个默认编解码器。针对本地消费者的返回类型和消息体参数(即使用 @ConsumeEvent
注解的方法,其中 ConsumeEvent#local() == true
是默认值),此编解码器将被自动使用。
因此,您可以按照如下方式交换消息对象:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", new MyName(name))
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello " + name.getName());
}
如果您想使用特定的编解码器,则需要在两端明确地设置它:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name,
new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting", codec = MyNameCodec.class) (2)
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 | 设置用于发送消息的编解码器的名称 |
2 | 设置用于接收消息的编解码器 |
Combine HTTP and the Event Bus
让我们重新审视一下 HTTP greeting 端点,并使用异步消息传递将调用委托给一个单独的 bean。它使用请求/响应分发机制。我们在 Jakarta REST 端点中发送消息,而不是实现其中的业务逻辑。另一个 bean 会使用 reply 机制接收此消息,然后发送响应。
在你的 HTTP 端点类中,注入事件总线并使用 request
方法向事件总线发送消息,并期待一个响应:
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/bus")
public class EventResource {
@Inject
EventBus bus;
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (1)
.onItem().transform(Message::body); (2)
}
}
1 | 将 name 发送到 greeting 地址并请求一个响应 |
2 | 当我们得到响应后,提取主体并将其发送给用户 |
HTTP 方法返回一个 |
我们需要一个侦听 greeting
地址的消费者。此消费者可以位于同一个类或另一个 Bean 中,例如:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent("greeting")
public String greeting(String name) {
return "Hello " + name;
}
}
此 Bean 接收姓名并返回 greeting 消息。
有了这些,/bus/quarkus
上的每个 HTTP 请求都会向事件总线发送一条消息、等待回复,而且当回复到达后,编写 HTTP 响应:
Hello Quarkus
为了更好地理解,让我们详细介绍一下 HTTP 请求/响应是如何处理的:
-
请求由
greeting
方法接收 -
包含 name 的消息被发送到事件总线
-
另一个 bean 接收此消息并计算出响应
-
该响应使用 reply 机制发送回来
-
一旦发送方收到回复,其内容就会被写入到 HTTP 响应中
Bidirectional communication with browsers by using SockJS
Vert.x 提供的 SockJS 网桥允许浏览器应用程序和 Quarkus 应用程序使用事件总线进行通信。它把两边连接起来。因此,双方都可以向另一方发送消息并收到对方发送的消息。它支持这三种传递机制。
SockJS 协商 Quarkus 应用程序和浏览器之间的通信通道。如果支持 WebSocket,它会使用它们;否则,它会降级为 SSE、长轮询等。
因此,要使用 SockJS,你需要配置网桥,特别是用来进行通信的地址:
package org.acme;
import io.vertx.core.Vertx;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.sockjs.SockJSBridgeOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import java.util.concurrent.atomic.AtomicInteger;
@ApplicationScoped
public class SockJsExample {
@Inject
Vertx vertx;
public void init(@Observes Router router) {
SockJSHandler sockJSHandler = SockJSHandler.create(vertx);
Router bridge = sockJSHandler.bridge(new SockJSBridgeOptions()
.addOutboundPermitted(new PermittedOptions().setAddress("ticks")));
router.route("/eventbus/*").subRouter(bridge);
AtomicInteger counter = new AtomicInteger();
vertx.setPeriodic(1000,
ignored -> vertx.eventBus().publish("ticks", counter.getAndIncrement()));
}
}
此代码将 SockJS 网桥配置为将所有针对 ticks
地址的消息发送到已连接的浏览器。更多有关配置的详细说明可以在 the Vert.x SockJS Bridge documentation 中找到。
浏览器必须使用 vertx-eventbus
JavaScript 库来使用消息:
<!doctype html>
<html>
<head>
<meta charset="utf-8"/>
<title>SockJS example - Quarkus</title>
<script src="https://code.jquery.com/jquery-3.3.1.min.js"
integrity="sha256-FgpCb/KJQlLNfOu91ta32o/NMZxltwRo8QtmkMRdAu8=" crossorigin="anonymous"></script>
<script type="application/javascript" src="https://cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/vertx3-eventbus-client@3.8.5/vertx-eventbus.min.js"></script>
</head>
<body>
<h1>SockJS Examples</h1>
<p><strong>Last Tick:</strong> <span id="tick"></span></p>
</body>
<script>
var eb = new EventBus('/eventbus');
eb.onopen = function () {
eb.registerHandler('ticks', function (error, message) {
$("#tick").html(message.body);
});
}
</script>
</html>
Use native transports
原生可执行文件中不支持原生传输。
如要使用 |
Vert.x 能够使用 Netty’s native transports,可以在特定平台上提供性能提升。若要启用它们,您必须包含平台的相应依赖项。通常建议同时拥有这两者,以保持应用程序与平台无关。Netty 足够智能,可以使用正确的依赖项,包括在不受支持的平台上根本不使用它们:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<classifier>osx-x86_64</classifier>
</dependency>
implementation("io.netty:netty-transport-native-epoll::linux-x86_64")
implementation("io.netty:netty-transport-native-kqueue::osx-x86_64")
您还必须显式配置 Vert.x 以使用原生传输。在 `application.properties`中添加:
quarkus.vertx.prefer-native-transport=true
或在 `application.yml`中:
quarkus:
vertx:
prefer-native-transport: true
如果一切运行良好,quarkus 将记录:
[io.qua.ver.cor.run.VertxCoreRecorder] (main) Vertx has Native Transport Enabled: true
Use a Vert.x context-aware scheduler
部分 Mutiny 运算符需要在执行器线程池上安排工作。一个很好的示例是 .onItem().delayIt().by(Duration.ofMillis(10)
,因为它需要此类执行器来延迟发射。
默认执行器由 `io.smallrye.mutiny.infrastructure.Infrastructure`返回,并且已由 Quarkus 配置和管理。
话虽如此,有时您需要确保在 Vert.x(已复制)上下文中运行操作,而不仅仅是在任意线程中运行。
`io.smallrye.mutiny.vertx.core.ContextAwareScheduler`界面提供了一个获取上下文感知调度程序的 API。此类调度程序使用以下内容进行配置:
-
根据您的选择委托
ScheduledExecutorService
(提示:可以重复使用Infrastructure.getDefaultWorkerPool()
),以及 -
从以下内容中调用上下文获取策略:
-
an explicit
Context
, or -
在当前线程或稍后调用 `Vertx::getOrCreateContext()`在调度请求发生时,或
-
调用
Vertx::currentContext()
,如果当前线程不是 Vert.x 线程,则将失败。
-
以下是一个 `ContextAwareScheduler`的使用示例:
class MyVerticle extends AbstractVerticle {
@Override
public Uni<Void> asyncStart() {
vertx.getOrCreateContext().put("foo", "bar");
var delegate = Infrastructure.getDefaultWorkerPool();
var scheduler = ContextAwareScheduler.delegatingTo(delegate)
.withCurrentContext();
return Uni.createFrom().voidItem()
.onItem().delayIt().onExecutor(scheduler).by(Duration.ofMillis(10))
.onItem().invoke(() -> {
// Prints "bar"
var ctx = vertx.getOrCreateContext();
System.out.println(ctx.get("foo"));
});
}
}
此示例通过捕获调用 `asyncStart()`的 Vert.x 事件循环的上下文来创建调度器。`delayIt`运算符使用该调度程序,我们可以检查我们在 `invoke`中获取的上下文是一个 Vert.x 复制的上下文,其中已传播键 `"foo"`的数据。
Use a Unix domain socket
如果从同一主机建立与 quarkus 服务的连接,则监听 Unix 域套接字时,我们可以消除 TCP 的开销。如果您使用 Envoy 之类的代理设置服务网格,则这种情况可能会发生,因为在这种情况下,对服务的访问会通过代理进行。
这段文字只能在支持 Use native transports 的平台上运作。
启用适当的 Use native transports,并设置以下环境属性:
quarkus.http.domain-socket=/var/run/io.quarkus.app.socket quarkus.http.domain-socket-enabled=true quarkus.vertx.prefer-native-transport=true
它本身不会禁用将默认在 0.0.0.0:8080
上打开的 tcp 套接字。可以明确禁用:
quarkus.http.host-enabled=false
这些属性可以通过 Java 的 -D
命令行参数或 application.properties
设置。
不要忘记添加本机传输依赖关系。有关详细信息,请参见 Use native transports。
确保您的应用程序具有向套接字写入的正确权限。
Use io_uring
io_uring
不受本机可执行文件支持。
|
io_uring
是一个 Linux 内核界面,允许您异步发送和接收数据。它为文件和网络 I/O 提供统一语义。它最初设计为针对块设备和文件,但此后获得了处理网络套接字等事务的能力。它有可能单独为网络 I/O 提供适度的性能优势,为混合文件和网络 I/O 应用程序工作负载提供更大的优势。
要了解更多有关 io_uring
的信息,我们推荐以下链接:
-
Why you should use io_uring for network I/O:io_uring 对网络 I/O 的主要好处是易于使用的现代异步 API,并为文件和网络 I/O 提供统一语义。io_uring 对网络 I/O 的潜在性能优势是减少系统调用的数量。这可以为大量小型操作提供最大的好处,因为系统调用的开销可能很大。
-
The Backend Revolution and Why io_uring Is So Important:io_uring API 使用两个环形缓冲区进行应用程序和内核之间的通信(因此得名),并以一种能够实现批处理请求和响应的方式设计。此外,它提供了一种在一次系统调用中提交多个请求的方法,这可以减少开销。
-
What exactly is io_uring?:io_uring 是一个 Linux 内核界面,可以有效地让您异步发送和接收数据。它最初设计为针对块设备和文件,但此后获得了处理网络套接字等事务的能力。
要使用 io_uring
,您需要向您的项目添加两个依赖关系并启用本机传输。首先向您的项目添加以下依赖关系:
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
<version>0.0.21.Final</version> <!-- Update this version (https://github.com/netty/netty-incubator-transport-io_uring/tags) -->
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-io_uring-incubator</artifactId>
</dependency>
// Update the io_uring version by picking the latest from https://github.com/netty/netty-incubator-transport-io_uring/tags
implementation("io.netty.incubator:netty-incubator-transport-native-io_uring:0.0.21.Final")
implementation("io.vertx:vertx-io_uring-incubator")
然后,在 application.properties
中添加:
quarkus.vertx.prefer-native-transport=true
Can I use io_uring on my Linux machine?
要检查您是否可以在 Linux 机器上使用
如果打印出类似以上内容,您可以使用 |
Troubleshooting
|
域套接字还不受 io_uring 支持。
Vert.x 异步文件系统 API 尚未使用 io_uring。
Deploy on read-only environments
在仅读文件系统环境中,您可能会收到以下形式的错误:
java.lang.IllegalStateException: Failed to create cache dir
假设 /tmp/
可写,则可以通过将 vertx.cacheDirBase
属性设置为指向 /tmp/
中的目录来修复此问题,例如,在 Kubernetes 中,通过创建具有值 -Dvertx.cacheDirBase=/tmp/vertx
的环境变量 JAVA_OPTS
,或在 application.properties
中设置 quarkus.vertx.cache-directory
属性:
quarkus.vertx.cache-directory=/tmp/vertx
Customize the Vert.x configuration
托管 Vert.x 实例的配置可以使用 application.properties
文件提供,但也可以使用 special beans.CDI bean 暴露 io.quarkus.vertx.VertxOptionsCustomizer
接口可用于自定义 Vert.x 配置。例如,以下自定义程序会更改 tmp
基础目录:
@ApplicationScoped
public class MyCustomizer implements VertxOptionsCustomizer {
@Override
public void accept(VertxOptions options) {
options.setFileSystemOptions(new FileSystemOptions().setFileCacheDir("target"));
}
}
customizer Bean 接收 VertxOptions
(来自应用程序配置),并可以修改它们。