Filtering Messages
在某些情况下,例如重新平衡,可能重新发送已处理的消息。该框架无法识别此类消息是否已处理。这是一个应用程序级函数。这称为“ Idempotent Receiver”模式,Spring Integration 提供了一个“ implementation”模式。
In certain scenarios, such as rebalancing, a message that has already been processed may be redelivered. The framework cannot know whether such a message has been processed or not. That is an application-level function. This is known as the Idempotent Receiver pattern and Spring Integration provides an implementation of it.
适用于 Apache Kafka 的 Spring 项目还通过 FilteringMessageListenerAdapter
类提供了一些帮助,该类可以包装您的 MessageListener
。此类采用 RecordFilterStrategy
的实现,您可以在其中实现 filter
方法,以表示一条消息是重复的,应该被丢弃。这有一个称为 ackDiscarded
的附加属性,表示该适配器是否应该确认已丢弃的记录。它默认值为 false
。
The Spring for Apache Kafka project also provides some assistance by means of the FilteringMessageListenerAdapter
class, which can wrap your MessageListener
.
This class takes an implementation of RecordFilterStrategy
in which you implement the filter
method to signal that a message is a duplicate and should be discarded.
This has an additional property called ackDiscarded
, which indicates whether the adapter should acknowledge the discarded record.
It is false
by default.
当您使用 @KafkaListener
时,在容器工厂上设置 RecordFilterStrategy
(以及可选的 ackDiscarded
),以便侦听器被包装在合适的筛选适配器中。
When you use @KafkaListener
, set the RecordFilterStrategy
(and optionally ackDiscarded
) on the container factory so that the listener is wrapped in the appropriate filtering adapter.
此外,还提供了 FilteringBatchMessageListenerAdapter
,供你在使用批处理 message listener 时使用。
In addition, a FilteringBatchMessageListenerAdapter
is provided, for when you use a batch message listener.
如果 @KafkaListener
收到 List<ConsumerRecord<?, ?>>
而不是 ConsumerRecords<?, ?>
,则将忽略 FilteringBatchMessageListenerAdapter
,因为 ConsumerRecords
是不可变的。
The FilteringBatchMessageListenerAdapter
is ignored if your @KafkaListener
receives a ConsumerRecords<?, ?>
instead of List<ConsumerRecord<?, ?>>
, because ConsumerRecords
is immutable.
从 2.8.4 版开始,你可以使用侦听器注释上的 filter
属性覆盖侦听器容器工厂的默认 RecordFilterStrategy
。
Starting with version 2.8.4, you can override the listener container factory’s default RecordFilterStrategy
by using the filter
property on the listener annotations.
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
...
}