Features
RetryTopic 批注和 RetryTopicConfiguration Bean 提供了广泛的功能,包括:
-
重试策略配置:支持各种回退策略,包括固定/指数回退和自定义策略。
-
全局超时:为重试操作设置超时,防止无限重试。
-
异常分类器:指定要重试的异常和不重试的异常,可遍历原因以查找嵌套异常。
-
主题包含/排除:确定 RetryTopicConfiguration Bean 处理或不处理哪些主题。
-
自动创建主题:通过 NewTopic Bean 自动创建所需的主题,可配置分区数和复制系数。
-
死信标题管理:自定义管理死信记录的故障标题的方式。
-
自定义 DeadLetterPublishingRecoverer:覆盖默认实现以修改发送到重试或死信主题的内容。
-
基于异常的 DLT 路由:根据引发的异常类型将消息路由到自定义死信主题。
@RetryableTopic
批注和 RetryTopicConfiguration
Bean 都可使用大多数功能。
Most of the features are available both for the @RetryableTopic
annotation and the RetryTopicConfiguration
beans.
BackOff Configuration
BackOff 配置依赖于 Spring Retry
项目中的 BackOffPolicy
界面。
The BackOff configuration relies on the BackOffPolicy
interface from the Spring Retry
project.
它包括:
It includes:
-
Fixed Back Off
-
Exponential Back Off
-
Random Exponential Back Off
-
Uniform Random Back Off
-
No Back Off
-
Custom Back Off
@RetryableTopic(attempts = 5,
backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(3_000)
.maxAttempts(4)
.create(template);
}
您还可以提供 Spring Retry 的 SleepingBackOffPolicy
界面自定义实现:
You can also provide a custom implementation of Spring Retry’s SleepingBackOffPolicy
interface:
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.customBackOff(new MyCustomBackOffPolicy())
.maxAttempts(5)
.create(template);
}
默认后退策略是 |
The default back off policy is |
|
There is a 30-second default maximum delay for the |
第一次尝试计入 maxAttempts
,因此如果您提供了一个 maxAttempts
值 4,将会有原始尝试加上 3 次重试。
The first attempt counts against maxAttempts
, so if you provide a maxAttempts
value of 4 there’ll be the original attempt plus 3 retries.
Global Timeout
您可为重试过程设置全局超时。如果达到该时间,则下次使用者引发异常时,消息将直接进入 DLT,或在没有 DLT 时结束处理。
You can set the global timeout for the retrying process. If that time is reached, the next time the consumer throws an exception the message goes straight to the DLT, or just ends the processing if no DLT is available.
@RetryableTopic(backoff = @Backoff(2_000), timeout = 5_000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(2_000)
.timeoutAfter(5_000)
.create(template);
}
默认情况下不设置超时,也可以提供 -1 作为超时值来实现此目的。 |
The default is having no timeout set, which can also be achieved by providing -1 as the timout value. |
Exception Classifier
您可以指定要重试的异常以及不重试的异常。您还可将其设置为遍历原因以查找嵌套异常。
You can specify which exceptions you want to retry on and which not to. You can also set it to traverse the causes to lookup nested exceptions.
@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = true)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
throw new RuntimeException(new MyRetryException()); // will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.notRetryOn(MyDontRetryException.class)
.create(template);
}
默认的行为是重试所有异常,不遍历原因。 |
The default behavior is retrying on all exceptions and not traversing causes. |
自 2.8.3 版以来,有一个全局列表包含致命异常,这些异常会导致记录在没有任何重试的情况下发送到 DLT。有关致命异常的默认列表,请参阅 DefaultErrorHandler。您可以通过覆盖扩展 `RetryTopicConfigurationSupport`的 `@Configuration`类中的 `configureNonBlockingRetries`方法来向此列表添加或从中删除异常。有关详细信息,请参阅 Configuring Global Settings and Features。
Since 2.8.3 there’s a global list of fatal exceptions which will cause the record to be sent to the DLT without any retries.
See DefaultErrorHandler for the default list of fatal exceptions.
You can add or remove exceptions to and from this list by overriding the configureNonBlockingRetries
method in a @Configuration
class that extends RetryTopicConfigurationSupport
.
See Configuring Global Settings and Features for more information.
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
要禁用致命异常的分类,只需清除提供的列表。 |
To disable fatal exceptions' classification, just clear the provided list. |
Include and Exclude Topics
您可以通过 .includeTopic(String topic)、.includeTopics(Collection<String> topics)、.excludeTopic(String topic) 和 .excludeTopics(Collection<String> topics) 方法决定哪些主题将由 RetryTopicConfiguration
Bean 处理,哪些主题将不处理。
You can decide which topics will and will not be handled by a RetryTopicConfiguration
bean via the .includeTopic(String topic), .includeTopics(Collection<String> topics) .excludeTopic(String topic) and .excludeTopics(Collection<String> topics) methods.
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.includeTopics(List.of("my-included-topic", "my-other-included-topic"))
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.excludeTopic("my-excluded-topic")
.create(template);
}
默认行为是包含所有主题。 |
The default behavior is to include all topics. |
Topics AutoCreation
除非另有指定,否则框架将使用由 KafkaAdmin
Bean 消费的 NewTopic
Bean 自动创建所需的主题。您可以指定创建主题时分区的数量和副本系数,并且您可以关闭此功能。从 3.0 版本开始,默认副本系数为 -1
,表示使用 Broker 的默认值。如果您的 Broker 版本低于 2.4,您需要设置一个明确的值。
Unless otherwise specified the framework will auto create the required topics using NewTopic
beans that are consumed by the KafkaAdmin
bean.
You can specify the number of partitions and the replication factor with which the topics will be created, and you can turn this feature off.
Starting with version 3.0, the default replication factor is -1
, meaning using the broker default.
If your broker version is earlier than 2.4, you will need to set an explicit value.
请注意,如果您不使用 Spring Boot,则需要提供一个 KafkaAdmin Bean 才能使用此功能。
Note that if you’re not using Spring Boot you’ll have to provide a KafkaAdmin bean in order to use this feature.
@RetryableTopic(numPartitions = 2, replicationFactor = 3)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@RetryableTopic(autoCreateTopics = false)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.autoCreateTopicsWith(2, 3)
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotAutoCreateRetryTopics()
.create(template);
}
默认情况下,这些主题会自动创建,其中包含一个分区,并且复制因子为 -1(表示使用代理默认值)。如果您的代理版本早于 2.4,则需要设置一个显式值。 |
By default the topics are autocreated with one partition and a replication factor of -1 (meaning using the broker default). If your broker version is earlier than 2.4, you will need to set an explicit value. |
Failure Header Management
考虑如何管理故障标头(原始标头和异常标头)时,框架将委派给 DeadLetterPublishingRecover
以决定附加标头还是替换标头。
When considering how to manage failure headers (original headers and exception headers), the framework delegates to the DeadLetterPublishingRecover
to decide whether to append or replace the headers.
默认情况下,它将显式将 appendOriginalHeaders
设置为 false
,并将 stripPreviousExceptionHeaders
留给 DeadLetterPublishingRecover
使用的默认值。
By default, it explicitly sets appendOriginalHeaders
to false
and leaves stripPreviousExceptionHeaders
to the default used by the DeadLetterPublishingRecover
.
这意味着,默认情况下,只保留第一个“原始”标题和最后一个异常标题。这是为了避免在涉及许多重试步骤时(例如,由于堆栈跟踪标题)创建超大的消息。
This means that only the first "original" and last exception headers are retained with the default configuration. This is to avoid creation of excessively large messages (due to the stack trace header, for example) when many retry steps are involved.
有关更多信息,请参见 Managing Dead Letter Record Headers。
See Managing Dead Letter Record Headers for more information.
要重新配置框架以使用这些属性的不同设置,可以通过覆盖扩展 `RetryTopicConfigurationSupport`的 `@Configuration`类中的 `configureCustomizers`方法来配置 `DeadLetterPublishingRecoverer`自定义程序。有关详细信息,请参阅 Configuring Global Settings and Features。
To reconfigure the framework to use different settings for these properties, configure a DeadLetterPublishingRecoverer
customizer by overriding the configureCustomizers
method in a @Configuration
class that extends RetryTopicConfigurationSupport
.
See Configuring Global Settings and Features for more details.
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
dlpr.setAppendOriginalHeaders(true);
dlpr.setStripPreviousExceptionHeaders(false);
});
}
从 2.8.4 版开始,如果您希望添加自定义标题(除了工厂添加的重试信息标题),您可以向工厂添加一个 headersFunction
- factory.setHeadersFunction((rec, ex) → { … })
。
Starting with version 2.8.4, if you wish to add custom headers (in addition to the retry information headers added by the factory, you can add a headersFunction
to the factory - factory.setHeadersFunction((rec, ex) → { … })
.
默认情况下,添加的任何标题都是累积的 - Kafka 标头可以包含多个值。从 2.9.5 版开始,如果函数返回的 Headers
包含类型为 DeadLetterPublishingRecoverer.SingleRecordHeader
的标题,那么该标题的任何现有值都将被删除,并且只会保留新的单个值。
By default, any headers added will be cumulative - Kafka headers can contain multiple values.
Starting with version 2.9.5, if the Headers
returned by the function contains a header of type DeadLetterPublishingRecoverer.SingleRecordHeader
, then any existing values for that header will be removed and only the new single value will remain.
Custom DeadLetterPublishingRecoverer
如 Failure Header Management 中所示,可以自定义框架创建的默认 DeadLetterPublishingRecoverer
实例。然而,对于某些用例,有必要对 DeadLetterPublishingRecoverer
进行子类化,例如覆盖 createProducerRecord()
以修改发送到重试(或死信)主题的内容。从版本 3.0.9 开始,您可以覆盖 RetryConfigurationSupport.configureDeadLetterPublishingContainerFactory()
方法以提供 DeadLetterPublisherCreator
实例,例如:
As can be seen in Failure Header Management it is possible to customize the default DeadLetterPublishingRecoverer
instances created by the framework.
However, for some use cases, it is necessary to subclass the DeadLetterPublishingRecoverer
, for example to override createProducerRecord()
to modify the contents sent to the retry (or dead-letter) topics.
Starting with version 3.0.9, you can override the RetryConfigurationSupport.configureDeadLetterPublishingContainerFactory()
method to provide a DeadLetterPublisherCreator
instance, for example:
@Override
protected Consumer<DeadLetterPublishingRecovererFactory>
configureDeadLetterPublishingContainerFactory() {
return (factory) -> factory.setDeadLetterPublisherCreator(
(templateResolver, destinationResolver) ->
new CustomDLPR(templateResolver, destinationResolver));
}
建议您在构建自定义实例时使用提供的解析器。
It is recommended that you use the provided resolvers when constructing the custom instance.
Routing of messages to custom DLTs based on thrown exceptions
从 3.2.0 版开始,可以根据在处理期间引发的异常类型将消息路由到自定义 DLT。为了做到这一点,需要指定路由。路由自定义包括附加目标点的规范。目标点本身又包括两个设置:后缀
和 异常
。当在 exceptions
中指定了异常类型时,包含 后缀
的 DLT 将被视为消息在考虑通用 DLT 之前的目标主题。使用批注或 RetryTopicConfiguration
Bean 进行配置的示例:
Starting with version 3.2.0, it’s possible to route messages to custom DLTs based on the type of the exception, which has been thrown during their processing.
In order to do that, there’s a need to specify the routing.
Routing customization consists of the specification of the additional destinations.
Destinations in turn consist of two settings: the suffix
and exceptions
.
When the exception type specified in exceptions
has been thrown, the DLT containing the suffix
will be considered as the target topic for the message before the general purpose DLT is considered.
Examples of configuration using either annotations or RetryTopicConfiguration
beans:
@RetryableTopic(exceptionBasedDltRouting = {
@ExceptionBasedDltDestination(
suffix = "-deserialization", exceptions = {DeserializationException.class}
)}
)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class)))
.create(kafkaOperations)
.create(template);
}
suffix
出现在自定义 DLT 名称中的一般 dltTopicSuffix
之前。预先考虑示例,导致 DeserializationException
的消息将被路由到 my-annotated-topic-deserialization-dlt
,而不是 my-annotated-topic-dlt
。自定义 DLT 将根据 Topics AutoCreation 中规定的规则创建。
suffix
takes place before the general dltTopicSuffix
in the custom DLT name.
Considering presented examples, the message, which caused the DeserializationException
will be routed to the my-annotated-topic-deserialization-dlt
instead of the my-annotated-topic-dlt
.
Custom DLTs will be created following the same rules as stated in the Topics AutoCreation.