Filtering Messages

在某些情况下,例如重新平衡,可能重新发送已处理的消息。该框架无法识别此类消息是否已处理。这是一个应用程序级函数。这称为“ Idempotent Receiver”模式,Spring Integration 提供了一个“ implementation”模式。

适用于 Apache Kafka 的 Spring 项目还通过 FilteringMessageListenerAdapter 类提供了一些帮助,该类可以包装您的 MessageListener。此类采用 RecordFilterStrategy 的实现,您可以在其中实现 filter 方法,以表示一条消息是重复的,应该被丢弃。这有一个称为 ackDiscarded 的附加属性,表示该适配器是否应该确认已丢弃的记录。它默认值为 false

当您使用 @KafkaListener 时,在容器工厂上设置 RecordFilterStrategy(以及可选的 ackDiscarded),以便侦听器被包装在合适的筛选适配器中。

此外,还提供了 FilteringBatchMessageListenerAdapter,供你在使用批处理 message listener 时使用。

如果 @KafkaListener 收到 List<ConsumerRecord<?, ?>> 而不是 ConsumerRecords<?, ?>,则将忽略 FilteringBatchMessageListenerAdapter,因为 ConsumerRecords 是不可变的。

从 2.8.4 版开始,你可以使用侦听器注释上的 filter 属性覆盖侦听器容器工厂的默认 RecordFilterStrategy

@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
    ...
}