State Store
当使用高级 DSL 且触发状态存储的适当调用时,Apache Kafka Streams 会自动创建状态存储。
State stores are created automatically by Kafka Streams when the high level DSL is used and appropriate calls are made those trigger a state store.
如果您想要将传入的 KTable
绑定作为命名状态存储具体化,使用以下策略即可实现。
If you want to materialize an incoming KTable
binding as a named state store, then you can do so by using the following strategy.
假设您有以下函数。
Lets say you have the following function.
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
然后通过设置以下属性,传入的 KTable
数据将具体化为命名状态存储。
Then by setting the following property, the incoming KTable
data will be materialized in to the named state store.
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store
您可以在应用程序中将自定义状态存储定义为 bean,且绑定工具将检测到并将它们添加到 Kafka Streams 构建器中。尤其当使用了处理器 API 时,需要手动注册状态存储。为此,您可以在应用程序中将 StateStore 创建为 bean。以下是定义此类 bean 的示例。
You can define custom state stores as beans in your application and those will be detected and added to the Kafka Streams builder by the binder. Especially when the processor API is used, you need to register a state store manually. In order to do so, you can create the StateStore as a bean in the application. Here are examples of defining such beans.
@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
Serdes.Long());
}
@Bean
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
1L, 3, 3L, false), Serdes.Long(),
Serdes.Long());
}
之后,这些状态存储可直接由应用程序访问。
These state stores can be then accessed by the applications directly.
在引导期间,上述 bean 将由绑定程序处理,并传递给 Streams 构建器对象。
During the bootstrap, the above beans will be processed by the binder and passed on to the Streams builder object.
访问状态存储:
Accessing the state store:
Processor<Object, Product>() {
WindowStore<Object, String> state;
@Override
public void init(ProcessorContext processorContext) {
state = (WindowStore)processorContext.getStateStore("mystate");
}
...
}
当需要注册全局状态存储时,该方法无效。如需注册全局状态存储,请参阅以下关于自定义 StreamsBuilderFactoryBean
的部分。
This will not work when it comes to registering global state stores.
In order to register a global state store, please see the section below on customizing StreamsBuilderFactoryBean
.