Implementing a gRPC Service

Generated Code

Quarkus 会为在 proto 文件中声明的服务生成一些实现类:

  1. 一个使用 Mutiny API 的 service interface

    • 类名是 ${JAVA_PACKAGE}.${NAME_OF_THE_SERVICE}

  2. 一个使用 gRPC API 的 implementation base

    • 类名结构如下: ${JAVA_PACKAGE}.${NAME_OF_THE_SERVICE}Grpc.${NAME_OF_THE_SERVICE}ImplBase

例如,如果您使用以下 proto 文件片段:

option java_package = "hello"; 1

service Greeter { 2
    rpc SayHello (HelloRequest) returns (HelloReply) {}
}
1 hello 是生成类的 Java 包。
2 Greeter 是服务名称。

然后服务接口是 hello.Greeter,实现基础是抽象静态嵌套类: hello.GreeterGrpc.GreeterImplBase

您需要实现 service interface 或使用服务实现 Bean 扩展 base class,如以下部分所述。

Implementing a Service with the Mutiny API

要使用 Mutiny API 实现 gRPC 服务,创建一个实现服务接口的类。然后,实现服务接口中定义的方法。如果您不想实现服务方法,只需从方法体中抛出 java.lang.UnsupportedOperationException (异常将自动转换为适当的 gRPC 异常)。最后,实现服务并添加 @GrpcService 注释:

import io.quarkus.grpc.GrpcService;
import hello.Greeter;

@GrpcService 1
public class HelloService implements Greeter { 2

    @Override
    public Uni<HelloReply> sayHello(HelloRequest request) {
        return Uni.createFrom().item(() ->
                HelloReply.newBuilder().setMessage("Hello " + request.getName()).build()
        );
    }
}
1 gRPC 服务实现 Bean 必须使用 @GrpcService 注释进行注释,并且不应声明任何其他 CDI 限定符。所有 gRPC 服务都有 jakarta.inject.Singleton 作用域。此外,请求上下文在服务调用期间始终处于活动状态。
2 hello.Greeter 是一款生成的服务器接口。

服务器实现 bean 还可以扩展 Mutiny 实现基,类名的结构如下:Mutiny${NAME_OF_THE_SERVICE}Grpc.${NAME_OF_THE_SERVICE}ImplBase

Implementing a Service with the default gRPC API

要使用默认 gRPC API 来实现 gRPC 服务器,请创建一个扩展默认实现基的类。然后,重写服务器接口中定义的方法。最后,实现服务器并添加 @GrpcService 注释:

import io.quarkus.grpc.GrpcService;

@GrpcService
public class HelloService extends GreeterGrpc.GreeterImplBase {

    @Override
    public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
        String name = request.getName();
        String message = "Hello " + name;
        responseObserver.onNext(HelloReply.newBuilder().setMessage(message).build());
        responseObserver.onCompleted();
    }
}

Blocking Service Implementation

默认情况下,gRPC 服务器的所有方法都会在事件循环上运行。因此,您必须 not 块。如果您的服务器逻辑必须阻塞,请使用 `io.smallrye.common.annotation.Blocking`注释该方法:

@Override
@Blocking
public Uni<HelloReply> sayHelloBlocking(HelloRequest request) {
    // Do something blocking before returning the Uni
}

Handling Streams

gRPC 允许接收和返回流:

service Streaming {
    rpc Source(Empty) returns (stream Item) {} // Returns a stream
    rpc Sink(stream Item) returns (Empty) {}   // Reads a stream
    rpc Pipe(stream Item) returns (stream Item) {}  // Reads a streams and return a streams
}

使用 Mutiny,您可以实现如下操作:

import io.quarkus.grpc.GrpcService;

@GrpcService
public class StreamingService implements Streaming {

    @Override
    public Multi<Item> source(Empty request) {
        // Just returns a stream emitting an item every 2ms and stopping after 10 items.
        return Multi.createFrom().ticks().every(Duration.ofMillis(2))
                .select().first(10)
                .map(l -> Item.newBuilder().setValue(Long.toString(l)).build());
    }

    @Override
    public Uni<Empty> sink(Multi<Item> request) {
        // Reads the incoming streams, consume all the items.
        return request
                .map(Item::getValue)
                .map(Long::parseLong)
                .collect().last()
                .map(l -> Empty.newBuilder().build());
    }

    @Override
    public Multi<Item> pipe(Multi<Item> request) {
        // Reads the incoming stream, compute a sum and return the cumulative results
        // in the outbound stream.
        return request
                .map(Item::getValue)
                .map(Long::parseLong)
                .onItem().scan(() -> 0L, Long::sum)
                .onItem().transform(l -> Item.newBuilder().setValue(Long.toString(l)).build());
    }
}

Health Check

对于已实现的服务器,Quarkus gRPC 会以以下格式公开健康信息:

