Starting `@KafkaListener`s in Sequence
一个常见的用例是在另一个侦听器使用主题中的所有记录后启动侦听器。例如,在处理其他主题中的记录之前,您可能需要将一个或多个紧凑主题的内容加载到内存中。从 2.7.3 版开始,推出了一个新组件 ContainerGroupSequencer
。它使用 @KafkaListener
的 containerGroup
属性将容器组合在一起,并在当前组中的所有容器进入空闲状态后启动下一组中的容器。
最好通过一个示例来说明。
@KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
public void listen1(String in) {
}
@KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
public void listen2(String in) {
}
@KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
public void listen3(String in) {
}
@KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
public void listen4(String in) {
}
@Bean
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
}
此处,我们在两个组“g1”和“g2”中拥有 4 个侦听器。
在应用程序上下文初始化期间,序贯器将所有容器中提供的组中的 autoStartup
属性设置为 false
。它还为所有容器(尚未设置一个容器)设置 idleEventInterval
为提供的值(本例中为 5000 毫秒)。然后,当应用程序上下文启动序贯器时,将会启动第一组中的容器。在接收到 ListenerContainerIdleEvent
时,将会停止每个容器中各单独的子容器。ConcurrentMessageListenerContainer
中所有子容器停止时,父容器将停止。当组中所有容器停止时,将会启动下一组中的容器。组数或组中容器的数量没有限制。
默认情况下,最终组中的容器 (g2
以上)不会在空闲时停止。要修改此行为,在序贯器上将 stopLastGroupWhenIdle
设置为 true
。
顺便说一下,组中的容器之前添加到类型为 Collection<MessageListenerContainer>
的 Bean 中,其中 Bean 名称是 containerGroup
。这些集合现在已弃用,有利于名称为组名称且带有 .group
后缀的类型为 ContainerGroup
的 Bean;在以上示例中,将有 2 个 Bean g1.group
和 g2.group
。Collection
Bean 将在未来版本中删除。