Handling Exceptions

此部分介绍如何处理在使用 Spring for Apache Kafka 时可能出现的各种异常。

This section describes how to handle various exceptions that may arise when you use Spring for Apache Kafka.

Listener Error Handlers

从版本 2.0 开始,@KafkaListener 注解具有一个新属性:errorHandler

Starting with version 2.0, the @KafkaListener annotation has a new attribute: errorHandler.

可以使用 errorHandler 来提供 KafkaListenerErrorHandler 实现的 bean 名称。此函数式接口有一个方法,如下面清单所示:

You can use the errorHandler to provide the bean name of a KafkaListenerErrorHandler implementation. This functional interface has one method, as the following listing shows:

@FunctionalInterface
public interface KafkaListenerErrorHandler {

    Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;

}

你可以访问由消息转换器生成的 Spring 消息传递 Message<?> 对象和监听器抛出的异常,该异常被包装在 ListenerExecutionFailedException 中。错误处理程序可以抛出原始或新异常,并将其抛出给容器。错误处理程序返回的任何内容都将被忽略。

You have access to the spring-messaging Message<?> object produced by the message converter and the exception that was thrown by the listener, which is wrapped in a ListenerExecutionFailedException. The error handler can throw the original or a new exception, which is thrown to the container. Anything returned by the error handler is ignored.

从版本 2.7 开始,可以在 MessagingMessageConverterBatchMessagingMessageConverter 上设置 rawRecordHeader 属性,这会导致在转换后的 KafkaHeaders.RAW_DATA 头中的 Message<?> 中添加原始 ConsumerRecord。例如,如果你希望在监听器错误处理程序中使用 DeadLetterPublishingRecoverer,这将非常有用。在请求/响应场景中,它可能用于在将失败记录捕获到死信主题并进行多次重试后,将失败结果发送给发送者。

Starting with version 2.7, you can set the rawRecordHeader property on the MessagingMessageConverter and BatchMessagingMessageConverter which causes the raw ConsumerRecord to be added to the converted Message<?> in the KafkaHeaders.RAW_DATA header. This is useful, for example, if you wish to use a DeadLetterPublishingRecoverer in a listener error handler. It might be used in a request/reply scenario where you wish to send a failure result to the sender, after some number of retries, after capturing the failed record in a dead letter topic.

@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
    return (msg, ex) -> {
        if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
            recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
            return "FAILED";
        }
        throw ex;
    };
}

它有一个子接口 (ConsumerAwareListenerErrorHandler),可以通过以下方法访问使用者对象:

It has a sub-interface (ConsumerAwareListenerErrorHandler) that has access to the consumer object, through the following method:

Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);

另一个子接口 (ManualAckListenerErrorHandler) 提供在使用手动 AckMode 时访问 Acknowledgment 对象的权限。

Another sub-interface (ManualAckListenerErrorHandler) provides access to the Acknowledgment object when using manual `AckMode`s.

Object handleError(Message<?> message, ListenerExecutionFailedException exception,
			Consumer<?, ?> consumer, @Nullable Acknowledgment ack);

不管哪种情况,你都不应该对使用者执行任何查找,因为容器不会意识到这一点。

In either case, you should NOT perform any seeks on the consumer because the container would be unaware of them.

Container Error Handlers

从版本 2.8 开始,传统的 ErrorHandlerBatchErrorHandler 接口已由新的 CommonErrorHandler 取代。这些错误处理程序可以处理记录和批处理监听器的错误,从而允许单个监听器容器工厂为两种类型的监听器创建容器。提供了 CommonErrorHandler 实现以替换大多数传统框架错误处理程序实现。

Starting with version 2.8, the legacy ErrorHandler and BatchErrorHandler interfaces have been superseded by a new CommonErrorHandler. These error handlers can handle errors for both record and batch listeners, allowing a single listener container factory to create containers for both types of listener. CommonErrorHandler implementations to replace most legacy framework error handler implementations are provided.

关于将自定义错误处理程序迁移到 CommonErrorHandler 的信息,请参阅 Migrating Custom Legacy Error Handler Implementations to CommonErrorHandler

See Migrating Custom Legacy Error Handler Implementations to CommonErrorHandler for information to migrate custom error handlers to CommonErrorHandler.

在使用事务时,默认情况下未配置错误处理程序,以便异常将回滚事务。事务性容器的错误处理由 AfterRollbackProcessor 处理。如果你在使用事务时提供了自定义错误处理程序,则它必须抛出异常,以便事务回滚。

When transactions are being used, no error handlers are configured, by default, so that the exception will roll back the transaction. Error handling for transactional containers are handled by the AfterRollbackProcessor. If you provide a custom error handler when using transactions, it must throw an exception if you want the transaction rolled back.

此接口具有一个默认方法 isAckAfterHandle(),容器调用该方法以确定在错误处理程序未引发异常情况返回时是否提交偏移;默认情况下返回 true。

This interface has a default method isAckAfterHandle() which is called by the container to determine whether the offset(s) should be committed if the error handler returns without throwing an exception; it returns true by default.

通常,当错误“未处理”时(如执行搜索操作后),框架提供的错误处理程序会引发异常。默认情况下,容器在 ERROR 级别记录此类异常。所有框架错误处理程序都扩展 KafkaExceptionLogLevelAware ,这样可以控制记录此类异常的级别。

Typically, the error handlers provided by the framework will throw an exception when the error is not "handled" (e.g. after performing a seek operation). By default, such exceptions are logged by the container at ERROR level. All of the framework error handlers extend KafkaExceptionLogLevelAware which allows you to control the level at which these exceptions are logged.

/**
 * Set the level at which the exception thrown by this handler is logged.
 * @param logLevel the level (default ERROR).
 */
public void setLogLevel(KafkaException.Level logLevel) {
    ...
}

可以指定一个全局错误处理程序,用于容器工厂中的所有侦听器。下面的示例展示了如何做到这一点:

You can specify a global error handler to be used for all listeners in the container factory. The following example shows how to do so:

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
        kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setCommonErrorHandler(myErrorHandler);
    ...
    return factory;
}

默认情况下,如果带注释的侦听器方法引发异常,则会引发异常至容器,并根据容器配置来处理消息。

By default, if an annotated listener method throws an exception, it is thrown to the container, and the message is handled according to the container configuration.

容器在调用错误处理程序前会提交所有待处理的偏移提交。

The container commits any pending offset commits before calling the error handler.

如果使用 Spring Boot,只需要将错误处理程序作为 @Bean 添加,Boot 就会将其添加到自动配置的工厂中。

If you are using Spring Boot, you simply need to add the error handler as a @Bean and Boot will add it to the auto-configured factory.

Back Off Handlers

DefaultErrorHandler 等错误处理程序使用 BackOff 来确定重试传递之前等待多长时间。从版本 2.9 开始,你可以配置自定义 BackOffHandler。默认处理程序只是将线程挂起,直到回退时间过去(或容器停止)。框架还提供了 ContainerPausingBackOffHandler,它将暂停监听器容器,直到回退时间过去,然后恢复容器。当延迟长于 max.poll.interval.ms 消费者属性时,这很有用。请注意,实际回退时间的解析将受到 pollTimeout 容器属性的影响。

