Dead-Letter Topic Processing

  • 提供明确的消费者组来启用 DLQ 特性。

  • spring.cloud.stream.kafka.bindings.<binding-name>.consumer.enable-dlq 属性设置为 true,以便接收错误记录。

  • 设置 spring.cloud.stream.bindings.<binding-name>.consumer.max-attempts 属性的值大于 1,以控制重试次数。

  • 启用 DLQ 时,max-attempts 属性将优先考虑重试行为。否则,在禁用 DLQ 时,默认的重试次数设置为 10 次。

  • 应用程序必须自己处理死信,因为它没有提供标准机制。

  • 示例应用程序展示了如何将死信重新路由到原始主题,但在尝试三次后将其移动到停车位主题。

Enabling DLQ

要启用 DLQ,基于 Kafka binder 的应用程序必须通过属性 spring.cloud.stream.bindings.<binding-name>.group 提供消费者组。匿名消费者组(即应用程序没有明确提供组)无法启用 DLQ 特性。

当应用程序希望将错误记录发送到 DLQ 主题时,该应用程序必须启用 DLQ 特性,因为该特性默认情况下并未启用。要启用 DLQ,必须将属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.enable-dlq 设置为 true。

启用 DLQ 后,在处理过程中发生错误并且基于 spring.cloud.stream.bindings.<binding-name>.consumer.max-attempts 属性耗尽所有重试后,该记录将被发送到 DLQ 主题。

默认情况下,max-attempts 属性设置为三个。当 max-attempts 属性大于 1 且已启用 dlq 时,您将看到重试会遵守 max-attempts 属性。在未启用 dlq(这是默认设置)时,max-attempts 属性与重试处理方式无关。在这种情况下,重试将恢复为 Spring for Apache Kafka 中的容器默认设置,即重试 10 次。如果应用程序希望在禁用 DLQ 时完全禁用重试,则将 max-attempts 属性设置为 1 将不起作用。要在此情况下完全禁用重试,您需要提供 ListenerContainerCustomizer,然后使用合适的 Backoff 设置。示例如下。

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
	return (container, destinationName, group) -> {
		var commonErrorHandler = new DefaultErrorHandler(new FixedBackOff(0L, 0l));
		container.setCommonErrorHandler(commonErrorHandler);
	};
}

这样,将禁用默认容器行为,并且不会尝试进行重试。如上所述,在启用 DLQ 时,binder 设置将具有优先权。

Handling Records in a Dead-Letter Topic

因为该框架无法预料用户希望如何处置死信,所以它不提供任何标准的机制来处理死信。如果死信的原因是临时性的,您可能希望将消息重新路由到原始主题。但是,如果问题是永久性的,则可能导致死循环。本主题中的示例 Spring Boot 应用程序展示了如何将那些消息重新路由到原始主题,但它在尝试三次后将其移动到 “停车位” 主题。该应用程序是另一个从死信主题读取的 spring-cloud-stream 应用程序。当 5 秒内没有收到消息时,它将退出。

这些示例假设原始目标为 so8400out,而消费者组为 so8400

有几种策略需要考虑:

  • 考虑仅在未运行主应用程序时运行重新路由。否则,瞬态错误的重试会很快用尽。

  • 或者,使用两阶段方法:使用此应用程序路由到第三方主题,再使用另一应用程序路由回主主题。

以下代码列表显示了示例应用程序:

application.properties
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400

spring.cloud.stream.bindings.output.destination=so8400out

spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot

spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest

spring.cloud.stream.kafka.binder.headers=x-retries
Application
@SpringBootApplication
public class ReRouteDlqKApplication implements CommandLineRunner {

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) {
        SpringApplication.run(ReRouteDlqKApplication.class, args).close();
    }

    private final AtomicInteger processed = new AtomicInteger();

    @Autowired
    private StreamBridge streamBridge;

    @Bean
    public Function<Message<?>, Message<?>> reRoute() {
        return failed -> {
            processed.incrementAndGet();
            Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
            if (retries == null) {
                System.out.println("First retry for " + failed);
                return MessageBuilder.fromMessage(failed)
                        .setHeader(X_RETRIES_HEADER, 1)
                        .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                                failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                        .build();
            }
            else if (retries < 3) {
                System.out.println("Another retry for " + failed);
                return MessageBuilder.fromMessage(failed)
                        .setHeader(X_RETRIES_HEADER, retries + 1)
                        .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                                failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                        .build();
            }
            else {
                System.out.println("Retries exhausted for " + failed);
                streamBridge.send("parkingLot", MessageBuilder.fromMessage(failed)
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build());
            }
            return null;
        };
    }

    @Override
    public void run(String... args) throws Exception {
        while (true) {
            int count = this.processed.get();
            Thread.sleep(5000);
            if (count == this.processed.get()) {
                System.out.println("Idle, exiting");
                return;
            }
        }
    }
}