Programmatic Construction
此功能专为与 @KafkaListener
一起使用而设计;然而,一些用户已请求有关如何以编程方式配置非阻塞重试的信息。以下 Spring Boot 应用程序提供了有关如何执行此操作的示例。
The feature is designed to be used with @KafkaListener
; however, several users have requested information on how to configure non-blocking retries programmatically.
The following Spring Boot application provides an example of how to do so.
@SpringBootApplication
public class Application extends RetryTopicConfigurationSupport {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
RetryTopicConfiguration retryConfig(KafkaTemplate<String, String> template) {
return RetryTopicConfigurationBuilder.newInstance()
.maxAttempts(4)
.autoCreateTopicsWith(2, (short) 1)
.create(template);
}
@Bean
TaskScheduler scheduler() {
return new ThreadPoolTaskScheduler();
}
@Bean
@Order(0)
SmartInitializingSingleton dynamicRetry(RetryTopicConfigurer configurer, RetryTopicConfiguration config,
KafkaListenerAnnotationBeanPostProcessor<?, ?> bpp, KafkaListenerContainerFactory<?> factory,
Listener listener, KafkaListenerEndpointRegistry registry) {
return () -> {
KafkaListenerEndpointRegistrar registrar = bpp.getEndpointRegistrar();
MethodKafkaListenerEndpoint<String, String> mainEndpoint = new MethodKafkaListenerEndpoint<>();
EndpointProcessor endpointProcessor = endpoint -> {
// customize as needed (e.g. apply attributes to retry endpoints).
if (!endpoint.equals(mainEndpoint)) {
endpoint.setConcurrency(1);
}
// these are required
endpoint.setMessageHandlerMethodFactory(bpp.getMessageHandlerMethodFactory());
endpoint.setTopics("topic");
endpoint.setId("id");
endpoint.setGroupId("group");
};
mainEndpoint.setBean(listener);
try {
mainEndpoint.setMethod(Listener.class.getDeclaredMethod("onMessage", ConsumerRecord.class));
}
catch (NoSuchMethodException | SecurityException ex) {
throw new IllegalStateException(ex);
}
mainEndpoint.setConcurrency(2);
mainEndpoint.setTopics("topic");
mainEndpoint.setId("id");
mainEndpoint.setGroupId("group");
configurer.processMainAndRetryListeners(endpointProcessor, mainEndpoint, config, registrar, factory,
"kafkaListenerContainerFactory");
};
}
@Bean
ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("topic", "test");
};
}
}
@Component
class Listener implements MessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println(KafkaUtils.format(record));
throw new RuntimeException("test");
}
}
仅当在刷新应用程序上下文之前处理配置时,才会自动创建主题,如上面的示例所示。要在运行时配置容器,则需要使用其他技术创建主题。
Auto creation of topics will only occur if the configuration is processed before the application context is refreshed, as in the above example. To configure containers at runtime, the topics will need to be created using some other technique.