Error handlers such as the DefaultErrorHandler use a BackOff to determine how long to wait before retrying a delivery. Starting with version 2.9, you can configure a custom BackOffHandler. The default handler simply suspends the thread until the back off time passes (or the container is stopped). The framework also provides the ContainerPausingBackOffHandler which pauses the listener container until the back off time passes and then resumes the container. This is useful when the delays are longer than the max.poll.interval.ms consumer property. Note that the resolution of the actual back off time will be affected by the pollTimeout container property.

DefaultErrorHandler

这个新的错误处理程序替换了 SeekToCurrentErrorHandlerRecoveringBatchErrorHandler,它们一直是几个发行版中的默认错误处理程序。一种区别在于,批量监听器的回退行为(当抛出 BatchListenerFailedException 之外的异常时)相当于 Retrying Complete Batches

This new error handler replaces the SeekToCurrentErrorHandler and RecoveringBatchErrorHandler, which have been the default error handlers for several releases now. One difference is that the fallback behavior for batch listeners (when an exception other than a BatchListenerFailedException is thrown) is the equivalent of the Retrying Complete Batches.

从版本 2.9 开始,DefaultErrorHandler 可以配置为提供与查找未处理记录偏移量相同的语义,如下所述,但实际上不进行查找。相反,这些记录由侦听器容器保留,并在错误处理程序退出(并执行一次暂停的 poll() 之后重新提交给侦听器,以使使用者保持活动状态;如果正在使用 Non-Blocking RetriesContainerPausingBackOffHandler ,暂停可能会延长到多次轮询)。错误处理程序会返回一个结果给容器,指示当前失败的记录是否可以重新提交,或者是否已经恢复,然后不会再次发送给侦听器。要启用此模式,请将属性 seekAfterError 设置为 false

Starting with version 2.9, the DefaultErrorHandler can be configured to provide the same semantics as seeking the unprocessed record offsets as discussed below, but without actually seeking. Instead, the records are retained by the listener container and resubmitted to the listener after the error handler exits (and after performing a single paused poll(), to keep the consumer alive; if Non-Blocking Retries or a ContainerPausingBackOffHandler are being used, the pause may extend over multiple polls). The error handler returns a result to the container that indicates whether the current failing record can be resubmitted, or if it was recovered and then it will not be sent to the listener again. To enable this mode, set the property seekAfterError to false.

错误处理程序可以恢复(跳过)不断失败的记录。默认情况下,在十次失败后,会记录(ERROR 级别的)失败记录。可以使用自定义恢复器 (BiConsumer) 和控制每次传递尝试和延迟时间的 BackOff 来配置处理程序。使用具有 FixedBackOff.UNLIMITED_ATTEMPTSFixedBackOff 会(实际上)产生无限次重试。下列示例配置在尝试三次后的恢复:

The error handler can recover (skip) a record that keeps failing. By default, after ten failures, the failed record is logged (at the ERROR level). You can configure the handler with a custom recoverer (BiConsumer) and a BackOff that controls the delivery attempts and delays between each. Using a FixedBackOff with FixedBackOff.UNLIMITED_ATTEMPTS causes (effectively) infinite retries. The following example configures recovery after three tries:

DefaultErrorHandler errorHandler =
    new DefaultErrorHandler((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

要使用此处理程序的自定义实例配置侦听器容器,请将它添加到容器工厂。

To configure the listener container with a customized instance of this handler, add it to the container factory.

例如,对于 @KafkaListener 容器工厂,可以如下添加 DefaultErrorHandler

For example, with the @KafkaListener container factory, you can add DefaultErrorHandler as follows:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
    return factory;
}

对于记录侦听器,这会用 1 秒的退避重试发送最多 2 次(共 3 次发送尝试),而不是默认配置(FixedBackOff(0L, 9))。重试次数用尽后,仅记录失败。

For a record listener, this will retry a delivery up to 2 times (3 delivery attempts) with a back off of 1 second, instead of the default configuration (FixedBackOff(0L, 9)). Failures are simply logged after retries are exhausted.

例如,如果 poll 返回六条记录(来自每个分区 0、1、2 的两条),而侦听器在第四条记录上引发异常,容器通过提交偏移来确认前三个消息。DefaultErrorHandler 查找分区 1 的偏移 1 和分区 2 的偏移 0。下一个 poll() 返回未处理的三条记录。

As an example, if the poll returns six records (two from each partition 0, 1, 2) and the listener throws an exception on the fourth record, the container acknowledges the first three messages by committing their offsets. The DefaultErrorHandler seeks to offset 1 for partition 1 and offset 0 for partition 2. The next poll() returns the three unprocessed records.

如果 AckModeBATCH,容器会在调用错误处理程序前提交前两个分区的偏移。

If the AckMode was BATCH, the container commits the offsets for the first two partitions before calling the error handler.

对于批处理侦听器,侦听器必须引发 BatchListenerFailedException,以指示批处理中哪个记录失败。

For a batch listener, the listener must throw a BatchListenerFailedException indicating which records in the batch failed.

事件顺序为:

The sequence of events is:

  • Commit the offsets of the records before the index.

  • If retries are not exhausted, perform seeks so that all the remaining records (including the failed record) will be redelivered.

  • If retries are exhausted, attempt recovery of the failed record (default log only) and perform seeks so that the remaining records (excluding the failed record) will be redelivered. The recovered record’s offset is committed.

  • If retries are exhausted and recovery fails, seeks are performed as if retries are not exhausted.

从版本 2.9 开始,可以将 DefaultErrorHandler 配置为提供与以上提及的搜索未处理记录偏移量相同的语义,但实际不进行搜索。相反,错误处理程序创建一个新的 ConsumerRecords<?, ?>,其中仅包含未处理记录,然后将该记录提交至监听器(在执行一次暂停的 poll() 以保持使用者存活之后)。要启用此模式,请将属性 seekAfterError 设置为 false

Starting with version 2.9, the DefaultErrorHandler can be configured to provide the same semantics as seeking the unprocessed record offsets as discussed above, but without actually seeking. Instead, error handler creates a new ConsumerRecords<?, ?> containing just the unprocessed records which will then be submitted to the listener (after performing a single paused poll(), to keep the consumer alive). To enable this mode, set the property seekAfterError to false.

默认恢复器会在重试用尽后记录失败记录。您可以使用自定义恢复器,或框架提供的恢复器,如 xref:kafka/annotation-error-handling.adoc#dead-letters[DeadLetterPublishingRecoverer

The default recoverer logs the failed record after retries are exhausted. You can use a custom recoverer, or one provided by the framework such as the DeadLetterPublishingRecoverer.

当使用 POJO 批处理侦听器(如 List<Thing>)时,如果没有完整的消费者记录可以添加到异常中,只需添加失败记录的索引即可:

When using a POJO batch listener (e.g. List<Thing>), and you don’t have the full consumer record to add to the exception, you can just add the index of the record that failed:

@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
    for (int i = 0; i < records.size(); i++) {
        try {
            process(things.get(i));
        }
        catch (Exception e) {
            throw new BatchListenerFailedException("Failed to process", i);
        }
    }
}

