How the Pattern Works

如果消息处理失败,则消息将转发到具有回退时间戳的重试主题。然后,重试主题使用者会检查时间戳,如果没有过期,则暂停对该主题分区的消耗。当过期时,分区消耗恢复,并且再次消耗消息。如果消息处理再次失败,则会将消息转发到下一个重试主题,并且会重复该模式,直到成功处理,或尝试次数用尽并向死信主题(如果已配置)发送消息为止。

If message processing fails, the message is forwarded to a retry topic with a back off timestamp. The retry topic consumer then checks the timestamp and if it’s not due it pauses the consumption for that topic’s partition. When it is due the partition consumption is resumed, and the message is consumed again. If the message processing fails again the message will be forwarded to the next retry topic, and the pattern is repeated until a successful processing occurs, or the attempts are exhausted, and the message is sent to the Dead Letter Topic (if configured).

举例来说,如果你有一个“main-topic”主题,并希望设置具有 1000ms 指数回退和 2 倍数以及 4 次最大尝试的非阻塞重试,它将创建 main-topic-retry-1000、main-topic-retry-2000、main-topic-retry-4000 和 main-topic-dlt 主题,并配置相应的使用者。该框架还负责创建主题、设置和配置监听器。

To illustrate, if you have a "main-topic" topic, and want to set up non-blocking retry with an exponential backoff of 1000ms with a multiplier of 2 and 4 max attempts, it will create the main-topic-retry-1000, main-topic-retry-2000, main-topic-retry-4000 and main-topic-dlt topics and configure the respective consumers. The framework also takes care of creating the topics and setting up and configuring the listeners.

使用此策略后,你将失去 Kafka 对该主题的顺序保证。

By using this strategy you lose Kafka’s ordering guarantees for that topic.

你可以设置你首选的 AckMode 模式,但建议使用 RECORD

You can set the AckMode mode you prefer, but RECORD is suggested.

目前,此功能不支持类级的 @KafkaListener 注释。

At this time this functionality doesn’t support class level @KafkaListener annotations.

在将 asyncAcks 设置为 true 的手动 AckMode 中,DefaultErrorHandler 必须配置为将 seekAfterError 设置为 false。从 2.9.10、3.0.8 版开始,对于这样的配置,它将无条件地设置为 false。在较早的版本中,有必要覆盖 RetryConfigurationSupport.configureCustomizers() 方法以将该属性设置为 false

When using a manual AckMode with asyncAcks set to true, the DefaultErrorHandler must be configured with seekAfterError set to false. Starting with versions 2.9.10, 3.0.8, this will be set to false unconditionally for such configurations. With earlier versions, it was necessary to override the RetryConfigurationSupport.configureCustomizers() method to set the property to false.

@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
    customizersConfigurer.customizeErrorHandler(eh -> eh.setSeekAfterError(false));
}

此外,在这些版本之前,使用默认(日志记录)DLT 处理程序与任何种类的 AckMode 不兼容,无论 asyncAcks 属性如何。

In addition, before those versions, using the default (logging) DLT handler was not compatible with any kind of manual AckMode, regardless of the asyncAcks property.