Consuming a gRPC Service
gRPC 客户端可以注入到你的应用程序代码中。
gRPC clients can be injected in your application code.
消费 gRPC 服务需要生成 gRPC 类。将 proto
文件放在 src/main/proto
中并运行 mvn compile
。
Consuming gRPC services requires the gRPC classes to be generated.
Place your proto
files in src/main/proto
and run mvn compile
.
Stubs and Injection
gRPC 生成提供多个存根,为消费 gRPC 服务提供不同的方法。你可以注入:
gRPC generation provides several stubs, providing different ways to consume a gRPC service. You can inject:
-
a service interface using the Mutiny API,
-
a blocking stub using the gRPC API,
-
a reactive stub based on Mutiny,
-
the gRPC
io.grpc.Channel
, that lets you create other types of stubs.
import io.quarkus.grpc.GrpcClient;
import hello.Greeter;
import hello.GreeterGrpc.GreeterBlockingStub;
import hello.MutinyGreeterGrpc.MutinyGreeterStub;
class MyBean {
// A service interface using the Mutiny API
@GrpcClient("helloService") (1)
Greeter greeter;
// A reactive stub based on Mutiny
@GrpcClient("helloService")
MutinyGreeterGrpc.MutinyGreeterStub mutiny;
// A blocking stub using the gRPC API
@GrpcClient
GreeterGrpc.GreeterBlockingStub helloService; (2)
@GrpcClient("hello-service")
Channel channel;
}
1 | A gRPC client injection point must be annotated with the @GrpcClient qualifier. This qualifier can be used to specify the name that is used to configure the underlying gRPC client. For example, if you set it to hello-service , configuring the host of the service is done using the quarkus.grpc.clients.hello-service.host . |
2 | If the name is not specified via the GrpcClient#value() then the field name is used instead, e.g. helloService in this particular case. |
存根类名称派生自 proto
文件中使用的服务名称。例如,如果你使用 Greeter
作为服务名称,如下所示:
The stub class names are derived from the service name used in your proto
file.
For example, if you use Greeter
as a service name as in:
option java_package = "hello";
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
那么服务接口名称为: hello.Greeter
,Mutiny 存根名称为: hello.MutinyGreeterGrpc.MutinyGreeterStub
,阻塞存根名称为: hello.GreeterGrpc.GreeterBlockingStub
。
Then the service interface name is: hello.Greeter
, the Mutiny stub name is: hello.MutinyGreeterGrpc.MutinyGreeterStub
and the blocking stub name is: hello.GreeterGrpc.GreeterBlockingStub
.
Examples
Service Interface
import io.quarkus.grpc.GrpcClient;
import io.smallrye.mutiny.Uni;
import hello.Greeter;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/hello")
public class ExampleResource {
@GrpcClient 1
Greeter hello;
@GET
@Path("/mutiny/{name}")
public Uni<String> helloMutiny(String name) {
return hello.sayHello(HelloRequest.newBuilder().setName(name).build())
.onItem().transform(HelloReply::getMessage);
}
}
1 | The service name is derived from the injection point - the field name is used. The quarkus.grpc.clients.hello.host property must be set. |
Blocking Stub
import io.quarkus.grpc.GrpcClient;
import hello.GreeterGrpc.GreeterBlockingStub;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/hello")
public class ExampleResource {
@GrpcClient("hello") 1
GreeterGrpc.GreeterBlockingStub blockingHelloService;
@GET
@Path("/blocking/{name}")
public String helloBlocking(String name) {
return blockingHelloService.sayHello(HelloRequest.newBuilder().setName(name).build()).getMessage();
}
}
1 | The quarkus.grpc.clients.hello.host property must be set. |
Handling streams
gRPC 允许发送和接收流:
gRPC allows sending and receiving streams:
service Streaming {
rpc Source(Empty) returns (stream Item) {} // Returns a stream
rpc Sink(stream Item) returns (Empty) {} // Reads a stream
rpc Pipe(stream Item) returns (stream Item) {} // Reads a streams and return a streams
}
使用 Mutiny 存根,你可以按如下方式与这些流交互:
Using the Mutiny stub, you can interact with these as follows:
package io.quarkus.grpc.example.streaming;
import io.grpc.examples.streaming.Empty;
import io.grpc.examples.streaming.Item;
import io.grpc.examples.streaming.MutinyStreamingGrpc;
import io.quarkus.grpc.GrpcClient;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/streaming")
@Produces(MediaType.APPLICATION_JSON)
public class StreamingEndpoint {
@GrpcClient
MutinyStreamingGrpc.MutinyStreamingStub streaming;
@GET
public Multi<String> invokeSource() {
// Retrieve a stream
return streaming.source(Empty.newBuilder().build())
.onItem().transform(Item::getValue);
}
@GET
@Path("sink/{max}")
public Uni<Void> invokeSink(int max) {
// Send a stream and wait for completion
Multi<Item> inputs = Multi.createFrom().range(0, max)
.map(i -> Integer.toString(i))
.map(i -> Item.newBuilder().setValue(i).build());
return streaming.sink(inputs).onItem().ignore().andContinueWithNull();
}
@GET
@Path("/{max}")
public Multi<String> invokePipe(int max) {
// Send a stream and retrieve a stream
Multi<Item> inputs = Multi.createFrom().range(0, max)
.map(i -> Integer.toString(i))
.map(i -> Item.newBuilder().setValue(i).build());
return streaming.pipe(inputs).onItem().transform(Item::getValue);
}
}
Client configuration
对于你在应用程序中注入的每个 gRPC 服务,你可以配置以下属性:
For each gRPC service you inject in your application, you can configure the following attributes:
Unresolved directive in grpc-service-consumption.adoc - include::{generated-dir}/config/quarkus-grpc_quarkus.grpc-client.adoc[]
`client-name`是在 `@GrpcClient`中设置的名称,或者如果没有显式定义,则派生自注入点。
The client-name
is the name set in the @GrpcClient
or derived from the injection point if not explicitly defined.
以下示例使用 _hello_作为客户端名称。别忘了使用 `@GrpcClient`注解中使用的名称替换它。
The following examples uses hello as the client name.
Don’t forget to replace it with the name you used in the @GrpcClient
annotation.
启用 `quarkus.grpc.clients."client-name".xds.enabled`后,xDS 应处理上面配置的大部分内容。
When you enable quarkus.grpc.clients."client-name".xds.enabled
, it’s the xDS that should handle most of the configuration above.
Enabling TLS
启用 TLS,使用以下配置。请注意,配置中的所有路径都可以指定类路径(通常来自 `src/main/resources`或其子文件夹)上的资源或外部文件。
To enable TLS, use the following configuration.
Note that all paths in the configuration may either specify a resource on the classpath
(typically from src/main/resources
or its subfolder) or an external file.
quarkus.grpc.clients.hello.host=localhost
# either a path to a classpath resource or to a file:
quarkus.grpc.clients.hello.ssl.trust-store=tls/ca.pem
配置 SSL/TLS 后,`plain-text`将自动禁用。 |
When SSL/TLS is configured, |
TLS with Mutual Auth
与相互认证一起使用 TLS,使用以下配置:
To use TLS with mutual authentication, use the following configuration:
quarkus.grpc.clients.hello.host=localhost
quarkus.grpc.clients.hello.plain-text=false
# all the following may use either a path to a classpath resource or to a file:
quarkus.grpc.clients.hello.ssl.certificate=tls/client.pem
quarkus.grpc.clients.hello.ssl.key=tls/client.key
quarkus.grpc.clients.hello.ssl.trust-store=tls/ca.pem
Client Stub Deadlines
如果你需要为 gRPC 存根配置一个截止时间,即指定一个时段,之后存根将始终返回状态错误 DEADLINE_EXCEEDED
。你可以通过 `quarkus.grpc.clients."service-name".deadline`配置属性指定截止时间,例如:
If you need to configure a deadline for a gRPC stub, i.e. to specify a duration of time after which the stub will always return the status error DEADLINE_EXCEEDED
.
You can specify the deadline via the quarkus.grpc.clients."service-name".deadline
configuration property, e.g.:
quarkus.grpc.clients.hello.host=localhost
quarkus.grpc.clients.hello.deadline=2s 1
1 | Set the deadline for all injected stubs. |
不要使用此功能来实现 RPC 超时。为了实现 RPC 超时,可以使用 Mutiny call.ifNoItem().after(…)`或故障容错 `@Timeout
。
Do not use this feature to implement an RPC timeout.
To implement an RPC timeout, either use Mutiny call.ifNoItem().after(…)
or Fault Tolerance @Timeout
.
gRPC Headers
类似于 HTTP,gRPC 调用除了消息外,还可以携带标头。标头在身份验证中很有用。
Similarly to HTTP, alongside the message, gRPC calls can carry headers. Headers can be useful e.g. for authentication.
为 gRPC 调用设置标头,创建一个已附加标头的客户端,然后在此客户端上执行调用:
To set headers for a gRPC call, create a client with headers attached and then perform the call on this client:
import jakarta.enterprise.context.ApplicationScoped;
import examples.Greeter;
import examples.HelloReply;
import examples.HelloRequest;
import io.grpc.Metadata;
import io.quarkus.grpc.GrpcClient;
import io.quarkus.grpc.GrpcClientUtils;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class MyService {
@GrpcClient
Greeter client;
public Uni<HelloReply> doTheCall() {
Metadata extraHeaders = new Metadata();
if (headers) {
extraHeaders.put("my-header", "my-interface-value");
}
Greeter alteredClient = GrpcClientUtils.attachHeaders(client, extraHeaders); (1)
return alteredClient.sayHello(HelloRequest.newBuilder().setName(name).build()); (2)
}
}
1 | Alter the client to make calls with the extraHeaders attached |
2 | Perform the call with the altered client. The original client remains unmodified |
`GrpcClientUtils`适用于各种客户端。
GrpcClientUtils
work with all flavors of clients.
Client Interceptors
gRPC 客户端拦截器可以通过 CDI Bean 实现,其中还实现了 io.grpc.ClientInterceptor`接口。你可以使用 `@io.quarkus.grpc.RegisterClientInterceptor`为注入的客户端添加注释以注册针对特定客户端实例指定的拦截器。
@RegisterClientInterceptor`注释是可重复的。或者,如果你想将拦截器应用于任何注入的客户端,则使用 `@io.quarkus.grpc.GlobalInterceptor`为拦截器 Bean 添加注释。
A gRPC client interceptor can be implemented by a CDI bean that also implements the io.grpc.ClientInterceptor
interface.
You can annotate an injected client with @io.quarkus.grpc.RegisterClientInterceptor
to register the specified interceptor for the particular client instance.
The @RegisterClientInterceptor
annotation is repeatable.
Alternatively, if you want to apply the interceptor to any injected client then annotate the interceptor bean with @io.quarkus.grpc.GlobalInterceptor
.
import io.quarkus.grpc.GlobalInterceptor;
import io.grpc.ClientInterceptor;
@GlobalInterceptor 1
@ApplicationScoped
public class MyInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
// ...
}
}
1 | This interceptor is applied to all injected gRPC clients. |
还可以在生产者方法中添加注释作为全局拦截器:
It’s also possible to annotate a producer method as a global interceptor:
import io.quarkus.grpc.GlobalInterceptor;
import jakarta.enterprise.inject.Produces;
public class MyProducer {
@GlobalInterceptor
@Produces
public MyInterceptor myInterceptor() {
return new MyInterceptor();
}
}
查看 ClientInterceptor JavaDoc以正确实现你的拦截器。 |
Check the ClientInterceptor JavaDoc to properly implement your interceptor. |
@RegisterClientInterceptor
Exampleimport io.quarkus.grpc.GrpcClient;
import io.quarkus.grpc.RegisterClientInterceptor;
import hello.Greeter;
@ApplicationScoped
class MyBean {
@RegisterClientInterceptor(MySpecialInterceptor.class) 1
@GrpcClient("helloService")
Greeter greeter;
}
1 | Registers the MySpecialInterceptor for this particular client. |
当你有多个客户端拦截器时,你可以通过实现 jakarta.enterprise.inject.spi.Prioritized
接口对它们进行排序:
When you have multiple client interceptors, you can order them by implementing the jakarta.enterprise.inject.spi.Prioritized
interface:
@ApplicationScoped
public class MyInterceptor implements ClientInterceptor, Prioritized {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
// ...
}
@Override
public int getPriority() {
return 10;
}
}
具有最高优先级的拦截器被首先调用。如果拦截器没有实现 Prioritized
接口,使用的默认优先级为 0
。
Interceptors with the highest priority are called first.
The default priority, used if the interceptor does not implement the Prioritized
interface, is 0
.
gRPC Client metrics
Enabling metrics collection
当应用程序同时也使用了 quarkus-micrometer
扩展时,gRPC 客户端指标功能被自动启用。Micrometer 收集应用程序使用的所有 gRPC 客户端的指标。
gRPC client metrics are automatically enabled when the application also uses the quarkus-micrometer
extension.
Micrometer collects the metrics of all the gRPC clients used by the application.
比如,如果你将指标导出到 Prometheus,你将得到:
As an example, if you export the metrics to Prometheus, you will get:
# HELP grpc_client_responses_received_messages_total The total number of responses received
# TYPE grpc_client_responses_received_messages_total counter
grpc_client_responses_received_messages_total{method="SayHello",methodType="UNARY",service="helloworld.Greeter",} 6.0
# HELP grpc_client_requests_sent_messages_total The total number of requests sent
# TYPE grpc_client_requests_sent_messages_total counter
grpc_client_requests_sent_messages_total{method="SayHello",methodType="UNARY",service="helloworld.Greeter",} 6.0
# HELP grpc_client_processing_duration_seconds The total time taken for the client to complete the call, including network delay
# TYPE grpc_client_processing_duration_seconds summary
grpc_client_processing_duration_seconds_count{method="SayHello",methodType="UNARY",service="helloworld.Greeter",statusCode="OK",} 6.0
grpc_client_processing_duration_seconds_sum{method="SayHello",methodType="UNARY",service="helloworld.Greeter",statusCode="OK",} 0.167411625
# HELP grpc_client_processing_duration_seconds_max The total time taken for the client to complete the call, including network delay
# TYPE grpc_client_processing_duration_seconds_max gauge
grpc_client_processing_duration_seconds_max{method="SayHello",methodType="UNARY",service="helloworld.Greeter",statusCode="OK",} 0.136478028
服务名称、方法和类型可以在 tags 中找到。
The service name, method and type can be found in the tags.
Custom exception handling
如果任何 gRPC 服务或服务器拦截器抛出(自定义)异常,你可以将你自己的 ExceptionHandlerProvider 添加为应用程序中的 CDI bean,以提供对该异常的自定义处理。
If any of the gRPC services or server interceptors throw an (custom) exception, you can add your own ExceptionHandlerProvider as a CDI bean in your application, to provide a custom handling of those exceptions.
例如:
e.g.
@ApplicationScoped
public class HelloExceptionHandlerProvider implements ExceptionHandlerProvider {
@Override
public <ReqT, RespT> ExceptionHandler<ReqT, RespT> createHandler(ServerCall.Listener<ReqT> listener,
ServerCall<ReqT, RespT> serverCall, Metadata metadata) {
return new HelloExceptionHandler<>(listener, serverCall, metadata);
}
@Override
public Throwable transform(Throwable t) {
if (t instanceof HelloException he) {
return new StatusRuntimeException(Status.ABORTED.withDescription(he.getName()));
} else {
return ExceptionHandlerProvider.toStatusException(t, true);
}
}
private static class HelloExceptionHandler<A, B> extends ExceptionHandler<A, B> {
public HelloExceptionHandler(ServerCall.Listener<A> listener, ServerCall<A, B> call, Metadata metadata) {
super(listener, call, metadata);
}
@Override
protected void handleException(Throwable t, ServerCall<A, B> call, Metadata metadata) {
StatusRuntimeException sre = (StatusRuntimeException) ExceptionHandlerProvider.toStatusException(t, true);
Metadata trailers = sre.getTrailers() != null ? sre.getTrailers() : metadata;
call.close(sre.getStatus(), trailers);
}
}
}
Dev Mode
默认情况下,在 dev 模式下启动应用程序,即使未配置任何服务,也会启动 gRPC 服务器。可以使用以下属性配置 gRPC 扩展的 dev 模式行为。
By default, when starting the application in dev mode, a gRPC server is started, even if no services are configured. You can configure the gRPC extension’s dev mode behavior using the following properties.
Unresolved directive in grpc-service-consumption.adoc - include::{generated-dir}/config/quarkus-grpc_quarkus.grpc.dev-mode.adoc[]
Inject mock clients
在你的 @QuarkusTest
中,你可以使用 @InjectMock
注入 gRPC 服务的 Mutiny 客户端:
In your @QuarkusTest
, you can use @InjectMock
to inject the Mutiny client of a gRPC service:
@QuarkusTest
public class GrpcMockTest {
@InjectMock
@GrpcClient("hello")
Greeter greeter;
@Test
void test1() {
HelloRequest request = HelloRequest.newBuilder().setName("neo").build();
Mockito.when(greeter.sayHello(Mockito.any(HelloRequest.class)))
.thenReturn(Uni.createFrom().item(HelloReply.newBuilder().setMessage("hello neo").build()));
Assertions.assertEquals(greeter.sayHello(request).await().indefinitely().getMessage(), "hello neo");
}
}
只有 Mutiny 客户端可以 mocked,通道和其他存根无法被模拟。
Only the Mutiny client can be mocked, channels, and other stubs cannot be mocked.