当容器配置为 AckMode.MANUAL_IMMEDIATE 时,可以配置错误处理程序来提交已恢复记录的偏移;将 commitRecovered 属性设置为 true

When the container is configured with AckMode.MANUAL_IMMEDIATE, the error handler can be configured to commit the offset of recovered records; set the commitRecovered property to true.

在使用事务时,DefaultAfterRollbackProcessor 提供类似的功能。请参阅 After-rollback Processor

When using transactions, similar functionality is provided by the DefaultAfterRollbackProcessor. See After-rollback Processor.

DefaultErrorHandler 将某些异常视为致命异常,并且对此类异常跳过重试;恢复程序在首次失败时调用。默认情况下,被视为致命的异常有:

The DefaultErrorHandler considers certain exceptions to be fatal, and retries are skipped for such exceptions; the recoverer is invoked on the first failure. The exceptions that are considered fatal, by default, are:

  • DeserializationException

  • MessageConversionException

  • ConversionException

  • MethodArgumentResolutionException

  • NoSuchMethodException

  • ClassCastException

因为这些异常不太可能在重试传递时得到解决。

since these exceptions are unlikely to be resolved on a retried delivery.

可以将更多异常类型添加到不可重试类别,或完全替换分类的异常映射。请参阅 DefaultErrorHandler.addNotRetryableException()DefaultErrorHandler.setClassifications() 的 Javadocs 了解更多信息,以及适用于 spring-retry BinaryExceptionClassifier 的信息。

You can add more exception types to the not-retryable category, or completely replace the map of classified exceptions. See the Javadocs for DefaultErrorHandler.addNotRetryableException() and DefaultErrorHandler.setClassifications() for more information, as well as those for the spring-retry BinaryExceptionClassifier.

以下示例向不可重试异常添加了 IllegalArgumentException

Here is an example that adds IllegalArgumentException to the not-retryable exceptions:

@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
    DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
    handler.addNotRetryableExceptions(IllegalArgumentException.class);
    return handler;
}

错误处理程序可以通过一个或多个 RetryListener 进行配置,以接收重试和恢复进度的通知。从版本 2.8.10 开始,添加了批处理侦听器的方法。

The error handler can be configured with one or more `RetryListener`s, receiving notifications of retry and recovery progress. Starting with version 2.8.10, methods for batch listeners were added.

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

    default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
    }

    default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
    }

	default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
	}

}

有关更多信息,请参见 JavaDoc。

See the JavaDocs for more information.

如果恢复程序失败(抛出异常),则失败的记录将包含在 seeks 中。如果恢复程序失败,BackOff 将按默认值重置,而且在再次尝试恢复之前,重新交付将再次经历 back offs。要跳过恢复失败后的重试,将错误处理程序的 resetStateOnRecoveryFailure 设置为 false

If the recoverer fails (throws an exception), the failed record will be included in the seeks. If the recoverer fails, the BackOff will be reset by default and redeliveries will again go through the back offs before recovery is attempted again. To skip retries after a recovery failure, set the error handler’s resetStateOnRecoveryFailure to false.

可以给错误处理程序提供一个 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>,用于基于失败的记录和/或异常来确定要使用的 BackOff

You can provide the error handler with a BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> to determine the BackOff to use, based on the failed record and/or the exception:

handler.setBackOffFunction((record, ex) -> { ... });

如果函数返回 null,则将使用处理程序的默认 BackOff

If the function returns null, the handler’s default BackOff will be used.

resetStateOnExceptionChange 设置为 true,如果失败之间的异常类型发生更改,则将重新启动重试序列(包括选择新的 BackOff(如果已配置))。当为 false(2.9 版本之前的默认值)时,异常类型不被考虑。

Set resetStateOnExceptionChange to true and the retry sequence will be restarted (including the selection of a new BackOff, if so configured) if the exception type changes between failures. When false (the default before version 2.9), the exception type is not considered.

从 2.9 版本开始,现在默认为 true

Starting with version 2.9, this is now true by default.

另请参阅 Delivery Attempts Header

Conversion Errors with Batch Error Handlers

从 2.8 版本开始,批处理侦听器现在可以正确处理转换错误,当使用具有 ByteArrayDeserializerBytesDeserializerStringDeserializerMessageConverterDefaultErrorHandler 时。当发生转换错误时,有效负载将设置为 null,并且记录头中会添加反序列化异常,类似于 ErrorHandlingDeserializerConversionException 列表在侦听器中可用,以便侦听器可以抛出 BatchListenerFailedException,以指示出现转换异常的第一个索引。

Starting with version 2.8, batch listeners can now properly handle conversion errors, when using a MessageConverter with a ByteArrayDeserializer, a BytesDeserializer or a StringDeserializer, as well as a DefaultErrorHandler. When a conversion error occurs, the payload is set to null and a deserialization exception is added to the record headers, similar to the ErrorHandlingDeserializer. A list of ConversionException`s is available in the listener so the listener can throw a `BatchListenerFailedException indicating the first index at which a conversion exception occurred.

示例:

Example:

@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
    for (int i = 0; i < in.size(); i++) {
        Foo foo = in.get(i);
        if (foo == null && exceptions.get(i) != null) {
            throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
        }
        process(foo);
    }
}

Retrying Complete Batches

这现在是 FallbackBatchErrorHandler 的后备行为,其中批处理侦听器抛出不是 BatchListenerFailedException 的异常。

This is now the fallback behavior of the DefaultErrorHandler for a batch listener where the listener throws an exception other than a BatchListenerFailedException.

当批处理重新传递时,不能保证批处理具有相同数量的记录和/或重新传递的记录按相同顺序排列。因此,很难轻松维护批处理的重试状态。FallbackBatchErrorHandler 采用以下方法。如果批处理侦听器抛出一个不是 BatchListenerFailedException 的异常,则从记录的内存中批处理执行重试。为了避免在扩展的重试序列期间重新平衡,错误处理程序暂停使用者,在每次重试之前进行轮询,然后休眠以进行回退,并再次调用侦听器。如果/当重试耗尽时,将针对批处理中的每个记录调用 ConsumerRecordRecoverer。如果恢复程序抛出一个异常,或者在睡眠期间线程被打断,则将在下次轮询时重新传递批处理记录。无论结果如何,在退出之前,都会恢复使用者。

There is no guarantee that, when a batch is redelivered, the batch has the same number of records and/or the redelivered records are in the same order. It is impossible, therefore, to easily maintain retry state for a batch. The FallbackBatchErrorHandler takes the following approach. If a batch listener throws an exception that is not a BatchListenerFailedException, the retries are performed from the in-memory batch of records. In order to avoid a rebalance during an extended retry sequence, the error handler pauses the consumer, polls it before sleeping for the back off, for each retry, and calls the listener again. If/when retries are exhausted, the ConsumerRecordRecoverer is called for each record in the batch. If the recoverer throws an exception, or the thread is interrupted during its sleep, the batch of records will be redelivered on the next poll. Before exiting, regardless of the outcome, the consumer is resumed.

