Manually starting Kafka Streams processors selectively
尽管通过 StreamsBuilderFactoryManager
上述方法将无条件地将自动启动 false
应用于应用程序中的所有 Kafka Streams 处理器,但通常希望仅对单独选择的 Kafka Streams 处理器不进行自动启动。例如,让我们假设应用程序中有三个不同的函数(处理器),并且您不想启动其中某个处理器作为应用程序启动的一部分。以下是一个此类情况的示例。
@Bean
public Function<KStream<?, ?>, KStream<?, ?>> process1() {
}
@Bean
public Consumer<KStream<?, ?>> process2() {
}
@Bean
public BiFunction<KStream<?, ?>, KTable<?, ?>, KStream<?, ?>> process3() {
}
在上述场景中,如果将 spring.kafka.streams.auto-startup
设置为 false
,则在应用程序启动期间,所有处理器都不会自动启动。在这种情况下,必须通过按照上述方法调用基础 StreamsBuilderFactoryManager
上的 start()
对它们进行编程启动。但是,如果我们有仅禁用一个处理器的用例,则必须对该处理器的各个绑定设置 auto-startup
。假设我们不想让 process3
函数自动启动。这是一个具有两个输入绑定的 BiFunction
- process3-in-0
和 process3-in-1
。为了避免此处理器自动启动,您可以选择其中任何一个输入绑定并在其上设置 auto-startup
。选择哪个绑定并不重要;如果您愿意,可以在两者上都将 auto-startup
设置为 false
,但一个就足够了。由于它们共享同一个工厂 bean,因此不必对这两个绑定都将 autoStartup 设置为 false,但出于清晰起见这样做可能更有意义。
以下是可用于禁用此处理器的自动启动的 Spring Cloud Stream 属性。
spring.cloud.stream.bindings.process3-in-0.consumer.auto-startup: false
或
spring.cloud.stream.bindings.process3-in-1.consumer.auto-startup: false
然后,您可以使用 REST 端点或使用 BindingsEndpoint
API(如下所示)手动启动处理器。为此,您需要确保类路径上具有 Spring Boot 执行器依赖项。
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/process3-in-0
或
@Autowired
BindingsEndpoint endpoint;
@Bean
public ApplicationRunner runner() {
return args -> {
endpoint.changeState("process3-in-0", State.STARTED);
};
}
有关此机制的更多详细信息,请参阅参考文档中的 this section。
在按本节中所述禁用 |