Features

RetryTopic 批注和 RetryTopicConfiguration Bean 提供了广泛的功能,包括:

  • 重试策略配置:支持各种回退策略,包括固定/指数回退和自定义策略。

  • 全局超时:为重试操作设置超时,防止无限重试。

  • 异常分类器:指定要重试的异常和不重试的异常,可遍历原因以查找嵌套异常。

  • 主题包含/排除:确定 RetryTopicConfiguration Bean 处理或不处理哪些主题。

  • 自动创建主题:通过 NewTopic Bean 自动创建所需的主题,可配置分区数和复制系数。

  • 死信标题管理:自定义管理死信记录的故障标题的方式。

  • 自定义 DeadLetterPublishingRecoverer:覆盖默认实现以修改发送到重试或死信主题的内容。

  • 基于异常的 DLT 路由:根据引发的异常类型将消息路由到自定义死信主题。

@RetryableTopic 批注和 RetryTopicConfiguration Bean 都可使用大多数功能。

BackOff Configuration

BackOff 配置依赖于 Spring Retry 项目中的 BackOffPolicy 界面。

它包括:

  • Fixed Back Off

  • Exponential Back Off

  • Random Exponential Back Off

  • Uniform Random Back Off

  • No Back Off

  • Custom Back Off

@RetryableTopic(attempts = 5,
    backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackoff(3_000)
            .maxAttempts(4)
            .create(template);
}

您还可以提供 Spring Retry 的 SleepingBackOffPolicy 界面自定义实现:

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .customBackOff(new MyCustomBackOffPolicy())
            .maxAttempts(5)
            .create(template);
}

默认后退策略是 FixedBackOffPolicy,最大尝试次数为 3 次,间隔为 1000 毫秒。

ExponentialBackOffPolicy 的默认最大延迟为 30 秒。如果您的后退策略需要值大于该值,请相应调整 maxDelay 属性。

第一次尝试计入 maxAttempts,因此如果您提供了一个 maxAttempts 值 4,将会有原始尝试加上 3 次重试。

Global Timeout

您可为重试过程设置全局超时。如果达到该时间,则下次使用者引发异常时,消息将直接进入 DLT,或在没有 DLT 时结束处理。

@RetryableTopic(backoff = @Backoff(2_000), timeout = 5_000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackoff(2_000)
            .timeoutAfter(5_000)
            .create(template);
}

默认情况下不设置超时,也可以提供 -1 作为超时值来实现此目的。

Exception Classifier

您可以指定要重试的异常以及不重试的异常。您还可将其设置为遍历原因以查找嵌套异常。

@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = true)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    throw new RuntimeException(new MyRetryException()); // will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .notRetryOn(MyDontRetryException.class)
            .create(template);
}

默认的行为是重试所有异常,不遍历原因。

自 2.8.3 版以来,有一个全局列表包含致命异常,这些异常会导致记录在没有任何重试的情况下发送到 DLT。有关致命异常的默认列表,请参阅 DefaultErrorHandler。您可以通过覆盖扩展 `RetryTopicConfigurationSupport`的 `@Configuration`类中的 `configureNonBlockingRetries`方法来向此列表添加或从中删除异常。有关详细信息,请参阅 Configuring Global Settings and Features

@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
    nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}

要禁用致命异常的分类,只需清除提供的列表。

Include and Exclude Topics

您可以通过 .includeTopic(String topic)、.includeTopics(Collection<String> topics)、.excludeTopic(String topic) 和 .excludeTopics(Collection<String> topics) 方法决定哪些主题将由 RetryTopicConfiguration Bean 处理,哪些主题将不处理。

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .includeTopics(List.of("my-included-topic", "my-other-included-topic"))
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .excludeTopic("my-excluded-topic")
            .create(template);
}

默认行为是包含所有主题。

Topics AutoCreation

除非另有指定,否则框架将使用由 KafkaAdmin Bean 消费的 NewTopic Bean 自动创建所需的主题。您可以指定创建主题时分区的数量和副本系数,并且您可以关闭此功能。从 3.0 版本开始,默认副本系数为 -1,表示使用 Broker 的默认值。如果您的 Broker 版本低于 2.4,您需要设置一个明确的值。

