Strict Message Ordering

本部分描述了入站消息和出站消息的消息排序。

This section describes message ordering for inbound and outbound messages.

Inbound

如果需要对入站消息进行严格排序,则必须将入站侦听器容器的 fetchCount 属性配置为 1。这是因为,如果一条消息发送失败并重新投递,它将在现有预取消息之后到达。从 Spring AMQP 2.0 版开始,fetchCount 默认为 250,以提高性能。严格的排序要求是以降低性能为代价的。

If you require strict ordering of inbound messages, you must configure the inbound listener container’s prefetchCount property to 1. This is because, if a message fails and is redelivered, it arrives after existing prefetched messages. Since Spring AMQP version 2.0, the prefetchCount defaults to 250 for improved performance. Strict ordering requirements come at the cost of decreased performance.

Outbound

考虑以下集成流程:

Consider the following integration flow:

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .splitWith(s -> s.delimiters(","))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

假设我们向网关发送消息 ABC。虽然消息 ABC 按顺序发送的可能性很大,但这并不能保证。这是因为模板会为每次发送操作从缓存中“借用”一个通道,并且不能保证每个消息都会使用同一个通道。一种解决方案是在拆分器之前启动事务,但事务在 RabbitMQ 中开销很大,并且会使性能降低几百倍。

Suppose we send messages A, B and C to the gateway. While it is likely that messages A, B, C are sent in order, there is no guarantee. This is because the template “borrows” a channel from the cache for each send operation, and there is no guarantee that the same channel is used for each message. One solution is to start a transaction before the splitter, but transactions are expensive in RabbitMQ and can reduce performance several hundred-fold.

为了更高效地解决此问题,从 5.1 版开始,Spring Integration 提供了 BoundRabbitChannelAdvice,它是一个 HandleMessageAdvice。请参阅 Handling Message Advice。当在分隔器之前应用时,可确保在同一通道上执行所有下游操作,并且可以有选择地等待接收到所有已发送消息的发布者确认(如果连接工厂配置为接收确认)。以下示例演示如何使用 BoundRabbitChannelAdvice

To solve this problem in a more efficient manner, starting with version 5.1, Spring Integration provides the BoundRabbitChannelAdvice which is a HandleMessageAdvice. See Handling Message Advice. When applied before the splitter, it ensures that all downstream operations are performed on the same channel and, optionally, can wait until publisher confirmations for all sent messages are received (if the connection factory is configured for confirmations). The following example shows how to use BoundRabbitChannelAdvice:

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .splitWith(s -> s.delimiters(",")
                    .advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

请注意,建议和出站适配器中都使用了相同的 RabbitTemplate(实现 RabbitOperations)。该建议在模板的 invoke 方法中运行下游流程,以便所有操作都在同一个通道上运行。如果提供了可选超时,则在流程完成时,建议将调用 waitForConfirmsOrDie 方法,该方法会在未在指定时间内收到确认时引发异常。

Notice that the same RabbitTemplate (which implements RabbitOperations) is used in the advice and the outbound adapter. The advice runs the downstream flow within the template’s invoke method so that all operations run on the same channel. If the optional timeout is provided, when the flow completes, the advice calls the waitForConfirmsOrDie method, which throws an exception if the confirmations are not received within the specified time.

下游流中不得进行线程传递(QueueChannelExecutorChannel 和其他)。

There must be no thread hands-off in the downstream flow (QueueChannel, ExecutorChannel, and others).