Consuming a gRPC Service

gRPC 客户端可以注入到你的应用程序代码中。

消费 gRPC 服务需要生成 gRPC 类。将 proto 文件放在 src/main/proto 中并运行 mvn compile

Stubs and Injection

gRPC 生成提供多个存根,为消费 gRPC 服务提供不同的方法。你可以注入:

  • 使用 Mutiny API 的服务接口,

  • 使用 gRPC API 的阻塞存根,

  • 基于 Mutiny 的响应存根,

  • gRPC io.grpc.Channel,它允许你创建其他类型的存根。

import io.quarkus.grpc.GrpcClient;

import hello.Greeter;
import hello.GreeterGrpc.GreeterBlockingStub;
import hello.MutinyGreeterGrpc.MutinyGreeterStub;

class MyBean {

   // A service interface using the Mutiny API
   @GrpcClient("helloService")                   (1)
   Greeter greeter;

   // A reactive stub based on Mutiny
   @GrpcClient("helloService")
   MutinyGreeterGrpc.MutinyGreeterStub mutiny;

   // A blocking stub using the gRPC API
   @GrpcClient
   GreeterGrpc.GreeterBlockingStub helloService; (2)

   @GrpcClient("hello-service")
   Channel channel;

}
1 gRPC 客户端注入点必须用 @GrpcClient 限定符进行注释。此限定符可用于指定用于配置底层 gRPC 客户端的名称。例如,如果你将其设置为 hello-service,那么使用 quarkus.grpc.clients.hello-service.host 配置服务的主机。
2 如果名称不是通过 GrpcClient#value() 指定的,那么将使用字段名称,例如,在本例中为 helloService

存根类名称派生自 proto 文件中使用的服务名称。例如,如果你使用 Greeter 作为服务名称,如下所示:

option java_package = "hello";

service Greeter {
    rpc SayHello (HelloRequest) returns (HelloReply) {}
}

那么服务接口名称为: hello.Greeter,Mutiny 存根名称为: hello.MutinyGreeterGrpc.MutinyGreeterStub,阻塞存根名称为: hello.GreeterGrpc.GreeterBlockingStub

Examples

Service Interface

import io.quarkus.grpc.GrpcClient;
import io.smallrye.mutiny.Uni;

import hello.Greeter;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/hello")
public class ExampleResource {

   @GrpcClient 1
   Greeter hello;

   @GET
   @Path("/mutiny/{name}")
   public Uni<String> helloMutiny(String name) {
      return hello.sayHello(HelloRequest.newBuilder().setName(name).build())
            .onItem().transform(HelloReply::getMessage);
   }
}
1 服务名称派生自注入点 - 使用字段名称。必须设置 quarkus.grpc.clients.hello.host 属性。

Blocking Stub

import io.quarkus.grpc.GrpcClient;

import hello.GreeterGrpc.GreeterBlockingStub;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/hello")
public class ExampleResource {

   @GrpcClient("hello") 1
   GreeterGrpc.GreeterBlockingStub blockingHelloService;

   @GET
   @Path("/blocking/{name}")
   public String helloBlocking(String name) {
      return blockingHelloService.sayHello(HelloRequest.newBuilder().setName(name).build()).getMessage();
   }
}
1 必须设置 quarkus.grpc.clients.hello.host 属性。

Handling streams

gRPC 允许发送和接收流:

service Streaming {
    rpc Source(Empty) returns (stream Item) {} // Returns a stream
    rpc Sink(stream Item) returns (Empty) {}   // Reads a stream
    rpc Pipe(stream Item) returns (stream Item) {}  // Reads a streams and return a streams
}

使用 Mutiny 存根,你可以按如下方式与这些流交互:

package io.quarkus.grpc.example.streaming;

import io.grpc.examples.streaming.Empty;
import io.grpc.examples.streaming.Item;
import io.grpc.examples.streaming.MutinyStreamingGrpc;
import io.quarkus.grpc.GrpcClient;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/streaming")
@Produces(MediaType.APPLICATION_JSON)
public class StreamingEndpoint {

    @GrpcClient
    MutinyStreamingGrpc.MutinyStreamingStub streaming;

    @GET
    public Multi<String> invokeSource() {
        // Retrieve a stream
        return streaming.source(Empty.newBuilder().build())
                .onItem().transform(Item::getValue);
    }

    @GET
    @Path("sink/{max}")
    public Uni<Void> invokeSink(int max) {
        // Send a stream and wait for completion
        Multi<Item> inputs = Multi.createFrom().range(0, max)
                .map(i -> Integer.toString(i))
                .map(i -> Item.newBuilder().setValue(i).build());
        return streaming.sink(inputs).onItem().ignore().andContinueWithNull();
    }

    @GET
    @Path("/{max}")
    public Multi<String> invokePipe(int max) {
        // Send a stream and retrieve a stream
        Multi<Item> inputs = Multi.createFrom().range(0, max)
                .map(i -> Integer.toString(i))
                .map(i -> Item.newBuilder().setValue(i).build());
        return streaming.pipe(inputs).onItem().transform(Item::getValue);
    }

}

