Implementing a gRPC Service
quarkus-grpc 会自动注册并提供公开为 CDI Bean 的 gRPC 服务实现。
实现 gRPC 服务需要生成 gRPC 类。将 proto
文件放入 src/main/proto
并运行 mvn compile
。
- Generated Code
- Implementing a Service with the Mutiny API
- Implementing a Service with the default gRPC API
- Blocking Service Implementation
- Handling Streams
- Health Check
- Reflection Service
- Scaling
- Server Configuration
- Example of Configuration
- Server Interceptors
- Testing your services
- Trying out your services manually
- gRPC Server metrics
- gRPC Server authorization
Generated Code
Quarkus 会为在 proto
文件中声明的服务生成一些实现类:
-
一个使用 Mutiny API 的 service interface
-
类名是
${JAVA_PACKAGE}.${NAME_OF_THE_SERVICE}
-
-
一个使用 gRPC API 的 implementation base 类
-
类名结构如下:
${JAVA_PACKAGE}.${NAME_OF_THE_SERVICE}Grpc.${NAME_OF_THE_SERVICE}ImplBase
-
例如,如果您使用以下 proto
文件片段:
option java_package = "hello"; 1
service Greeter { 2
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
1 | hello 是生成类的 Java 包。 |
2 | Greeter 是服务名称。 |
然后服务接口是 hello.Greeter
,实现基础是抽象静态嵌套类: hello.GreeterGrpc.GreeterImplBase
。
您需要实现 service interface 或使用服务实现 Bean 扩展 base class,如以下部分所述。
Implementing a Service with the Mutiny API
要使用 Mutiny API 实现 gRPC 服务,创建一个实现服务接口的类。然后,实现服务接口中定义的方法。如果您不想实现服务方法,只需从方法体中抛出 java.lang.UnsupportedOperationException
(异常将自动转换为适当的 gRPC 异常)。最后,实现服务并添加 @GrpcService
注释:
import io.quarkus.grpc.GrpcService;
import hello.Greeter;
@GrpcService 1
public class HelloService implements Greeter { 2
@Override
public Uni<HelloReply> sayHello(HelloRequest request) {
return Uni.createFrom().item(() ->
HelloReply.newBuilder().setMessage("Hello " + request.getName()).build()
);
}
}
1 | gRPC 服务实现 Bean 必须使用 @GrpcService 注释进行注释,并且不应声明任何其他 CDI 限定符。所有 gRPC 服务都有 jakarta.inject.Singleton 作用域。此外,请求上下文在服务调用期间始终处于活动状态。 |
2 | hello.Greeter 是一款生成的服务器接口。 |
服务器实现 bean 还可以扩展 Mutiny 实现基,类名的结构如下: |
Implementing a Service with the default gRPC API
要使用默认 gRPC API 来实现 gRPC 服务器,请创建一个扩展默认实现基的类。然后,重写服务器接口中定义的方法。最后,实现服务器并添加 @GrpcService
注释:
import io.quarkus.grpc.GrpcService;
@GrpcService
public class HelloService extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
String name = request.getName();
String message = "Hello " + name;
responseObserver.onNext(HelloReply.newBuilder().setMessage(message).build());
responseObserver.onCompleted();
}
}
Blocking Service Implementation
默认情况下,gRPC 服务器的所有方法都会在事件循环上运行。因此,您必须 not 块。如果您的服务器逻辑必须阻塞,请使用 `io.smallrye.common.annotation.Blocking`注释该方法:
@Override
@Blocking
public Uni<HelloReply> sayHelloBlocking(HelloRequest request) {
// Do something blocking before returning the Uni
}
Handling Streams
gRPC 允许接收和返回流:
service Streaming {
rpc Source(Empty) returns (stream Item) {} // Returns a stream
rpc Sink(stream Item) returns (Empty) {} // Reads a stream
rpc Pipe(stream Item) returns (stream Item) {} // Reads a streams and return a streams
}
使用 Mutiny,您可以实现如下操作:
import io.quarkus.grpc.GrpcService;
@GrpcService
public class StreamingService implements Streaming {
@Override
public Multi<Item> source(Empty request) {
// Just returns a stream emitting an item every 2ms and stopping after 10 items.
return Multi.createFrom().ticks().every(Duration.ofMillis(2))
.select().first(10)
.map(l -> Item.newBuilder().setValue(Long.toString(l)).build());
}
@Override
public Uni<Empty> sink(Multi<Item> request) {
// Reads the incoming streams, consume all the items.
return request
.map(Item::getValue)
.map(Long::parseLong)
.collect().last()
.map(l -> Empty.newBuilder().build());
}
@Override
public Multi<Item> pipe(Multi<Item> request) {
// Reads the incoming stream, compute a sum and return the cumulative results
// in the outbound stream.
return request
.map(Item::getValue)
.map(Long::parseLong)
.onItem().scan(() -> 0L, Long::sum)
.onItem().transform(l -> Item.newBuilder().setValue(Long.toString(l)).build());
}
}
Health Check
对于已实现的服务器,Quarkus gRPC 会以以下格式公开健康信息:
syntax = "proto3";
package grpc.health.v1;
message HealthCheckRequest {
string service = 1;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
}
ServingStatus status = 1;
}
service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}
客户端可以指定完全限定的服务器名称以获取特定服务器的运行状况,或者跳过指定服务器名称以获取 gRPC 服务器的常规状态。
有关更多详细信息,请参阅 gRPC documentation
此外,如果将 Quarkus SmallRye Health 添加到应用程序,则将在 MicroProfile Health 端点响应中添加针对 gRPC 服务器状态的就绪检查,即 /q/health
。
Reflection Service
Quarkus gRPC 服务器实现了 reflection service。此服务器允许 grpcurl 或 grpcox 等工具与您的服务器进行交互。
在 dev 模式下,默认情况下启用了反射服务器。在测试或生产模式下,您需要通过将 quarkus.grpc.server.enable-reflection-service
设置为 true
来明确启用它。
Quarkus 公开反射服务器 |
Scaling
默认情况下,quarkus-grpc 会启动一个在单个事件循环上运行的单个 gRPC 服务器。
如果您希望扩展服务器,您可以通过设置 quarkus.grpc.server.instances
来设置服务器实例的数量。
Server Configuration
Unresolved include directive in modules/ROOT/pages/grpc-service-implementation.adoc - include::../../../target/quarkus-generated-doc/config/quarkus-grpc_quarkus.grpc.server.adoc[]
当您禁用 quarkus.grpc.server.use-separate-server
时,您正在使用新的 Vert.x gRPC 服务器实现,它使用了现有的 HTTP 服务器。这意味着服务器端口现在是 8080
(或使用 quarkus.http.port
配置的端口)。此外,大多数其他配置属性不再适用,因为应正确配置 HTTP 服务器。
当您启用 quarkus.grpc.server.xds.enabled
时,xDS 应处理上述大部分配置。
Example of Configuration
Server Interceptors
gRPC 服务器拦截器允许你在调用服务之前执行逻辑,如验证。
你可以通过创建实现 io.grpc.ServerInterceptor
的 @ApplicationScoped
Bean 来实现 gRPC 服务器拦截器:
@ApplicationScoped
// add @GlobalInterceptor for interceptors meant to be invoked for every service
public class MyInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall,
Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
// ...
}
}
还可以在生产者方法中添加注释作为全局拦截器:
import io.quarkus.grpc.GlobalInterceptor;
import jakarta.enterprise.inject.Produces;
public class MyProducer {
@GlobalInterceptor
@Produces
public MyInterceptor myInterceptor() {
return new MyInterceptor();
}
}
检查 ServerInterceptor JavaDoc 以正确实现你的拦截器。 |
若要将拦截器应用到所有公开服务,请使用 @io.quarkus.grpc.GlobalInterceptor
标注它。若要将拦截器应用到单个服务,请使用 @io.quarkus.grpc.RegisterInterceptor
在服务中注册它:
import io.quarkus.grpc.GrpcService;
import io.quarkus.grpc.RegisterInterceptor;
@GrpcService
@RegisterInterceptor(MyInterceptor.class)
public class StreamingService implements Streaming {
// ...
}
如果你有多个服务器拦截器,则可以通过实现 jakarta.enterprise.inject.spi.Prioritized
接口来对它们排序。请注意,在调用特定于服务拦截器之前调用所有全局拦截器。
@ApplicationScoped
public class MyInterceptor implements ServerInterceptor, Prioritized {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall,
Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
// ...
}
@Override
public int getPriority() {
return 10;
}
}
具有最高优先级的拦截器被首先调用。如果拦截器没有实现 Prioritized
接口,使用的默认优先级为 0
。
Testing your services
测试 gRPC 服务最简单的方法是使用 gRPC 客户端,如 Consuming a gRPC Service 中所示。
请注意,在使用客户端测试未使用 TLS 的公开服务的情况下,无需提供任何配置。例如,若要测试上面定义的 HelloService
,可以创建以下测试:
public class HelloServiceTest implements Greeter {
@GrpcClient
Greeter client;
@Test
void shouldReturnHello() {
CompletableFuture<String> message = new CompletableFuture<>();
client.sayHello(HelloRequest.newBuilder().setName("Quarkus").build())
.subscribe().with(reply -> message.complete(reply.getMessage()));
assertThat(message.get(5, TimeUnit.SECONDS)).isEqualTo("Hello Quarkus");
}
}
Trying out your services manually
在开发模式下,你可以在 Quarkus Dev UI 中体验你的 gRPC 服务。只需转到 [role="bare"][role="bare"]http://localhost:8080/q/dev-ui 并单击 gRPC 磁贴下的 Services。
请注意,你的应用程序需要公开“正常”HTTP 端口才能访问 Dev UI。如果你的应用程序不公开任何 HTTP 端点,你可以创建一个具有 quarkus-vertx-http
依赖性的专用概要文件:
<profiles>
<profile>
<id>development</id>
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx-http</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>
拥有它之后,你可以使用 mvn quarkus:dev -Pdevelopment
运行开发模式。
如果你使用 Gradle,你可以简单地添加对 quarkusDev
任务的依赖关系:
dependencies {
quarkusDev 'io.quarkus:quarkus-vertx-http'
}
gRPC Server metrics
Enabling metrics collection
当应用程序还使用 quarkus-micrometer
扩展时,gRPC 服务器指标会被自动启用。Micrometer 收集应用程序实现的所有 gRPC 服务的指标。
比如,如果你将指标导出到 Prometheus,你将得到:
# HELP grpc_server_responses_sent_messages_total The total number of responses sent
# TYPE grpc_server_responses_sent_messages_total counter
grpc_server_responses_sent_messages_total{method="SayHello",methodType="UNARY",service="helloworld.Greeter",} 6.0
# HELP grpc_server_processing_duration_seconds The total time taken for the server to complete the call
# TYPE grpc_server_processing_duration_seconds summary
grpc_server_processing_duration_seconds_count{method="SayHello",methodType="UNARY",service="helloworld.Greeter",statusCode="OK",} 6.0
grpc_server_processing_duration_seconds_sum{method="SayHello",methodType="UNARY",service="helloworld.Greeter",statusCode="OK",} 0.016216771
# HELP grpc_server_processing_duration_seconds_max The total time taken for the server to complete the call
# TYPE grpc_server_processing_duration_seconds_max gauge
grpc_server_processing_duration_seconds_max{method="SayHello",methodType="UNARY",service="helloworld.Greeter",statusCode="OK",} 0.007985236
# HELP grpc_server_requests_received_messages_total The total number of requests received
# TYPE grpc_server_requests_received_messages_total counter
grpc_server_requests_received_messages_total{method="SayHello",methodType="UNARY",service="helloworld.Greeter",} 6.0
服务名称、方法和类型可以在 tags 中找到。
Disabling metrics collection
若 要在使用 quarkus-micrometer
时禁用 gRPC 服务器指标,请将以下属性添加到应用程序配置:
quarkus.micrometer.binder.grpc-server.enabled=false
Use virtual threads
若要在你的 gRPC 服务实现中使用虚拟线程,请检查专用的 guide。
gRPC Server authorization
Quarkus 包括内置安全功能,当支持使用现有 Vert.x HTTP 服务器的 Vert.x gRPC 启用时,允许 authorization using annotations。
Add the Quarkus Security extension
安全功能由 Quarkus Security 扩展提供,因此请确保你的 pom.xml
文件包含以下依赖关系:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-security</artifactId>
</dependency>
若要向现有 Maven 项目添加 Quarkus Security 扩展,请从你的项目基目录运行以下命令:
quarkus extension add {add-extension-extensions}
./mvnw quarkus:add-extension -Dextensions='{add-extension-extensions}'
./gradlew addExtension --extensions='{add-extension-extensions}'
Overview of supported authentication mechanisms
一些受支持的身份验证机制内置于 Quarkus 中,而其他机制要求你添加扩展。下表将特定身份验证要求映射到你可以在 Quarkus 中使用的受支持机制:
Authentication requirement | Authentication mechanism |
---|---|
Username and password |
|
Client certificate |
|
Custom requirements |
|
Bearer access token |
不要忘记至少安装一个基于所选身份验证要求提供 IdentityProvider
的扩展。请参考 Basic authentication guide,了解如何基于用户名和密码提供 IdentityProvider
。
如果您使用独立的 HTTP 服务器来处理 gRPC 请求,Custom authentication 是您的唯一选择。将 |
Secure gRPC service
gRPC 服务可以像以下示例中一样,使用 standard security annotations 进行保护:
package org.acme.grpc.auth;
import hello.Greeter;
import io.quarkus.grpc.GrpcService;
import jakarta.annotation.security.RolesAllowed;
@GrpcService
public class HelloService implements Greeter {
@RolesAllowed("admin")
@Override
public Uni<HelloReply> sayHello(HelloRequest request) {
return Uni.createFrom().item(() ->
HelloReply.newBuilder().setMessage("Hello " + request.getName()).build()
);
}
}
支持的大多数机制示例都发送了身份验证标头,请参阅使用 gRPC 服务指南的 gRPC Headers 部分,以了解更多有关 gRPC 标头的信息。
Basic authentication
Quarkus Security 为 Basic authentication 提供内置的身份验证支持。
quarkus.grpc.server.use-separate-server=false
quarkus.http.auth.basic=true 1
1 | Enable the Basic authentication. |
package org.acme.grpc.auth;
import static org.assertj.core.api.Assertions.assertThat;
import io.grpc.Metadata;
import io.quarkus.grpc.GrpcClient;
import io.quarkus.grpc.GrpcClientUtils;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.Test;
public class HelloServiceTest implements Greeter {
@GrpcClient
Greeter greeterClient;
@Test
void shouldReturnHello() {
Metadata headers = new Metadata();
headers.put("Authorization", "Basic am9objpqb2hu");
var client = GrpcClientUtils.attachHeaders(greeterClient, headers);
CompletableFuture<String> message = new CompletableFuture<>();
client.sayHello(HelloRequest.newBuilder().setName("Quarkus").build())
.subscribe().with(reply -> message.complete(reply.getMessage()));
assertThat(message.get(5, TimeUnit.SECONDS)).isEqualTo("Hello Quarkus");
}
}
Mutual TLS authentication
Quarkus 提供相互TLS (mTLS) 身份验证,以便您可以根据用户的 X.509 证书对用户进行身份验证。在本指南的 TLS with Mutual Auth 部分中,描述了对您的所有 gRPC 服务强制执行身份验证的最简单方法。但是,Quarkus Security 支持角色映射功能,您可以使用此功能执行更细粒度的访问控制。
quarkus.grpc.server.use-separate-server=false
quarkus.http.insecure-requests=disabled
quarkus.http.ssl.certificate.files=tls/server.pem
quarkus.http.ssl.certificate.key-files=tls/server.key
quarkus.http.ssl.certificate.trust-store-file=tls/ca.jks
quarkus.http.ssl.certificate.trust-store-password=**********
quarkus.http.ssl.client-auth=required
quarkus.http.auth.certificate-role-properties=role-mappings.txt 1
quarkus.native.additional-build-args=-H:IncludeResources=.*\\.txt
1 | Adds certificate role mapping. |
testclient=admin 1
1 | 将 testclient 证书 CN(公用名称)映射到 SecurityIdentity 角色 admin 。 |
Custom authentication
如果您始终实现一个或多个 GrpcSecurityMechanism
bean,如果 Quarkus 提供的上文提到的机制无法满足您的需求,可以执行此操作。
GrpcSecurityMechanism
package org.acme.grpc.auth;
import jakarta.inject.Singleton;
import io.grpc.Metadata;
import io.quarkus.security.credential.PasswordCredential;
import io.quarkus.security.identity.request.AuthenticationRequest;
import io.quarkus.security.identity.request.UsernamePasswordAuthenticationRequest;
@Singleton
public class CustomGrpcSecurityMechanism implements GrpcSecurityMechanism {
private static final String AUTHORIZATION = "Authorization";
@Override
public boolean handles(Metadata metadata) {
String authString = metadata.get(AUTHORIZATION);
return authString != null && authString.startsWith("Custom ");
}
@Override
public AuthenticationRequest createAuthenticationRequest(Metadata metadata) {
final String authString = metadata.get(AUTHORIZATION);
final String userName;
final String password;
// here comes your application logic that transforms 'authString' to user name and password
return new UsernamePasswordAuthenticationRequest(userName, new PasswordCredential(password));
}
}