Configuration
从 2.9 版开始,对于默认配置,应该在带有 @Configuration
批注的类中使用 @EnableKafkaRetryTopic
批注。这会启用此功能以正确启动,并且可以访问注入的某些功能组件,以便在运行时进行查找。
如果添加了此注释,则无需也添加 |
而且,从该版本开始,对于该特性组件和全局特性的更高级配置,RetryTopicConfigurationSupport
类应该在 @Configuration
类中扩展,并覆盖适当的方法。有关更多详细信息,请参阅 Configuring Global Settings and Features。
默认情况下,重试主题的容器与主容器具有相同的并发性。从 3.0 版开始,您可以为重试容器设置不同的 并发性
(在批注中或在 RetryConfigurationBuilder
中)。
只可使用上述技术之一,且只有一个 @Configuration
类可扩展 RetryTopicConfigurationSupport
。
Using the @RetryableTopic
annotation
要为带 @KafkaListener
注释的方法配置重试主题和 DLT,你只需向它添加 @RetryableTopic
注释,Apache Kafka 的 Spring 将使用默认配置引导所需的所有主题和消费者。
@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
// ... message processing
}
自 3.2 起,@RetryableTopic
对类上 @KafkaListener
的支持将是:
@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public class ClassLevelRetryListener {
@KafkaHandler
public void processMessage(MyPojo message) {
// ... message processing
}
}
你可以通过用 @DltHandler
注释来指定类中处理 DLT 消息的方法。如果没有提供 DltHandler
方法,则创建默认使用者,它只记录消耗。
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
如果您未指定 kafkaTemplate 名称,则会查找名称为 |
从 3.0 版开始,@RetryableTopic
注释可用作自定义注释的元注释;例如:
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
static @interface MetaAnnotatedRetryableTopic {
@AliasFor(attribute = "concurrency", annotation = RetryableTopic.class)
String parallelism() default "3";
}
Using RetryTopicConfiguration
beans
你还可以通过在带 @Configuration
注释的类中创建 RetryTopicConfiguration
Bean 来配置非阻塞重试支持。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.create(template);
}
这将为使用默认配置带 @KafkaListener
注释的方法中的所有主题创建重试主题和 DLT 以及相应的使用者。需要 KafkaTemplate
实例进行消息转发。
要对如何处理每个主题的非阻塞重试进行更细粒度的控制,可以提供多个 RetryTopicConfiguration
Bean。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3000)
.maxAttempts(5)
.concurrency(1)
.includeTopics("my-topic", "my-other-topic")
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1000, 2, 5000)
.maxAttempts(4)
.excludeTopics("my-topic", "my-other-topic")
.retryOn(MyException.class)
.create(template);
}
重试主题和 DLT 的使用者将被分配到一个使用者组,该组的组 ID 是您在 |
如果消费者配置有 ErrorHandlingDeserializer
来处理反序列化异常,则配置 KafkaTemplate
及其生产者使用可以处理正常对象以及由反序列化异常导致的原始 byte[]
值的序列化程序非常重要。模板的通用值类型应为 Object
。一种方法是使用 DelegatingByTypeSerializer
;示例如下:
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
可以对同一个主题使用多个 @KafkaListener
注释,可以手动分配分区,也可以不手动分配分区,同时还可以使用非阻塞重试,但对于给定的主题只使用一个配置。最好为此类主题的配置使用一个单独的 RetryTopicConfiguration
Bean;如果针对同一个主题使用了多个 @RetryableTopic
注释,则所有注释都应具有相同的值,否则其中一个注释将应用于该主题的所有侦听器,而其他注释的值将被忽略。
Configuring Global Settings and Features
自 2.9 起,用于配置组件的以前 Bean 覆盖方法已被删除(未弃用,由于上述 API 的实验性质)。这不会改变 RetryTopicConfiguration
Bean 方法,而只改变基础设施组件的配置。现在应该在(单个)@Configuration
类中扩展 RetryTopicConfigurationSupport
类,并覆盖适当的方法。示例如下:
@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {
@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
.backOff(new FixedBackOff(3000, 3));
}
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
// Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
customizersConfigurer.customizeErrorHandler(eh -> {
eh.setSeekAfterError(false);
});
}
}
在使用这种配置方法的情况下,不应使用 @EnableKafkaRetryTopic
注释来防止由于重复的 Bean 而导致上下文启动失败。请改用简单的 @EnableKafka
注释。
当 autoCreateTopics
为 true 时,将使用指定的分区数和复制因子创建主主题和重试主题。从版本 3.0 开始,默认复制因子为 -1
,表示使用代理默认值。如果代理版本低于 2.4,则需要设置显式值。若要针对特定主题(例如主主题或 DLT)覆盖这些值,只需添加带有所需属性的 NewTopic
@Bean
;这将覆盖自动创建属性。
默认情况下,记录将使用接收记录的原始分区发布到重试主题。如果重试主题的分区比主主题的分区少,则应适当配置框架;如下所示的一个示例。
@EnableKafka
@Configuration
public class Config extends RetryTopicConfigurationSupport {
@Override
protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
}
...
}
函数的参数是使用者记录和下一个主题的名称。可以返回特定分区数,或 null
以指示 KafkaProducer
应确定分区。
在默认情况下,当记录在重试主题中转换时,重试报头(尝试次数、时间戳)的所有值将被保留。从 2.9.6 版开始,如果你只想保留这些报头的最后一个值,请使用上面显示的 configureDeadLetterPublishingContainerFactory()
方法将工厂的 retainAllRetryHeaderValues
属性设置为 false
。
Find RetryTopicConfiguration
尝试提供 RetryTopicConfiguration
实例,通过从 @RetryableTopic
注释创建实例,或在没有可用注释的情况下从 Bean 容器创建实例。
如果在容器中找到 Bean,则会检查提供的主题是否应由任何这样一个实例来处理。
如果提供了 @RetryableTopic
注释,则会查找 DltHandler
注释的方法。
自 3.2 起,在类上注释 @RetryableTopic
时提供新的 API 来创建 RetryTopicConfiguration
:
@Bean
public RetryTopicConfiguration myRetryTopic() {
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
return provider.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean);
}
@RetryableTopic
public static class AnnotatedClass {
// NoOps
}