Dead-Letter Topic Processing
-
提供明确的消费者组来启用 DLQ 特性。
-
将
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.enable-dlq
属性设置为true
,以便接收错误记录。 -
设置
spring.cloud.stream.bindings.<binding-name>.consumer.max-attempts
属性的值大于 1,以控制重试次数。 -
启用 DLQ 时,
max-attempts
属性将优先考虑重试行为。否则,在禁用 DLQ 时,默认的重试次数设置为 10 次。 -
应用程序必须自己处理死信,因为它没有提供标准机制。
-
示例应用程序展示了如何将死信重新路由到原始主题,但在尝试三次后将其移动到停车位主题。
Enabling DLQ
要启用 DLQ,基于 Kafka binder 的应用程序必须通过属性 spring.cloud.stream.bindings.<binding-name>.group
提供消费者组。匿名消费者组(即应用程序没有明确提供组)无法启用 DLQ 特性。
To enable DLQ, a Kafka binder based applications must provide a consumer group via the property spring.cloud.stream.bindings.<binding-name>.group
.
Anonymous consumer groups (i.e, where the application does not explicitly provide a group) cannot enable the DLQ feature.
当应用程序希望将错误记录发送到 DLQ 主题时,该应用程序必须启用 DLQ 特性,因为该特性默认情况下并未启用。要启用 DLQ,必须将属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.enable-dlq
设置为 true。
When an application wants to send the record in error to a DLQ topic, that application must enable the DLQ feature, since this is not enabled by default.
To enable DLQ, the property spring.cloud.stream.kafka.bindings.<binding-name>.consumer.enable-dlq
must be set to true.
启用 DLQ 后,在处理过程中发生错误并且基于 spring.cloud.stream.bindings.<binding-name>.consumer.max-attempts
属性耗尽所有重试后,该记录将被发送到 DLQ 主题。
When DLQ is enabled, then after an error occurs from processing and all the retries are exhausted based on the spring.cloud.stream.bindings.<binding-name>.consumer.max-attempts
property, then that record will be sent to the DLQ topic.
默认情况下,max-attempts
属性设置为三个。当 max-attempts
属性大于 1
且已启用 dlq 时,您将看到重试会遵守 max-attempts
属性。在未启用 dlq(这是默认设置)时,max-attempts
属性与重试处理方式无关。在这种情况下,重试将恢复为 Spring for Apache Kafka 中的容器默认设置,即重试 10
次。如果应用程序希望在禁用 DLQ 时完全禁用重试,则将 max-attempts
属性设置为 1
将不起作用。要在此情况下完全禁用重试,您需要提供 ListenerContainerCustomizer
,然后使用合适的 Backoff
设置。示例如下。
By default, the max-attempts
property is set to three.
When max-attempts
property is greater than 1
, and dlq is enabled, then you will see that the retries are honoring the max-attempts
property.
When no dlq is enabled (which is the default), then the max-attempts
property does not have any bearing in the way how retries are handled.
In that case, the retries will fall back to the container defaults in Spring for Apache Kafka, which is 10
retries.
If an application wants to disable retries altogether when DLQ is disabled, then setting max-attempts
property to 1
will not work.
To completely disable retries in that case, you need to provide a ListenerContainerCustomizer
and then use appropriate Backoff
settings.
Here is an example.
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
return (container, destinationName, group) -> {
var commonErrorHandler = new DefaultErrorHandler(new FixedBackOff(0L, 0l));
container.setCommonErrorHandler(commonErrorHandler);
};
}
这样,将禁用默认容器行为,并且不会尝试进行重试。如上所述,在启用 DLQ 时,binder 设置将具有优先权。
With this, the default container behavior will be disabled and no retries will be attempted. As noted above, when enabling DLQ, the binder settings will have precedence.
Handling Records in a Dead-Letter Topic
因为该框架无法预料用户希望如何处置死信,所以它不提供任何标准的机制来处理死信。如果死信的原因是临时性的,您可能希望将消息重新路由到原始主题。但是,如果问题是永久性的,则可能导致死循环。本主题中的示例 Spring Boot 应用程序展示了如何将那些消息重新路由到原始主题,但它在尝试三次后将其移动到 “停车位” 主题。该应用程序是另一个从死信主题读取的 spring-cloud-stream 应用程序。当 5 秒内没有收到消息时,它将退出。
Because the framework cannot anticipate how users would want to dispose of dead-lettered messages, it does not provide any standard mechanism to handle them. If the reason for the dead-lettering is transient, you may wish to route the messages back to the original topic. However, if the problem is a permanent issue, that could cause an infinite loop. The sample Spring Boot application within this topic is an example of how to route those messages back to the original topic, but it moves them to a “parking lot” topic after three attempts. The application is another spring-cloud-stream application that reads from the dead-letter topic. It exits when no messages are received for 5 seconds.
这些示例假设原始目标为 so8400out
,而消费者组为 so8400
。
The examples assume the original destination is so8400out
and the consumer group is so8400
.
有几种策略需要考虑:
There are a couple of strategies to consider:
-
Consider running the rerouting only when the main application is not running. Otherwise, the retries for transient errors are used up very quickly.
-
Alternatively, use a two-stage approach: Use this application to route to a third topic and another to route from there back to the main topic.
以下代码列表显示了示例应用程序:
The following code listings show the sample application:
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.headers=x-retries
@SpringBootApplication
public class ReRouteDlqKApplication implements CommandLineRunner {
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) {
SpringApplication.run(ReRouteDlqKApplication.class, args).close();
}
private final AtomicInteger processed = new AtomicInteger();
@Autowired
private StreamBridge streamBridge;
@Bean
public Function<Message<?>, Message<?>> reRoute() {
return failed -> {
processed.incrementAndGet();
Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
if (retries == null) {
System.out.println("First retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, 1)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else if (retries < 3) {
System.out.println("Another retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, retries + 1)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else {
System.out.println("Retries exhausted for " + failed);
streamBridge.send("parkingLot", MessageBuilder.fromMessage(failed)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build());
}
return null;
};
}
@Override
public void run(String... args) throws Exception {
while (true) {
int count = this.processed.get();
Thread.sleep(5000);
if (count == this.processed.get()) {
System.out.println("Idle, exiting");
return;
}
}
}
}