Filtering Messages

在某些情况下,例如重新平衡,可能重新发送已处理的消息。该框架无法识别此类消息是否已处理。这是一个应用程序级函数。这称为“ Idempotent Receiver”模式,Spring Integration 提供了一个“ implementation”模式。

In certain scenarios, such as rebalancing, a message that has already been processed may be redelivered. The framework cannot know whether such a message has been processed or not. That is an application-level function. This is known as the Idempotent Receiver pattern and Spring Integration provides an implementation of it.

适用于 Apache Kafka 的 Spring 项目还通过 FilteringMessageListenerAdapter 类提供了一些帮助,该类可以包装您的 MessageListener。此类采用 RecordFilterStrategy 的实现,您可以在其中实现 filter 方法,以表示一条消息是重复的,应该被丢弃。这有一个称为 ackDiscarded 的附加属性,表示该适配器是否应该确认已丢弃的记录。它默认值为 false

The Spring for Apache Kafka project also provides some assistance by means of the FilteringMessageListenerAdapter class, which can wrap your MessageListener. This class takes an implementation of RecordFilterStrategy in which you implement the filter method to signal that a message is a duplicate and should be discarded. This has an additional property called ackDiscarded, which indicates whether the adapter should acknowledge the discarded record. It is false by default.

当您使用 @KafkaListener 时,在容器工厂上设置 RecordFilterStrategy(以及可选的 ackDiscarded),以便侦听器被包装在合适的筛选适配器中。

When you use @KafkaListener, set the RecordFilterStrategy (and optionally ackDiscarded) on the container factory so that the listener is wrapped in the appropriate filtering adapter.

此外,还提供了 FilteringBatchMessageListenerAdapter,供你在使用批处理 message listener 时使用。

In addition, a FilteringBatchMessageListenerAdapter is provided, for when you use a batch message listener.

如果 @KafkaListener 收到 List<ConsumerRecord<?, ?>> 而不是 ConsumerRecords<?, ?>,则将忽略 FilteringBatchMessageListenerAdapter,因为 ConsumerRecords 是不可变的。

The FilteringBatchMessageListenerAdapter is ignored if your @KafkaListener receives a ConsumerRecords<?, ?> instead of List<ConsumerRecord<?, ?>>, because ConsumerRecords is immutable.

从 2.8.4 版开始,你可以使用侦听器注释上的 filter 属性覆盖侦听器容器工厂的默认 RecordFilterStrategy

Starting with version 2.8.4, you can override the listener container factory’s default RecordFilterStrategy by using the filter property on the listener annotations.

@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
    ...
}