Retry and Dead Letter Processing
当重试和延迟的总和超过消费者的 max.poll.interval.ms 属性,或者需要将死信发布到不同的 Kafka 集群,或者需要将重试监听器添加到错误处理程序时,就需要进行此移动。
文章提供了如何配置将这些功能从 Binder 移至容器的说明,包括如何定义 @Bean 和如何使用 BinderCustomizer 设置容器自定义器。
默认情况下,当你配置消费者绑定中的重试(例如,maxAttemts
)和 enableDlq
时,这些函数将在 Binder 内执行,而侦听器容器或 Kafka 消费者不会参与。
By default, when you configure retry (e.g. maxAttemts
) and enableDlq
in a consumer binding, these functions are performed within the binder, with no participation by the listener container or Kafka consumer.
在某些情况下,最好将此功能移至侦听器容器,例如:
There are situations where it is preferable to move this functionality to the listener container, such as:
-
The aggregate of retries and delays will exceed the consumer’s
max.poll.interval.ms
property, potentially causing a partition rebalance. -
You wish to publish the dead letter to a different Kafka cluster.
-
You wish to add retry listeners to the error handler.
-
…
要配置将此功能从 Binder 移至容器,请定义类型为 ListenerContainerWithDlqAndRetryCustomizer
的 @Bean
。此接口具有以下方法:
To configure moving this functionality from the binder to the container, define a @Bean
of type ListenerContainerWithDlqAndRetryCustomizer
.
This interface has the following methods:
/**
* Configure the container.
* @param container the container.
* @param destinationName the destination name.
* @param group the group.
* @param dlqDestinationResolver a destination resolver for the dead letter topic (if
* enableDlq).
* @param backOff the backOff using retry properties (if configured).
* @see #retryAndDlqInBinding(String, String)
*/
void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff);
/**
* Return false to move retries and DLQ from the binding to a customized error handler
* using the retry metadata and/or a {@code DeadLetterPublishingRecoverer} when
* configured via
* {@link #configure(AbstractMessageListenerContainer, String, String, BiFunction, BackOff)}.
* @param destinationName the destination name.
* @param group the group.
* @return false to disable retries and DLQ in the binding
*/
default boolean retryAndDlqInBinding(String destinationName, String group) {
return true;
}
目标解析器和 BackOff
是通过绑定属性(如果已配置)创建的。KafkaTemplate
使用 spring.kafka….
属性中的配置。然后,你可以使用它们创建自定义错误处理程序和死信发布者;例如:
The destination resolver and BackOff
are created from the binding properties (if configured). The KafkaTemplate
uses configuration from spring.kafka….
properties. You can then use these to create a custom error handler and dead letter publisher; for example:
@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
return new ListenerContainerWithDlqAndRetryCustomizer() {
@Override
public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff) {
if (destinationName.equals("topicWithLongTotalRetryConfig")) {
ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template,
dlqDestinationResolver);
container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
}
}
@Override
public boolean retryAndDlqInBinding(String destinationName, String group) {
return !destinationName.contains("topicWithLongTotalRetryConfig");
}
};
}
现在,只需一个重试延迟需要大于消费者的 max.poll.interval.ms
属性。
Now, only a single retry delay needs to be greater than the consumer’s max.poll.interval.ms
property.
与多个绑定一起工作时,“DefaultBinderFactory” 会覆盖 ListenerContainerWithDlqAndRetryCustomizer
bean。要应用 Bean,你需要使用 BinderCustomizer
设置容器自定义器(参见 [binder-customizer]):
When working with several binders, the 'ListenerContainerWithDlqAndRetryCustomizer' bean gets overridden by the 'DefaultBinderFactory'. For the bean to apply, you need to use a 'BinderCustomizer' to set the container customizer (See [binder-customizer]):
@Bean
public BinderCustomizer binderCustomizer(ListenerContainerWithDlqAndRetryCustomizer containerCustomizer) {
return (binder, binderName) -> {
if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
kafkaMessageChannelBinder.setContainerCustomizer(containerCustomizer);
}
else if (binder instanceof KStreamBinder) {
...
}
else if (binder instanceof RabbitMessageChannelBinder) {
...
}
};
}