此机制不能与事务一起使用。

This mechanism cannot be used with transactions.

在等待 BackOff 间隔时,错误处理程序将以短时间睡眠循环,直至达到所需的延迟,同时检查容器是否已停止,允许在 stop() 后立即退出睡眠,而不是导致延迟。

While waiting for a BackOff interval, the error handler will loop with a short sleep until the desired delay is reached, while checking to see if the container has been stopped, allowing the sleep to exit soon after the stop() rather than causing a delay.

Container Stopping Error Handlers

如果侦听器抛出异常,CommonContainerStoppingErrorHandler 将停止容器。对于记录侦听器,当 AckModeRECORD 时,将提交已处理记录的偏移量。对于记录侦听器,当 AckMode 为任何手动值时,将提交已确认记录的偏移量。对于记录侦听器,当 AckModeBATCH,或对于批处理侦听器,当容器重新启动时,将重放整个批处理。

The CommonContainerStoppingErrorHandler stops the container if the listener throws an exception. For record listeners, when the AckMode is RECORD, offsets for already processed records are committed. For record listeners, when the AckMode is any manual value, offsets for already acknowledged records are committed. For record listeners, when the AckMode is BATCH, or for batch listeners, the entire batch is replayed when the container is restarted.

在容器停止后,将抛出一个包含 ListenerExecutionFailedException 的异常。这是为了使事务回滚(如果启用了事务)。

After the container stops, an exception that wraps the ListenerExecutionFailedException is thrown. This is to cause the transaction to roll back (if transactions are enabled).

Delegating Error Handler

CommonDelegatingErrorHandler 可以委派给不同的错误处理程序,具体取决于异常类型。例如,你可能希望对大多数异常调用 DefaultErrorHandler,或对其他异常调用 CommonContainerStoppingErrorHandler

The CommonDelegatingErrorHandler can delegate to different error handlers, depending on the exception type. For example, you may wish to invoke a DefaultErrorHandler for most exceptions, or a CommonContainerStoppingErrorHandler for others.

所有委托都必须共享相同的兼容属性(ackAfterHandleseekAfterError …​​)。

All delegates must share the same compatible properties (ackAfterHandle, seekAfterError …​).

Logging Error Handler

CommonLoggingErrorHandler 只会记录异常;对于记录监听器,上个轮训中的剩余记录会传递给监听器。对于批处理监听器,批处理中的所有记录都会被记录。

The CommonLoggingErrorHandler simply logs the exception; with a record listener, the remaining records from the previous poll are passed to the listener. For a batch listener, all the records in the batch are logged.

Using Different Common Error Handlers for Record and Batch Listeners

如果你希望对记录和批处理监听器使用不同的错误处理策略,会提供 CommonMixedErrorHandler 来允许为每个监听器类型配置特定的错误处理程序。

If you wish to use a different error handling strategy for record and batch listeners, the CommonMixedErrorHandler is provided allowing the configuration of a specific error handler for each listener type.

Common Error Handler Summary

  • DefaultErrorHandler

  • CommonContainerStoppingErrorHandler

  • CommonDelegatingErrorHandler

  • CommonLoggingErrorHandler

  • CommonMixedErrorHandler

Legacy Error Handlers and Their Replacements

Legacy Error Handler Replacement

LoggingErrorHandler

CommonLoggingErrorHandler

BatchLoggingErrorHandler

CommonLoggingErrorHandler

ConditionalDelegatingErrorHandler

DelegatingErrorHandler

ConditionalDelegatingBatchErrorHandler

DelegatingErrorHandler

ContainerStoppingErrorHandler

CommonContainerStoppingErrorHandler

ContainerStoppingBatchErrorHandler

CommonContainerStoppingErrorHandler

SeekToCurrentErrorHandler

DefaultErrorHandler

SeekToCurrentBatchErrorHandler

No replacement, use DefaultErrorHandler with an infinite BackOff.

RecoveringBatchErrorHandler

DefaultErrorHandler

RetryingBatchErrorHandler

No replacements, use DefaultErrorHandler and throw an exception other than BatchListenerFailedException.

Migrating Custom Legacy Error Handler Implementations to CommonErrorHandler

请参阅 CommonErrorHandler 中的 JavaDocs。

Refer to the JavaDocs in CommonErrorHandler.

要代替 ErrorHandlerConsumerAwareErrorHandler 实现,你应实现 handleOne(),并让 seeksAfterHandle() 返回 false(默认值)。你还应实现 handleOtherException() 来处理发生在记录处理范围之外的异常(例如消费者错误)。

To replace an ErrorHandler or ConsumerAwareErrorHandler implementation, you should implement handleOne() and leave seeksAfterHandle() to return false (default). You should also implement handleOtherException() to handle exceptions that occur outside the scope of record processing (e.g. consumer errors).

要代替 RemainingRecordsErrorHandler 实现,你应实现 handleRemaining(),并覆盖 seeksAfterHandle() 以返回 true(错误处理程序必须执行必要的搜索)。你还应实现 handleOtherException() 来处理发生在记录处理范围之外的异常(例如消费者错误)。

To replace a RemainingRecordsErrorHandler implementation, you should implement handleRemaining() and override seeksAfterHandle() to return true (the error handler must perform the necessary seeks). You should also implement handleOtherException() - to handle exceptions that occur outside the scope of record processing (e.g. consumer errors).

要代替任何 BatchErrorHandler 实现,你应实现 handleBatch()。你还应实现 handleOtherException() 来处理发生在记录处理范围之外的异常(例如消费者错误)。

To replace any BatchErrorHandler implementation, you should implement handleBatch() You should also implement handleOtherException() - to handle exceptions that occur outside the scope of record processing (e.g. consumer errors).

After Rollback Processor

使用事务时,如果监听器抛出一个异常(且存在错误处理程序时,该处理程序抛出异常),则会回滚该事务。默认情况下,任何未处理的记录(包括失败的记录)都会在下一个轮询中重新抓取。这通过在 DefaultAfterRollbackProcessor 中执行 seek 操作来实现。对于批处理监听器,会重新处理记录的整个批处理(该容器并不知道批处理中哪个记录失败了)。若要修改此行为,你可以使用自定义 AfterRollbackProcessor 配置监听器容器。例如,对于基于记录的监听器,你可能希望跟踪失败的记录,并在多次尝试后放弃,也许通过将该记录发布到死信主题。

When using transactions, if the listener throws an exception (and an error handler, if present, throws an exception), the transaction is rolled back. By default, any unprocessed records (including the failed record) are re-fetched on the next poll. This is achieved by performing seek operations in the DefaultAfterRollbackProcessor. With a batch listener, the entire batch of records is reprocessed (the container has no knowledge of which record in the batch failed). To modify this behavior, you can configure the listener container with a custom AfterRollbackProcessor. For example, with a record-based listener, you might want to keep track of the failed record and give up after some number of attempts, perhaps by publishing it to a dead-letter topic.