syntax = "proto3";

package grpc.health.v1;

message HealthCheckRequest {
  string service = 1;
}

message HealthCheckResponse {
  enum ServingStatus {
    UNKNOWN = 0;
    SERVING = 1;
    NOT_SERVING = 2;
  }
  ServingStatus status = 1;
}

service Health {
  rpc Check(HealthCheckRequest) returns (HealthCheckResponse);

  rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}

客户端可以指定完全限定的服务器名称以获取特定服务器的运行状况,或者跳过指定服务器名称以获取 gRPC 服务器的常规状态。

有关更多详细信息,请参阅 gRPC documentation

此外,如果将 Quarkus SmallRye Health 添加到应用程序,则将在 MicroProfile Health 端点响应中添加针对 gRPC 服务器状态的就绪检查,即 /q/health

Reflection Service

Quarkus gRPC 服务器实现了 reflection service。此服务器允许 grpcurlgrpcox 等工具与您的服务器进行交互。

dev 模式下,默认情况下启用了反射服务器。在测试或生产模式下,您需要通过将 quarkus.grpc.server.enable-reflection-service 设置为 true 来明确启用它。

Quarkus 公开反射服务器 v1v1alpha

Scaling

默认情况下,quarkus-grpc 会启动一个在单个事件循环上运行的单个 gRPC 服务器。

如果您希望扩展服务器,您可以通过设置 quarkus.grpc.server.instances 来设置服务器实例的数量。

Server Configuration

Unresolved include directive in modules/ROOT/pages/grpc-service-implementation.adoc - include::../../../target/quarkus-generated-doc/config/quarkus-grpc_quarkus.grpc.server.adoc[]

当您禁用 quarkus.grpc.server.use-separate-server 时,您正在使用新的 Vert.x gRPC 服务器实现,它使用了现有的 HTTP 服务器。这意味着服务器端口现在是 8080 (或使用 quarkus.http.port 配置的端口)。此外,大多数其他配置属性不再适用,因为应正确配置 HTTP 服务器。

当您启用 quarkus.grpc.server.xds.enabled 时,xDS 应处理上述大部分配置。

Example of Configuration

Enabling TLS

要启用 TLS,请使用以下配置。

请注意,配置中的所有路径都指定类路径上的资源(通常来自 src/main/resources 或其子文件夹)或外部文件。

quarkus.grpc.server.ssl.certificate=tls/server.pem
quarkus.grpc.server.ssl.key=tls/server.key

配置 SSL/TLS 后,`plain-text`将自动禁用。

TLS with Mutual Auth

与相互认证一起使用 TLS,使用以下配置:

quarkus.grpc.server.ssl.certificate=tls/server.pem
quarkus.grpc.server.ssl.key=tls/server.key
quarkus.grpc.server.ssl.trust-store=tls/ca.jks
quarkus.grpc.server.ssl.trust-store-password=*****
quarkus.grpc.server.ssl.client-auth=REQUIRED

Server Interceptors

gRPC 服务器拦截器允许你在调用服务之前执行逻辑,如验证。

你可以通过创建实现 io.grpc.ServerInterceptor@ApplicationScoped Bean 来实现 gRPC 服务器拦截器:

@ApplicationScoped
// add @GlobalInterceptor for interceptors meant to be invoked for every service
public class MyInterceptor implements ServerInterceptor {

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall,
            Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
        // ...
    }
}

还可以在生产者方法中添加注释作为全局拦截器:

import io.quarkus.grpc.GlobalInterceptor;

import jakarta.enterprise.inject.Produces;

public class MyProducer {
    @GlobalInterceptor
    @Produces
    public MyInterceptor myInterceptor() {
        return new MyInterceptor();
    }
}

检查 ServerInterceptor JavaDoc 以正确实现你的拦截器。

若要将拦截器应用到所有公开服务,请使用 @io.quarkus.grpc.GlobalInterceptor 标注它。若要将拦截器应用到单个服务,请使用 @io.quarkus.grpc.RegisterInterceptor 在服务中注册它:

import io.quarkus.grpc.GrpcService;
import io.quarkus.grpc.RegisterInterceptor;

@GrpcService
@RegisterInterceptor(MyInterceptor.class)
public class StreamingService implements Streaming {
    // ...
}

如果你有多个服务器拦截器,则可以通过实现 jakarta.enterprise.inject.spi.Prioritized 接口来对它们排序。请注意,在调用特定于服务拦截器之前调用所有全局拦截器。

@ApplicationScoped
public class MyInterceptor implements ServerInterceptor, Prioritized {

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall,
            Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
        // ...
    }

    @Override
    public int getPriority() {
        return 10;
    }
}

具有最高优先级的拦截器被首先调用。如果拦截器没有实现 Prioritized 接口,使用的默认优先级为 0

