Error Handling

Apache Kafka Streams 提供了原生地处理反序列化错误的异常的功能。有关此支持的详细信息,请参见 this。开箱即用,Apache Kafka Streams 提供了两种类型的反序列化异常处理程序 - LogAndContinueExceptionHandlerLogAndFailExceptionHandler。顾名思义,前者将记录错误并继续处理后续记录,后者将记录错误并失败。LogAndFailExceptionHandler 是默认的反序列化异常处理程序。

Apache Kafka Streams provides the capability for natively handling exceptions from deserialization errors. For details on this support, please see this. Out of the box, Apache Kafka Streams provides two kinds of deserialization exception handlers - LogAndContinueExceptionHandler and LogAndFailExceptionHandler. As the name indicates, the former will log the error and continue processing the next records and the latter will log the error and fail. LogAndFailExceptionHandler is the default deserialization exception handler.

Handling Deserialization Exceptions in the Binder

Kafka Streams Binder 允许使用以下属性指定上述反序列化异常处理程序。

Kafka Streams binder allows to specify the deserialization exception handlers above using the following property.

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue

or

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail

除了上述两个反序列化异常处理程序之外,该 Binder 还提供了一个第三个处理程序,用于将错误记录(毒丸)发送到 DLQ(死信队列)主题。以下是启用此 DLQ 异常处理程序的方法。

In addition to the above two deserialization exception handlers, the binder also provides a third one for sending the erroneous records (poison pills) to a DLQ (dead letter queue) topic. Here is how you enable this DLQ exception handler.

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq

当设置上述属性时,反序列化错误中的所有记录都会自动发送到 DLQ 主题。

When the above property is set, all the records in deserialization error are automatically sent to the DLQ topic.

您可以按照如下方式设置 DLQ 消息发布到的主题名称。

You can set the topic name where the DLQ messages are published as below.

您可以提供一个 DlqDestinationResolver 实现,它是一个函数式接口。DlqDestinationResolverConsumerRecord 和异常作为输入,然后允许指定一个主题名称作为输出。通过访问 Kafka ConsumerRecord,可以在 BiFunction 的实现中内省检查 Header 记录。

You can provide an implementation for DlqDestinationResolver which is a functional interface. DlqDestinationResolver takes ConsumerRecord and the exception as inputs and then allows to specify a topic name as the output. By gaining access to the Kafka ConsumerRecord, the header records can be introspected in the implementation of the BiFunction.

以下是提供 DlqDestinationResolver 实现的示例。

Here is an example of providing an implementation for DlqDestinationResolver.

@Bean
public DlqDestinationResolver dlqDestinationResolver() {
    return (rec, ex) -> {
        if (rec.topic().equals("word1")) {
            return "topic1-dlq";
        }
        else {
            return "topic2-dlq";
        }
    };
}

在提供 DlqDestinationResolver 的实现时需要记住的一件重要事情是,绑定中的供应者不会为应用程序自动创建主题。这是因为绑定无法推断出实现可能发送到的所有 DLQ 主题的名称。因此,如果你使用此策略提供了 DLQ 名称,则应用程序有责任确保事先创建了那些主题。

One important thing to keep in mind when providing an implementation for DlqDestinationResolver is that the provisioner in the binder will not auto create topics for the application. This is because there is no way for the binder to infer the names of all the DLQ topics the implementation might send to. Therefore, if you provide DLQ names using this strategy, it is the application’s responsibility to ensure that those topics are created beforehand.

如果 DlqDestinationResolver 作为 Bean 存在于应用程序中,它将具有更高的优先级。如果您不想遵循此方法,而是使用配置提供一个静态 DLQ 名称,那么您可以设置以下属性。

If DlqDestinationResolver is present in the application as a bean, that takes higher precedence. If you do not want to follow this approach and rather provide a static DLQ name using configuration, you can set the following property.

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)

如果已设置,那么错误记录将发送到主题 custom-dlq。如果应用程序没有使用上述任一策略,那么它将创建一个名为 error.<input-topic-name>.<application-id> 的 DLQ 主题。例如,如果 Binding 的目标主题是 inputTopic 且应用程序 ID 是 process-applicationId, 那么默认的 DLQ 主题是 error.inputTopic.process-applicationId。在每次输入 Binding 打算启用 DLQ 时,始终建议明确为其创建一个 DLQ 主题。

If this is set, then the error records are sent to the topic custom-dlq. If the application is not using either of the above strategies, then it will create a DLQ topic with the name error.<input-topic-name>.<application-id>. For instance, if your binding’s destination topic is inputTopic and the application ID is process-applicationId, then the default DLQ topic is error.inputTopic.process-applicationId. It is always recommended to explicitly create a DLQ topic for each input binding if it is your intention to enable DLQ.

DLQ per input consumer binding

