Topic Naming
重试主题和 DLT 是通过为主题添加提供或默认值的后缀来命名的,后缀要么是延迟,要么是该主题的索引。
Retry topics and DLT are named by suffixing the main topic with a provided or default value, appended by either the delay or index for that topic.
示例:
Examples:
“my-topic”→ “my-topic-retry-0”、“my-topic-retry-1”、“my-topic-dlt”
"my-topic" → "my-topic-retry-0", "my-topic-retry-1", …, "my-topic-dlt"
“my-other-topic”→ “my-topic-myRetrySuffix-1000”、“my-topic-myRetrySuffix-2000”、“my-topic-myDltSuffix”
"my-other-topic" → "my-topic-myRetrySuffix-1000", "my-topic-myRetrySuffix-2000", …, "my-topic-myDltSuffix"
默认行为是为每次尝试创建单独的重试主题,并附加索引值:retry-0、retry-1、…、retry-n。因此,默认情况下,重试主题的数量是配置的 |
The default behavior is to create separate retry topics for each attempt, appended with an index value: retry-0, retry-1, …, retry-n.
Therefore, by default the number of retry topics is the configured |
在使用指数后退时,您可以 configure the suffixes选择是追加 the attempt index or delay、使用 single retry topic when using fixed backoff,还是使用 single retry topic for the attempts with the maxInterval。
You can configure the suffixes, choose whether to append the attempt index or delay, use a single retry topic when using fixed backoff, and use a single retry topic for the attempts with the maxInterval when using exponential backoffs.
Retry Topics and DLT Suffixes
你可以指定重试主题和 DLT 主题将使用的后缀。
You can specify the suffixes that will be used by the retry and DLT topics.
@RetryableTopic(retryTopicSuffix = "-my-retry-suffix", dltTopicSuffix = "-my-dlt-suffix")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.retryTopicSuffix("-my-retry-suffix")
.dltTopicSuffix("-my-dlt-suffix")
.create(template);
}
默认后缀对于重试主题和 dlt 分别是“-retry”和“-dlt”。 |
The default suffixes are "-retry" and "-dlt", for retry topics and dlt respectively. |
Appending the Topic’s Index or Delay
你可以在后缀之后附加主题的索引或延迟值。
You can either append the topic’s index or delay values after the suffix.
@RetryableTopic(topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.suffixTopicsWithIndexValues()
.create(template);
}
默认行为是使用延迟值作为后缀,对于具有多个主题的固定延迟配置除外,在这种情况下,主题将使用主题的索引作为后缀。 |
The default behavior is to suffix with the delay values, except for fixed delay configurations with multiple topics, in which case the topics are suffixed with the topic’s index. |
Single Topic for Fixed Delay Retries
如果你正在使用固定延迟策略(如 FixedBackOffPolicy
或 NoBackOffPolicy
),可以使用单个主题来完成无阻塞重试。该主题将加上提供或默认的后缀,并且不会附加索引或延迟值。
If you’re using fixed delay policies such as FixedBackOffPolicy
or NoBackOffPolicy
you can use a single topic to accomplish the non-blocking retries.
This topic will be suffixed with the provided or default suffix, and will not have either the index or the delay values appended.
以前的 |
The previous |
@RetryableTopic(backoff = @Backoff(2_000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(3_000)
.maxAttempts(5)
.useSingleTopicForFixedDelays()
.create(template);
}
默认行为是为每次尝试创建单独的重试主题,并附加其索引值:retry-0、retry-1、… |
The default behavior is creating separate retry topics for each attempt, appended with their index values: retry-0, retry-1, … |
Single Topic for maxInterval Exponential Delay
如果你正在使用指数退避策略(ExponentialBackOffPolicy
),可以使用单个重试主题来完成延迟是已配置的 maxInterval
的尝试的无阻塞重试。
If you’re using exponential backoff policy (ExponentialBackOffPolicy
), you can use a single retry topic to accomplish the non-blocking retries of the attempts whose delays are the configured maxInterval
.
这个“最终”重试主题将加上提供或默认的后缀,并且将附加索引或 maxInterval
值。
This "final" retry topic will be suffixed with the provided or default suffix, and will have either the index or the maxInterval
value appended.
通过选择对使用 |
By opting to use a single topic for the retries with the |
从 3.2 开始,默认行为在使用指数退避时,针对相同的时间间隔重新使用重试主题,重试主题带有延迟值的后缀,最后一个重试主题针对相同的时间间隔(对应于 maxInterval
延迟)重复使用。
Starting 3.2, the default behavior is reuses the retry topic for the same intervals, when using exponential backoff, the retry topics are suffixed with the delay values, with the last retry topic reuses for the same intervals(corresponding to the maxInterval
delay).
例如,使用 initialInterval=1_000
、multiplier=2
和 maxInterval=16_000
配置指数退避时,为了持续尝试一小时,需要将 maxAttempts
配置为 229,默认情况下需要的重试主题如下:
For instance, when configuring the exponential backoff with initialInterval=1_000
, multiplier=2
, and maxInterval=16_000
, in order to keep trying for one hour, one would need to configure maxAttempts
as 229, and by default the needed retry topics would be:
-
-retry-1000
-
-retry-2000
-
-retry-4000
-
-retry-8000
-
-retry-16000
当使用重试主题的数量等于已配置的 maxAttempts
减去 1 的策略时,最后一个重试主题(对应于 maxInterval
延迟)将带有附加索引的后缀:
When using the strategy that work with the number of retry topics equal to the configured maxAttempts
minus 1, the last retry topic (corresponding to the maxInterval
delay) being suffixed with an additional index would be:
-
-retry-1000
-
-retry-2000
-
-retry-4000
-
-retry-8000
-
-retry-16000-0
-
-retry-16000-1
-
-retry-16000-2
-
…
-
-retry-16000-224
如果需要多个主题,可以使用以下配置执行此操作。
If multiple topics are required, then that can be done using the following configuration.
@RetryableTopic(attempts = 230,
backoff = @Backoff(delay = 1_000, multiplier = 2, maxDelay = 16_000),
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1_000, 2, 16_000)
.maxAttempts(230)
.useSingleTopicForSameIntervals()
.create(template);
}
Custom Naming Strategies
可以通过注册实现 RetryTopicNamesProviderFactory
的 Bean 来实现更复杂的命名策略。默认实现为 SuffixingRetryTopicNamesProviderFactory
,并且可以通过以下方式注册其他实现:
More complex naming strategies can be accomplished by registering a bean that implements RetryTopicNamesProviderFactory
.
The default implementation is SuffixingRetryTopicNamesProviderFactory
and a different implementation can be registered in the following way:
@Override
protected RetryTopicComponentFactory createComponentFactory() {
return new RetryTopicComponentFactory() {
@Override
public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
return new CustomRetryTopicNamesProviderFactory();
}
};
}
例如,以下实现除标准后缀外,还向重试/dlt 主题名称添加前缀:
As an example, the following implementation, in addition to the standard suffix, adds a prefix to retry/dlt topics names:
public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {
@Override
public RetryTopicNamesProvider createRetryTopicNamesProvider(
DestinationTopic.Properties properties) {
if (properties.isMainEndpoint()) {
return new SuffixingRetryTopicNamesProvider(properties);
}
else {
return new SuffixingRetryTopicNamesProvider(properties) {
@Override
public String getTopicName(String topic) {
return "my-prefix-" + super.getTopicName(topic);
}
};
}
}
}