Error Handling

Apache Kafka Streams 提供了原生地处理反序列化错误的异常的功能。有关此支持的详细信息,请参见 this。开箱即用,Apache Kafka Streams 提供了两种类型的反序列化异常处理程序 - LogAndContinueExceptionHandlerLogAndFailExceptionHandler。顾名思义,前者将记录错误并继续处理后续记录,后者将记录错误并失败。LogAndFailExceptionHandler 是默认的反序列化异常处理程序。

Handling Deserialization Exceptions in the Binder

Kafka Streams Binder 允许使用以下属性指定上述反序列化异常处理程序。

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail

除了上述两个反序列化异常处理程序之外,该 Binder 还提供了一个第三个处理程序,用于将错误记录(毒丸)发送到 DLQ(死信队列)主题。以下是启用此 DLQ 异常处理程序的方法。

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq

当设置上述属性时,反序列化错误中的所有记录都会自动发送到 DLQ 主题。

您可以按照如下方式设置 DLQ 消息发布到的主题名称。

您可以提供一个 DlqDestinationResolver 实现,它是一个函数式接口。DlqDestinationResolverConsumerRecord 和异常作为输入,然后允许指定一个主题名称作为输出。通过访问 Kafka ConsumerRecord,可以在 BiFunction 的实现中内省检查 Header 记录。

以下是提供 DlqDestinationResolver 实现的示例。

@Bean
public DlqDestinationResolver dlqDestinationResolver() {
    return (rec, ex) -> {
        if (rec.topic().equals("word1")) {
            return "topic1-dlq";
        }
        else {
            return "topic2-dlq";
        }
    };
}

在提供 DlqDestinationResolver 的实现时需要记住的一件重要事情是,绑定中的供应者不会为应用程序自动创建主题。这是因为绑定无法推断出实现可能发送到的所有 DLQ 主题的名称。因此,如果你使用此策略提供了 DLQ 名称,则应用程序有责任确保事先创建了那些主题。

如果 DlqDestinationResolver 作为 Bean 存在于应用程序中,它将具有更高的优先级。如果您不想遵循此方法,而是使用配置提供一个静态 DLQ 名称,那么您可以设置以下属性。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)

如果已设置,那么错误记录将发送到主题 custom-dlq。如果应用程序没有使用上述任一策略,那么它将创建一个名为 error.<input-topic-name>.<application-id> 的 DLQ 主题。例如,如果 Binding 的目标主题是 inputTopic 且应用程序 ID 是 process-applicationId, 那么默认的 DLQ 主题是 error.inputTopic.process-applicationId。在每次输入 Binding 打算启用 DLQ 时,始终建议明确为其创建一个 DLQ 主题。

DLQ per input consumer binding

属性 spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler 适用于整个应用程序。这意味着如果同一应用程序中有多个函数,则此属性适用于它们全部。但是,如果你在单个处理程序中有多个处理程序或多个输入绑定,则可以使用 Binder 为每个输入使用者绑定提供的更细粒度的 DLQ 控制。

如果你有以下处理程序,

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

并且你仅希望在第一个输入绑定上启用 DLQ,并在第二个绑定上跳过并继续,则可以在使用者中按如下方式执行此操作。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq``spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinue

这样设置反序列化异常处理程序的优先级高于在 Binder 级别进行设置。

DLQ partitioning

默认情况下,记录使用与原始记录相同的分区发布到 Dead-Letter 主题。这意味着 Dead-Letter 主题必须至少与原始记录有相同数量的分区。

若要更改此行为,请将 DlqPartitionFunction 实现作为 @Bean 添加到应用程序上下文。仅可以存在一项此类 Bean。此函数随使用者组一起提供(在大多数情况下与应用程序 ID 相同),已失败 ConsumerRecord 和异常。例如,如果你始终希望路由到分区 0,则可以使用:

@Bean
public DlqPartitionFunction partitionFunction() {
    return (group, record, ex) -> 0;
}

如果您将消费者绑定的 dlqPartitions 属性设置为 1(并且黏合剂的 minPartitionCount 等于 1),则无需提供 DlqPartitionFunction;框架将始终使用分区 0。如果您将消费者绑定的 dlqPartitions 属性设置为大于 1 的值(或者黏合剂的 minPartitionCount 大于 1),则即使分区计数与原始主题相同,您 must 也会提供一个 DlqPartitionFunction Bean。

