DLT Strategies
该框架提供了一些用于处理 DLT 的策略。你可以提供用于 DLT 处理的方法,使用默认日志记录方法,或者根本不使用 DLT。此外,你还可以选择在 DLT 处理失败时发生的情况。
DLT Processing Method
您可以指定用于处理主题的 DLT 的方法以及在处理失败时采取的行为。
为此,您可以在具有 @RetryableTopic
注释的类的某个方法中使用 @DltHandler
注释。请注意,该方法将用于该类内所有带 @RetryableTopic
注释的方法。
@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
还可以通过 RetryTopicConfigurationBuilder.dltHandlerMethod(String, String)
方法提供 DLT 处理程序方法,并传递作为处理 DLT 消息的 bean 名称和方法名称的参数。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
.create(template);
}
@Component
public class MyCustomDltProcessor {
private final MyDependency myDependency;
public MyCustomDltProcessor(MyDependency myDependency) {
this.myDependency = myDependency;
}
public void processDltMessage(MyPojo message) {
// ... message processing, persistence, etc
}
}
如果没有提供 DLT 处理程序,则使用默认 |
从 2.8 版开始,如果您根本不想在此应用程序中使用 DLT,包括使用默认处理程序(或想要推迟使用),您可以控制 DLT 容器的启动与否,而不考虑容器工厂的 autoStartup
属性。
使用 @RetryableTopic
注释时,将 autoStartDltHandler
属性设置为 false
;使用配置构建器时,使用 autoStartDltHandler(false)
。
您稍后可以通过 KafkaListenerEndpointRegistry
启动 DLT 处理程序。
DLT Failure Behavior
如果 DLT 处理失败,有两种可用行为:ALWAYS_RETRY_ON_ERROR
和 FAIL_ON_ERROR
。
在第一种情况下,记录会转发回 DLT 主题,这样不会阻止其他 DLT 记录的处理。在后一种情况下,使用者在不转发该消息的情况下结束执行。
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
.doNotRetryOnDltFailure()
.create(template);
}
默认行为是 |
从 2.8.3 版本开始,如果记录导致抛出致命异常(例如 DeserializationException
),ALWAYS_RETRY_ON_ERROR
将不会将记录重新路由到 DLT,因为通常情况下总是会抛出此类异常。
被视为严重的异常:
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
您可以使用 DestinationTopicResolver
bean 上的方法向此列表中添加和从中删除异常。
有关更多信息,请参阅 Exception Classifier。
Configuring No DLT
该框架还提供了不为该主题配置 DLT 的可能性。在这种情况下,在重试用尽后,处理过程会简单地结束。
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotConfigureDlt()
.create(template);
}