Starting `@KafkaListener`s in Sequence
一个常见的用例是在另一个侦听器使用主题中的所有记录后启动侦听器。例如,在处理其他主题中的记录之前,您可能需要将一个或多个紧凑主题的内容加载到内存中。从 2.7.3 版开始,推出了一个新组件 ContainerGroupSequencer
。它使用 @KafkaListener
的 containerGroup
属性将容器组合在一起,并在当前组中的所有容器进入空闲状态后启动下一组中的容器。
A common use case is to start a listener after another listener has consumed all the records in a topic.
For example, you may want to load the contents of one or more compacted topics into memory before processing records from other topics.
Starting with version 2.7.3, a new component ContainerGroupSequencer
has been introduced.
It uses the @KafkaListener’s `containerGroup
property to group containers together and start the containers in the next group, when all the containers in the current group have gone idle.
最好通过一个示例来说明。
It is best illustrated with an example.
@KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
public void listen1(String in) {
}
@KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
public void listen2(String in) {
}
@KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
public void listen3(String in) {
}
@KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
public void listen4(String in) {
}
@Bean
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
}
此处,我们在两个组“g1”和“g2”中拥有 4 个侦听器。
Here, we have 4 listeners in two groups, g1
and g2
.
在应用程序上下文初始化期间,序贯器将所有容器中提供的组中的 autoStartup
属性设置为 false
。它还为所有容器(尚未设置一个容器)设置 idleEventInterval
为提供的值(本例中为 5000 毫秒)。然后,当应用程序上下文启动序贯器时,将会启动第一组中的容器。在接收到 ListenerContainerIdleEvent
时,将会停止每个容器中各单独的子容器。ConcurrentMessageListenerContainer
中所有子容器停止时,父容器将停止。当组中所有容器停止时,将会启动下一组中的容器。组数或组中容器的数量没有限制。
During application context initialization, the sequencer sets the autoStartup
property of all the containers in the provided groups to false
.
It also sets the idleEventInterval
for any containers (that do not already have one set) to the supplied value (5000ms in this case).
Then, when the sequencer is started by the application context, the containers in the first group are started.
As ListenerContainerIdleEvent`s are received, each individual child container in each container is stopped.
When all child containers in a `ConcurrentMessageListenerContainer
are stopped, the parent container is stopped.
When all containers in a group have been stopped, the containers in the next group are started.
There is no limit to the number of groups or containers in a group.
默认情况下,最终组中的容器 (g2
以上)不会在空闲时停止。要修改此行为,在序贯器上将 stopLastGroupWhenIdle
设置为 true
。
By default, the containers in the final group (g2
above) are not stopped when they go idle.
To modify that behavior, set stopLastGroupWhenIdle
to true
on the sequencer.
顺便说一下,组中的容器之前添加到类型为 Collection<MessageListenerContainer>
的 Bean 中,其中 Bean 名称是 containerGroup
。这些集合现在已弃用,有利于名称为组名称且带有 .group
后缀的类型为 ContainerGroup
的 Bean;在以上示例中,将有 2 个 Bean g1.group
和 g2.group
。Collection
Bean 将在未来版本中删除。
As an aside, previously containers in each group were added to a bean of type Collection<MessageListenerContainer>
with the bean name being the containerGroup
.
These collections are now deprecated in favor of beans of type ContainerGroup
with a bean name that is the group name, suffixed with .group
; in the example above, there would be 2 beans g1.group
and g2.group
.
The Collection
beans will be removed in a future release.