属性 spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler 适用于整个应用程序。这意味着如果同一应用程序中有多个函数,则此属性适用于它们全部。但是,如果你在单个处理程序中有多个处理程序或多个输入绑定,则可以使用 Binder 为每个输入使用者绑定提供的更细粒度的 DLQ 控制。

The property spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler is applicable for the entire application. This implies that if there are multiple functions in the same application, this property is applied to all of them. However, if you have multiple processors or multiple input bindings within a single processor, then you can use the finer-grained DLQ control that the binder provides per input consumer binding.

如果你有以下处理程序,

If you have the following processor,

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

并且你仅希望在第一个输入绑定上启用 DLQ,并在第二个绑定上跳过并继续,则可以在使用者中按如下方式执行此操作。

and you only want to enable DLQ on the first input binding and skipAndContinue on the second binding, then you can do so on the consumer as below.

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq``spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinue

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinue

这样设置反序列化异常处理程序的优先级高于在 Binder 级别进行设置。

Setting deserialization exception handlers this way has a higher precedence than setting at the binder level.

DLQ partitioning

默认情况下,记录使用与原始记录相同的分区发布到 Dead-Letter 主题。这意味着 Dead-Letter 主题必须至少与原始记录有相同数量的分区。

By default, records are published to the Dead-Letter topic using the same partition as the original record. This means the Dead-Letter topic must have at least as many partitions as the original record.

若要更改此行为,请将 DlqPartitionFunction 实现作为 @Bean 添加到应用程序上下文。仅可以存在一项此类 Bean。此函数随使用者组一起提供(在大多数情况下与应用程序 ID 相同),已失败 ConsumerRecord 和异常。例如,如果你始终希望路由到分区 0,则可以使用:

To change this behavior, add a DlqPartitionFunction implementation as a @Bean to the application context. Only one such bean can be present. The function is provided with the consumer group (which is the same as the application ID in most situations), the failed ConsumerRecord and the exception. For example, if you always want to route to partition 0, you might use:

@Bean
public DlqPartitionFunction partitionFunction() {
    return (group, record, ex) -> 0;
}

如果您将消费者绑定的 dlqPartitions 属性设置为 1(并且黏合剂的 minPartitionCount 等于 1),则无需提供 DlqPartitionFunction;框架将始终使用分区 0。如果您将消费者绑定的 dlqPartitions 属性设置为大于 1 的值(或者黏合剂的 minPartitionCount 大于 1),则即使分区计数与原始主题相同,您 must 也会提供一个 DlqPartitionFunction Bean。

If you set a consumer binding’s dlqPartitions property to 1 (and the binder’s minPartitionCount is equal to 1), there is no need to supply a DlqPartitionFunction; the framework will always use partition 0. If you set a consumer binding’s dlqPartitions property to a value greater than 1 (or the binder’s minPartitionCount is greater than 1), you must provide a DlqPartitionFunction bean, even if the partition count is the same as the original topic’s.

在 Apache Kafka Streams Binder 中使用异常处理功能时需要记住以下几点。

A couple of things to keep in mind when using the exception handling feature in Kafka Streams binder.

  • The property spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler is applicable for the entire application. This implies that if there are multiple functions in the same application, this property is applied to all of them.

  • The exception handling for deserialization works consistently with native deserialization and framework provided message conversion.

Handling Production Exceptions in the Binder

与上面描述的反序列化异常处理程序支持不同,Binder 并未为处理生产异常提供此类一级机制。然而,你仍然可以使用 StreamsBuilderFactoryBean customizer 配置生产异常处理程序,你可以在下面后续部分中找到更多详细信息。

Unlike the support for deserialization exception handlers as described above, the binder does not provide such first class mechanisms for handling production exceptions. However, you still can configure production exception handlers using the StreamsBuilderFactoryBean customizer which you can find more details about, in a subsequent section below.

Runtime Error Handling

在处理应用程序代码(即来自业务逻辑执行)中的错误时,通常由应用程序处理。因为 Kafka Streams Binder 没有办法干预应用程序代码。但是,为了使应用程序更容易,Binder 提供了一个方便的 RecordRecoverableProcessor,你可以使用它指定如何处理应用程序级别的错误。

When it comes to handling errors from application code, i.e. from the business logic execution, it is usually up to the application to handle that. Because, the Kafka Streams binder does not have a way to interfere with the application code. However, to make things a bit easier for the application, the binder provides a convenient RecordRecoverableProcessor, using which, you can dictate how you want to handle the application level errors.

考虑以下代码。

Consider the following code.

@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
    return input -> input
        .map(...);
}

如果你上面的 map 调用中的业务代码抛出异常,则你需要负责处理该错误。这是 RecordRecoverableProcessor 派上用场的地方。默认情况下,RecordRecoverableProcessor 仅会记录错误并让应用程序继续运行。假设你希望将失败的记录发布到 DLT,而不是在应用程序中对其进行处理。在这种情况下,你必须使用名为 DltAwareProcessorRecordRecoverableProcessor 的自定义实现。你可以按如下方式进行操作。