从 2.2 版本开始,DefaultAfterRollbackProcessor 现在可以恢复(跳过)不断失败的记录。默认情况下,经过十次失败后,会记录失败的记录(在“ERROR”级别)。你可以使用自定义恢复程序(BiConsumer)和最大失败数来配置处理器。将 maxFailures 属性设置为负数会导致无限重试。以下示例配置三次尝试后的恢复:

Starting with version 2.2, the DefaultAfterRollbackProcessor can now recover (skip) a record that keeps failing. By default, after ten failures, the failed record is logged (at the ERROR level). You can configure the processor with a custom recoverer (BiConsumer) and maximum failures. Setting the maxFailures property to a negative number causes infinite retries. The following example configures recovery after three tries:

AfterRollbackProcessor<String, String> processor =
    new DefaultAfterRollbackProcessor((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

在不使用事务时,可以通过配置 DefaultErrorHandler 实现类似的功能。请参阅 Container Error Handlers

When you do not use transactions, you can achieve similar functionality by configuring a DefaultErrorHandler. See Container Error Handlers.

从 3.2 版本开始,恢复现在可以恢复(跳过)不断失败的整个批处理记录。设置 ContainerProperties.setBatchRecoverAfterRollback(true) 以启用此功能。

Starting with version 3.2, Recovery can now recover (skip) entire batch of records that keeps failing. Set ContainerProperties.setBatchRecoverAfterRollback(true) to enable this feature.

默认行为,恢复无法与批处理监听器一起使用,因为框架不知道批处理中哪条记录会一直失败。在这种情况下,应用程序监听器必须处理一直失败的记录。

Default behavior, recovery is not possible with a batch listener, since the framework has no knowledge about which record in the batch keeps failing. In such cases, the application listener must handle a record that keeps failing.

从 2.2.5 版本开始,DefaultAfterRollbackProcessor 可以在一个新事务中调用(在失败事务回滚后启动)。然后,如果你使用 DeadLetterPublishingRecoverer 将失败的记录发布,则处理器将恢复记录的偏移量发送到事务中的原始主题/分区。要启用此功能,请在 DefaultAfterRollbackProcessor 上设置 commitRecoveredkafkaTemplate 属性。

Starting with version 2.2.5, the DefaultAfterRollbackProcessor can be invoked in a new transaction (started after the failed transaction rolls back). Then, if you are using the DeadLetterPublishingRecoverer to publish a failed record, the processor will send the recovered record’s offset in the original topic/partition to the transaction. To enable this feature, set the commitRecovered and kafkaTemplate properties on the DefaultAfterRollbackProcessor.

如果恢复程序失败(抛出异常),则失败的记录将包含在 seeks 中。从 2.5.5 版本开始,如果恢复程序失败,BackOff 将按默认值重置,而且在再次尝试恢复之前,重新交付将再次经历 back offs。在较早版本中,BackOff 没有重置,并且在下次失败时重新尝试恢复。要恢复到以前的行为,将处理程序的 resetStateOnRecoveryFailure 属性设置为 false

If the recoverer fails (throws an exception), the failed record will be included in the seeks. Starting with version 2.5.5, if the recoverer fails, the BackOff will be reset by default and redeliveries will again go through the back offs before recovery is attempted again. With earlier versions, the BackOff was not reset and recovery was re-attempted on the next failure. To revert to the previous behavior, set the processor’s resetStateOnRecoveryFailure property to false.

从 2.6 版本开始,你现在可以向处理器提供 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>,以基于失败的记录和/或异常来确定要使用的 BackOff

Starting with version 2.6, you can now provide the processor with a BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> to determine the BackOff to use, based on the failed record and/or the exception:

handler.setBackOffFunction((record, ex) -> { ... });

如果该函数返回 null,则将使用处理器的默认 BackOff

If the function returns null, the processor’s default BackOff will be used.

从 2.6.3 版本开始,将 resetStateOnExceptionChange 设置为 true,如果异常类型在失败之间发生改变,重试序列将重新启动(包括选择一个新的 BackOff,如果这样配置的话)。默认情况下,不考虑异常类型。

Starting with version 2.6.3, set resetStateOnExceptionChange to true and the retry sequence will be restarted (including the selection of a new BackOff, if so configured) if the exception type changes between failures. By default, the exception type is not considered.

从 2.3.1 版本开始,与 DefaultErrorHandler 类似,DefaultAfterRollbackProcessor 将某些异常视为致命异常,并且对这些异常不进行重试;恢复程序在第一次失败时调用。默认情况下,被认为是致命的异常有:

Starting with version 2.3.1, similar to the DefaultErrorHandler, the DefaultAfterRollbackProcessor considers certain exceptions to be fatal, and retries are skipped for such exceptions; the recoverer is invoked on the first failure. The exceptions that are considered fatal, by default, are:

  • DeserializationException

  • MessageConversionException

  • ConversionException

  • MethodArgumentResolutionException

  • NoSuchMethodException

  • ClassCastException

因为这些异常不太可能在重试传递时得到解决。

since these exceptions are unlikely to be resolved on a retried delivery.

你可以在不可重试类别中添加更多异常类型,或者完全替换已分类异常的地图。有关更多信息,请参阅 DefaultAfterRollbackProcessor.setClassifications()spring-retry BinaryExceptionClassifier 的 Javadoc。

You can add more exception types to the not-retryable category, or completely replace the map of classified exceptions. See the Javadocs for DefaultAfterRollbackProcessor.setClassifications() for more information, as well as those for the spring-retry BinaryExceptionClassifier.

以下示例向不可重试异常添加了 IllegalArgumentException

Here is an example that adds IllegalArgumentException to the not-retryable exceptions:

@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
    DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
    processor.addNotRetryableException(IllegalArgumentException.class);
    return processor;
}

另请参阅 Delivery Attempts Header

使用当前 kafka-clients 时,容器无法检测出 ProducerFencedException 是由负载平衡引起的,还是生产者的 transactional.id 由于超时或失效而被撤销。因为在大多数情况下,此类问题由负载平衡引起,所以容器不会调用 AfterRollbackProcessor(因为它不是适当的,因为我们不再分配给它们分区)。如果您确保超时时间足够每个事务处理,并且定期执行“空”事务(例如通过 ListenerContainerIdleEvent),您可以避免因超时和失效而隔离。或者,您可以将 stopContainerWhenFenced 容器属性设置为 true,而且容器将停止,避免记录丢失。您可以消费 ConsumerStoppedEvent 并检查 Reason 属性是否为 FENCED 以检测此条件。由于该事件还引用了容器,因此您可以使用此事件重新启动容器。

With current kafka-clients, the container cannot detect whether a ProducerFencedException is caused by a rebalance or if the producer’s transactional.id has been revoked due to a timeout or expiry. Because, in most cases, it is caused by a rebalance, the container does not call the AfterRollbackProcessor (because it’s not appropriate to seek the partitions because we no longer are assigned them). If you ensure the timeout is large enough to process each transaction and periodically perform an "empty" transaction (e.g. via a ListenerContainerIdleEvent) you can avoid fencing due to timeout and expiry. Or, you can set the stopContainerWhenFenced container property to true and the container will stop, avoiding the loss of records. You can consume a ConsumerStoppedEvent and check the Reason property for FENCED to detect this condition. Since the event also has a reference to the container, you can restart the container using this event.

从 2.7 版本开始,错误处理程序会在等待 BackOff 间隔时循环进行短暂休眠,直到达到所需的延迟,同时检查容器是否已停止,从而允许休眠在 stop() 后尽快退出,而不是造成延迟。

Starting with version 2.7, while waiting for a BackOff interval, the error handler will loop with a short sleep until the desired delay is reached, while checking to see if the container has been stopped, allowing the sleep to exit soon after the stop() rather than causing a delay.

从 2.7 版本开始,可以为处理器配置一个或多个 RetryListener,接收重试和恢复进度的通知。

Starting with version 2.7, the processor can be configured with one or more `RetryListener`s, receiving notifications of retry and recovery progress.

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

}

