Features
RetryTopic 批注和 RetryTopicConfiguration Bean 提供了广泛的功能,包括:
-
重试策略配置:支持各种回退策略,包括固定/指数回退和自定义策略。
-
全局超时:为重试操作设置超时,防止无限重试。
-
异常分类器:指定要重试的异常和不重试的异常,可遍历原因以查找嵌套异常。
-
主题包含/排除:确定 RetryTopicConfiguration Bean 处理或不处理哪些主题。
-
自动创建主题:通过 NewTopic Bean 自动创建所需的主题,可配置分区数和复制系数。
-
死信标题管理:自定义管理死信记录的故障标题的方式。
-
自定义 DeadLetterPublishingRecoverer:覆盖默认实现以修改发送到重试或死信主题的内容。
-
基于异常的 DLT 路由:根据引发的异常类型将消息路由到自定义死信主题。
@RetryableTopic
批注和 RetryTopicConfiguration
Bean 都可使用大多数功能。
BackOff Configuration
BackOff 配置依赖于 Spring Retry
项目中的 BackOffPolicy
界面。
它包括:
-
Fixed Back Off
-
Exponential Back Off
-
Random Exponential Back Off
-
Uniform Random Back Off
-
No Back Off
-
Custom Back Off
@RetryableTopic(attempts = 5,
backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(3_000)
.maxAttempts(4)
.create(template);
}
您还可以提供 Spring Retry 的 SleepingBackOffPolicy
界面自定义实现:
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.customBackOff(new MyCustomBackOffPolicy())
.maxAttempts(5)
.create(template);
}
默认后退策略是 |
|
第一次尝试计入 maxAttempts
,因此如果您提供了一个 maxAttempts
值 4,将会有原始尝试加上 3 次重试。
Global Timeout
您可为重试过程设置全局超时。如果达到该时间,则下次使用者引发异常时,消息将直接进入 DLT,或在没有 DLT 时结束处理。
@RetryableTopic(backoff = @Backoff(2_000), timeout = 5_000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(2_000)
.timeoutAfter(5_000)
.create(template);
}
默认情况下不设置超时,也可以提供 -1 作为超时值来实现此目的。 |
Exception Classifier
您可以指定要重试的异常以及不重试的异常。您还可将其设置为遍历原因以查找嵌套异常。
@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = true)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
throw new RuntimeException(new MyRetryException()); // will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.notRetryOn(MyDontRetryException.class)
.create(template);
}
默认的行为是重试所有异常,不遍历原因。 |
自 2.8.3 版以来,有一个全局列表包含致命异常,这些异常会导致记录在没有任何重试的情况下发送到 DLT。有关致命异常的默认列表,请参阅 DefaultErrorHandler。您可以通过覆盖扩展 `RetryTopicConfigurationSupport`的 `@Configuration`类中的 `configureNonBlockingRetries`方法来向此列表添加或从中删除异常。有关详细信息,请参阅 Configuring Global Settings and Features。
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
要禁用致命异常的分类,只需清除提供的列表。 |
Include and Exclude Topics
您可以通过 .includeTopic(String topic)、.includeTopics(Collection<String> topics)、.excludeTopic(String topic) 和 .excludeTopics(Collection<String> topics) 方法决定哪些主题将由 RetryTopicConfiguration
Bean 处理,哪些主题将不处理。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.includeTopics(List.of("my-included-topic", "my-other-included-topic"))
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.excludeTopic("my-excluded-topic")
.create(template);
}
默认行为是包含所有主题。 |
Topics AutoCreation
除非另有指定,否则框架将使用由 KafkaAdmin
Bean 消费的 NewTopic
Bean 自动创建所需的主题。您可以指定创建主题时分区的数量和副本系数,并且您可以关闭此功能。从 3.0 版本开始,默认副本系数为 -1
,表示使用 Broker 的默认值。如果您的 Broker 版本低于 2.4,您需要设置一个明确的值。
请注意,如果您不使用 Spring Boot,则需要提供一个 KafkaAdmin Bean 才能使用此功能。
@RetryableTopic(numPartitions = 2, replicationFactor = 3)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@RetryableTopic(autoCreateTopics = false)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.autoCreateTopicsWith(2, 3)
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotAutoCreateRetryTopics()
.create(template);
}
默认情况下,这些主题会自动创建,其中包含一个分区,并且复制因子为 -1(表示使用代理默认值)。如果您的代理版本早于 2.4,则需要设置一个显式值。 |
Failure Header Management
考虑如何管理故障标头(原始标头和异常标头)时,框架将委派给 DeadLetterPublishingRecover
以决定附加标头还是替换标头。
默认情况下,它将显式将 appendOriginalHeaders
设置为 false
,并将 stripPreviousExceptionHeaders
留给 DeadLetterPublishingRecover
使用的默认值。
这意味着,默认情况下,只保留第一个“原始”标题和最后一个异常标题。这是为了避免在涉及许多重试步骤时(例如,由于堆栈跟踪标题)创建超大的消息。
有关更多信息,请参见 Managing Dead Letter Record Headers。
要重新配置框架以使用这些属性的不同设置,可以通过覆盖扩展 `RetryTopicConfigurationSupport`的 `@Configuration`类中的 `configureCustomizers`方法来配置 `DeadLetterPublishingRecoverer`自定义程序。有关详细信息,请参阅 Configuring Global Settings and Features。
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
dlpr.setAppendOriginalHeaders(true);
dlpr.setStripPreviousExceptionHeaders(false);
});
}
从 2.8.4 版开始,如果您希望添加自定义标题(除了工厂添加的重试信息标题),您可以向工厂添加一个 headersFunction
- factory.setHeadersFunction((rec, ex) → { … })
。
默认情况下,添加的任何标题都是累积的 - Kafka 标头可以包含多个值。从 2.9.5 版开始,如果函数返回的 Headers
包含类型为 DeadLetterPublishingRecoverer.SingleRecordHeader
的标题,那么该标题的任何现有值都将被删除,并且只会保留新的单个值。
Custom DeadLetterPublishingRecoverer
如 Failure Header Management 中所示,可以自定义框架创建的默认 DeadLetterPublishingRecoverer
实例。然而,对于某些用例,有必要对 DeadLetterPublishingRecoverer
进行子类化,例如覆盖 createProducerRecord()
以修改发送到重试(或死信)主题的内容。从版本 3.0.9 开始,您可以覆盖 RetryConfigurationSupport.configureDeadLetterPublishingContainerFactory()
方法以提供 DeadLetterPublisherCreator
实例,例如:
@Override
protected Consumer<DeadLetterPublishingRecovererFactory>
configureDeadLetterPublishingContainerFactory() {
return (factory) -> factory.setDeadLetterPublisherCreator(
(templateResolver, destinationResolver) ->
new CustomDLPR(templateResolver, destinationResolver));
}
建议您在构建自定义实例时使用提供的解析器。
Routing of messages to custom DLTs based on thrown exceptions
从 3.2.0 版开始,可以根据在处理期间引发的异常类型将消息路由到自定义 DLT。为了做到这一点,需要指定路由。路由自定义包括附加目标点的规范。目标点本身又包括两个设置:后缀
和 异常
。当在 exceptions
中指定了异常类型时,包含 后缀
的 DLT 将被视为消息在考虑通用 DLT 之前的目标主题。使用批注或 RetryTopicConfiguration
Bean 进行配置的示例:
@RetryableTopic(exceptionBasedDltRouting = {
@ExceptionBasedDltDestination(
suffix = "-deserialization", exceptions = {DeserializationException.class}
)}
)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class)))
.create(kafkaOperations)
.create(template);
}
suffix
出现在自定义 DLT 名称中的一般 dltTopicSuffix
之前。预先考虑示例,导致 DeserializationException
的消息将被路由到 my-annotated-topic-deserialization-dlt
,而不是 my-annotated-topic-dlt
。自定义 DLT 将根据 Topics AutoCreation 中规定的规则创建。