Thread Barrier

有时,我们需要暂停消息流线程,直至发生其他异步事件。例如,考虑会发布消息到 RabbitMQ 的 HTTP 请求。我们可能希望在 RabbitMQ 代理发出消息已收到的确认之前,不向用户回复。

Sometimes, we need to suspend a message flow thread until some other asynchronous event occurs. For example, consider an HTTP request that publishes a message to RabbitMQ. We might wish to not reply to the user until the RabbitMQ broker has issued an acknowledgment that the message was received.

在 4.2 版本中,Spring Integration 为此目的引入了 <barrier/> 组件。底层 MessageHandlerBarrierMessageHandler。此类还实现了 MessageTriggerAction,其中传递给 trigger() 方法的消息在 handleRequestMessage() 方法(如果存在)中释放对应的线程。

In version 4.2, Spring Integration introduced the <barrier/> component for this purpose. The underlying MessageHandler is the BarrierMessageHandler. This class also implements MessageTriggerAction, in which a message passed to the trigger() method releases a corresponding thread in the handleRequestMessage() method (if present).

暂停的线程和触发线程通过在消息上调用 CorrelationStrategy 关联。将消息发送到 input-channel 时,线程将暂停最多 requestTimeout 毫秒,等待对应的触发消息。默认关联策略使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 标头。当具有相同关联的触发消息到达时,线程将被释放。在释放后发送到 output-channel 的消息是使用 MessageGroupProcessor 构建的。默认情况下,消息是两个有效负载的 Collection<?>,标头通过使用 DefaultAggregatingMessageGroupProcessor 合并。

The suspended thread and trigger thread are correlated by invoking a CorrelationStrategy on the messages. When a message is sent to the input-channel, the thread is suspended for up to requestTimeout milliseconds, waiting for a corresponding trigger message. The default correlation strategy uses the IntegrationMessageHeaderAccessor.CORRELATION_ID header. When a trigger message arrives with the same correlation, the thread is released. The message sent to the output-channel after release is constructed by using a MessageGroupProcessor. By default, the message is a Collection<?> of the two payloads, and the headers are merged by using a DefaultAggregatingMessageGroupProcessor.

如果首先调用 trigger() 方法(或在主线程超时之后),则它将挂起最长 triggerTimeout 时间,等待挂起的消息到达。如果您不想挂起触发器线程,请考虑改为移交给 TaskExecutor,以便改为挂起其线程。

If the trigger() method is invoked first (or after the main thread times out), it is suspended for up to triggerTimeout waiting for the suspending message to arrive. If you do not want to suspend the trigger thread, consider handing off to a TaskExecutor instead so that its thread is suspended instead.

在版本 5.4 之前,仅有一个 timeout 选项用于请求和触发消息,但在某些用例中,最好为这些操作设置不同的超时时间。因此引入了 requestTimeouttriggerTimeout 选项。

Prior version 5.4, there was only one timeout option for both request and trigger messages, but in some use-case it is better to have different timeouts for those actions. Therefore requestTimeout and triggerTimeout options have been introduced.

requires-reply 属性确定如果在触发消息到达之前挂起线程超时后执行的动作。默认情况下,它是 false,这意味着端点返回 null,流结束,线程返回到调用者。当为 true 时,会抛出 ReplyRequiredException

The requires-reply property determines the action to take if the suspended thread times out before the trigger message arrives. By default, it is false, which means the endpoint returns null, the flow ends, and the thread returns to the caller. When true, a ReplyRequiredException is thrown.

你可以通过编程方式调用 trigger() 方法(通过使用名称和“barrier.handler”获取 bean 引用——其中“barrier”是障碍端点的 bean 名称)。或者,你可以配置 <outbound-channel-adapter/> 来触发释放。

You can call the trigger() method programmatically (obtain the bean reference by using the name, barrier.handler — where barrier is the bean name of the barrier endpoint). Alternatively, you can configure an <outbound-channel-adapter/> to trigger the release.

只有单个线程可以挂起,与此相关性相同。相同的关系性可以重复使用多次,但一次只能同时使用。如果第二个线程带有相同的关系性到来,则会抛出一个异常。

Only one thread can be suspended with the same correlation. The same correlation can be used multiple times but only once concurrently. An exception is thrown if a second thread arrives with the same correlation.

以下示例说明如何为关联使用自定义标头:

The following example shows how to use a custom header for correlation:

  • Java

  • XML

@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
    BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
    barrier.setOutputChannel(out());
    barrier.setDiscardChannel(lateTriggerChannel);
    return barrier;
}

@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
    return barrier::trigger;
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
        correlation-strategy-expression="headers['myHeader']"
        output-processor="myOutputProcessor"
        discard-channel="lateTriggerChannel"
        timeout="10000">
</int:barrier>

<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />

根据哪一个先收到消息,发送消息给“in”的线程或发送消息给“release”的线程都将等待最多十秒,直到收到其他消息。当消息被释放时,“out”通道会接收到一个消息,该消息组合了调用名为“myOutputProcessor”的自定义 MessageGroupProcessor bean 的结果。如果主线程超时而触发器稍后到达,你可以配置一个在稍后的触发器被发送到的丢弃通道。

Depending on which one has a message arrive first, either the thread sending a message to in or the thread sending a message to release waits for up to ten seconds until the other message arrives. When the message is released, the out channel is sent a message that combines the result of invoking the custom MessageGroupProcessor bean, named myOutputProcessor. If the main thread times out and a trigger arrives later, you can configure a discard channel to which the late trigger is sent.

有关此组件的示例,请参见 barrier sample application

For an example of this component, see the barrier sample application.