有关更多信息,请参见 JavaDoc。

See the JavaDocs for more information.

Delivery Attempts Header

以下只适用于记录监听器,不适用于批处理监听器。

The following applies to record listeners only, not batch listeners.

从版本 2.5 开始,在使用实现 DeliveryAttemptAwareErrorHandlerAfterRollbackProcessor 时,可以启用向记录中添加 KafkaHeaders.DELIVERY_ATTEMPT 头 (kafka_deliveryAttempt)。此头的值为从 1 开始的递增整数。在收到原始 ConsumerRecord<?, ?> 时,整数位于 byte[4] 中。

Starting with version 2.5, when using an ErrorHandler or AfterRollbackProcessor that implements DeliveryAttemptAware, it is possible to enable the addition of the KafkaHeaders.DELIVERY_ATTEMPT header (kafka_deliveryAttempt) to the record. The value of this header is an incrementing integer starting at 1. When receiving a raw ConsumerRecord<?, ?> the integer is in a byte[4].

int delivery = ByteBuffer.wrap(record.headers()
    .lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
    .getInt();

使用带有 DefaultKafkaHeaderMapperSimpleKafkaHeaderMapper@KafkaListener 时,可通过将 @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery 作为参数添加到监听器方法中来获取。

When using @KafkaListener with the DefaultKafkaHeaderMapper or SimpleKafkaHeaderMapper, it can be obtained by adding @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery as a parameter to the listener method.

要启用此标头的填充,请将容器属性 deliveryAttemptHeader 设置为 true。它在默认情况下被禁用,以避免查找每个记录的状态并添加标头的(小)开销。

To enable population of this header, set the container property deliveryAttemptHeader to true. It is disabled by default to avoid the (small) overhead of looking up the state for each record and adding the header.

DefaultErrorHandlerDefaultAfterRollbackProcessor 支持此功能。

The DefaultErrorHandler and DefaultAfterRollbackProcessor support this feature.

Listener Info Header

在某些情况下,了解监听器在哪个容器中运行很有用。

In some cases, it is useful to be able to know which container a listener is running in.

从版本 2.8.4 开始,您现在可以在监听器容器上设置 listenerInfo 属性,或在 @KafkaListener 注释上设置 info 属性。然后,容器会将此信息添加到所有传入消息的 KafkaListener.LISTENER_INFO 标头中;然后可以在记录拦截器、筛选器等中或在监听器本身中使用它。

Starting with version 2.8.4, you can now set the listenerInfo property on the listener container, or set the info attribute on the @KafkaListener annotation. Then, the container will add this in the KafkaListener.LISTENER_INFO header to all incoming messages; it can then be used in record interceptors, filters, etc., or in the listener itself.

@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
        info = "this is the something listener")
public void listen(@Payload Thing thing,
        @Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
    ...
}

RecordInterceptorRecordFilterStrategy 实现中使用时,标头在消费者记录中以字节数组的形式出现,使用 KafkaListenerAnnotationBeanPostProcessorcharSet 属性进行转换。

