Interactive Queries

Kafka Streams binder API 公开了一个名为 InteractiveQueryService 的类,用于交互式查询状态存储区。您可以在应用程序中以 Spring bean 的形式访问它。从应用程序获取此 bean 的简单方法是 autowire 该 bean。

@Autowired
private InteractiveQueryService interactiveQueryService;

一旦您获得对该 bean 的访问权限,您就可以查询您感兴趣的特定状态存储区。如下所示。

ReadOnlyKeyValueStore<Object, Object> keyValueStore =
						interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());

在启动期间,上述用于检索存储区的调用方法可能会失败。例如,它可能仍在状态存储区的初始化过程中。在这种情况下,重试此操作将非常有用。Kafka Streams binder 提供了一个简单的重试机制来适应这种情况。 以下是可用于控制此重试的两个属性。

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - 默认值为 1

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 默认值为 1000 毫秒。

如果有多个 kafka streams 应用程序正在运行,那么在您可以交互式查询它们之前,您需要识别托管您正在查询的特定键的应用程序实例。InteractiveQueryService API 提供了识别主机信息的方法。 为了让它工作,您必须如下配置 application.server 属性:

spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>

这里有一些代码片段:

org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
						key, keySerializer);

if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {

    //query from the store that is locally available
}
else {
    //query from the remote host
}

有关这些主机查找方法的更多信息,请参阅有关这些方法的 Javadoc。对于这些方法,在启动期间,如果底层的 KafkaStreams 对象尚未准备好,它们可能会抛出异常。上述重试属性也适用于这些方法。

Other API methods available through the InteractiveQueryService

使用以下 API 方法检索与给定存储和键的组合关联的 KeyQueryMetadata 对象。

public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)

使用以下 API 方法检索与给定存储和键的组合关联的 KakfaStreams 对象。

public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)

Customizing Store Query Parameters

有时,在通过 InteractiveQueryService 查询存储之前,需要微调存储查询参数。为此,从 binder 的 4.0.1 版本开始,你可以为 StoreQueryParametersCustomizer 提供一个 bean,这是一个带有 customize 方法的函数式接口,该方法将 StoreQueryParameter 作为参数。下面是其方法签名。

StoreQueryParameters<T> customize(StoreQueryParameters<T> storeQueryParameters);

使用此方法,应用程序可以进一步自定义 StoreQueryParameters,例如启用陈旧的存储。

当此 bean 存在于此应用程序中时,InteractiveQueryService 将在其查询状态存储之前调用其 customize 方法。

请记住,应用程序中必须有一个唯一的 StoreQueryParametersCustomizer bean。