在 Apache Kafka Streams Binder 中使用异常处理功能时需要记住以下几点。

  • 属性 spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler 适用于整个应用程序。这意味着如果同一应用程序中有多个函数,此属性将应用于所有函数。

  • 用于反序列化的异常处理与本机反序列化和框架提供的消息转换一致。

Handling Production Exceptions in the Binder

与上面描述的反序列化异常处理程序支持不同,Binder 并未为处理生产异常提供此类一级机制。然而,你仍然可以使用 StreamsBuilderFactoryBean customizer 配置生产异常处理程序,你可以在下面后续部分中找到更多详细信息。

Runtime Error Handling

在处理应用程序代码(即来自业务逻辑执行)中的错误时,通常由应用程序处理。因为 Kafka Streams Binder 没有办法干预应用程序代码。但是,为了使应用程序更容易,Binder 提供了一个方便的 RecordRecoverableProcessor,你可以使用它指定如何处理应用程序级别的错误。

考虑以下代码。

@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
    return input -> input
        .map(...);
}

如果你上面的 map 调用中的业务代码抛出异常,则你需要负责处理该错误。这是 RecordRecoverableProcessor 派上用场的地方。默认情况下,RecordRecoverableProcessor 仅会记录错误并让应用程序继续运行。假设你希望将失败的记录发布到 DLT,而不是在应用程序中对其进行处理。在这种情况下,你必须使用名为 DltAwareProcessorRecordRecoverableProcessor 的自定义实现。你可以按如下方式进行操作。

@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process(DltPublishingContext dltSenderContext) {
    return input -> input
        .process(() -> new DltAwareProcessor<>(record -> {
					throw new RuntimeException("error");
				}, "hello-dlt-1", dltPublishingContext));
}

原始 map 调用中的业务逻辑代码现已移到 KStream#process 方法调用的部分,该调用采用 ProcessorSupplier。然后,我们传入自定义 DltAwareProcessor,,它能够发布到 DLT。上面 DltAwareProcessor 的构造函数采用三个参数 - 一个接受输入记录然后作为 Function 主体的业务逻辑操作的 Function DLT 主题,最后是 DltPublishingContext。当 Function’s lambda expression throws an exception, the `DltAwareProcessor 会将输入记录发送到 DLT。DltPublishingContextDltAwareProcessor 提供必需的发布基础架构 bean。DltPublishingContext 将由 Binder 自动配置,以便你可以直接将其注入应用程序。

如果你不希望 Binder 将失败的记录发布到 DLT,则必须直接使用 DltAwareProcessor 而不是 RecordRecoverableProcessor。你可以提供自己的恢复程序作为 BiConsumer,它采用输入 Record 和异常作为参数。假设一个场景,在这种场景中,你不想将记录发送到 DLT,而只是记录该消息并继续执行。下面是你可以实现此目标的一个示例。

@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
    return input -> input
        .process(() -> new RecordRecoverableProcessor<>(record -> {
					throw new RuntimeException("error");
				},
                (record, exception) -> {
                  // Handle the record
                }));
}

在这种情况下,当记录失败时,RecordRecoverableProcessor 使用用户提供的恢复程序,该恢复程序是 BiConsumer,它采用失败的记录和抛出的异常作为参数。

Handling Record Keys in DltAwareProcessor

使用 DltAwareProcessor`将失败的记录发送到 DLT 时,如果您想将记录键发送到 DLT 主题,则需要在 DLT 绑定上设置正确的序列化器。这是因为 `DltAwareProcessor`使用 `StreamBridge,它使用常规 Kafka 绑定器(基于消息通道),默认情况下,它对键使用 ByteArraySerializer。对于记录值,Spring Cloud Stream 将有效负载转换为正确的 byte[];然而,键并非如此,因为它只是传递它在标题中作为键收到的内容。如果您提供了非字节数组键,则可能会导致类型转换异常,为了避免这种情况,您需要在 DLT 绑定上设置一个序列化器,如下所示。

假设 DLT 目标是 hello-dlt-1 且记录键属于 String 数据类型。

spring.cloud.stream.kafka.bindings.hello-dlt-1.producer.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer