Consuming a gRPC Service
gRPC 客户端可以注入到你的应用程序代码中。
消费 gRPC 服务需要生成 gRPC 类。将 proto
文件放在 src/main/proto
中并运行 mvn compile
。
Stubs and Injection
gRPC 生成提供多个存根,为消费 gRPC 服务提供不同的方法。你可以注入:
-
使用 Mutiny API 的服务接口,
-
使用 gRPC API 的阻塞存根,
-
基于 Mutiny 的响应存根,
-
gRPC
io.grpc.Channel
,它允许你创建其他类型的存根。
import io.quarkus.grpc.GrpcClient;
import hello.Greeter;
import hello.GreeterGrpc.GreeterBlockingStub;
import hello.MutinyGreeterGrpc.MutinyGreeterStub;
class MyBean {
// A service interface using the Mutiny API
@GrpcClient("helloService") (1)
Greeter greeter;
// A reactive stub based on Mutiny
@GrpcClient("helloService")
MutinyGreeterGrpc.MutinyGreeterStub mutiny;
// A blocking stub using the gRPC API
@GrpcClient
GreeterGrpc.GreeterBlockingStub helloService; (2)
@GrpcClient("hello-service")
Channel channel;
}
1 | gRPC 客户端注入点必须用 @GrpcClient 限定符进行注释。此限定符可用于指定用于配置底层 gRPC 客户端的名称。例如,如果你将其设置为 hello-service ,那么使用 quarkus.grpc.clients.hello-service.host 配置服务的主机。 |
2 | 如果名称不是通过 GrpcClient#value() 指定的,那么将使用字段名称,例如,在本例中为 helloService 。 |
存根类名称派生自 proto
文件中使用的服务名称。例如,如果你使用 Greeter
作为服务名称,如下所示:
option java_package = "hello";
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
那么服务接口名称为: hello.Greeter
,Mutiny 存根名称为: hello.MutinyGreeterGrpc.MutinyGreeterStub
,阻塞存根名称为: hello.GreeterGrpc.GreeterBlockingStub
。
Examples
Service Interface
import io.quarkus.grpc.GrpcClient;
import io.smallrye.mutiny.Uni;
import hello.Greeter;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/hello")
public class ExampleResource {
@GrpcClient 1
Greeter hello;
@GET
@Path("/mutiny/{name}")
public Uni<String> helloMutiny(String name) {
return hello.sayHello(HelloRequest.newBuilder().setName(name).build())
.onItem().transform(HelloReply::getMessage);
}
}
1 | 服务名称派生自注入点 - 使用字段名称。必须设置 quarkus.grpc.clients.hello.host 属性。 |
Blocking Stub
import io.quarkus.grpc.GrpcClient;
import hello.GreeterGrpc.GreeterBlockingStub;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/hello")
public class ExampleResource {
@GrpcClient("hello") 1
GreeterGrpc.GreeterBlockingStub blockingHelloService;
@GET
@Path("/blocking/{name}")
public String helloBlocking(String name) {
return blockingHelloService.sayHello(HelloRequest.newBuilder().setName(name).build()).getMessage();
}
}
1 | 必须设置 quarkus.grpc.clients.hello.host 属性。 |
Handling streams
gRPC 允许发送和接收流:
service Streaming {
rpc Source(Empty) returns (stream Item) {} // Returns a stream
rpc Sink(stream Item) returns (Empty) {} // Reads a stream
rpc Pipe(stream Item) returns (stream Item) {} // Reads a streams and return a streams
}
使用 Mutiny 存根,你可以按如下方式与这些流交互:
package io.quarkus.grpc.example.streaming;
import io.grpc.examples.streaming.Empty;
import io.grpc.examples.streaming.Item;
import io.grpc.examples.streaming.MutinyStreamingGrpc;
import io.quarkus.grpc.GrpcClient;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/streaming")
@Produces(MediaType.APPLICATION_JSON)
public class StreamingEndpoint {
@GrpcClient
MutinyStreamingGrpc.MutinyStreamingStub streaming;
@GET
public Multi<String> invokeSource() {
// Retrieve a stream
return streaming.source(Empty.newBuilder().build())
.onItem().transform(Item::getValue);
}
@GET
@Path("sink/{max}")
public Uni<Void> invokeSink(int max) {
// Send a stream and wait for completion
Multi<Item> inputs = Multi.createFrom().range(0, max)
.map(i -> Integer.toString(i))
.map(i -> Item.newBuilder().setValue(i).build());
return streaming.sink(inputs).onItem().ignore().andContinueWithNull();
}
@GET
@Path("/{max}")
public Multi<String> invokePipe(int max) {
// Send a stream and retrieve a stream
Multi<Item> inputs = Multi.createFrom().range(0, max)
.map(i -> Integer.toString(i))
.map(i -> Item.newBuilder().setValue(i).build());
return streaming.pipe(inputs).onItem().transform(Item::getValue);
}
}
Client configuration
对于你在应用程序中注入的每个 gRPC 服务,你可以配置以下属性:
Unresolved include directive in modules/ROOT/pages/grpc-service-consumption.adoc - include::../../../target/quarkus-generated-doc/config/quarkus-grpc_quarkus.grpc-client.adoc[]
`client-name`是在 `@GrpcClient`中设置的名称,或者如果没有显式定义,则派生自注入点。
以下示例使用 _hello_作为客户端名称。别忘了使用 `@GrpcClient`注解中使用的名称替换它。
启用 `quarkus.grpc.clients."client-name".xds.enabled`后,xDS 应处理上面配置的大部分内容。
Enabling TLS
启用 TLS,使用以下配置。请注意,配置中的所有路径都可以指定类路径(通常来自 `src/main/resources`或其子文件夹)上的资源或外部文件。
quarkus.grpc.clients.hello.host=localhost
# either a path to a classpath resource or to a file:
quarkus.grpc.clients.hello.ssl.trust-store=tls/ca.pem
配置 SSL/TLS 后,`plain-text`将自动禁用。 |
TLS with Mutual Auth
与相互认证一起使用 TLS,使用以下配置:
quarkus.grpc.clients.hello.host=localhost
quarkus.grpc.clients.hello.plain-text=false
# all the following may use either a path to a classpath resource or to a file:
quarkus.grpc.clients.hello.ssl.certificate=tls/client.pem
quarkus.grpc.clients.hello.ssl.key=tls/client.key
quarkus.grpc.clients.hello.ssl.trust-store=tls/ca.pem
Client Stub Deadlines
如果你需要为 gRPC 存根配置一个截止时间,即指定一个时段,之后存根将始终返回状态错误 DEADLINE_EXCEEDED
。你可以通过 `quarkus.grpc.clients."service-name".deadline`配置属性指定截止时间,例如:
quarkus.grpc.clients.hello.host=localhost
quarkus.grpc.clients.hello.deadline=2s 1
1 | 设置所有注入存根的截止时间。 |
不要使用此功能来实现 RPC 超时。为了实现 RPC 超时,可以使用 Mutiny call.ifNoItem().after(…)`或故障容错 `@Timeout
。
gRPC Headers
类似于 HTTP,gRPC 调用除了消息外,还可以携带标头。标头在身份验证中很有用。
为 gRPC 调用设置标头,创建一个已附加标头的客户端,然后在此客户端上执行调用:
import jakarta.enterprise.context.ApplicationScoped;
import examples.Greeter;
import examples.HelloReply;
import examples.HelloRequest;
import io.grpc.Metadata;
import io.quarkus.grpc.GrpcClient;
import io.quarkus.grpc.GrpcClientUtils;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class MyService {
@GrpcClient
Greeter client;
public Uni<HelloReply> doTheCall() {
Metadata extraHeaders = new Metadata();
if (headers) {
extraHeaders.put("my-header", "my-interface-value");
}
Greeter alteredClient = GrpcClientUtils.attachHeaders(client, extraHeaders); (1)
return alteredClient.sayHello(HelloRequest.newBuilder().setName(name).build()); (2)
}
}
1 | 修改客户端以执行附加 `extraHeaders`的调用 |
2 | 使用修改后的客户端执行调用。原始客户端保持不变 |
`GrpcClientUtils`适用于各种客户端。
Client Interceptors
gRPC 客户端拦截器可以通过 CDI Bean 实现,其中还实现了 io.grpc.ClientInterceptor`接口。你可以使用 `@io.quarkus.grpc.RegisterClientInterceptor`为注入的客户端添加注释以注册针对特定客户端实例指定的拦截器。
@RegisterClientInterceptor`注释是可重复的。或者,如果你想将拦截器应用于任何注入的客户端,则使用 `@io.quarkus.grpc.GlobalInterceptor`为拦截器 Bean 添加注释。
import io.quarkus.grpc.GlobalInterceptor;
import io.grpc.ClientInterceptor;
@GlobalInterceptor 1
@ApplicationScoped
public class MyInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
// ...
}
}
1 | 该拦截器应用于所有注入的 gRPC 客户端。 |
还可以在生产者方法中添加注释作为全局拦截器:
import io.quarkus.grpc.GlobalInterceptor;
import jakarta.enterprise.inject.Produces;
public class MyProducer {
@GlobalInterceptor
@Produces
public MyInterceptor myInterceptor() {
return new MyInterceptor();
}
}
查看 ClientInterceptor JavaDoc以正确实现你的拦截器。 |
@RegisterClientInterceptor
Exampleimport io.quarkus.grpc.GrpcClient;
import io.quarkus.grpc.RegisterClientInterceptor;
import hello.Greeter;
@ApplicationScoped
class MyBean {
@RegisterClientInterceptor(MySpecialInterceptor.class) 1
@GrpcClient("helloService")
Greeter greeter;
}
1 | 为这个特定客户端注册 MySpecialInterceptor 。 |
当你有多个客户端拦截器时,你可以通过实现 jakarta.enterprise.inject.spi.Prioritized
接口对它们进行排序:
@ApplicationScoped
public class MyInterceptor implements ClientInterceptor, Prioritized {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
// ...
}
@Override
public int getPriority() {
return 10;
}
}
具有最高优先级的拦截器被首先调用。如果拦截器没有实现 Prioritized
接口,使用的默认优先级为 0
。
gRPC Client metrics
Enabling metrics collection
当应用程序同时也使用了 quarkus-micrometer
扩展时,gRPC 客户端指标功能被自动启用。Micrometer 收集应用程序使用的所有 gRPC 客户端的指标。
比如,如果你将指标导出到 Prometheus,你将得到:
# HELP grpc_client_responses_received_messages_total The total number of responses received
# TYPE grpc_client_responses_received_messages_total counter
grpc_client_responses_received_messages_total{method="SayHello",methodType="UNARY",service="helloworld.Greeter",} 6.0
# HELP grpc_client_requests_sent_messages_total The total number of requests sent
# TYPE grpc_client_requests_sent_messages_total counter
grpc_client_requests_sent_messages_total{method="SayHello",methodType="UNARY",service="helloworld.Greeter",} 6.0
# HELP grpc_client_processing_duration_seconds The total time taken for the client to complete the call, including network delay
# TYPE grpc_client_processing_duration_seconds summary
grpc_client_processing_duration_seconds_count{method="SayHello",methodType="UNARY",service="helloworld.Greeter",statusCode="OK",} 6.0
grpc_client_processing_duration_seconds_sum{method="SayHello",methodType="UNARY",service="helloworld.Greeter",statusCode="OK",} 0.167411625
# HELP grpc_client_processing_duration_seconds_max The total time taken for the client to complete the call, including network delay
# TYPE grpc_client_processing_duration_seconds_max gauge
grpc_client_processing_duration_seconds_max{method="SayHello",methodType="UNARY",service="helloworld.Greeter",statusCode="OK",} 0.136478028
服务名称、方法和类型可以在 tags 中找到。
Custom exception handling
如果任何 gRPC 服务或服务器拦截器抛出(自定义)异常,你可以将你自己的 ExceptionHandlerProvider 添加为应用程序中的 CDI bean,以提供对该异常的自定义处理。
例如:
@ApplicationScoped
public class HelloExceptionHandlerProvider implements ExceptionHandlerProvider {
@Override
public <ReqT, RespT> ExceptionHandler<ReqT, RespT> createHandler(ServerCall.Listener<ReqT> listener,
ServerCall<ReqT, RespT> serverCall, Metadata metadata) {
return new HelloExceptionHandler<>(listener, serverCall, metadata);
}
@Override
public Throwable transform(Throwable t) {
if (t instanceof HelloException he) {
return new StatusRuntimeException(Status.ABORTED.withDescription(he.getName()));
} else {
return ExceptionHandlerProvider.toStatusException(t, true);
}
}
private static class HelloExceptionHandler<A, B> extends ExceptionHandler<A, B> {
public HelloExceptionHandler(ServerCall.Listener<A> listener, ServerCall<A, B> call, Metadata metadata) {
super(listener, call, metadata);
}
@Override
protected void handleException(Throwable t, ServerCall<A, B> call, Metadata metadata) {
StatusRuntimeException sre = (StatusRuntimeException) ExceptionHandlerProvider.toStatusException(t, true);
Metadata trailers = sre.getTrailers() != null ? sre.getTrailers() : metadata;
call.close(sre.getStatus(), trailers);
}
}
}
Dev Mode
默认情况下,在 dev 模式下启动应用程序,即使未配置任何服务,也会启动 gRPC 服务器。可以使用以下属性配置 gRPC 扩展的 dev 模式行为。
Unresolved include directive in modules/ROOT/pages/grpc-service-consumption.adoc - include::../../../target/quarkus-generated-doc/config/quarkus-grpc_quarkus.grpc.dev-mode.adoc[]
Inject mock clients
在你的 @QuarkusTest
中,你可以使用 @InjectMock
注入 gRPC 服务的 Mutiny 客户端:
@QuarkusTest
public class GrpcMockTest {
@InjectMock
@GrpcClient("hello")
Greeter greeter;
@Test
void test1() {
HelloRequest request = HelloRequest.newBuilder().setName("neo").build();
Mockito.when(greeter.sayHello(Mockito.any(HelloRequest.class)))
.thenReturn(Uni.createFrom().item(HelloReply.newBuilder().setMessage("hello neo").build()));
Assertions.assertEquals(greeter.sayHello(request).await().indefinitely().getMessage(), "hello neo");
}
}
只有 Mutiny 客户端可以 mocked,通道和其他存根无法被模拟。