Quarkus Virtual Thread support for gRPC services
本指南解释了在实现 gRPC 服务时如何从 Java 虚拟线程中受益。
本指南重点介绍如何将虚拟线程与 gRPC 扩展结合使用。请参阅“Writing simpler reactive REST services with Quarkus Virtual Thread support”以详细了解常规的 Java 虚拟线程和 Quarkus 虚拟线程支持。 |
默认情况下,Quarkus gRPC 扩展在事件循环线程上调用服务方法。有关此主题的更多详细信息,请参阅“Quarkus Reactive Architecture documentation”。但是,您还可以使用“ @Blocking”注解来指示该服务为“blocking”并且应在工作线程上运行。 Quarkus 虚拟线程对 gRPC 服务提供支持的理念是将服务方法调用卸载到虚拟线程上,而不是在事件循环线程或工作线程上运行。 要在服务方法上启用虚拟线程支持,只需将“ @RunOnVirtualThread”注解添加到该方法中即可。如果 JDK 兼容(Java 19 或更高版本 - 我们建议使用 21 及更高版本),则调用将被卸载到新的虚拟线程。然后,可以执行阻塞操作而不会阻塞已装载虚拟线程的平台线程。
Configuring gRPC services to use virtual threads
我们来看看一个使用虚拟线程实现 gRPC 服务的示例。首先,确保在构建文件中添加 gRPC 扩展依赖项:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-grpc</artifactId>
</dependency>
implementation("io.quarkus:quarkus-grpc")
您还需要确保使用的是 Java 19 或更高版本(我们推荐 21 及更高版本),您可在“pom.xml
”文件中通过以下内容来强制执行此操作:
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
</properties>
使用以下命令运行您的应用程序:
java -jar target/quarkus-app/quarkus-run.jar
或要使用 Quarkus 开发模式,请将以下内容插入到“quarkus-maven-plugin
”配置中:
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus.version}</version>
<executions>
<execution>
<goals>
<goal>build</goal>
<goal>generate-code</goal>
<goal>generate-code-tests</goal>
</goals>
</execution>
</executions>
<configuration>
<source>21</source>
<target>21</target>
</configuration>
</plugin>
然后,您可以在服务实现中开始使用注解“@RunOnVirtualThread
”:
package io.quarkus.grpc.example.streaming;
import com.google.protobuf.ByteString;
import com.google.protobuf.EmptyProtos;
import io.grpc.testing.integration.Messages;
import io.grpc.testing.integration.TestService;
import io.quarkus.grpc.GrpcService;
import io.smallrye.common.annotation.RunOnVirtualThread;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
@GrpcService
public class TestServiceImpl implements TestService {
@RunOnVirtualThread
@Override
public Uni<EmptyProtos.Empty> emptyCall(EmptyProtos.Empty request) {
return Uni.createFrom().item(EmptyProtos.Empty.newBuilder().build());
}
@RunOnVirtualThread
@Override
public Uni<Messages.SimpleResponse> unaryCall(Messages.SimpleRequest request) {
var value = request.getPayload().getBody().toStringUtf8();
var resp = Messages.SimpleResponse.newBuilder()
.setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFromUtf8(value.toUpperCase())).build())
.build();
return Uni.createFrom().item(resp);
}
@Override
@RunOnVirtualThread
public Multi<Messages.StreamingOutputCallResponse> streamingOutputCall(Messages.StreamingOutputCallRequest request) {
var value = request.getPayload().getBody().toStringUtf8();
return Multi.createFrom().<String> emitter(emitter -> {
emitter.emit(value.toUpperCase());
emitter.emit(value.toUpperCase());
emitter.emit(value.toUpperCase());
emitter.complete();
}).map(v -> Messages.StreamingOutputCallResponse.newBuilder()
.setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFromUtf8(v)).build())
.build());
}
}
接收“streams”的 gRPC 方法,例如“Multi
”,不能使用“@RunOnVirtualThread
”,因为该方法不能阻塞,并且必须立即产生其结果(“Multi
”或“Uni
”)。