Programmatic Construction

此功能专为与 @KafkaListener 一起使用而设计;然而,一些用户已请求有关如何以编程方式配置非阻塞重试的信息。以下 Spring Boot 应用程序提供了有关如何执行此操作的示例。

@SpringBootApplication
public class Application extends RetryTopicConfigurationSupport {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    RetryTopicConfiguration retryConfig(KafkaTemplate<String, String> template) {
        return RetryTopicConfigurationBuilder.newInstance()
                .maxAttempts(4)
                .autoCreateTopicsWith(2, (short) 1)
                .create(template);
    }

    @Bean
    TaskScheduler scheduler() {
        return new ThreadPoolTaskScheduler();
    }

    @Bean
    @Order(0)
    SmartInitializingSingleton dynamicRetry(RetryTopicConfigurer configurer, RetryTopicConfiguration config,
            KafkaListenerAnnotationBeanPostProcessor<?, ?> bpp, KafkaListenerContainerFactory<?> factory,
            Listener listener, KafkaListenerEndpointRegistry registry) {

        return () -> {
            KafkaListenerEndpointRegistrar registrar = bpp.getEndpointRegistrar();
            MethodKafkaListenerEndpoint<String, String> mainEndpoint = new MethodKafkaListenerEndpoint<>();
            EndpointProcessor endpointProcessor = endpoint -> {
                // customize as needed (e.g. apply attributes to retry endpoints).
                if (!endpoint.equals(mainEndpoint)) {
                    endpoint.setConcurrency(1);
                }
                // these are required
                endpoint.setMessageHandlerMethodFactory(bpp.getMessageHandlerMethodFactory());
                endpoint.setTopics("topic");
                endpoint.setId("id");
                endpoint.setGroupId("group");
            };
            mainEndpoint.setBean(listener);
            try {
                mainEndpoint.setMethod(Listener.class.getDeclaredMethod("onMessage", ConsumerRecord.class));
            }
            catch (NoSuchMethodException | SecurityException ex) {
                throw new IllegalStateException(ex);
            }
            mainEndpoint.setConcurrency(2);
            mainEndpoint.setTopics("topic");
            mainEndpoint.setId("id");
            mainEndpoint.setGroupId("group");
            configurer.processMainAndRetryListeners(endpointProcessor, mainEndpoint, config, registrar, factory,
                    "kafkaListenerContainerFactory");
        };
    }


    @Bean
    ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("topic", "test");
        };
    }

}

@Component
class Listener implements MessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> record) {
        System.out.println(KafkaUtils.format(record));
        throw new RuntimeException("test");
    }

}

仅当在刷新应用程序上下文之前处理配置时,才会自动创建主题,如上面的示例所示。要在运行时配置容器,则需要使用其他技术创建主题。