请注意,如果您不使用 Spring Boot,则需要提供一个 KafkaAdmin Bean 才能使用此功能。

@RetryableTopic(numPartitions = 2, replicationFactor = 3)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}

@RetryableTopic(autoCreateTopics = false)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .autoCreateTopicsWith(2, 3)
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotAutoCreateRetryTopics()
            .create(template);
}

默认情况下,这些主题会自动创建,其中包含一个分区,并且复制因子为 -1(表示使用代理默认值)。如果您的代理版本早于 2.4,则需要设置一个显式值。

Failure Header Management

考虑如何管理故障标头(原始标头和异常标头)时,框架将委派给 DeadLetterPublishingRecover 以决定附加标头还是替换标头。

默认情况下,它将显式将 appendOriginalHeaders 设置为 false,并将 stripPreviousExceptionHeaders 留给 DeadLetterPublishingRecover 使用的默认值。

这意味着,默认情况下,只保留第一个“原始”标题和最后一个异常标题。这是为了避免在涉及许多重试步骤时(例如,由于堆栈跟踪标题)创建超大的消息。

有关更多信息,请参见 Managing Dead Letter Record Headers

要重新配置框架以使用这些属性的不同设置,可以通过覆盖扩展 `RetryTopicConfigurationSupport`的 `@Configuration`类中的 `configureCustomizers`方法来配置 `DeadLetterPublishingRecoverer`自定义程序。有关详细信息,请参阅 Configuring Global Settings and Features

@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
    customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
        dlpr.setAppendOriginalHeaders(true);
        dlpr.setStripPreviousExceptionHeaders(false);
    });
}

从 2.8.4 版开始,如果您希望添加自定义标题(除了工厂添加的重试信息标题),您可以向工厂添加一个 headersFunction - factory.setHeadersFunction((rec, ex) → { …​ })

默认情况下,添加的任何标题都是累积的 - Kafka 标头可以包含多个值。从 2.9.5 版开始,如果函数返回的 Headers 包含类型为 DeadLetterPublishingRecoverer.SingleRecordHeader 的标题,那么该标题的任何现有值都将被删除,并且只会保留新的单个值。

Custom DeadLetterPublishingRecoverer

Failure Header Management 中所示,可以自定义框架创建的默认 DeadLetterPublishingRecoverer 实例。然而,对于某些用例,有必要对 DeadLetterPublishingRecoverer 进行子类化,例如覆盖 createProducerRecord() 以修改发送到重试(或死信)主题的内容。从版本 3.0.9 开始,您可以覆盖 RetryConfigurationSupport.configureDeadLetterPublishingContainerFactory() 方法以提供 DeadLetterPublisherCreator 实例,例如:

@Override
protected Consumer<DeadLetterPublishingRecovererFactory>
        configureDeadLetterPublishingContainerFactory() {

    return (factory) -> factory.setDeadLetterPublisherCreator(
            (templateResolver, destinationResolver) ->
                    new CustomDLPR(templateResolver, destinationResolver));
}

建议您在构建自定义实例时使用提供的解析器。

Routing of messages to custom DLTs based on thrown exceptions

从 3.2.0 版开始,可以根据在处理期间引发的异常类型将消息路由到自定义 DLT。为了做到这一点,需要指定路由。路由自定义包括附加目标点的规范。目标点本身又包括两个设置:后缀异常。当在 exceptions 中指定了异常类型时,包含 后缀 的 DLT 将被视为消息在考虑通用 DLT 之前的目标主题。使用批注或 RetryTopicConfiguration Bean 进行配置的示例:

@RetryableTopic(exceptionBasedDltRouting = {
    @ExceptionBasedDltDestination(
        suffix = "-deserialization", exceptions = {DeserializationException.class}
    )}
)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class)))
            .create(kafkaOperations)
            .create(template);
}

suffix 出现在自定义 DLT 名称中的一般 dltTopicSuffix 之前。预先考虑示例,导致 DeserializationException 的消息将被路由到 my-annotated-topic-deserialization-dlt,而不是 my-annotated-topic-dlt。自定义 DLT 将根据 Topics AutoCreation 中规定的规则创建。