Manually starting Kafka Streams processors

Spring Cloud Stream Kafka Streams binder 提供了一个名为 StreamsBuilderFactoryManager 的抽象,它位于 Apache Kafka 的 Spring 的 StreamsBuilderFactoryBean 之上。此管理器 API 用于控制 binder 中基于处理器的多个 StreamsBuilderFactoryBean。因此,在使用 binder 时,如果你想手动控制应用程序中各种 StreamsBuilderFactoryBean 对象的自动启动,则需要使用 StreamsBuilderFactoryManager。可以使用属性 spring.kafka.streams.auto-startup 并将其设置为 false 以关闭处理器的自动启动。然后,在应用程序中,可以使用如下内容使用 StreamsBuilderFactoryManager 启动处理器。

Spring Cloud Stream Kafka Streams binder offers an abstraction called StreamsBuilderFactoryManager on top of the StreamsBuilderFactoryBean from Spring for Apache Kafka. This manager API is used for controlling the multiple StreamsBuilderFactoryBean per processor in a binder based application. Therefore, when using the binder, if you manually want to control the auto starting of the various StreamsBuilderFactoryBean objects in the application, you need to use StreamsBuilderFactoryManager. You can use the property spring.kafka.streams.auto-startup and set this to false in order to turn off auto starting of the processors. Then, in the application, you can use something as below to start the processors using StreamsBuilderFactoryManager.

@Bean
public ApplicationRunner runner(StreamsBuilderFactoryManager sbfm) {
    return args -> {
        sbfm.start();
    };
}

当你想要让你的应用程序在主线程中启动并让 Kafka Streams 处理器单独启动时,此功能非常方便。例如,当你有需要恢复的大型状态存储时,如果处理器以正常方式启动,这是默认情况,这可能会阻止你的应用程序启动。如果你使用某种形式的运行状况探测机制(例如 Kubernetes),它可能会认为应用程序已关闭并尝试重新启动。为了纠正此问题,你可以将 spring.kafka.streams.auto-startup 设置为 false 并遵循上述方法。

This feature is handy, when you want your application to start in the main thread and let Kafka Streams processors start separately. For example, when you have a large state store that needs to be restored, if the processors are started normally as is the default case, this may block your application to start. If you are using some sort of liveness probe mechanism (for example on Kubernetes), it may think that the application is down and attempt a restart. In order to correct this, you can set spring.kafka.streams.auto-startup to false and follow the approach above.

请记住,在使用 Spring Cloud Stream binder 时,你不会直接处理 Apache Kafka 的 Spring 中的 StreamsBuilderFactoryBean,而是 StreamsBuilderFactoryManager,因为 StreamsBuilderFactoryBean 对象在内部由 binder 管理。

Keep in mind that, when using the Spring Cloud Stream binder, you are not directly dealing with StreamsBuilderFactoryBean from Spring for Apache Kafka, rather StreamsBuilderFactoryManager, as the StreamsBuilderFactoryBean objects are internally managed by the binder.