Container factory
如 @KafkaListener
Annotation 中所述,ConcurrentKafkaListenerContainerFactory
用于为带批注的方法创建容器。
从 2.2 版开始,您可以使用相同的工厂创建任何 ConcurrentMessageListenerContainer
。如果您想创建具有类似属性的多个容器或者想使用某些外部配置的工厂(例如 Spring Boot 自动配置提供的),这将非常有用。一旦创建容器,您可以进一步修改其属性,其中许多属性都是通过使用 container.getContainerProperties()
设置的。以下示例配置了 ConcurrentMessageListenerContainer
:
@Bean
public ConcurrentMessageListenerContainer<String, String>(
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> container =
factory.createContainer("topic1", "topic2");
container.setMessageListener(m -> { ... } );
return container;
}
这样创建的容器不会添加到端点注册表。它们应该创建为 @Bean
定义,以便在应用程序上下文中注册。
从 2.3.4 版开始,您可以向工厂添加一个 ContainerCustomizer
,以便在创建和配置每个容器后进一步对其进行配置。
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setContainerCustomizer(container -> { /* customize the container */ });
return factory;
}
从 3.1 版开始,还可以通过在 KafkaListener
注释中指定 ContainerPostProcessor
的 bean 名称,在单个侦听器上应用相同类型的自定义。
@Bean
public ContainerPostProcessor<String, String, AbstractMessageListenerContainer<String, String>> customContainerPostProcessor() {
return container -> { /* customize the container */ };
}
...
@KafkaListener(..., containerPostProcessor="customContainerPostProcessor", ...)