Container factory

@KafkaListener Annotation 中所述,ConcurrentKafkaListenerContainerFactory 用于为带批注的方法创建容器。

As discussed in @KafkaListener Annotation, a ConcurrentKafkaListenerContainerFactory is used to create containers for annotated methods.

从 2.2 版开始,您可以使用相同的工厂创建任何 ConcurrentMessageListenerContainer。如果您想创建具有类似属性的多个容器或者想使用某些外部配置的工厂(例如 Spring Boot 自动配置提供的),这将非常有用。一旦创建容器,您可以进一步修改其属性,其中许多属性都是通过使用 container.getContainerProperties() 设置的。以下示例配置了 ConcurrentMessageListenerContainer

Starting with version 2.2, you can use the same factory to create any ConcurrentMessageListenerContainer. This might be useful if you want to create several containers with similar properties or you wish to use some externally configured factory, such as the one provided by Spring Boot auto-configuration. Once the container is created, you can further modify its properties, many of which are set by using container.getContainerProperties(). The following example configures a ConcurrentMessageListenerContainer:

@Bean
public ConcurrentMessageListenerContainer<String, String>(
        ConcurrentKafkaListenerContainerFactory<String, String> factory) {

    ConcurrentMessageListenerContainer<String, String> container =
        factory.createContainer("topic1", "topic2");
    container.setMessageListener(m -> { ... } );
    return container;
}

这样创建的容器不会添加到端点注册表。它们应该创建为 @Bean 定义,以便在应用程序上下文中注册。

Containers created this way are not added to the endpoint registry. They should be created as @Bean definitions so that they are registered with the application context.

从 2.3.4 版开始,您可以向工厂添加一个 ContainerCustomizer,以便在创建和配置每个容器后进一步对其进行配置。

Starting with version 2.3.4, you can add a ContainerCustomizer to the factory to further configure each container after it has been created and configured.

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setContainerCustomizer(container -> { /* customize the container */ });
    return factory;
}

从 3.1 版开始,还可以通过在 KafkaListener 注释中指定 ContainerPostProcessor 的 bean 名称,在单个侦听器上应用相同类型的自定义。

Starting with version 3.1, it’s also possible to apply the same kind of customization on a single listener by specifying the bean name of a 'ContainerPostProcessor' on the KafkaListener annotation.

@Bean
public ContainerPostProcessor<String, String, AbstractMessageListenerContainer<String, String>> customContainerPostProcessor() {
    return container -> { /* customize the container */ };
}

...

@KafkaListener(..., containerPostProcessor="customContainerPostProcessor", ...)