@RabbitListener with Batching

在收到 a batch 消息时,批处理通常由容器执行,并且侦听器每次调用都会收到一条消息。从 2.2 版开始,您可以将侦听器容器工厂和侦听器配置为一次接收全部批处理,只需设置工厂的 batchListener 属性,并将方法有效负载参数设为 ListCollection

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setBatchListener(true);
    return factory;
}

@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
    ...
}

// or

@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
    ...
}

batchListener 属性设置为 true 将自动关闭工厂创建的容器中的 deBatchingEnabled 容器属性(除非 consumerBatchEnabledtrue - 请参见下文)。实际上,去批处理已从容器移动到监听器适配器,并且适配器创建传递给监听器列表。

启用批处理的工厂无法与 multi-method listener一起使用。

此外,从 2.2 版本开始,当一次接收成批消息时,最后一条消息包含一布尔值头,该头设置为 true。可以通过将 @Header(AmqpHeaders.LAST_IN_BATCH) 布尔值最后一个添加到监听器方法来获取此头。该头从 MessageProperties.isLastInBatch() 映射而来。此外,AmqpHeaders.BATCH_SIZE 会填充为所有消息片段中批处理的大小。

此外,已向 SimpleMessageListenerContainer 添加`consumerBatchEnabled` 的新属性。当此值为 true 时,容器会创建一个多达 batchSize 的消息批次;如果没有新消息到达,则在 receiveTimeout 经过时传递部分批次。如果收到生产者创建的批次,则会将其解批并添加到消费者侧的批次中;因此,传递的实际消息数量可能超过 batchSize,后者表示从代理接收到的消息数量。当 consumerBatchEnabled 为 true 时,deBatchingEnabled 必须为 true;容器工厂会强制执行这一要求。

@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(rabbitConnectionFactory());
    factory.setConsumerTagStrategy(consumerTagStrategy());
    factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
    factory.setBatchSize(2);
    factory.setConsumerBatchEnabled(true);
    return factory;
}

当将 consumerBatchEnabled@RabbitListener 配合使用时:

@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
    ...
}

@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
    ...
}

@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
    ...
}
  • 第一个使用收到的原始未转换 org.springframework.amqp.core.Message 调用。

  • 第二个使用 org.springframework.messaging.Message&lt;?&gt;(有效负载已转换,标头/属性已映射)。

  • 第三个使用已转换的有效负载调用,无权访问标头/属性。

你还可以添加 Channel 参数,通常在使用 MANUAL ack 模式时使用。第三个示例没有对此使用过多,因为你无法访问 delivery_tag 属性。

Spring Boot 为 consumerBatchEnabledbatchSize 提供了一个配置属性,但没有为 batchListener 提供这样的属性。从版本 3.0 开始,在容器工厂中将 consumerBatchEnabled 设置为 true 也会将 batchListener 设置为 true。当 consumerBatchEnabledtrue 时,监听器*必须*为批次监听器。

从版本 3.0 开始,监听器方法可以消费 Collection<?>List<?>