Testing your services

测试 gRPC 服务最简单的方法是使用 gRPC 客户端,如 Consuming a gRPC Service 中所示。

请注意,在使用客户端测试未使用 TLS 的公开服务的情况下,无需提供任何配置。例如,若要测试上面定义的 HelloService,可以创建以下测试:

public class HelloServiceTest implements Greeter {

    @GrpcClient
    Greeter client;

    @Test
    void shouldReturnHello() {
        CompletableFuture<String> message = new CompletableFuture<>();
        client.sayHello(HelloRequest.newBuilder().setName("Quarkus").build())
                .subscribe().with(reply -> message.complete(reply.getMessage()));
        assertThat(message.get(5, TimeUnit.SECONDS)).isEqualTo("Hello Quarkus");
    }
}

Trying out your services manually

在开发模式下,你可以在 Quarkus Dev UI 中体验你的 gRPC 服务。只需转到 [role="bare"][role="bare"]http://localhost:8080/q/dev-ui 并单击 gRPC 磁贴下的 Services

请注意,你的应用程序需要公开“正常”HTTP 端口才能访问 Dev UI。如果你的应用程序不公开任何 HTTP 端点,你可以创建一个具有 quarkus-vertx-http 依赖性的专用概要文件:

    <profiles>
        <profile>
            <id>development</id>
            <dependencies>
                <dependency>
                    <groupId>io.quarkus</groupId>
                    <artifactId>quarkus-vertx-http</artifactId>
                </dependency>
            </dependencies>
        </profile>
    </profiles>

拥有它之后,你可以使用 mvn quarkus:dev -Pdevelopment 运行开发模式。

如果你使用 Gradle,你可以简单地添加对 quarkusDev 任务的依赖关系:

dependencies {
    quarkusDev 'io.quarkus:quarkus-vertx-http'
}

gRPC Server metrics

Enabling metrics collection

当应用程序还使用 quarkus-micrometer 扩展时,gRPC 服务器指标会被自动启用。Micrometer 收集应用程序实现的所有 gRPC 服务的指标。

比如,如果你将指标导出到 Prometheus,你将得到:

# HELP grpc_server_responses_sent_messages_total The total number of responses sent
# TYPE grpc_server_responses_sent_messages_total counter
grpc_server_responses_sent_messages_total{method="SayHello",methodType="UNARY",service="helloworld.Greeter",} 6.0
# HELP grpc_server_processing_duration_seconds The total time taken for the server to complete the call
# TYPE grpc_server_processing_duration_seconds summary
grpc_server_processing_duration_seconds_count{method="SayHello",methodType="UNARY",service="helloworld.Greeter",statusCode="OK",} 6.0
grpc_server_processing_duration_seconds_sum{method="SayHello",methodType="UNARY",service="helloworld.Greeter",statusCode="OK",} 0.016216771
# HELP grpc_server_processing_duration_seconds_max The total time taken for the server to complete the call
# TYPE grpc_server_processing_duration_seconds_max gauge
grpc_server_processing_duration_seconds_max{method="SayHello",methodType="UNARY",service="helloworld.Greeter",statusCode="OK",} 0.007985236
# HELP grpc_server_requests_received_messages_total The total number of requests received
# TYPE grpc_server_requests_received_messages_total counter
grpc_server_requests_received_messages_total{method="SayHello",methodType="UNARY",service="helloworld.Greeter",} 6.0

服务名称、方法和类型可以在 tags 中找到。

Disabling metrics collection

若 要在使用 quarkus-micrometer 时禁用 gRPC 服务器指标,请将以下属性添加到应用程序配置:

quarkus.micrometer.binder.grpc-server.enabled=false

Use virtual threads

若要在你的 gRPC 服务实现中使用虚拟线程,请检查专用的 guide

gRPC Server authorization

Quarkus 包括内置安全功能,当支持使用现有 Vert.x HTTP 服务器的 Vert.x gRPC 启用时,允许 authorization using annotations

Add the Quarkus Security extension

安全功能由 Quarkus Security 扩展提供,因此请确保你的 pom.xml 文件包含以下依赖关系:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-security</artifactId>
</dependency>

若要向现有 Maven 项目添加 Quarkus Security 扩展,请从你的项目基目录运行以下命令:

CLI
quarkus extension add {add-extension-extensions}
Maven
./mvnw quarkus:add-extension -Dextensions='{add-extension-extensions}'
Gradle
./gradlew addExtension --extensions='{add-extension-extensions}'

Overview of supported authentication mechanisms

一些受支持的身份验证机制内置于 Quarkus 中,而其他机制要求你添加扩展。下表将特定身份验证要求映射到你可以在 Quarkus 中使用的受支持机制:

Table 1. Authentication requirements and mechanisms
Authentication requirement Authentication mechanism

Username and password

