Receiving Batched Messages

  • 生产者创建的批处理:如果生产者启用批处理或消息由批处理模板创建,则批次元素将作为侦听器方法的单独调用返回。也可以将它们呈现为 List<>

  • 消费者端批处理:从 3.1 版开始,消费者可以配置为将多个入站消息组装到批次中,并将其作为 List<> 呈现给应用程序。批次中的消息数由 batch-sizereceive-timeout 属性指定。

使用 RabbitMQ Binder 时,消费者绑定处理两类批处理:

Batches Created by Producers

通常,如果生产者绑定具有 batch-enabled=true,(请参见 Rabbit Producer Properties),或消息由 BatchingRabbitTemplate 创建,批处理的元素作为对侦听器方法的单独调用返回。从 3.0 版本开始,如果将 spring.cloud.stream.bindings.<name>.consumer.batch-mode 设置为 true,则任何此类批处理都可以作为 List<?> 呈现给侦听器方法。

Consumer-side Batching

从 3.1 版开始,可以配置使用者将多个入站消息组装到一个批次中,并将该批次作为已转换有效内容的 List<?> 呈现给应用程序。以下简单应用程序演示了如何使用此技术:

spring.cloud.stream.bindings.input-in-0.group=someGroup

spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true

spring.cloud.stream.rabbit.bindings.input-in-0.consumer.enable-batching=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.batch-size=10
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.receive-timeout=200
@SpringBootApplication
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	Consumer<List<Thing>> input() {
		return list -> {
			System.out.println("Received " + list.size());
			list.forEach(thing -> {
				System.out.println(thing);

				// ...

			});
		};
	}

	@Bean
	public ApplicationRunner runner(RabbitTemplate template) {
		return args -> {
			template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value1\"}");
			template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value2\"}");
		};
	}

	public static class Thing {

		private String field;

		public Thing() {
		}

		public Thing(String field) {
			this.field = field;
		}

		public String getField() {
			return this.field;
		}

		public void setField(String field) {
			this.field = field;
		}

		@Override
		public String toString() {
			return "Thing [field=" + this.field + "]";
		}

	}

}
Received 2
Thing [field=value1]
Thing [field=value2]

批次中的消息数由 batch-sizereceive-timeout 属性指定;如果 receive-timeout 在没有新消息的情况下经过一段间隔,则会传送一个“短”批次。

使用者端批量处理仅受 container-type=simple 支持(默认选项)。

如果您希望检查使用者端批处理消息的标头,您应使用`Message<List<?>>`;标头在标头 AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS 中是一个 List<Map<String, Object>>,其中每个有效负载元素的标头在相应的索引中。以下是一个简单的示例:

@SpringBootApplication
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	Consumer<Message<List<Thing>>> input() {
		return msg -> {
			List<Thing> things = msg.getPayload();
			System.out.println("Received " + things.size());
			List<Map<String, Object>> headers =
					(List<Map<String, Object>>) msg.getHeaders().get(AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS);
			for (int i = 0; i < things.size(); i++) {
				System.out.println(things.get(i) + " myHeader=" + headers.get(i).get("myHeader"));

				// ...

			}
		};
	}

	@Bean
	public ApplicationRunner runner(RabbitTemplate template) {
		return args -> {
			template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value1\"}", msg -> {
				msg.getMessageProperties().setHeader("myHeader", "headerValue1");
				return msg;
			});
			template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value2\"}", msg -> {
				msg.getMessageProperties().setHeader("myHeader", "headerValue2");
				return msg;
			});
		};
	}

	public static class Thing {

		private String field;

		public Thing() {
		}

		public Thing(String field) {
			this.field = field;
		}

		public String getfield() {
			return this.field;
		}

		public void setfield(String field) {
			this.field = field;
		}

		@Override
		public String toString() {
			return "Thing [field=" + this.field + "]";
		}

	}

}
Received 2
Thing [field=value1] myHeader=headerValue1
Thing [field=value2] myHeader=headerValue2