State Store

当使用高级 DSL 且触发状态存储的适当调用时,Apache Kafka Streams 会自动创建状态存储。

如果您想要将传入的 KTable 绑定作为命名状态存储具体化,使用以下策略即可实现。

假设您有以下函数。

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
   ...
}

然后通过设置以下属性,传入的 KTable 数据将具体化为命名状态存储。

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store

您可以在应用程序中将自定义状态存储定义为 bean,且绑定工具将检测到并将它们添加到 Kafka Streams 构建器中。尤其当使用了处理器 API 时,需要手动注册状态存储。为此,您可以在应用程序中将 StateStore 创建为 bean。以下是定义此类 bean 的示例。

@Bean
public StoreBuilder myStore() {
    return Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
            Serdes.Long());
}

@Bean
public StoreBuilder otherStore() {
    return Stores.windowStoreBuilder(
            Stores.persistentWindowStore("other-store",
                    1L, 3, 3L, false), Serdes.Long(),
            Serdes.Long());
}

之后,这些状态存储可直接由应用程序访问。

在引导期间,上述 bean 将由绑定程序处理,并传递给 Streams 构建器对象。

访问状态存储:

Processor<Object, Product>() {

    WindowStore<Object, String> state;

    @Override
    public void init(ProcessorContext processorContext) {
        state = (WindowStore)processorContext.getStateStore("mystate");
    }
    ...
}

当需要注册全局状态存储时,该方法无效。如需注册全局状态存储,请参阅以下关于自定义 StreamsBuilderFactoryBean 的部分。