Strict Message Ordering
本部分描述了入站消息和出站消息的消息排序。
Inbound
如果需要对入站消息进行严格排序,则必须将入站侦听器容器的 fetchCount
属性配置为 1
。这是因为,如果一条消息发送失败并重新投递,它将在现有预取消息之后到达。从 Spring AMQP 2.0 版开始,fetchCount
默认为 250
,以提高性能。严格的排序要求是以降低性能为代价的。
Outbound
考虑以下集成流程:
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(","))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
假设我们向网关发送消息 A
、B
和 C
。虽然消息 A
、B
、C
按顺序发送的可能性很大,但这并不能保证。这是因为模板会为每次发送操作从缓存中“借用”一个通道,并且不能保证每个消息都会使用同一个通道。一种解决方案是在拆分器之前启动事务,但事务在 RabbitMQ 中开销很大,并且会使性能降低几百倍。
为了更高效地解决此问题,从 5.1 版开始,Spring Integration 提供了 BoundRabbitChannelAdvice
,它是一个 HandleMessageAdvice
。请参阅 Handling Message Advice。当在分隔器之前应用时,可确保在同一通道上执行所有下游操作,并且可以有选择地等待接收到所有已发送消息的发布者确认(如果连接工厂配置为接收确认)。以下示例演示如何使用 BoundRabbitChannelAdvice
:
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(",")
.advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
请注意,建议和出站适配器中都使用了相同的 RabbitTemplate
(实现 RabbitOperations
)。该建议在模板的 invoke
方法中运行下游流程,以便所有操作都在同一个通道上运行。如果提供了可选超时,则在流程完成时,建议将调用 waitForConfirmsOrDie
方法,该方法会在未在指定时间内收到确认时引发异常。
下游流中不得进行线程传递(QueueChannel
、ExecutorChannel
和其他)。