DLT Strategies

该框架提供了一些用于处理 DLT 的策略。你可以提供用于 DLT 处理的方法,使用默认日志记录方法,或者根本不使用 DLT。此外,你还可以选择在 DLT 处理失败时发生的情况。

The framework provides a few strategies for working with DLTs. You can provide a method for DLT processing, use the default logging method, or have no DLT at all. Also you can choose what happens if DLT processing fails.

DLT Processing Method

您可以指定用于处理主题的 DLT 的方法以及在处理失败时采取的行为。

You can specify the method used to process the DLT for the topic, as well as the behavior if that processing fails.

为此,您可以在具有 @RetryableTopic 注释的类的某个方法中使用 @DltHandler 注释。请注意,该方法将用于该类内所有带 @RetryableTopic 注释的方法。

To do that you can use the @DltHandler annotation in a method of the class with the @RetryableTopic annotation(s). Note that the same method will be used for all the @RetryableTopic annotated methods within that class.

@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}

@DltHandler
public void processMessage(MyPojo message) {
    // ... message processing, persistence, etc
}

还可以通过 RetryTopicConfigurationBuilder.dltHandlerMethod(String, String) 方法提供 DLT 处理程序方法,并传递作为处理 DLT 消息的 bean 名称和方法名称的参数。

The DLT handler method can also be provided through the RetryTopicConfigurationBuilder.dltHandlerMethod(String, String) method, passing as arguments the bean name and method name that should process the DLT’s messages.

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
            .create(template);
}

@Component
public class MyCustomDltProcessor {

    private final MyDependency myDependency;

    public MyCustomDltProcessor(MyDependency myDependency) {
        this.myDependency = myDependency;
    }

    public void processDltMessage(MyPojo message) {
        // ... message processing, persistence, etc
    }
}

如果没有提供 DLT 处理程序,则使用默认 RetryTopicConfigurer.LoggingDltListenerHandlerMethod

If no DLT handler is provided, the default RetryTopicConfigurer.LoggingDltListenerHandlerMethod is used.

从 2.8 版开始,如果您根本不想在此应用程序中使用 DLT,包括使用默认处理程序(或想要推迟使用),您可以控制 DLT 容器的启动与否,而不考虑容器工厂的 autoStartup 属性。

Starting with version 2.8, if you don’t want to consume from the DLT in this application at all, including by the default handler (or you wish to defer consumption), you can control whether or not the DLT container starts, independent of the container factory’s autoStartup property.

使用 @RetryableTopic 注释时,将 autoStartDltHandler 属性设置为 false;使用配置构建器时,使用 autoStartDltHandler(false)

When using the @RetryableTopic annotation, set the autoStartDltHandler property to false; when using the configuration builder, use autoStartDltHandler(false) .

您稍后可以通过 KafkaListenerEndpointRegistry 启动 DLT 处理程序。

You can later start the DLT handler via the KafkaListenerEndpointRegistry.

DLT Failure Behavior

如果 DLT 处理失败,有两种可用行为:ALWAYS_RETRY_ON_ERRORFAIL_ON_ERROR

Should the DLT processing fail, there are two possible behaviors available: ALWAYS_RETRY_ON_ERROR and FAIL_ON_ERROR.

在第一种情况下,记录会转发回 DLT 主题,这样不会阻止其他 DLT 记录的处理。在后一种情况下,使用者在不转发该消息的情况下结束执行。

In the former the record is forwarded back to the DLT topic so it doesn’t block other DLT records' processing. In the latter the consumer ends the execution without forwarding the message.

@RetryableTopic(dltProcessingFailureStrategy =
            DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
            .doNotRetryOnDltFailure()
            .create(template);
}

默认行为是 ALWAYS_RETRY_ON_ERROR

The default behavior is to ALWAYS_RETRY_ON_ERROR.

从 2.8.3 版本开始,如果记录导致抛出致命异常(例如 DeserializationException),ALWAYS_RETRY_ON_ERROR 将不会将记录重新路由到 DLT,因为通常情况下总是会抛出此类异常。

Starting with version 2.8.3, ALWAYS_RETRY_ON_ERROR will NOT route a record back to the DLT if the record causes a fatal exception to be thrown, such as a DeserializationException, because, generally, such exceptions will always be thrown.

被视为严重的异常:

Exceptions that are considered fatal are:

  • DeserializationException

  • MessageConversionException

  • ConversionException

  • MethodArgumentResolutionException

  • NoSuchMethodException

  • ClassCastException

您可以使用 DestinationTopicResolver bean 上的方法向此列表中添加和从中删除异常。

You can add exceptions to and remove exceptions from this list using methods on the DestinationTopicResolver bean.

有关更多信息,请参阅 Exception Classifier

See Exception Classifier for more information.

Configuring No DLT

该框架还提供了不为该主题配置 DLT 的可能性。在这种情况下,在重试用尽后,处理过程会简单地结束。

The framework also provides the possibility of not configuring a DLT for the topic. In this case after retrials are exhausted the processing simply ends.

@RetryableTopic(dltProcessingFailureStrategy =
            DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotConfigureDlt()
            .create(template);
}