Topic Naming

重试主题和 DLT 是通过为主题添加提供或默认值的后缀来命名的,后缀要么是延迟,要么是该主题的索引。 示例: “my-topic”→ “my-topic-retry-0”、“my-topic-retry-1”、“my-topic-dlt” “my-other-topic”→ “my-topic-myRetrySuffix-1000”、“my-topic-myRetrySuffix-2000”、“my-topic-myDltSuffix”

默认行为是为每次尝试创建单独的重试主题,并附加索引值:retry-0、retry-1、…​、retry-n。因此,默认情况下,重试主题的数量是配置的 maxAttempts 减去 1。

Retry Topics and DLT Suffixes

你可以指定重试主题和 DLT 主题将使用的后缀。

@RetryableTopic(retryTopicSuffix = "-my-retry-suffix", dltTopicSuffix = "-my-dlt-suffix")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .retryTopicSuffix("-my-retry-suffix")
            .dltTopicSuffix("-my-dlt-suffix")
            .create(template);
}

默认后缀对于重试主题和 dlt 分别是“-retry”和“-dlt”。

Appending the Topic’s Index or Delay

你可以在后缀之后附加主题的索引或延迟值。

@RetryableTopic(topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .suffixTopicsWithIndexValues()
            .create(template);
    }

默认行为是使用延迟值作为后缀,对于具有多个主题的固定延迟配置除外,在这种情况下,主题将使用主题的索引作为后缀。

Single Topic for Fixed Delay Retries

如果你正在使用固定延迟策略(如 FixedBackOffPolicyNoBackOffPolicy),可以使用单个主题来完成无阻塞重试。该主题将加上提供或默认的后缀,并且不会附加索引或延迟值。

以前的 FixedDelayStrategy 已被弃用,可以用 SameIntervalTopicReuseStrategy 取代。

@RetryableTopic(backoff = @Backoff(2_000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackoff(3_000)
            .maxAttempts(5)
            .useSingleTopicForFixedDelays()
            .create(template);
}

默认行为是为每次尝试创建单独的重试主题,并附加其索引值:retry-0、retry-1、…​

Single Topic for maxInterval Exponential Delay

如果你正在使用指数退避策略(ExponentialBackOffPolicy),可以使用单个重试主题来完成延迟是已配置的 maxInterval 的尝试的无阻塞重试。

这个“最终”重试主题将加上提供或默认的后缀,并且将附加索引或 maxInterval 值。

通过选择对使用 maxInterval 延迟的重试使用单个主题,配置指数重试策略可能更可行,这种策略会持续重试很长时间,因为此方法不需要大量主题。

从 3.2 开始,默认行为在使用指数退避时,针对相同的时间间隔重新使用重试主题,重试主题带有延迟值的后缀,最后一个重试主题针对相同的时间间隔(对应于 maxInterval 延迟)重复使用。

例如,使用 initialInterval=1_000multiplier=2maxInterval=16_000 配置指数退避时,为了持续尝试一小时,需要将 maxAttempts 配置为 229,默认情况下需要的重试主题如下:

  • -retry-1000

  • -retry-2000

  • -retry-4000

  • -retry-8000

  • -retry-16000

当使用重试主题的数量等于已配置的 maxAttempts 减去 1 的策略时,最后一个重试主题(对应于 maxInterval 延迟)将带有附加索引的后缀:

  • -retry-1000

  • -retry-2000

  • -retry-4000

  • -retry-8000

  • -retry-16000-0

  • -retry-16000-1

  • -retry-16000-2

  • …​

  • -retry-16000-224

如果需要多个主题,可以使用以下配置执行此操作。

@RetryableTopic(attempts = 230,
    backoff = @Backoff(delay = 1_000, multiplier = 2, maxDelay = 16_000),
    sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .exponentialBackoff(1_000, 2, 16_000)
            .maxAttempts(230)
            .useSingleTopicForSameIntervals()
            .create(template);
}

Custom Naming Strategies

可以通过注册实现 RetryTopicNamesProviderFactory 的 Bean 来实现更复杂的命名策略。默认实现为 SuffixingRetryTopicNamesProviderFactory,并且可以通过以下方式注册其他实现:

@Override
protected RetryTopicComponentFactory createComponentFactory() {
    return new RetryTopicComponentFactory() {
        @Override
        public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
            return new CustomRetryTopicNamesProviderFactory();
        }
    };
}

例如,以下实现除标准后缀外,还向重试/dlt 主题名称添加前缀:

public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {

    @Override
    public RetryTopicNamesProvider createRetryTopicNamesProvider(
                DestinationTopic.Properties properties) {

        if (properties.isMainEndpoint()) {
            return new SuffixingRetryTopicNamesProvider(properties);
        }
        else {
            return new SuffixingRetryTopicNamesProvider(properties) {

                @Override
                public String getTopicName(String topic) {
                    return "my-prefix-" + super.getTopicName(topic);
                }

            };
        }
    }

}