@RabbitListener with Batching
在收到 a batch 消息时,批处理通常由容器执行,并且侦听器每次调用都会收到一条消息。从 2.2 版开始,您可以将侦听器容器工厂和侦听器配置为一次接收全部批处理,只需设置工厂的 batchListener
属性,并将方法有效负载参数设为 List
或 Collection
:
When receiving a a batch of messages, the de-batching is normally performed by the container and the listener is invoked with one message at at time.
Starting with version 2.2, you can configure the listener container factory and listener to receive the entire batch in one call, simply set the factory’s batchListener
property, and make the method payload parameter a List
or Collection
:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setBatchListener(true);
return factory;
}
@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
...
}
// or
@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
...
}
将 batchListener
属性设置为 true 将自动关闭工厂创建的容器中的 deBatchingEnabled
容器属性(除非 consumerBatchEnabled
为 true
- 请参见下文)。实际上,去批处理已从容器移动到监听器适配器,并且适配器创建传递给监听器列表。
Setting the batchListener
property to true automatically turns off the deBatchingEnabled
container property in containers that the factory creates (unless consumerBatchEnabled
is true
- see below). Effectively, the debatching is moved from the container to the listener adapter and the adapter creates the list that is passed to the listener.
启用批处理的工厂无法与 multi-method listener一起使用。
A batch-enabled factory cannot be used with a multi-method listener.
此外,从 2.2 版本开始,当一次接收成批消息时,最后一条消息包含一布尔值头,该头设置为 true
。可以通过将 @Header(AmqpHeaders.LAST_IN_BATCH)
布尔值最后一个添加到监听器方法来获取此头。该头从 MessageProperties.isLastInBatch()
映射而来。此外,AmqpHeaders.BATCH_SIZE
会填充为所有消息片段中批处理的大小。
Also starting with version 2.2. when receiving batched messages one-at-a-time, the last message contains a boolean header set to true
.
This header can be obtained by adding the @Header(AmqpHeaders.LAST_IN_BATCH)
boolean last` parameter to your listener method.
The header is mapped from MessageProperties.isLastInBatch()
.
In addition, AmqpHeaders.BATCH_SIZE
is populated with the size of the batch in every message fragment.
此外,已向 SimpleMessageListenerContainer
添加`consumerBatchEnabled` 的新属性。当此值为 true 时,容器会创建一个多达 batchSize
的消息批次;如果没有新消息到达,则在 receiveTimeout
经过时传递部分批次。如果收到生产者创建的批次,则会将其解批并添加到消费者侧的批次中;因此,传递的实际消息数量可能超过 batchSize
,后者表示从代理接收到的消息数量。当 consumerBatchEnabled
为 true 时,deBatchingEnabled
必须为 true;容器工厂会强制执行这一要求。
In addition, a new property consumerBatchEnabled
has been added to the SimpleMessageListenerContainer
.
When this is true, the container will create a batch of messages, up to batchSize
; a partial batch is delivered if receiveTimeout
elapses with no new messages arriving.
If a producer-created batch is received, it is debatched and added to the consumer-side batch; therefore the actual number of messages delivered may exceed batchSize
, which represents the number of messages received from the broker.
deBatchingEnabled
must be true when consumerBatchEnabled
is true; the container factory will enforce this requirement.
@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setConsumerTagStrategy(consumerTagStrategy());
factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
factory.setBatchSize(2);
factory.setConsumerBatchEnabled(true);
return factory;
}
当将 consumerBatchEnabled
与 @RabbitListener
配合使用时:
When using consumerBatchEnabled
with @RabbitListener
:
@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
...
}
@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
...
}
@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
...
}
-
the first is called with the raw, unconverted
org.springframework.amqp.core.Message
s received. -
the second is called with the
org.springframework.messaging.Message<?>
s with converted payloads and mapped headers/properties. -
the third is called with the converted payloads, with no access to headers/properties.
你还可以添加 Channel
参数,通常在使用 MANUAL
ack 模式时使用。第三个示例没有对此使用过多,因为你无法访问 delivery_tag
属性。
You can also add a Channel
parameter, often used when using MANUAL
ack mode.
This is not very useful with the third example because you don’t have access to the delivery_tag
property.
Spring Boot 为 consumerBatchEnabled
和 batchSize
提供了一个配置属性,但没有为 batchListener
提供这样的属性。从版本 3.0 开始,在容器工厂中将 consumerBatchEnabled
设置为 true
也会将 batchListener
设置为 true
。当 consumerBatchEnabled
为 true
时,监听器*必须*为批次监听器。
Spring Boot provides a configuration property for consumerBatchEnabled
and batchSize
, but not for batchListener
.
Starting with version 3.0, setting consumerBatchEnabled
to true
on the container factory also sets batchListener
to true
.
When consumerBatchEnabled
is true
, the listener must be a batch listener.
从版本 3.0 开始,监听器方法可以消费 Collection<?>
或 List<?>
。
Starting with version 3.0, listener methods can consume Collection<?>
or List<?>
.