Interactive Queries
Kafka Streams binder API 公开了一个名为 InteractiveQueryService
的类,用于交互式查询状态存储区。您可以在应用程序中以 Spring bean 的形式访问它。从应用程序获取此 bean 的简单方法是 autowire
该 bean。
Kafka Streams binder API exposes a class called InteractiveQueryService
to interactively query the state stores.
You can access this as a Spring bean in your application. An easy way to get access to this bean from your application is to autowire
the bean.
@Autowired
private InteractiveQueryService interactiveQueryService;
一旦您获得对该 bean 的访问权限,您就可以查询您感兴趣的特定状态存储区。如下所示。
Once you gain access to this bean, then you can query for the particular state-store that you are interested. See below.
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
在启动期间,上述用于检索存储区的调用方法可能会失败。例如,它可能仍在状态存储区的初始化过程中。在这种情况下,重试此操作将非常有用。Kafka Streams binder 提供了一个简单的重试机制来适应这种情况。
During the startup, the above method call to retrieve the store might fail. For example, it might still be in the middle of initializing the state store. In such cases, it will be useful to retry this operation. Kafka Streams binder provides a simple retry mechanism to accommodate this.
以下是可用于控制此重试的两个属性。
Following are the two properties that you can use to control this retrying.
-
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - Default is
1
. -
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - Default is
1000
milliseconds.
如果有多个 kafka streams 应用程序正在运行,那么在您可以交互式查询它们之前,您需要识别托管您正在查询的特定键的应用程序实例。InteractiveQueryService
API 提供了识别主机信息的方法。
If there are multiple instances of the kafka streams application running, then before you can query them interactively, you need to identify which application instance hosts the particular key that you are querying.
InteractiveQueryService
API provides methods for identifying the host information.
为了让它工作,您必须如下配置 application.server
属性:
In order for this to work, you must configure the property application.server
as below:
spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>
这里有一些代码片段:
Here are some code snippets:
org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
}
有关这些主机查找方法的更多信息,请参阅有关这些方法的 Javadoc。对于这些方法,在启动期间,如果底层的 KafkaStreams 对象尚未准备好,它们可能会抛出异常。上述重试属性也适用于这些方法。
For more information on these host finding methods, please see the Javadoc on the methods. For these methods also, during startup, if the underlying KafkaStreams objects are not ready, they might throw exceptions. The aforementioned retry properties are applicable for these methods as well.
Other API methods available through the InteractiveQueryService
使用以下 API 方法检索与给定存储和键的组合关联的 KeyQueryMetadata
对象。
Use the following API method to retrieve the KeyQueryMetadata
object associated with the combination of given store and key.
public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)
使用以下 API 方法检索与给定存储和键的组合关联的 KakfaStreams
对象。
Use the following API method to retrieve the KakfaStreams
object associated with the combination of given store and key.
public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)
Customizing Store Query Parameters
有时,在通过 InteractiveQueryService
查询存储之前,需要微调存储查询参数。为此,从 binder 的 4.0.1
版本开始,你可以为 StoreQueryParametersCustomizer
提供一个 bean,这是一个带有 customize
方法的函数式接口,该方法将 StoreQueryParameter
作为参数。下面是其方法签名。
Sometimes it is necessary that you need to fine tune the store query parameters before querying the store through InteractiveQueryService
.
For this purpose, starting with the 4.0.1
version of the binder, you can provide a bean for StoreQueryParametersCustomizer
which is a functional interface with a customize
method that takes a StoreQueryParameter
as the argument.
Here is its method signature.
StoreQueryParameters<T> customize(StoreQueryParameters<T> storeQueryParameters);
使用此方法,应用程序可以进一步自定义 StoreQueryParameters
,例如启用陈旧的存储。
Using this approach, applications can further customize the StoreQueryParameters
such as enabling stale stores.
当此 bean 存在于此应用程序中时,InteractiveQueryService
将在其查询状态存储之前调用其 customize
方法。
When this bean is present in this application, InteractiveQueryService
will call its customize
method before querying the state store.
请记住,应用程序中必须有一个唯一的 |
Keep in mind that, there must be a unique bean for |