Manually starting Kafka Streams processors

Spring Cloud Stream Kafka Streams binder 提供了一个名为 StreamsBuilderFactoryManager 的抽象,它位于 Apache Kafka 的 Spring 的 StreamsBuilderFactoryBean 之上。此管理器 API 用于控制 binder 中基于处理器的多个 StreamsBuilderFactoryBean。因此,在使用 binder 时,如果你想手动控制应用程序中各种 StreamsBuilderFactoryBean 对象的自动启动,则需要使用 StreamsBuilderFactoryManager。可以使用属性 spring.kafka.streams.auto-startup 并将其设置为 false 以关闭处理器的自动启动。然后,在应用程序中,可以使用如下内容使用 StreamsBuilderFactoryManager 启动处理器。

@Bean
public ApplicationRunner runner(StreamsBuilderFactoryManager sbfm) {
    return args -> {
        sbfm.start();
    };
}

当你想要让你的应用程序在主线程中启动并让 Kafka Streams 处理器单独启动时,此功能非常方便。例如,当你有需要恢复的大型状态存储时,如果处理器以正常方式启动,这是默认情况,这可能会阻止你的应用程序启动。如果你使用某种形式的运行状况探测机制(例如 Kubernetes),它可能会认为应用程序已关闭并尝试重新启动。为了纠正此问题,你可以将 spring.kafka.streams.auto-startup 设置为 false 并遵循上述方法。

请记住,在使用 Spring Cloud Stream binder 时,你不会直接处理 Apache Kafka 的 Spring 中的 StreamsBuilderFactoryBean,而是 StreamsBuilderFactoryManager,因为 StreamsBuilderFactoryBean 对象在内部由 binder 管理。