Consuming Batches

从 3.0 版本开始,当 spring.cloud.stream.bindings.<name>.consumer.batch-mode 设置为 true 时,所有通过轮询 Kafka Consumer 接收的记录都将作为 List<?> 呈现给侦听器方法。否则,该方法会每次调用一个记录。批次大小由 Kafka 消费者属性 max.poll.recordsfetch.min.bytesfetch.max.wait.ms 控制;请参阅 Kafka 文档了解更多信息。

Starting with version 3.0, when spring.cloud.stream.bindings.<name>.consumer.batch-mode is set to true, all of the records received by polling the Kafka Consumer will be presented as a List<?> to the listener method. Otherwise, the method will be called with one record at a time. The size of the batch is controlled by Kafka consumer properties max.poll.records, fetch.min.bytes, fetch.max.wait.ms; refer to the Kafka documentation for more information.

接收批次时,允许以下类型签名:

When receiving the batches, the following type signatures are allowed:

List<Person>
Message<List<Person>>

List<Person> 的第一个选项中,侦听器不会获得任何消息头。如果使用第二个类型签名 (Message<List<Person>>),则可以访问这些头;但是,所有头仍采用 Collection 形式。我们来看看下面的示例。

In the first option of List<Person>, the listener will not get any message headers. If the second type signature (Message<List<Person>>) is used, then the headers can be accessed; however, all the headers are still be in the form of a Collection. Let’s take the following example.

假设 Message 包含一个包含十个 Person 对象的列表。MessageMessageHeaders 包含一个头映射,其中键是头名称,值是列表。此列表包含该头在与有效负载列表相同的顺序中的头值。因此,应用程序需要根据有效负载列表的迭代从 MessageHeaders 映射正确访问头。

Assume that the Message contains a list with ten Person objects. The MessageHeaders of the Message contains a map of headers with key as the header name and value as a list. This list contains the header value for that header in the same order as the payload list. Therefore, it is up to the application to correctly access the header from the MessageHeaders map based on the iteration of the payload list.

请注意,消费批处理模式时不允许使用 List<Message<Person>> 形式的类型签名。

Note that, type signatures in the form of List<Message<Person>> is not allowed when consuming in batch-mode.

从版本 4.0.2 开始,绑定器在以批处理模式消费时支持 DLQ 功能。请记住,当对处于批处理模式的消费者绑定使用 DLQ 时,从前一次轮询接收的所有记录都将传递到 DLQ 主题。

Starting with version 4.0.2, the binder supports DLQ capabilities when consuming in batch mode. Keep in mind that, when using DLQ on a consumer binding that is in batch mode, all the records received from the previous poll will be delivered to the DLQ topic.

使用批处理模式时,不支持在 Binder 内重试,因此将 maxAttempts 覆盖为 1。你可以配置一个 DefaultErrorHandler(使用 ListenerContainerCustomizer)以实现类似于在 Binder 内重试的功能。你还可以使用手动 AckMode 并调用 Ackowledgment.nack(index, sleep) 来提交部分批量的偏移量,并重新传递剩余的记录。有关这些技术的更多信息,请参阅 Spring for Apache Kafka documentation

Retry within the binder is not supported when using batch mode, so maxAttempts will be overridden to 1. You can configure a DefaultErrorHandler (using a ListenerContainerCustomizer) to achieve similar functionality to retry in the binder. You can also use a manual AckMode and call Ackowledgment.nack(index, sleep) to commit the offsets for a partial batch and have the remaining records redelivered. Refer to the Spring for Apache Kafka documentation for more information about these techniques.

在批处理模式下接收 KafkaNull 对象时,所接收的列表将包含一个对应的 KafkaNull 对象的空元素。对于 List<Person>Message<List<Person>> 样式类型签名来说,这是正确的。

When receiving KafkaNull objects in the batch-mode, the received list will contain a null element for the corresponding KafkaNull object. This is true for both List<Person> and Message<List<Person>> style type signatures.