Configuration
从 2.9 版开始,对于默认配置,应该在带有 @Configuration
批注的类中使用 @EnableKafkaRetryTopic
批注。这会启用此功能以正确启动,并且可以访问注入的某些功能组件,以便在运行时进行查找。
Starting with version 2.9, for default configuration, the @EnableKafkaRetryTopic
annotation should be used in a @Configuration
annotated class.
This enables the feature to bootstrap properly and gives access to injecting some of the feature’s components to be looked up at runtime.
如果添加了此注释,则无需也添加 |
It is not necessary to also add |
而且,从该版本开始,对于该特性组件和全局特性的更高级配置,RetryTopicConfigurationSupport
类应该在 @Configuration
类中扩展,并覆盖适当的方法。有关更多详细信息,请参阅 Configuring Global Settings and Features。
Also, starting with that version, for more advanced configuration of the feature’s components and global features, the RetryTopicConfigurationSupport
class should be extended in a @Configuration
class, and the appropriate methods overridden.
For more details refer to Configuring Global Settings and Features.
默认情况下,重试主题的容器与主容器具有相同的并发性。从 3.0 版开始,您可以为重试容器设置不同的 并发性
(在批注中或在 RetryConfigurationBuilder
中)。
By default, the containers for the retry topics will have the same concurrency as the main container.
Starting with version 3.0, you can set a different concurrency
for the retry containers (either on the annotation, or in RetryConfigurationBuilder
).
只可使用上述技术之一,且只有一个 @Configuration
类可扩展 RetryTopicConfigurationSupport
。
Only one of the above techniques can be used, and only one @Configuration
class can extend RetryTopicConfigurationSupport
.
Using the @RetryableTopic
annotation
要为带 @KafkaListener
注释的方法配置重试主题和 DLT,你只需向它添加 @RetryableTopic
注释,Apache Kafka 的 Spring 将使用默认配置引导所需的所有主题和消费者。
To configure the retry topic and dlt for a @KafkaListener
annotated method, you just have to add the @RetryableTopic
annotation to it and Spring for Apache Kafka will bootstrap all the necessary topics and consumers with the default configurations.
@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
// ... message processing
}
自 3.2 起,@RetryableTopic
对类上 @KafkaListener
的支持将是:
Since 3.2, @RetryableTopic
support for @KafkaListener on a class would be:
@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public class ClassLevelRetryListener {
@KafkaHandler
public void processMessage(MyPojo message) {
// ... message processing
}
}
你可以通过用 @DltHandler
注释来指定类中处理 DLT 消息的方法。如果没有提供 DltHandler
方法,则创建默认使用者,它只记录消耗。
You can specify a method in the same class to process the dlt messages by annotating it with the @DltHandler
annotation.
If no DltHandler method is provided a default consumer is created which only logs the consumption.
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
如果您未指定 kafkaTemplate 名称,则会查找名称为 |
If you don’t specify a kafkaTemplate name a bean with name |
从 3.0 版开始,@RetryableTopic
注释可用作自定义注释的元注释;例如:
Starting with version 3.0, the @RetryableTopic
annotation can be used as a meta-annotation on custom annotations; for example:
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
static @interface MetaAnnotatedRetryableTopic {
@AliasFor(attribute = "concurrency", annotation = RetryableTopic.class)
String parallelism() default "3";
}
Using RetryTopicConfiguration
beans
你还可以通过在带 @Configuration
注释的类中创建 RetryTopicConfiguration
Bean 来配置非阻塞重试支持。
You can also configure the non-blocking retry support by creating RetryTopicConfiguration
beans in a @Configuration
annotated class.
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.create(template);
}
这将为使用默认配置带 @KafkaListener
注释的方法中的所有主题创建重试主题和 DLT 以及相应的使用者。需要 KafkaTemplate
实例进行消息转发。
This will create retry topics and a dlt, as well as the corresponding consumers, for all topics in methods annotated with @KafkaListener
using the default configurations. The KafkaTemplate
instance is required for message forwarding.
要对如何处理每个主题的非阻塞重试进行更细粒度的控制,可以提供多个 RetryTopicConfiguration
Bean。
To achieve more fine-grained control over how to handle non-blocking retrials for each topic, more than one RetryTopicConfiguration
bean can be provided.
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3000)
.maxAttempts(5)
.concurrency(1)
.includeTopics("my-topic", "my-other-topic")
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1000, 2, 5000)
.maxAttempts(4)
.excludeTopics("my-topic", "my-other-topic")
.retryOn(MyException.class)
.create(template);
}
重试主题和 DLT 的使用者将被分配到一个使用者组,该组的组 ID 是您在 |
The retry topics' and dlt’s consumers will be assigned to a consumer group with a group id that is the combination of the one which you provide in the |
如果消费者配置有 ErrorHandlingDeserializer
来处理反序列化异常,则配置 KafkaTemplate
及其生产者使用可以处理正常对象以及由反序列化异常导致的原始 byte[]
值的序列化程序非常重要。模板的通用值类型应为 Object
。一种方法是使用 DelegatingByTypeSerializer
;示例如下:
If the consumer is configured with an ErrorHandlingDeserializer
, to handle deserialization exceptions, it is important to configure the KafkaTemplate
and its producer with a serializer that can handle normal objects as well as raw byte[]
values, which result from deserialization exceptions.
The generic value type of the template should be Object
.
One technique is to use the DelegatingByTypeSerializer
; an example follows:
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
可以对同一个主题使用多个 @KafkaListener
注释,可以手动分配分区,也可以不手动分配分区,同时还可以使用非阻塞重试,但对于给定的主题只使用一个配置。最好为此类主题的配置使用一个单独的 RetryTopicConfiguration
Bean;如果针对同一个主题使用了多个 @RetryableTopic
注释,则所有注释都应具有相同的值,否则其中一个注释将应用于该主题的所有侦听器,而其他注释的值将被忽略。
Multiple @KafkaListener
annotations can be used for the same topic with or without manual partition assignment along with non-blocking retries, but only one configuration will be used for a given topic.
It’s best to use a single RetryTopicConfiguration
bean for configuration of such topics; if multiple @RetryableTopic
annotations are being used for the same topic, all of them should have the same values, otherwise one of them will be applied to all of that topic’s listeners and the other annotations' values will be ignored.
Configuring Global Settings and Features
自 2.9 起,用于配置组件的以前 Bean 覆盖方法已被删除(未弃用,由于上述 API 的实验性质)。这不会改变 RetryTopicConfiguration
Bean 方法,而只改变基础设施组件的配置。现在应该在(单个)@Configuration
类中扩展 RetryTopicConfigurationSupport
类,并覆盖适当的方法。示例如下:
Since 2.9, the previous bean overriding approach for configuring components has been removed (without deprecation, due to the aforementioned experimental nature of the API).
This does not change the RetryTopicConfiguration
beans approach - only infrastructure components' configurations.
Now the RetryTopicConfigurationSupport
class should be extended in a (single) @Configuration
class, and the proper methods overridden.
An example follows:
@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {
@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
.backOff(new FixedBackOff(3000, 3));
}
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
// Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
customizersConfigurer.customizeErrorHandler(eh -> {
eh.setSeekAfterError(false);
});
}
}
在使用这种配置方法的情况下,不应使用 @EnableKafkaRetryTopic
注释来防止由于重复的 Bean 而导致上下文启动失败。请改用简单的 @EnableKafka
注释。
When using this configuration approach, the @EnableKafkaRetryTopic
annotation should not be used to prevent context failing to start due to duplicated beans.
Use the simple @EnableKafka
annotation instead.
当 autoCreateTopics
为 true 时,将使用指定的分区数和复制因子创建主主题和重试主题。从版本 3.0 开始,默认复制因子为 -1
,表示使用代理默认值。如果代理版本低于 2.4,则需要设置显式值。若要针对特定主题(例如主主题或 DLT)覆盖这些值,只需添加带有所需属性的 NewTopic
@Bean
;这将覆盖自动创建属性。
When autoCreateTopics
is true, the main and retry topics will be created with the specified number of partitions and replication factor.
Starting with version 3.0, the default replication factor is -1
, meaning using the broker default.
If your broker version is earlier than 2.4, you will need to set an explicit value.
To override these values for a particular topic (e.g. the main topic or DLT), simply add a NewTopic
@Bean
with the required properties; that will override the auto creation properties.
默认情况下,记录将使用接收记录的原始分区发布到重试主题。如果重试主题的分区比主主题的分区少,则应适当配置框架;如下所示的一个示例。
By default, records are published to the retry topic(s) using the original partition of the received record. If the retry topics have fewer partitions than the main topic, you should configure the framework appropriately; an example follows.
@EnableKafka
@Configuration
public class Config extends RetryTopicConfigurationSupport {
@Override
protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
}
...
}
函数的参数是使用者记录和下一个主题的名称。可以返回特定分区数,或 null
以指示 KafkaProducer
应确定分区。
The parameters to the function are the consumer record and the name of the next topic.
You can return a specific partition number, or null
to indicate that the KafkaProducer
should determine the partition.
在默认情况下,当记录在重试主题中转换时,重试报头(尝试次数、时间戳)的所有值将被保留。从 2.9.6 版开始,如果你只想保留这些报头的最后一个值,请使用上面显示的 configureDeadLetterPublishingContainerFactory()
方法将工厂的 retainAllRetryHeaderValues
属性设置为 false
。
By default, all values of retry headers (number of attempts, timestamps) are retained when a record transitions through the retry topics.
Starting with version 2.9.6, if you want to retain just the last value of these headers, use the configureDeadLetterPublishingContainerFactory()
method shown above to set the factory’s retainAllRetryHeaderValues
property to false
.
Find RetryTopicConfiguration
尝试提供 RetryTopicConfiguration
实例,通过从 @RetryableTopic
注释创建实例,或在没有可用注释的情况下从 Bean 容器创建实例。
Attempts to provide an instance of RetryTopicConfiguration
by either creating one from a @RetryableTopic
annotation, or from the bean container if no annotation is available.
如果在容器中找到 Bean,则会检查提供的主题是否应由任何这样一个实例来处理。
If beans are found in the container, there’s a check to determine whether the provided topics should be handled by any of such instances.
如果提供了 @RetryableTopic
注释,则会查找 DltHandler
注释的方法。
If @RetryableTopic
annotation is provided, a DltHandler
annotated method is looked up.
自 3.2 起,在类上注释 @RetryableTopic
时提供新的 API 来创建 RetryTopicConfiguration
:
since 3.2, provide new API to Create RetryTopicConfiguration
when @RetryableTopic
annotated on a class:
@Bean
public RetryTopicConfiguration myRetryTopic() {
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
return provider.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean);
}
@RetryableTopic
public static class AnnotatedClass {
// NoOps
}