Quarkus Virtual Thread support for gRPC services

本指南解释了在实现 gRPC 服务时如何从 Java 虚拟线程中受益。

本指南重点介绍如何将虚拟线程与 gRPC 扩展结合使用。请参阅“Writing simpler reactive REST services with Quarkus Virtual Thread support”以详细了解常规的 Java 虚拟线程和 Quarkus 虚拟线程支持。

默认情况下,Quarkus gRPC 扩展在事件循环线程上调用服务方法。有关此主题的更多详细信息,请参阅“Quarkus Reactive Architecture documentation”。但是,您还可以使用“ @Blocking”注解来指示该服务为“blocking”并且应在工作线程上运行。 Quarkus 虚拟线程对 gRPC 服务提供支持的理念是将服务方法调用卸载到虚拟线程上,而不是在事件循环线程或工作线程上运行。 要在服务方法上启用虚拟线程支持,只需将“ @RunOnVirtualThread”注解添加到该方法中即可。如果 JDK 兼容(Java 19 或更高版本 - 我们建议使用 21 及更高版本),则调用将被卸载到新的虚拟线程。然后,可以执行阻塞操作而不会阻塞已装载虚拟线程的平台线程。

Configuring gRPC services to use virtual threads

我们来看看一个使用虚拟线程实现 gRPC 服务的示例。首先,确保在构建文件中添加 gRPC 扩展依赖项:

pom.xml
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-grpc</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-grpc")

您还需要确保使用的是 Java 19 或更高版本(我们推荐 21 及更高版本),您可在“pom.xml”文件中通过以下内容来强制执行此操作:

pom.xml
<properties>
    <maven.compiler.source>21</maven.compiler.source>
    <maven.compiler.target>21</maven.compiler.target>
</properties>

使用以下命令运行您的应用程序:

java -jar target/quarkus-app/quarkus-run.jar

或要使用 Quarkus 开发模式,请将以下内容插入到“quarkus-maven-plugin”配置中:

pom.xml
<plugin>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-maven-plugin</artifactId>
    <version>${quarkus.version}</version>
    <executions>
        <execution>
            <goals>
                <goal>build</goal>
                <goal>generate-code</goal>
                <goal>generate-code-tests</goal>
            </goals>
        </execution>
    </executions>
    <configuration>
      <source>21</source>
      <target>21</target>
    </configuration>
</plugin>

然后,您可以在服务实现中开始使用注解“@RunOnVirtualThread”:

package io.quarkus.grpc.example.streaming;

import com.google.protobuf.ByteString;
import com.google.protobuf.EmptyProtos;

import io.grpc.testing.integration.Messages;
import io.grpc.testing.integration.TestService;
import io.quarkus.grpc.GrpcService;
import io.smallrye.common.annotation.RunOnVirtualThread;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

@GrpcService
public class TestServiceImpl implements TestService {

    @RunOnVirtualThread
    @Override
    public Uni<EmptyProtos.Empty> emptyCall(EmptyProtos.Empty request) {
        return Uni.createFrom().item(EmptyProtos.Empty.newBuilder().build());
    }

    @RunOnVirtualThread
    @Override
    public Uni<Messages.SimpleResponse> unaryCall(Messages.SimpleRequest request) {
        var value = request.getPayload().getBody().toStringUtf8();
        var resp = Messages.SimpleResponse.newBuilder()
                .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFromUtf8(value.toUpperCase())).build())
                .build();
        return Uni.createFrom().item(resp);
    }

    @Override
    @RunOnVirtualThread
    public Multi<Messages.StreamingOutputCallResponse> streamingOutputCall(Messages.StreamingOutputCallRequest request) {
        var value = request.getPayload().getBody().toStringUtf8();
        return Multi.createFrom().<String> emitter(emitter -> {
            emitter.emit(value.toUpperCase());
            emitter.emit(value.toUpperCase());
            emitter.emit(value.toUpperCase());
            emitter.complete();
        }).map(v -> Messages.StreamingOutputCallResponse.newBuilder()
                .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFromUtf8(v)).build())
                .build());
    }
}
Example 1. Limitations

接收“streams”的 gRPC 方法,例如“Multi”,不能使用“@RunOnVirtualThread”,因为该方法不能阻塞,并且必须立即产生其结果(“Multi”或“Uni”)。