If the business code inside your map call above throws an exception, it is your responsibility to handle that error. This is where RecordRecoverableProcessor becomes handy. By default, RecordRecoverableProcessor, will simply log the error and let the application move on. Let’s say that you want to publish the failed record to a DLT, rather than handling it within the application. In that case, you must use a custom implementation of RecordRecoverableProcessor called DltAwareProcessor. Here is how you can do that.

@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process(DltPublishingContext dltSenderContext) {
    return input -> input
        .process(() -> new DltAwareProcessor<>(record -> {
					throw new RuntimeException("error");
				}, "hello-dlt-1", dltPublishingContext));
}

原始 map 调用中的业务逻辑代码现已移到 KStream#process 方法调用的部分,该调用采用 ProcessorSupplier。然后,我们传入自定义 DltAwareProcessor,,它能够发布到 DLT。上面 DltAwareProcessor 的构造函数采用三个参数 - 一个接受输入记录然后作为 Function 主体的业务逻辑操作的 Function DLT 主题,最后是 DltPublishingContext。当 Function’s lambda expression throws an exception, the `DltAwareProcessor 会将输入记录发送到 DLT。DltPublishingContextDltAwareProcessor 提供必需的发布基础架构 bean。DltPublishingContext 将由 Binder 自动配置,以便你可以直接将其注入应用程序。

The business logic code from the original map call now has been moved as part of KStream#process method call, which takes a ProcessorSupplier. We, then, pass in the custom DltAwareProcessor, which is capable to publishing to a DLT. The constructor for DltAwareProcessor above takes three parameters - a Function that takes the input record and then the business logic operation as part of the Function body, the DLT topic, and finally a DltPublishingContext. When the Function’s lambda expression throws an exception, the `DltAwareProcessor will send the input record to a DLT. The DltPublishingContext provides DltAwareProcessor the necessary publishing infrastructure beans. The DltPublishingContext is autoconfigured by the binder, so that you can inject directly this into the application.

如果你不希望 Binder 将失败的记录发布到 DLT,则必须直接使用 DltAwareProcessor 而不是 RecordRecoverableProcessor。你可以提供自己的恢复程序作为 BiConsumer,它采用输入 Record 和异常作为参数。假设一个场景,在这种场景中,你不想将记录发送到 DLT,而只是记录该消息并继续执行。下面是你可以实现此目标的一个示例。

If you do not want the binder to publish failed records to a DLT, then you must use the RecordRecoverableProcessor directly instead of the DltAwareProcessor. You can provide your own recoverer as a BiConsumer that takes the input Record and the exception as arguments. Assume a scenario, in which you do not want to send the record to the DLT, but simply log the message and move on. Below an example of how you can accomplish that.

@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
    return input -> input
        .process(() -> new RecordRecoverableProcessor<>(record -> {
					throw new RuntimeException("error");
				},
                (record, exception) -> {
                  // Handle the record
                }));
}

在这种情况下,当记录失败时,RecordRecoverableProcessor 使用用户提供的恢复程序,该恢复程序是 BiConsumer,它采用失败的记录和抛出的异常作为参数。

In this case, when the record fails, the RecordRecoverableProcessor, uses the user provided recoverer which is a BiConsumer that takes the failed record and the exception thrown as arguments.

Handling Record Keys in DltAwareProcessor

使用 DltAwareProcessor`将失败的记录发送到 DLT 时,如果您想将记录键发送到 DLT 主题,则需要在 DLT 绑定上设置正确的序列化器。这是因为 `DltAwareProcessor`使用 `StreamBridge,它使用常规 Kafka 绑定器(基于消息通道),默认情况下,它对键使用 ByteArraySerializer。对于记录值,Spring Cloud Stream 将有效负载转换为正确的 byte[];然而,键并非如此,因为它只是传递它在标题中作为键收到的内容。如果您提供了非字节数组键,则可能会导致类型转换异常,为了避免这种情况,您需要在 DLT 绑定上设置一个序列化器,如下所示。

When sending failed records to a DLT using DltAwareProcessor, if you want to send the record keys to the DLT topic, then you need to set the proper serializer on the DLT binding. This is because, DltAwareProcessor uses StreamBridge which uses the regular Kafka binder (message-channel based) which by default uses a ByteArraySerializer for keys. In the case of record values, Spring Cloud Stream converts the payload to proper byte[]; however, that is not the case with keys, as it simply pass along what it received in the header as a key. If you are providing a non-byte array key, then that might cause class cast exceptions and to avoid that you need to set a serializer on the DLT binding as below.

假设 DLT 目标是 hello-dlt-1 且记录键属于 String 数据类型。

Assuming that the DLT destination is hello-dlt-1 and the record key is of String datatype.

spring.cloud.stream.kafka.bindings.hello-dlt-1.producer.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer