Retry and Dead Letter Processing

当重试和延迟的总和超过消费者的 属性,或者需要将死信发布到不同的 Kafka 集群,或者需要将重试监听器添加到错误处理程序时,就需要进行此移动。

文章提供了如何配置将这些功能从 Binder 移至容器的说明,包括如何定义 @Bean 和如何使用 BinderCustomizer 设置容器自定义器。

默认情况下,当你配置消费者绑定中的重试(例如,maxAttemts)和 enableDlq 时,这些函数将在 Binder 内执行,而侦听器容器或 Kafka 消费者不会参与。

By default, when you configure retry (e.g. maxAttemts) and enableDlq in a consumer binding, these functions are performed within the binder, with no participation by the listener container or Kafka consumer.


There are situations where it is preferable to move this functionality to the listener container, such as:

  • The aggregate of retries and delays will exceed the consumer’s property, potentially causing a partition rebalance.

  • You wish to publish the dead letter to a different Kafka cluster.

  • You wish to add retry listeners to the error handler.

  • …​

要配置将此功能从 Binder 移至容器,请定义类型为 ListenerContainerWithDlqAndRetryCustomizer@Bean。此接口具有以下方法:

To configure moving this functionality from the binder to the container, define a @Bean of type ListenerContainerWithDlqAndRetryCustomizer. This interface has the following methods:

 * Configure the container.
 * @param container the container.
 * @param destinationName the destination name.
 * @param group the group.
 * @param dlqDestinationResolver a destination resolver for the dead letter topic (if
 * enableDlq).
 * @param backOff the backOff using retry properties (if configured).
 * @see #retryAndDlqInBinding(String, String)
void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group,
        @Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
        @Nullable BackOff backOff);

 * Return false to move retries and DLQ from the binding to a customized error handler
 * using the retry metadata and/or a {@code DeadLetterPublishingRecoverer} when
 * configured via
 * {@link #configure(AbstractMessageListenerContainer, String, String, BiFunction, BackOff)}.
 * @param destinationName the destination name.
 * @param group the group.
 * @return false to disable retries and DLQ in the binding
default boolean retryAndDlqInBinding(String destinationName, String group) {
    return true;

目标解析器和 BackOff 是通过绑定属性(如果已配置)创建的。KafkaTemplate 使用 spring.kafka…​. 属性中的配置。然后,你可以使用它们创建自定义错误处理程序和死信发布者;例如:

The destination resolver and BackOff are created from the binding properties (if configured). The KafkaTemplate uses configuration from spring.kafka…​. properties. You can then use these to create a custom error handler and dead letter publisher; for example:

ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
    return new ListenerContainerWithDlqAndRetryCustomizer() {

        public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
                String group,
                @Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
                @Nullable BackOff backOff) {

            if (destinationName.equals("topicWithLongTotalRetryConfig")) {
                ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template,
                container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));

        public boolean retryAndDlqInBinding(String destinationName, String group) {
            return !destinationName.contains("topicWithLongTotalRetryConfig");


现在,只需一个重试延迟需要大于消费者的 属性。

Now, only a single retry delay needs to be greater than the consumer’s property.

与多个绑定一起工作时,“DefaultBinderFactory” 会覆盖 ListenerContainerWithDlqAndRetryCustomizer bean。要应用 Bean,你需要使用 BinderCustomizer 设置容器自定义器(参见 [binder-customizer]):

When working with several binders, the 'ListenerContainerWithDlqAndRetryCustomizer' bean gets overridden by the 'DefaultBinderFactory'. For the bean to apply, you need to use a 'BinderCustomizer' to set the container customizer (See [binder-customizer]):

public BinderCustomizer binderCustomizer(ListenerContainerWithDlqAndRetryCustomizer containerCustomizer) {
    return (binder, binderName) -> {
        if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
        else if (binder instanceof KStreamBinder) {
        else if (binder instanceof RabbitMessageChannelBinder) {