Thread Barrier

有时,我们需要暂停消息流线程,直至发生其他异步事件。例如,考虑会发布消息到 RabbitMQ 的 HTTP 请求。我们可能希望在 RabbitMQ 代理发出消息已收到的确认之前,不向用户回复。

在 4.2 版本中,Spring Integration 为此目的引入了 <barrier/> 组件。底层 MessageHandlerBarrierMessageHandler。此类还实现了 MessageTriggerAction,其中传递给 trigger() 方法的消息在 handleRequestMessage() 方法(如果存在)中释放对应的线程。

暂停的线程和触发线程通过在消息上调用 CorrelationStrategy 关联。将消息发送到 input-channel 时,线程将暂停最多 requestTimeout 毫秒,等待对应的触发消息。默认关联策略使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 标头。当具有相同关联的触发消息到达时,线程将被释放。在释放后发送到 output-channel 的消息是使用 MessageGroupProcessor 构建的。默认情况下,消息是两个有效负载的 Collection<?>,标头通过使用 DefaultAggregatingMessageGroupProcessor 合并。

如果首先调用 trigger() 方法(或在主线程超时之后),则它将挂起最长 triggerTimeout 时间,等待挂起的消息到达。如果您不想挂起触发器线程,请考虑改为移交给 TaskExecutor,以便改为挂起其线程。

在版本 5.4 之前,仅有一个 timeout 选项用于请求和触发消息,但在某些用例中,最好为这些操作设置不同的超时时间。因此引入了 requestTimeouttriggerTimeout 选项。

requires-reply 属性确定如果在触发消息到达之前挂起线程超时后执行的动作。默认情况下,它是 false,这意味着端点返回 null,流结束,线程返回到调用者。当为 true 时,会抛出 ReplyRequiredException

你可以通过编程方式调用 trigger() 方法(通过使用名称和“barrier.handler”获取 bean 引用——其中“barrier”是障碍端点的 bean 名称)。或者,你可以配置 <outbound-channel-adapter/> 来触发释放。

只有单个线程可以挂起,与此相关性相同。相同的关系性可以重复使用多次,但一次只能同时使用。如果第二个线程带有相同的关系性到来,则会抛出一个异常。

以下示例说明如何为关联使用自定义标头:

  • Java

  • XML

@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
    BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
    barrier.setOutputChannel(out());
    barrier.setDiscardChannel(lateTriggerChannel);
    return barrier;
}

@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
    return barrier::trigger;
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
        correlation-strategy-expression="headers['myHeader']"
        output-processor="myOutputProcessor"
        discard-channel="lateTriggerChannel"
        timeout="10000">
</int:barrier>

<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />

根据哪一个先收到消息,发送消息给“in”的线程或发送消息给“release”的线程都将等待最多十秒,直到收到其他消息。当消息被释放时,“out”通道会接收到一个消息,该消息组合了调用名为“myOutputProcessor”的自定义 MessageGroupProcessor bean 的结果。如果主线程超时而触发器稍后到达,你可以配置一个在稍后的触发器被发送到的丢弃通道。

有关此组件的示例,请参见 barrier sample application