Client configuration

对于你在应用程序中注入的每个 gRPC 服务,你可以配置以下属性:

Unresolved include directive in modules/ROOT/pages/grpc-service-consumption.adoc - include::../../../target/quarkus-generated-doc/config/quarkus-grpc_quarkus.grpc-client.adoc[]

`client-name`是在 `@GrpcClient`中设置的名称,或者如果没有显式定义,则派生自注入点。

以下示例使用 _hello_作为客户端名称。别忘了使用 `@GrpcClient`注解中使用的名称替换它。

启用 `quarkus.grpc.clients."client-name".xds.enabled`后,xDS 应处理上面配置的大部分内容。

Enabling TLS

启用 TLS,使用以下配置。请注意,配置中的所有路径都可以指定类路径(通常来自 `src/main/resources`或其子文件夹)上的资源或外部文件。

quarkus.grpc.clients.hello.host=localhost

# either a path to a classpath resource or to a file:
quarkus.grpc.clients.hello.ssl.trust-store=tls/ca.pem

配置 SSL/TLS 后,`plain-text`将自动禁用。

TLS with Mutual Auth

与相互认证一起使用 TLS,使用以下配置:

quarkus.grpc.clients.hello.host=localhost
quarkus.grpc.clients.hello.plain-text=false

# all the following may use either a path to a classpath resource or to a file:
quarkus.grpc.clients.hello.ssl.certificate=tls/client.pem
quarkus.grpc.clients.hello.ssl.key=tls/client.key
quarkus.grpc.clients.hello.ssl.trust-store=tls/ca.pem

Client Stub Deadlines

如果你需要为 gRPC 存根配置一个截止时间,即指定一个时段,之后存根将始终返回状态错误 DEADLINE_EXCEEDED。你可以通过 `quarkus.grpc.clients."service-name".deadline`配置属性指定截止时间,例如:

quarkus.grpc.clients.hello.host=localhost
quarkus.grpc.clients.hello.deadline=2s 1
1 设置所有注入存根的截止时间。

不要使用此功能来实现 RPC 超时。为了实现 RPC 超时,可以使用 Mutiny call.ifNoItem().after(…​)`或故障容错 `@Timeout

gRPC Headers

类似于 HTTP,gRPC 调用除了消息外,还可以携带标头。标头在身份验证中很有用。

为 gRPC 调用设置标头,创建一个已附加标头的客户端,然后在此客户端上执行调用:

import jakarta.enterprise.context.ApplicationScoped;

import examples.Greeter;
import examples.HelloReply;
import examples.HelloRequest;
import io.grpc.Metadata;
import io.quarkus.grpc.GrpcClient;
import io.quarkus.grpc.GrpcClientUtils;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class MyService {
    @GrpcClient
    Greeter client;

    public Uni<HelloReply> doTheCall() {
        Metadata extraHeaders = new Metadata();
        if (headers) {
            extraHeaders.put("my-header", "my-interface-value");
        }

        Greeter alteredClient = GrpcClientUtils.attachHeaders(client, extraHeaders); (1)

        return alteredClient.sayHello(HelloRequest.newBuilder().setName(name).build()); (2)
    }
}
1 修改客户端以执行附加 `extraHeaders`的调用
2 使用修改后的客户端执行调用。原始客户端保持不变

`GrpcClientUtils`适用于各种客户端。

Client Interceptors

gRPC 客户端拦截器可以通过 CDI Bean 实现,其中还实现了 io.grpc.ClientInterceptor`接口。你可以使用 `@io.quarkus.grpc.RegisterClientInterceptor`为注入的客户端添加注释以注册针对特定客户端实例指定的拦截器。@RegisterClientInterceptor`注释是可重复的。或者,如果你想将拦截器应用于任何注入的客户端,则使用 `@io.quarkus.grpc.GlobalInterceptor`为拦截器 Bean 添加注释。

Global Client Interceptor Example
import io.quarkus.grpc.GlobalInterceptor;

import io.grpc.ClientInterceptor;

@GlobalInterceptor 1
@ApplicationScoped
public class MyInterceptor implements ClientInterceptor {

    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
            CallOptions callOptions, Channel next) {
       // ...
    }
}
1 该拦截器应用于所有注入的 gRPC 客户端。

还可以在生产者方法中添加注释作为全局拦截器:

import io.quarkus.grpc.GlobalInterceptor;

import jakarta.enterprise.inject.Produces;

public class MyProducer {
    @GlobalInterceptor
    @Produces
    public MyInterceptor myInterceptor() {
        return new MyInterceptor();
    }
}

查看 ClientInterceptor JavaDoc以正确实现你的拦截器。

@RegisterClientInterceptor Example
import io.quarkus.grpc.GrpcClient;
import io.quarkus.grpc.RegisterClientInterceptor;

import hello.Greeter;

@ApplicationScoped
class MyBean {

    @RegisterClientInterceptor(MySpecialInterceptor.class) 1
    @GrpcClient("helloService")
    Greeter greeter;
}
1 为这个特定客户端注册 MySpecialInterceptor

