Consuming Batches
从 3.0 版本开始,当 spring.cloud.stream.bindings.<name>.consumer.batch-mode
设置为 true
时,所有通过轮询 Kafka Consumer
接收的记录都将作为 List<?>
呈现给侦听器方法。否则,该方法会每次调用一个记录。批次大小由 Kafka 消费者属性 max.poll.records
、fetch.min.bytes
、fetch.max.wait.ms
控制;请参阅 Kafka 文档了解更多信息。
接收批次时,允许以下类型签名:
List<Person>
Message<List<Person>>
在 List<Person>
的第一个选项中,侦听器不会获得任何消息头。如果使用第二个类型签名 (Message<List<Person>>
),则可以访问这些头;但是,所有头仍采用 Collection
形式。我们来看看下面的示例。
假设 Message
包含一个包含十个 Person
对象的列表。Message
的 MessageHeaders
包含一个头映射,其中键是头名称,值是列表。此列表包含该头在与有效负载列表相同的顺序中的头值。因此,应用程序需要根据有效负载列表的迭代从 MessageHeaders
映射正确访问头。
请注意,消费批处理模式时不允许使用 List<Message<Person>>
形式的类型签名。
从版本 4.0.2
开始,绑定器在以批处理模式消费时支持 DLQ 功能。请记住,当对处于批处理模式的消费者绑定使用 DLQ 时,从前一次轮询接收的所有记录都将传递到 DLQ 主题。
使用批处理模式时,不支持在 Binder 内重试,因此将 maxAttempts
覆盖为 1。你可以配置一个 DefaultErrorHandler
(使用 ListenerContainerCustomizer
)以实现类似于在 Binder 内重试的功能。你还可以使用手动 AckMode
并调用 Ackowledgment.nack(index, sleep)
来提交部分批量的偏移量,并重新传递剩余的记录。有关这些技术的更多信息,请参阅 Spring for Apache Kafka documentation。
在批处理模式下接收 |