Scatter-Gather

从 4.1 版本开始,Spring 集成提供了 scatter-gather 企业集成模式的实现。它是一个复合端点,其目标是将消息发送到收件人并汇总结果。正如 Enterprise Integration Patterns 中所述,它是一个适用于诸如 “best quote” 等场景的组件,在其中我们需要向多个供应商请求信息,并决定哪一个供应商为我们提供请求物品的最优惠条件。 以前,可以通过使用离散组件来配置此模式。此增强带来了更方便的配置。 ScatterGatherHandler 是一个请求-响应端点,它结合了 PublishSubscribeChannel(或 RecipientListRouter)和 AggregatingMessageHandler。请求消息发送到 scatter 通道,ScatterGatherHandler 等待聚合器发送到 outputChannel 的答复。

Functionality

`Scatter-Gather`模式建议两个场景:“auction`"和 "`distribution”。在这两种情况下,`aggregation`函数相同,并提供 `AggregatingMessageHandler`可以使用的所有选项。(实际上,`ScatterGatherHandler`只要求一个 `AggregatingMessageHandler`作为构造函数参数。)有关详细信息,请参见 Aggregator

Auction

拍卖 Scatter-Gather 变体使用 “publish-subscribe” 逻辑作为请求消息,其中 “scatter” 通道是带 apply-sequence="true"PublishSubscribeChannel 。但是,此通道可以是任何 MessageChannel 实现(如 ContentEnricher 中的 request-channel ——请参见 Content Enricher )。但是,在这种情况下,您应该为 aggregation 函数创建您自己的自定义 correlationStrategy

Distribution

分发 Scatter-Gather 变体基于 RecipientListRouter(请参阅 xref:router/implementations.adoc#router-implementations-recipientlistrouter[RecipientListRouter),其中包含 RecipientListRouter 的所有可用选项。这是第二个 ScatterGatherHandler 构造函数参数。如果您只想依赖 recipient-list-routeraggregator 的默认 correlationStrategy,则应指定 apply-sequence="true"。否则,您应为 aggregator 提供自定义 correlationStrategy。与 PublishSubscribeChannel 变体(拍卖变体)不同,具有 recipient-list-router selector 选项可根据消息筛选目标供应商。使用 apply-sequence="true",会提供默认的 sequenceSize,并且 aggregator 可以正确释放组。分发选项与拍卖选项互斥。

仅在基于 ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer) 构造函数配置的纯 Java 配置中才需要 applySequence=true,因为框架无法更改外部提供的组件。为了方便起见,从 6.0 版本开始,Scatter-Gather 的 XML 和 Java DSL 会将 applySequence 设置为真。

对于拍卖和分发变体,请求(分散)消息会通过 gatherResultChannel 头进行丰富,以等待来自 aggregator 的回复消息。

默认情况下,所有供应商都应该将其结果发送到 replyChannel 头(通常通过省略最终端点的 “output-channel”)。但是,还提供了 gatherChannel 选项,使供应商可以将其答复发送到该通道进行聚合。

Configuring a Scatter-Gather Endpoint

以下示例演示了 Scatter-Gather 的 bean 定义的 Java 配置:

@Bean
public MessageHandler distributor() {
    RecipientListRouter router = new RecipientListRouter();
    router.setApplySequence(true);
    router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
            distributionChannel3()));
    return router;
}

@Bean
public MessageHandler gatherer() {
	return new AggregatingMessageHandler(
			new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
			new SimpleMessageStore(),
			new HeaderAttributeCorrelationStrategy(
			       IntegrationMessageHeaderAccessor.CORRELATION_ID),
			new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}

@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
	ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
	handler.setOutputChannel(output());
	return handler;
}

在前面的示例中,我们用 “applySequence="true"” 和收件人通道列表配置了 RecipientListRouterdistributor” bean。下一个 bean 是用于 AggregatingMessageHandler。最后,我们将这两个 bean 注入 ScatterGatherHandler bean 定义中,并将其标记为 “@ServiceActivator” 以将分散收集组件连接到集成流。

以下示例演示了如何使用 XML 命名空间配置 <scatter-gather> 端点:

<scatter-gather
		id=""  1
		auto-startup=""  2
		input-channel=""  3
		output-channel=""  4
		scatter-channel=""  5
		gather-channel=""  6
		order=""  7
		phase=""  8
		send-timeout=""  9
		gather-timeout=""  10
		requires-reply="" > 11
			<scatterer/>  12
			<gatherer/>  13