Basic authentication

Client certificate

Mutual TLS authentication

Custom requirements

Custom authentication

Bearer access token

OIDC Bearer token authenticationJWTOAuth2

不要忘记至少安装一个基于所选身份验证要求提供 IdentityProvider 的扩展。请参考 Basic authentication guide,了解如何基于用户名和密码提供 IdentityProvider

如果您使用独立的 HTTP 服务器来处理 gRPC 请求,Custom authentication 是您的唯一选择。将 quarkus.grpc.server.use-separate-server 配置属性设置为 false,以便您可以使用其他机制。

Secure gRPC service

gRPC 服务可以像以下示例中一样,使用 standard security annotations 进行保护:

package org.acme.grpc.auth;

import hello.Greeter;
import io.quarkus.grpc.GrpcService;
import jakarta.annotation.security.RolesAllowed;

@GrpcService
public class HelloService implements Greeter {

    @RolesAllowed("admin")
    @Override
    public Uni<HelloReply> sayHello(HelloRequest request) {
        return Uni.createFrom().item(() ->
                HelloReply.newBuilder().setMessage("Hello " + request.getName()).build()
        );
    }
}

支持的大多数机制示例都发送了身份验证标头,请参阅使用 gRPC 服务指南的 gRPC Headers 部分,以了解更多有关 gRPC 标头的信息。

Basic authentication

Quarkus Security 为 Basic authentication 提供内置的身份验证支持。

quarkus.grpc.server.use-separate-server=false
quarkus.http.auth.basic=true 1
1 Enable the Basic authentication.
package org.acme.grpc.auth;

import static org.assertj.core.api.Assertions.assertThat;

import io.grpc.Metadata;
import io.quarkus.grpc.GrpcClient;
import io.quarkus.grpc.GrpcClientUtils;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.Test;

public class HelloServiceTest implements Greeter {

    @GrpcClient
    Greeter greeterClient;

    @Test
    void shouldReturnHello() {
        Metadata headers = new Metadata();
        headers.put("Authorization", "Basic am9objpqb2hu");
        var client = GrpcClientUtils.attachHeaders(greeterClient, headers);

        CompletableFuture<String> message = new CompletableFuture<>();
        client.sayHello(HelloRequest.newBuilder().setName("Quarkus").build())
                .subscribe().with(reply -> message.complete(reply.getMessage()));
        assertThat(message.get(5, TimeUnit.SECONDS)).isEqualTo("Hello Quarkus");
    }
}

Mutual TLS authentication

Quarkus 提供相互TLS (mTLS) 身份验证,以便您可以根据用户的 X.509 证书对用户进行身份验证。在本指南的 TLS with Mutual Auth 部分中,描述了对您的所有 gRPC 服务强制执行身份验证的最简单方法。但是,Quarkus Security 支持角色映射功能,您可以使用此功能执行更细粒度的访问控制。

quarkus.grpc.server.use-separate-server=false
quarkus.http.insecure-requests=disabled
quarkus.http.ssl.certificate.files=tls/server.pem
quarkus.http.ssl.certificate.key-files=tls/server.key
quarkus.http.ssl.certificate.trust-store-file=tls/ca.jks
quarkus.http.ssl.certificate.trust-store-password=**********
quarkus.http.ssl.client-auth=required
quarkus.http.auth.certificate-role-properties=role-mappings.txt     1
quarkus.native.additional-build-args=-H:IncludeResources=.*\\.txt
1 Adds certificate role mapping.
Example of the role mapping file
testclient=admin 1
1 testclient 证书 CN(公用名称)映射到 SecurityIdentity 角色 admin

Custom authentication

如果您始终实现一个或多个 GrpcSecurityMechanism bean,如果 Quarkus 提供的上文提到的机制无法满足您的需求,可以执行此操作。

Example of custom GrpcSecurityMechanism
package org.acme.grpc.auth;

import jakarta.inject.Singleton;

import io.grpc.Metadata;
import io.quarkus.security.credential.PasswordCredential;
import io.quarkus.security.identity.request.AuthenticationRequest;
import io.quarkus.security.identity.request.UsernamePasswordAuthenticationRequest;

@Singleton
public class CustomGrpcSecurityMechanism implements GrpcSecurityMechanism {

    private static final String AUTHORIZATION = "Authorization";

    @Override
    public boolean handles(Metadata metadata) {
        String authString = metadata.get(AUTHORIZATION);
        return authString != null && authString.startsWith("Custom ");
    }

    @Override
    public AuthenticationRequest createAuthenticationRequest(Metadata metadata) {
        final String authString = metadata.get(AUTHORIZATION);
        final String userName;
        final String password;
        // here comes your application logic that transforms 'authString' to user name and password
        return new UsernamePasswordAuthenticationRequest(userName, new PasswordCredential(password));
    }
}