当你有多个客户端拦截器时,你可以通过实现 jakarta.enterprise.inject.spi.Prioritized 接口对它们进行排序:

@ApplicationScoped
public class MyInterceptor implements ClientInterceptor, Prioritized {

    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
            CallOptions callOptions, Channel next) {
       // ...
    }

    @Override
    public int getPriority() {
        return 10;
    }
}

具有最高优先级的拦截器被首先调用。如果拦截器没有实现 Prioritized 接口,使用的默认优先级为 0

gRPC Client metrics

Enabling metrics collection

当应用程序同时也使用了 quarkus-micrometer 扩展时,gRPC 客户端指标功能被自动启用。Micrometer 收集应用程序使用的所有 gRPC 客户端的指标。

比如,如果你将指标导出到 Prometheus,你将得到:

# HELP grpc_client_responses_received_messages_total The total number of responses received
# TYPE grpc_client_responses_received_messages_total counter
grpc_client_responses_received_messages_total{method="SayHello",methodType="UNARY",service="helloworld.Greeter",} 6.0
# HELP grpc_client_requests_sent_messages_total The total number of requests sent
# TYPE grpc_client_requests_sent_messages_total counter
grpc_client_requests_sent_messages_total{method="SayHello",methodType="UNARY",service="helloworld.Greeter",} 6.0
# HELP grpc_client_processing_duration_seconds The total time taken for the client to complete the call, including network delay
# TYPE grpc_client_processing_duration_seconds summary
grpc_client_processing_duration_seconds_count{method="SayHello",methodType="UNARY",service="helloworld.Greeter",statusCode="OK",} 6.0
grpc_client_processing_duration_seconds_sum{method="SayHello",methodType="UNARY",service="helloworld.Greeter",statusCode="OK",} 0.167411625
# HELP grpc_client_processing_duration_seconds_max The total time taken for the client to complete the call, including network delay
# TYPE grpc_client_processing_duration_seconds_max gauge
grpc_client_processing_duration_seconds_max{method="SayHello",methodType="UNARY",service="helloworld.Greeter",statusCode="OK",} 0.136478028

服务名称、方法和类型可以在 tags 中找到。

Disabling metrics collection

要在使用 quarkus-micrometer 时禁用 gRPC 客户端指标,请将下列属性添加到应用程序配置:

quarkus.micrometer.binder.grpc-client.enabled=false

Custom exception handling

如果任何 gRPC 服务或服务器拦截器抛出(自定义)异常,你可以将你自己的 ExceptionHandlerProvider 添加为应用程序中的 CDI bean,以提供对该异常的自定义处理。

例如:

@ApplicationScoped
public class HelloExceptionHandlerProvider implements ExceptionHandlerProvider {
    @Override
    public <ReqT, RespT> ExceptionHandler<ReqT, RespT> createHandler(ServerCall.Listener<ReqT> listener,
            ServerCall<ReqT, RespT> serverCall, Metadata metadata) {
        return new HelloExceptionHandler<>(listener, serverCall, metadata);
    }

    @Override
    public Throwable transform(Throwable t) {
        if (t instanceof HelloException he) {
            return new StatusRuntimeException(Status.ABORTED.withDescription(he.getName()));
        } else {
            return ExceptionHandlerProvider.toStatusException(t, true);
        }
    }

    private static class HelloExceptionHandler<A, B> extends ExceptionHandler<A, B> {
        public HelloExceptionHandler(ServerCall.Listener<A> listener, ServerCall<A, B> call, Metadata metadata) {
            super(listener, call, metadata);
        }

        @Override
        protected void handleException(Throwable t, ServerCall<A, B> call, Metadata metadata) {
            StatusRuntimeException sre = (StatusRuntimeException) ExceptionHandlerProvider.toStatusException(t, true);
            Metadata trailers = sre.getTrailers() != null ? sre.getTrailers() : metadata;
            call.close(sre.getStatus(), trailers);
        }
    }
}

Dev Mode

默认情况下,在 dev 模式下启动应用程序,即使未配置任何服务,也会启动 gRPC 服务器。可以使用以下属性配置 gRPC 扩展的 dev 模式行为。

Unresolved include directive in modules/ROOT/pages/grpc-service-consumption.adoc - include::../../../target/quarkus-generated-doc/config/quarkus-grpc_quarkus.grpc.dev-mode.adoc[]

Inject mock clients

在你的 @QuarkusTest 中,你可以使用 @InjectMock 注入 gRPC 服务的 Mutiny 客户端:

@QuarkusTest
public class GrpcMockTest {

    @InjectMock
    @GrpcClient("hello")
    Greeter greeter;

    @Test
    void test1() {
        HelloRequest request = HelloRequest.newBuilder().setName("neo").build();
        Mockito.when(greeter.sayHello(Mockito.any(HelloRequest.class)))
                .thenReturn(Uni.createFrom().item(HelloReply.newBuilder().setMessage("hello neo").build()));
        Assertions.assertEquals(greeter.sayHello(request).await().indefinitely().getMessage(), "hello neo");
    }
}

只有 Mutiny 客户端可以 mocked,通道和其他存根无法被模拟。