Manually starting Kafka Streams processors
Spring Cloud Stream Kafka Streams binder 提供了一个名为 StreamsBuilderFactoryManager
的抽象,它位于 Apache Kafka 的 Spring 的 StreamsBuilderFactoryBean
之上。此管理器 API 用于控制 binder 中基于处理器的多个 StreamsBuilderFactoryBean
。因此,在使用 binder 时,如果你想手动控制应用程序中各种 StreamsBuilderFactoryBean
对象的自动启动,则需要使用 StreamsBuilderFactoryManager
。可以使用属性 spring.kafka.streams.auto-startup
并将其设置为 false
以关闭处理器的自动启动。然后,在应用程序中,可以使用如下内容使用 StreamsBuilderFactoryManager
启动处理器。
@Bean
public ApplicationRunner runner(StreamsBuilderFactoryManager sbfm) {
return args -> {
sbfm.start();
};
}
当你想要让你的应用程序在主线程中启动并让 Kafka Streams 处理器单独启动时,此功能非常方便。例如,当你有需要恢复的大型状态存储时,如果处理器以正常方式启动,这是默认情况,这可能会阻止你的应用程序启动。如果你使用某种形式的运行状况探测机制(例如 Kubernetes),它可能会认为应用程序已关闭并尝试重新启动。为了纠正此问题,你可以将 spring.kafka.streams.auto-startup
设置为 false
并遵循上述方法。
请记住,在使用 Spring Cloud Stream binder 时,你不会直接处理 Apache Kafka 的 Spring 中的 StreamsBuilderFactoryBean
,而是 StreamsBuilderFactoryManager
,因为 StreamsBuilderFactoryBean
对象在内部由 binder 管理。