Quarkus Virtual Thread support with Reactive Messaging

本指南讲述了在 Quarkus 中编写消息处理应用程序时如何从 Java 虚拟线程中受益。

本指南重点介绍了在 Reactive Messaging 扩展中使用虚拟线程。请参阅 Writing simpler reactive REST services with Quarkus Virtual Thread support 以了解有关 Java 虚拟线程和 Quarkus 对 REST 服务的虚拟线程支持的更多信息。 有关特定 Reactive Messaging 扩展的参考指南,请参阅 Apache Kafka Reference GuideReactive Messaging AMQP 1.0 ConnectorReactive Messaging RabbitMQ ConnectorApache Pulsar Reference Guide

默认情况下,Reactive Messaging 会在事件循环线程上调用消息处理方法。请参阅 Quarkus Reactive Architecture documentation 以获取关于此主题的更多详细信息。但是,您有时需要将 Reactive Messaging 与诸如调用外部服务或数据库操作等阻塞处理结合起来。为此,您可以使用 @Blocking 注释,表明处理是 blocking,并且应在工作线程上运行。您可以在 SmallRye Reactive Messaging documentation 中阅读有关阻塞处理的更多内容。 在 Reactive Messaging 中加入 Quarkus 虚拟线程支持的理念在于将其卸载到虚拟线程上,而不是将其运行在事件循环线程或工作线程上。 若要在消息使用者方法上启用虚拟线程支持,只需将 @RunOnVirtualThread 注释添加到该方法中。如果 JDK 兼容(Java 19 或更高版本,我们建议使用 21+),则每条传入的消息都会卸载到一个新的虚拟线程上。然后,就会有可能在不阻塞虚拟线程所安装的平台线程的情况下执行阻塞操作。

Example using the Reactive Messaging Kafka extension

让我们看一个如何在虚拟线程上处理 Kafka 记录的示例。首先,确保将反应式消息传递扩展依赖项添加到您的构建文件中:

pom.xml
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-messaging-kafka</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-kafka")

您还需要确保使用的是 Java 19 或更高版本(我们推荐 21 及更高版本),您可在“pom.xml”文件中通过以下内容来强制执行此操作:

pom.xml
<properties>
    <maven.compiler.source>21</maven.compiler.source>
    <maven.compiler.target>21</maven.compiler.target>
</properties>

使用以下内容运行应用程序:

java -jar target/quarkus-app/quarkus-run.jar

或要使用 Quarkus 开发模式,请将以下内容插入到“quarkus-maven-plugin”配置中:

pom.xml
<maven.compiler.release>21</maven.compiler.release>

然后,您可以在用 `@RunOnVirtualThread`注释您的使用者方法上开始使用该注释,还可以用 `@Incoming`注释。在以下示例中,我们将使用 REST Client对 REST 端点进行阻塞调用:

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.rest.client.inject.RestClient;

import io.smallrye.common.annotation.RunOnVirtualThread;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PriceConsumer {

    @RestClient (2)
    PriceAlertService alertService;


    @Incoming("prices")
    @RunOnVirtualThread (1)
    public void consume(double price) {
        if (price > 90.0) {
            alertService.alert(price); (3)
        }
    }

    @Outgoing("prices-out") (4)
    public Multi<Double> randomPriceGenerator() {
        return Multi.createFrom().<Random, Double>generator(Random::new, (r, e) -> {
            e.emit(r.nextDouble(100));
            return r;
        });
    }


}
1 `@RunOnVirtualThread`在 `@Incoming`方法上的注释确保该方法将在虚拟线程上调用。
2 REST 客户端存根使用 `@RestClient`注释注入。
3 `alert`方法阻塞虚拟线程,直到 REST 调用返回。
4 这个 `@Outgoing`方法生成随机价格并将其写入一个由应用程序消耗的 Kafka 主题。

请注意,默认情况下,反应式消息传递消息处理按顺序进行,以保持消息的顺序。同样地,`@Blocking(ordered = false)`注释会改变此行为,使用 `@RunOnVirtualThread`强制并发消息处理而不保持顺序。

Use the @RunOnVirtualThread annotation

Methods signatures eligible to @RunOnVirtualThread

只有使用 @Blocking`注释的方法才能使用 `@RunOnVirtualThreads。符合资格的方法签名为:

  • @Outgoing("channel-out") O generator()

  • @Outgoing("channel-out") Message<O> generator()

  • @Incoming("channel-in") @Outgoing("channel-out") O process(I in)

  • @Incoming("channel-in") @Outgoing("channel-out") Message&lt;O&gt; process(I in)

  • @Incoming("channel-in") void consume(I in)

  • @Incoming("channel-in") Uni<Void> consume(I in)

  • @Incoming("channel-in") Uni<Void> consume(Message<I> msg)

  • @Incoming("channel-in") CompletionStage<Void> consume(I in)

  • @Incoming("channel-in") CompletionStage<Void> consume(Message<I> msg)

Use of @RunOnVirtualThread annotation on methods and classes

您可以使用 `@RunOnVirtualThread`注释:

  1. 直接在反应式消息传递方法上 - 此方法将被视为 _blocking_并执行在虚拟线程上

  2. 在包含反应式消息传递方法的类上 - 用 `@Blocking`注释的此类方法将执行在虚拟线程上,除非该注释定义配置为使用常规工作线程的池名称

例如,您可以在方法上直接使用 @RunOnVirtualThread

@ApplicationScoped
public class MyBean {

    @Incoming("in")
    @Outgoing("out")
    @RunOnVirtualThread
    public String process(String s) {
        // Called on a new virtual thread for every incoming message
    }
}

或者,您可以在类本身上使用 @RunOnVirtualThread

@ApplicationScoped
@RunOnVirtualThread
public class MyBean {

    @Incoming("in1")
    @Outgoing("out1")
    public String process(String s) {
        // Called on the event loop - no @Blocking annotation
    }

    @Incoming("in2")
    @Outgoing("out2")
    @Blocking
    public String process(String s) {
        // Call on a new virtual thread for every incoming message
    }

    @Incoming("in3")
    @Outgoing("out3")
    @Blocking("my-worker-pool")
    public String process(String s) {
        // Called on a regular worker thread from the pool named "my-worker-pool"
    }
}

Control the maximum concurrency

为了利用虚拟线程的轻量级特性,用 `@RunOnVirtualThread`注释的方法的默认最大并发数为 1024。与平台线程相反,虚拟线程不会池化而是在每条消息的基础上创建。因此,最大并发数单独适用于所有 `@RunOnVirtualThread`方法。

共有两种方式来自定义并发级别:

  1. `@RunOnVirtualThread`注释可以与 @Blocking注释一起使用来指定工作线程名称。[source, java]

@Incoming("prices")
@RunOnVirtualThread
@Blocking("my-worker")
public void consume(double price) {
    //...
}

然后,例如,将此方法的最大并发数设置为 30,使用配置属性 smallrye.messaging.worker.my-worker.max-concurrency=30`设置。 1. 对于尚未配置工作程序名称的每个 `@RunOnVirtualThread 方法,你可以使用 smallrye.messaging.worker.&lt;virtual-thread&gt;.max-concurrency 配置属性。