Quarkus Virtual Thread support with Reactive Messaging
本指南讲述了在 Quarkus 中编写消息处理应用程序时如何从 Java 虚拟线程中受益。
This guide explains how to benefit from Java virtual threads when writing message processing applications in Quarkus.
本指南重点介绍了在 Reactive Messaging 扩展中使用虚拟线程。请参阅 Writing simpler reactive REST services with Quarkus Virtual Thread support 以了解有关 Java 虚拟线程和 Quarkus 对 REST 服务的虚拟线程支持的更多信息。 This guide focuses on using virtual threads with Reactive Messaging extensions. Please refer to Writing simpler reactive REST services with Quarkus Virtual Thread support to read more about Java virtual threads in general and the Quarkus Virtual Thread support for REST services. 有关特定 Reactive Messaging 扩展的参考指南,请参阅 Apache Kafka Reference Guide 、 Reactive Messaging AMQP 1.0 Connector 、 Reactive Messaging RabbitMQ Connector 或 Apache Pulsar Reference Guide。 For reference guides of specific Reactive Messaging extensions see Apache Kafka Reference Guide, Reactive Messaging AMQP 1.0 Connector, Reactive Messaging RabbitMQ Connector or Apache Pulsar Reference Guide. |
默认情况下,Reactive Messaging 会在事件循环线程上调用消息处理方法。请参阅 Quarkus Reactive Architecture documentation 以获取关于此主题的更多详细信息。但是,您有时需要将 Reactive Messaging 与诸如调用外部服务或数据库操作等阻塞处理结合起来。为此,您可以使用 @Blocking 注释,表明处理是 blocking,并且应在工作线程上运行。您可以在 SmallRye Reactive Messaging documentation 中阅读有关阻塞处理的更多内容。
By default, Reactive Messaging invokes message processing methods on an event-loop thread. See the Quarkus Reactive Architecture documentation for further details on this topic. But, you sometimes need to combine Reactive Messaging with blocking processing such as calling external services or database operations. For this, you can use the @Blocking annotation indicating that the processing is blocking and should be run on a worker thread. You can read more on the blocking processing in SmallRye Reactive Messaging documentation.
在 Reactive Messaging 中加入 Quarkus 虚拟线程支持的理念在于将其卸载到虚拟线程上,而不是将其运行在事件循环线程或工作线程上。
The idea behind Quarkus Virtual Thread support for Reactive Messaging is to offload the message processing on virtual threads, instead of running it on an event-loop thread or a worker thread.
若要在消息使用者方法上启用虚拟线程支持,只需将 @RunOnVirtualThread 注释添加到该方法中。如果 JDK 兼容(Java 19 或更高版本,我们建议使用 21+),则每条传入的消息都会卸载到一个新的虚拟线程上。然后,就会有可能在不阻塞虚拟线程所安装的平台线程的情况下执行阻塞操作。
To enable virtual thread support on a message consumer method, simply add the @RunOnVirtualThread annotation to the method. If the JDK is compatible (Java 19 or later versions, we recommend 21+) then each incoming message will be offloaded to a new virtual thread. It will then be possible to perform blocking operations without blocking the platform thread upon which the virtual thread is mounted.
Example using the Reactive Messaging Kafka extension
让我们看一个如何在虚拟线程上处理 Kafka 记录的示例。首先,确保将反应式消息传递扩展依赖项添加到您的构建文件中:
Let’s see an example of how to process Kafka records on virtual threads. First, make sure to have a reactive messaging extension dependency to your build file:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-messaging-kafka</artifactId>
</dependency>
implementation("io.quarkus:quarkus-messaging-kafka")
您还需要确保使用的是 Java 19 或更高版本(我们推荐 21 及更高版本),您可在“pom.xml
”文件中通过以下内容来强制执行此操作:
You also need to make sure that you are using Java 19 or later (we recommend 21+), this can be enforced in your pom.xml
file with the following:
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
</properties>
使用以下内容运行应用程序:
Run the application with:
java -jar target/quarkus-app/quarkus-run.jar
或要使用 Quarkus 开发模式,请将以下内容插入到“quarkus-maven-plugin
”配置中:
or to use the Quarkus Dev mode, insert the following to the quarkus-maven-plugin
configuration:
<maven.compiler.release>21</maven.compiler.release>
然后,您可以在用 `@RunOnVirtualThread`注释您的使用者方法上开始使用该注释,还可以用 `@Incoming`注释。在以下示例中,我们将使用 REST Client对 REST 端点进行阻塞调用:
Then you can start using the annotation @RunOnVirtualThread
on your consumer methods also annotated with @Incoming
.
In the following example we’ll use the REST Client to make a blocking call to a REST endpoint:
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import io.smallrye.common.annotation.RunOnVirtualThread;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class PriceConsumer {
@RestClient (2)
PriceAlertService alertService;
@Incoming("prices")
@RunOnVirtualThread (1)
public void consume(double price) {
if (price > 90.0) {
alertService.alert(price); (3)
}
}
@Outgoing("prices-out") (4)
public Multi<Double> randomPriceGenerator() {
return Multi.createFrom().<Random, Double>generator(Random::new, (r, e) -> {
e.emit(r.nextDouble(100));
return r;
});
}
}
1 | @RunOnVirtualThread annotation on the @Incoming method ensures that the method will be called on a virtual thread. |
2 | the REST client stub is injected with the @RestClient annotation. |
3 | alert method blocks the virtual thread until the REST call returns. |
4 | This @Outgoing method generates random prices and writes them a Kafka topic to be consumed back by the application. |
请注意,默认情况下,反应式消息传递消息处理按顺序进行,以保持消息的顺序。同样地,`@Blocking(ordered = false)`注释会改变此行为,使用 `@RunOnVirtualThread`强制并发消息处理而不保持顺序。
Note that by default Reactive Messaging message processing happens sequentially, preserving the order of messages.
In the same way, @Blocking(ordered = false)
annotation changes this behaviour,
using @RunOnVirtualThread
enforces concurrent message processing without preserving the order.
Use the @RunOnVirtualThread annotation
Methods signatures eligible to @RunOnVirtualThread
只有使用 @Blocking`注释的方法才能使用 `@RunOnVirtualThreads
。符合资格的方法签名为:
Only method can be annotated with @Blocking
can use @RunOnVirtualThreads
.
The eligible method signatures are:
-
@Outgoing("channel-out") O generator()
-
@Outgoing("channel-out") Message<O> generator()
-
@Incoming("channel-in") @Outgoing("channel-out") O process(I in)
-
@Incoming("channel-in") @Outgoing("channel-out") Message<O> process(I in)
-
@Incoming("channel-in") void consume(I in)
-
@Incoming("channel-in") Uni<Void> consume(I in)
-
@Incoming("channel-in") Uni<Void> consume(Message<I> msg)
-
@Incoming("channel-in") CompletionStage<Void> consume(I in)
-
@Incoming("channel-in") CompletionStage<Void> consume(Message<I> msg)
Use of @RunOnVirtualThread annotation on methods and classes
您可以使用 `@RunOnVirtualThread`注释:
You can use the @RunOnVirtualThread
annotation:
-
directly on a reactive messaging method - this method will be considered blocking and executed on a virtual thread
-
on the class containing reactive messaging methods - the methods from this class annotation with
@Blocking
will be executed on virtual thread, except if the annotation defines a pool name configured to use regular worker threads
例如,您可以在方法上直接使用 @RunOnVirtualThread
:
For example, you can use @RunOnVirtualThread
directly on the method:
@ApplicationScoped
public class MyBean {
@Incoming("in")
@Outgoing("out")
@RunOnVirtualThread
public String process(String s) {
// Called on a new virtual thread for every incoming message
}
}
或者,您可以在类本身上使用 @RunOnVirtualThread
:
Alternatively, you can use @RunOnVirtualThread
on the class itself:
@ApplicationScoped
@RunOnVirtualThread
public class MyBean {
@Incoming("in1")
@Outgoing("out1")
public String process(String s) {
// Called on the event loop - no @Blocking annotation
}
@Incoming("in2")
@Outgoing("out2")
@Blocking
public String process(String s) {
// Call on a new virtual thread for every incoming message
}
@Incoming("in3")
@Outgoing("out3")
@Blocking("my-worker-pool")
public String process(String s) {
// Called on a regular worker thread from the pool named "my-worker-pool"
}
}
Control the maximum concurrency
为了利用虚拟线程的轻量级特性,用 `@RunOnVirtualThread`注释的方法的默认最大并发数为 1024。与平台线程相反,虚拟线程不会池化而是在每条消息的基础上创建。因此,最大并发数单独适用于所有 `@RunOnVirtualThread`方法。
In order to leverage the lightweight nature of virtual threads, the default maximum concurrency for methods annotated with @RunOnVirtualThread
is 1024.
As opposed to platform threads, virtual threads are not pooled and created per message. Therefore the maximum concurrency applies separately to all @RunOnVirtualThread
methods.
共有两种方式来自定义并发级别:
There are two ways to customize the concurrency level:
-
The
@RunOnVirtualThread
annotation can be used together with the @Blocking annotation to specify a worker name.[source, java]
@Incoming("prices") @RunOnVirtualThread @Blocking("my-worker") public void consume(double price) { //... }
然后,例如,将此方法的最大并发数设置为 30,使用配置属性 `smallrye.messaging.worker.my-worker.max-concurrency=30`设置。
Then, for example, to set the maximum concurrency of this method down to 30, set using the config property smallrye.messaging.worker.my-worker.max-concurrency=30
.
1. For every @RunOnVirtualThread
method that is not configured with a worker name, you can use the config property smallrye.messaging.worker.<virtual-thread>.max-concurrency
.