When used in a RecordInterceptor or RecordFilterStrategy implementation, the header is in the consumer record as a byte array, converted using the KafkaListenerAnnotationBeanPostProcessor’s `charSet property.

标头映射器在从消费者记录创建 MessageHeaders 时也会转换成 String,且永远不会在此标头上映射输出记录。

The header mappers also convert to String when creating MessageHeaders from the consumer record and never map this header on an outbound record.

对于 POJO 批量侦听器,从版本 2.8.6 开始,标头将被复制到批量的每个成员中,并且在转换后还可用作单个 String 参数。

For POJO batch listeners, starting with version 2.8.6, the header is copied into each member of the batch and is also available as a single String parameter after conversion.

@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
        info = "info for batch")
public void listen(List<Thing> list,
        @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets,
        @Header(KafkaHeaders.LISTENER_INFO) String info) {
            ...
}

如果批处理监听器有过滤器,并且筛选导致批处理为空,则您需要将 required = false 添加到 @Header 参数中,因为信息不在空批处理中。

If the batch listener has a filter and the filter results in an empty batch, you will need to add required = false to the @Header parameter because the info is not available for an empty batch.

如果您收到 List<Message<Thing>>,则信息位于每个 Message<?>KafkaHeaders.LISTENER_INFO 标头中。

If you receive List<Message<Thing>> the info is in the KafkaHeaders.LISTENER_INFO header of each Message<?>.

Batch Listeners 了解更多关于消耗批次的信息。

See Batch Listeners for more information about consuming batches.

Publishing Dead-letter Records

当记录的最大失败次数达到时,您可以使用记录回收器配置 DefaultErrorHandlerDefaultAfterRollbackProcessor。该框架提供 DeadLetterPublishingRecoverer,该功能将失败的消息发布到另一个主题。回收器需要一个 KafkaTemplate<Object, Object>,它用于发送记录。您还可以选择使用 BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> 配置它,它用于解析目标主题和分区。

You can configure the DefaultErrorHandler and DefaultAfterRollbackProcessor with a record recoverer when the maximum number of failures is reached for a record. The framework provides the DeadLetterPublishingRecoverer, which publishes the failed message to another topic. The recoverer requires a KafkaTemplate<Object, Object>, which is used to send the record. You can also, optionally, configure it with a BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>, which is called to resolve the destination topic and partition.

默认情况下,死信记录会发送到名为 <originalTopic>.DLT 的主题(原始主题名加上 .DLT 后缀),并且属于与原始记录相同的分区。因此,当您使用默认解析器时,死信主题 must have at least as many partitions as the original topic.

By default, the dead-letter record is sent to a topic named <originalTopic>.DLT (the original topic name suffixed with .DLT) and to the same partition as the original record. Therefore, when you use the default resolver, the dead-letter topic must have at least as many partitions as the original topic.

如果返回的 TopicPartition 具有负分区,则该分区不会在 ProducerRecord 中设置,因此该分区由 Kafka 选择。从版本 2.2.4 开始,任何 ListenerExecutionFailedException(例如,在 @KafkaListener 方法中检测到异常时抛出)都会通过 groupId 属性得到增强。这允许目标解析器使用此属性,除了 ConsumerRecord 中的信息外,还可用于选择死信主题。

If the returned TopicPartition has a negative partition, the partition is not set in the ProducerRecord, so the partition is selected by Kafka. Starting with version 2.2.4, any ListenerExecutionFailedException (thrown, for example, when an exception is detected in a @KafkaListener method) is enhanced with the groupId property. This allows the destination resolver to use this, in addition to the information in the ConsumerRecord to select the dead letter topic.

以下示例说明如何连接自定义目标解析器:

The following example shows how to wire a custom destination resolver:

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
        (r, e) -> {
            if (e instanceof FooException) {
                return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
            }
            else {
                return new TopicPartition(r.topic() + ".other.failures", r.partition());
            }
        });
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));

发送到死信主题的记录通过以下标头进行增强:

The record sent to the dead-letter topic is enhanced with the following headers:

  • KafkaHeaders.DLT_EXCEPTION_FQCN: The Exception class name (generally a ListenerExecutionFailedException, but can be others).

  • KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN: The Exception cause class name, if present (since version 2.8).

  • KafkaHeaders.DLT_EXCEPTION_STACKTRACE: The Exception stack trace.

  • KafkaHeaders.DLT_EXCEPTION_MESSAGE: The Exception message.

  • KafkaHeaders.DLT_KEY_EXCEPTION_FQCN: The Exception class name (key deserialization errors only).

  • KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE: The Exception stack trace (key deserialization errors only).

  • KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE: The Exception message (key deserialization errors only).

  • KafkaHeaders.DLT_ORIGINAL_TOPIC: The original topic.

  • KafkaHeaders.DLT_ORIGINAL_PARTITION: The original partition.

  • KafkaHeaders.DLT_ORIGINAL_OFFSET: The original offset.

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP: The original timestamp.

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE: The original timestamp type.

  • KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP: The original consumer group that failed to process the record (since version 2.8).

关键异常仅由 DeserializationException 引起,因此没有 DLT_KEY_EXCEPTION_CAUSE_FQCN

Key exceptions are only caused by DeserializationException`s so there is no `DLT_KEY_EXCEPTION_CAUSE_FQCN.

有两种机制可以添加更多标头。

There are two mechanisms to add more headers.

  1. Subclass the recoverer and override createProducerRecord() - call super.createProducerRecord() and add more headers.

  2. Provide a BiFunction to receive the consumer record and exception, returning a Headers object; headers from there will be copied to the final producer record; also see Managing Dead Letter Record Headers. Use setHeadersFunction() to set the BiFunction.

第二种机制实现起来更简单,但第一种机制有更多信息可用,包括已经组装好的标准标头。

The second is simpler to implement but the first has more information available, including the already assembled standard headers.

从版本 2.3 开始,当与 ErrorHandlingDeserializer 结合使用时,发布者将在死信生产者记录中将记录 value() 还原为无法反序列化的原始值。以前,value() 为 null,用户代码必须从消息头中解码 DeserializationException。此外,您可以从 DeserializationException 中提供多个 KafkaTemplate`s to the publisher; this might be needed, for example, if you want to publish the `byte[],以及使用与成功反序列化的记录不同的序列化程序的值。以下是如何使用 KafkaTemplate`s that use a `Stringbyte[] 序列化程序配置发布者的示例:

Starting with version 2.3, when used in conjunction with an ErrorHandlingDeserializer, the publisher will restore the record value(), in the dead-letter producer record, to the original value that failed to be deserialized. Previously, the value() was null and user code had to decode the DeserializationException from the message headers. In addition, you can provide multiple KafkaTemplate`s to the publisher; this might be needed, for example, if you want to publish the `byte[] from a DeserializationException, as well as values using a different serializer from records that were deserialized successfully. Here is an example of configuring the publisher with KafkaTemplate`s that use a `String and byte[] serializer:

@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
        KafkaTemplate<?, ?> bytesTemplate) {
    Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
    templates.put(String.class, stringTemplate);
    templates.put(byte[].class, bytesTemplate);
    return new DeadLetterPublishingRecoverer(templates);
}

发布者使用映射键定位适合用于即将发布的 value() 的模板。建议使用 LinkedHashMap,以便按顺序检查键。

The publisher uses the map keys to locate a template that is suitable for the value() about to be published. A LinkedHashMap is recommended so that the keys are examined in order.

在发布 null 值时,并且有多个模板,则回收器将查找适用于 Void 类的模板;如果不存在,则将使用 values().iterator() 中的第一个模板。

When publishing null values, and there are multiple templates, the recoverer will look for a template for the Void class; if none is present, the first template from the values().iterator() will be used.

自 2.7 版开始,你可以使用 setFailIfSendResultIsError 方法,以便在消息发布失败时引发异常。你还可以使用 setWaitForSendResultTimeout 为发送者成功的验证设置超时。

Since 2.7 you can use the setFailIfSendResultIsError method so that an exception is thrown when message publishing fails. You can also set a timeout for the verification of the sender success with setWaitForSendResultTimeout.

如果恢复程序失败(抛出异常),则失败的记录将包含在 seeks 中。从 2.5.5 版本开始,如果恢复程序失败,BackOff 将按默认值重置,而且在再次尝试恢复之前,重新交付将再次经历 back offs。在较早版本中,BackOff 没有重置,并且在下次失败时重新尝试恢复。要恢复到以前的行为,将错误处理程序的 resetStateOnRecoveryFailure 属性设置为 false

If the recoverer fails (throws an exception), the failed record will be included in the seeks. Starting with version 2.5.5, if the recoverer fails, the BackOff will be reset by default and redeliveries will again go through the back offs before recovery is attempted again. With earlier versions, the BackOff was not reset and recovery was re-attempted on the next failure. To revert to the previous behavior, set the error handler’s resetStateOnRecoveryFailure property to false.

从 2.6.3 版本开始,将 resetStateOnExceptionChange 设置为 true,如果异常类型在失败之间发生改变,重试序列将重新启动(包括选择一个新的 BackOff,如果这样配置的话)。默认情况下,不考虑异常类型。

Starting with version 2.6.3, set resetStateOnExceptionChange to true and the retry sequence will be restarted (including the selection of a new BackOff, if so configured) if the exception type changes between failures. By default, the exception type is not considered.

从 2.3 版开始,恢复程序也可以与 Kafka 流一起使用 - 见 Recovery from Deserialization Exceptions 了解更多信息。

Starting with version 2.3, the recoverer can also be used with Kafka Streams - see Recovery from Deserialization Exceptions for more information.

ErrorHandlingDeserializer 将反序列化异常添加到头信息 ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADERErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER(使用 Java 序列化)。默认情况下,这些头信息不会保留在发布到死信主题的消息中。从 2.7 版开始,如果密钥和值都反序列化失败,那么将这两个的原始值填充到发送到 DLT 的记录中。

The ErrorHandlingDeserializer adds the deserialization exception(s) in headers ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER and ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER (using Java serialization). By default, these headers are not retained in the message published to the dead letter topic. Starting with version 2.7, if both the key and value fail deserialization, the original values of both are populated in the record sent to the DLT.

如果传入记录相互依赖,但可能会乱序到达,则重新将失败记录重新发布到原始主题的尾部(重复若干次)可能很有用,而不是直接将其发送到死信主题。有关示例,请参见 this Stack Overflow Question

If incoming records are dependent on each other, but may arrive out of order, it may be useful to republish a failed record to the tail of the original topic (for some number of times), instead of sending it directly to the dead letter topic. See this Stack Overflow Question for an example.

以下错误处理程序配置将完全执行该操作:

The following error handler configuration will do exactly that:

@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
    return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
            (rec, ex) -> {
                org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
                if (retries == null) {
                    retries = new RecordHeader("retries", new byte[] { 1 });
                    rec.headers().add(retries);
                }
                else {
                    retries.value()[0]++;
                }
                return retries.value()[0] > 5
                        ? new TopicPartition("topic.DLT", rec.partition())
                        : new TopicPartition("topic", rec.partition());
            }), new FixedBackOff(0L, 0L));
}

从 2.7 版开始,恢复器检查目标解析器选择的实际分区是否存在。如果分区不存在,则 ProducerRecord 中的分区被设置为 null,从而允许 KafkaProducer 选择分区。你可以通过将 verifyPartition 属性设置为 false 来禁用此检查。

Starting with version 2.7, the recoverer checks that the partition selected by the destination resolver actually exists. If the partition is not present, the partition in the ProducerRecord is set to null, allowing the KafkaProducer to select the partition. You can disable this check by setting the verifyPartition property to false.

从 3.1 版开始,将 logRecoveryRecord 属性设置为 true 将记录恢复记录和异常。

Starting with version 3.1, setting the logRecoveryRecord property to true will log the recovery record and exception.

Managing Dead Letter Record Headers

参考上面 Publishing Dead-letter RecordsDeadLetterPublishingRecoverer 具有两个属性,用于在这些头文件已经存在时(例如在重新处理失败的死信记录时,包括在使用 Non-Blocking Retries 时)管理头文件。

Referring to Publishing Dead-letter Records above, the DeadLetterPublishingRecoverer has two properties used to manage headers when those headers already exist (such as when reprocessing a dead letter record that failed, including when using Non-Blocking Retries).

  • appendOriginalHeaders (default true)

  • stripPreviousExceptionHeaders (default true since version 2.8)

Apache Kafka 支持同名多个头信息;要获取“最新”值,可以使用 headers.lastHeader(headerName);要获取多个头信息的迭代器,可以使用 headers.headers(headerName).iterator().

Apache Kafka supports multiple headers with the same name; to obtain the "latest" value, you can use headers.lastHeader(headerName); to get an iterator over multiple headers, use headers.headers(headerName).iterator().

在反复重新发布失败记录时,这些头信息可能会增长(并且最终会导致发布失败,原因是 RecordTooLargeException);对于异常头信息,尤其是堆栈跟踪头信息,尤其如此。

When repeatedly republishing a failed record, these headers can grow (and eventually cause publication to fail due to a RecordTooLargeException); this is especially true for the exception headers and particularly for the stack trace headers.

这两个属性的原因是因为,虽然你可能只想保留最近的异常信息,但你可能希望保留记录在每次失败时经过的主题历史记录。

The reason for the two properties is because, while you might want to retain only the last exception information, you might want to retain the history of which topic(s) the record passed through for each failure.

appendOriginalHeaders 应用于名为 ORIGINAL 的所有头信息,而 stripPreviousExceptionHeaders 应用于名为 EXCEPTION 的所有头信息。

appendOriginalHeaders is applied to all headers named ORIGINAL while stripPreviousExceptionHeaders is applied to all headers named EXCEPTION.

从 2.8.4 版开始,你现在可以控制哪些标准头信息将添加到输出记录中。查看 enum HeadersToAdd 了解默认添加的(当前)10 个标准头信息的通用名称(这些不是实际的头信息名称,而只是一个抽象;实际的头信息名称由子类可以重写的 getHeaderNames() 方法设置)。

Starting with version 2.8.4, you now can control which of the standard headers will be added to the output record. See the enum HeadersToAdd for the generic names of the (currently) 10 standard headers that are added by default (these are not the actual header names, just an abstraction; the actual header names are set up by the getHeaderNames() method which subclasses can override.

要排除头信息,请使用 excludeHeaders() 方法;例如,要禁止在头信息中添加异常堆栈跟踪,请使用:

To exclude headers, use the excludeHeaders() method; for example, to suppress adding the exception stack trace in a header, use:

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);

此外,你可以通过添加 ExceptionHeadersCreator 完全自定义异常头信息的添加;这也将禁用所有标准异常头信息。

In addition, you can completely customize the addition of exception headers by adding an ExceptionHeadersCreator; this also disables all standard exception headers.

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
    kafkaHeaders.add(new RecordHeader(..., ...));
});

同样从版本 2.8.4 开始,您现在可以通过 addHeadersFunction 方法提供多个 header 函数。这允许应用附加函数,即使另一个函数已经注册,例如在使用 Non-Blocking Retries 时。

Also starting with version 2.8.4, you can now provide multiple headers functions, via the addHeadersFunction method. This allows additional functions to apply, even if another function has already been registered, for example, when using Non-Blocking Retries.

ExponentialBackOffWithMaxRetries Implementation

Spring Framework 提供了许多 BackOff 实现。默认情况下,ExponentialBackOff 将无限重试;在尝试了几次重试后放弃需要计算 maxElapsedTime。自 2.7.3 版开始,Spring for Apache Kafka 提供了 ExponentialBackOffWithMaxRetries,这是一个收到 maxRetries 属性并自动计算 maxElapsedTime 的子类,这更方便一些。

Spring Framework provides a number of BackOff implementations. By default, the ExponentialBackOff will retry indefinitely; to give up after some number of retry attempts requires calculating the maxElapsedTime. Since version 2.7.3, Spring for Apache Kafka provides the ExponentialBackOffWithMaxRetries which is a subclass that receives the maxRetries property and automatically calculates the maxElapsedTime, which is a little more convenient.

@Bean
DefaultErrorHandler handler() {
    ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
    bo.setInitialInterval(1_000L);
    bo.setMultiplier(2.0);
    bo.setMaxInterval(10_000L);
    return new DefaultErrorHandler(myRecoverer, bo);
}

这将在调用恢复器之前重试 1、2、4、8、10、10 秒。

This will retry after 1, 2, 4, 8, 10, 10 seconds, before calling the recoverer.