</scatter-gather>
1 端点的 ID。ScatterGatherHandler bean 使用别名 id + '.handler' 注册。RecipientListRouter bean 使用别名 id + '.scatterer' 注册。AggregatingMessageHandler bean 使用别名 id + '.gatherer' 注册。可选。(BeanFactory 生成一个默认 id 值。)
2 生命周期属性表示端点是否应在应用程序上下文初始化期间启动。此外,ScatterGatherHandler 还实现 Lifecycle,并启动并停止 gatherEndpoint,如果提供 gather-channel,则在内部创建 gatherEndpoint。可选。(默认值为 true。)
3 要接收请求消息的通道,以便在 ScatterGatherHandler 中处理它们。必需。
4 ScatterGatherHandler 向其发送聚合结果的通道。可选。(传入消息可以自己在 replyChannel 消息头中指定回复通道)。
5 为拍卖场景发送散射消息的通道。可选。与 &lt;scatterer&gt; 子元素互斥。
6 接收聚合的每个供应商的答复的通道。它在散射消息中用作 replyChannel 头。可选。默认情况下,创建 FixedSubscriberChannel
7 当多个处理程序订阅相同的 DirectChannel 时,此组件的顺序(用于负载平衡目的)。可选。
8 指定端点应启动和停止的阶段。启动顺序从最低到最高,关闭顺序从最高到最低。默认情况下,此值为 Integer.MAX_VALUE,这意味着此容器尽可能晚地启动并尽快停止。可选。
9 output-channel 发送 Message 的答复时要等待的超时时间。默认情况下,send() 阻塞一秒钟。它仅在输出通道有一些“发送”限制时适用,例如具有已满的固定“容量”的 QueueChannel。在这种情况下,会引发 MessageDeliveryExceptionsend-timeoutAbstractSubscribableChannel 实现忽略。对于 group-timeout(-expression),来自计划的已过期任务的 MessageDeliveryException 使此任务重新计划。可选。
10 允许您指定散射-收集等待答复消息多长时间然后再返回。默认情况下,它会等待 30 秒。如果答复超时,则返回“null”。可选。
11 指定是否散射-收集必须返回非空值。此值默认为 true。因此,当底层聚合器在 gather-timeout 之后返回 null 值时,会引发 ReplyRequiredException。需要注意的是,如果 null 有可能,则应指定 gather-timeout 以避免无限期等待。
12 &lt;recipient-list-router&gt; 选项。可选。与 scatter-channel 属性互斥。
13 The <aggregator> options. Required.

Error Handling

由于 Scatter-Gather 是多请求-回复组件,因此错误处理具有一些额外的复杂性。在某些情况下,如果 ReleaseStrategy 允许进程在回复少于请求的情况下完成,则最好只是捕获并忽略下游异常。在其他情况下,当发生错误时,应考虑类似 “`compensation message`" 的内容以从子流返回。

每个异步子流都应当配置一个 errorChannel`标头,以便于从 `MessagePublishingErrorHandler`发送正确的错误消息。否则,将使用通用的错误处理逻辑将错误发送到全局 `errorChannel。有关异步错误处理的更多信息,请参见 Error Handling

同步流可以使用 ExpressionEvaluatingRequestHandlerAdvice 来忽略异常或返回补偿消息。当异常从子流之一抛出到 ScatterGatherHandler 时,它只会重新抛出到上游。这样,所有其他子流将徒劳无功,并且它们的答复将在 ScatterGatherHandler 中被忽略。有时这可能是预期的行为,但在大多数情况下,最好在特定子流中处理错误,而不会影响其他所有子流和收集器中的预期。

从 5.1.3 版本开始,ScatterGatherHandler 提供了 errorChannelName 选项。它会填充到分散消息的 errorChannel 头中,并在发生异步错误时使用,或者可以在常规同步子流中使用以直接发送错误消息。

下面的示例配置演示了通过返回补偿消息来进行异步错误处理:

@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
    return f -> f
            .scatterGather(
                    scatterer -> scatterer
                            .recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
                            .recipientFlow(f2 -> f2
                                    .channel(c -> c.executor(taskExecutor))
                                    .transform(p -> {
                                        throw new RuntimeException("Sub-flow#2");
                                    })),
                    null,
                    s -> s.errorChannel("scatterGatherErrorChannel"));
}

@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
    return MessageBuilder.withPayload(payload.getCause().getCause())
            .copyHeaders(payload.getFailedMessage().getHeaders())
            .build();
}

为了产生正确的回复,我们必须从 MessagingExceptionfailedMessage 复制头(包括 replyChannelerrorChannel),该消息是由 MessagePublishingErrorHandler 发送到 scatterGatherErrorChannel 的。这样,目标异常就会返回到 ScatterGatherHandler 的收集器,以完成回复消息组。分散收集端点之后,此类异常 “payload” 可以在收集器的 MessageGroupProcessor 中过滤掉,或使用其他方式在后端进行处理。

在将分散结果发送到收集器之前,ScatterGatherHandler 会重新设定请求消息头,包括答复和错误通道(如果有的话)。这样一来,即使在分散接收器子流中应用了异步传递,AggregatingMessageHandler 的错误也会传播给调用方。为了成功操作,必须将 gatherResultChanneloriginalReplyChanneloriginalErrorChannel 头部传输回分散接收器子流的回复。在这种情况下,必须为 ScatterGatherHandler 配置一个合理的有界 gatherTimeout。否则,默认情况下,它将永远等待收集器的答复而被阻止。