Manually starting Kafka Streams processors selectively
尽管通过 StreamsBuilderFactoryManager
上述方法将无条件地将自动启动 false
应用于应用程序中的所有 Kafka Streams 处理器,但通常希望仅对单独选择的 Kafka Streams 处理器不进行自动启动。例如,让我们假设应用程序中有三个不同的函数(处理器),并且您不想启动其中某个处理器作为应用程序启动的一部分。以下是一个此类情况的示例。
While the approach laid out above will unconditionally apply auto start false
to all the Kafka Streams processors in the application through StreamsBuilderFactoryManager
, it is often desirable that only individually selected Kafka Streams processors are not auto started.
For instance, let us assume that you have three different functions (processors) in your application and for one of the processors, you do not want to start it as part of the application startup.
Here is an example of such a situation.
@Bean
public Function<KStream<?, ?>, KStream<?, ?>> process1() {
}
@Bean
public Consumer<KStream<?, ?>> process2() {
}
@Bean
public BiFunction<KStream<?, ?>, KTable<?, ?>, KStream<?, ?>> process3() {
}
在上述场景中,如果将 spring.kafka.streams.auto-startup
设置为 false
,则在应用程序启动期间,所有处理器都不会自动启动。在这种情况下,必须通过按照上述方法调用基础 StreamsBuilderFactoryManager
上的 start()
对它们进行编程启动。但是,如果我们有仅禁用一个处理器的用例,则必须对该处理器的各个绑定设置 auto-startup
。假设我们不想让 process3
函数自动启动。这是一个具有两个输入绑定的 BiFunction
- process3-in-0
和 process3-in-1
。为了避免此处理器自动启动,您可以选择其中任何一个输入绑定并在其上设置 auto-startup
。选择哪个绑定并不重要;如果您愿意,可以在两者上都将 auto-startup
设置为 false
,但一个就足够了。由于它们共享同一个工厂 bean,因此不必对这两个绑定都将 autoStartup 设置为 false,但出于清晰起见这样做可能更有意义。
In this scenario above, if you set spring.kafka.streams.auto-startup
to false
, then none of the processors will auto start during the application startup.
In that case, you have to programmatically start them as described above by calling start()
on the underlying StreamsBuilderFactoryManager
.
However, if we have a use case to selectively disable only one processor, then you have to set auto-startup
on the individual binding for that processor.
Let us assume that we don’t want our process3
function to auto start.
This is a BiFunction
with two input bindings - process3-in-0
and process3-in-1
.
In order to avoid auto start for this processor, you can pick any of these input bindings and set auto-startup
on them.
It does not matter which binding you pick; if you wish, you can set auto-startup
to false
on both of them, but one will be sufficient.
Because they share the same factory bean, you don’t have to set autoStartup to false on both bindings, but it probably makes sense to do so, for clarity.
以下是可用于禁用此处理器的自动启动的 Spring Cloud Stream 属性。
Here is the Spring Cloud Stream property that you can use to disable auto startup for this processor.
spring.cloud.stream.bindings.process3-in-0.consumer.auto-startup: false
或
or
spring.cloud.stream.bindings.process3-in-1.consumer.auto-startup: false
然后,您可以使用 REST 端点或使用 BindingsEndpoint
API(如下所示)手动启动处理器。为此,您需要确保类路径上具有 Spring Boot 执行器依赖项。
Then, you can manually start the processor either using the REST endpoint or using the BindingsEndpoint
API as shown below.
For this, you need to ensure that you have the Spring Boot actuator dependency on the classpath.
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/process3-in-0
或
or
@Autowired
BindingsEndpoint endpoint;
@Bean
public ApplicationRunner runner() {
return args -> {
endpoint.changeState("process3-in-0", State.STARTED);
};
}
有关此机制的更多详细信息,请参阅参考文档中的 this section。
See this section from the reference docs for more details on this mechanism.
在按本节中所述禁用 |
When controlling the bindings by disabling |