Scatter-Gather
从 4.1 版本开始,Spring 集成提供了 scatter-gather 企业集成模式的实现。它是一个复合端点,其目标是将消息发送到收件人并汇总结果。正如 Enterprise Integration Patterns 中所述,它是一个适用于诸如 “best quote” 等场景的组件,在其中我们需要向多个供应商请求信息,并决定哪一个供应商为我们提供请求物品的最优惠条件。
以前,可以通过使用离散组件来配置此模式。此增强带来了更方便的配置。
ScatterGatherHandler
是一个请求-响应端点,它结合了 PublishSubscribeChannel
(或 RecipientListRouter
)和 AggregatingMessageHandler
。请求消息发送到 scatter
通道,ScatterGatherHandler
等待聚合器发送到 outputChannel
的答复。
Functionality
`Scatter-Gather`模式建议两个场景:“auction`"和 "`distribution”。在这两种情况下,`aggregation`函数相同,并提供 `AggregatingMessageHandler`可以使用的所有选项。(实际上,`ScatterGatherHandler`只要求一个 `AggregatingMessageHandler`作为构造函数参数。)有关详细信息,请参见 Aggregator。
Auction
拍卖 Scatter-Gather
变体使用 “publish-subscribe” 逻辑作为请求消息,其中 “scatter” 通道是带 apply-sequence="true"
的 PublishSubscribeChannel
。但是,此通道可以是任何 MessageChannel
实现(如 ContentEnricher
中的 request-channel
——请参见 Content Enricher )。但是,在这种情况下,您应该为 aggregation
函数创建您自己的自定义 correlationStrategy
。
Distribution
分发 Scatter-Gather
变体基于 RecipientListRouter
(请参阅 xref:router/implementations.adoc#router-implementations-recipientlistrouter[RecipientListRouter
),其中包含 RecipientListRouter
的所有可用选项。这是第二个 ScatterGatherHandler
构造函数参数。如果您只想依赖 recipient-list-router
和 aggregator
的默认 correlationStrategy
,则应指定 apply-sequence="true"
。否则,您应为 aggregator
提供自定义 correlationStrategy
。与 PublishSubscribeChannel
变体(拍卖变体)不同,具有 recipient-list-router
selector
选项可根据消息筛选目标供应商。使用 apply-sequence="true"
,会提供默认的 sequenceSize
,并且 aggregator
可以正确释放组。分发选项与拍卖选项互斥。
仅在基于 |
对于拍卖和分发变体,请求(分散)消息会通过 gatherResultChannel
头进行丰富,以等待来自 aggregator
的回复消息。
默认情况下,所有供应商都应该将其结果发送到 replyChannel
头(通常通过省略最终端点的 “output-channel
”)。但是,还提供了 gatherChannel
选项,使供应商可以将其答复发送到该通道进行聚合。
Configuring a Scatter-Gather Endpoint
以下示例演示了 Scatter-Gather
的 bean 定义的 Java 配置:
@Bean
public MessageHandler distributor() {
RecipientListRouter router = new RecipientListRouter();
router.setApplySequence(true);
router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
distributionChannel3()));
return router;
}
@Bean
public MessageHandler gatherer() {
return new AggregatingMessageHandler(
new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
new SimpleMessageStore(),
new HeaderAttributeCorrelationStrategy(
IntegrationMessageHeaderAccessor.CORRELATION_ID),
new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}
@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
handler.setOutputChannel(output());
return handler;
}
在前面的示例中,我们用 “applySequence="true"
” 和收件人通道列表配置了 RecipientListRouter
“distributor
” bean。下一个 bean 是用于 AggregatingMessageHandler
。最后,我们将这两个 bean 注入 ScatterGatherHandler
bean 定义中,并将其标记为 “@ServiceActivator
” 以将分散收集组件连接到集成流。
以下示例演示了如何使用 XML 命名空间配置 <scatter-gather>
端点:
<scatter-gather
id="" 1
auto-startup="" 2
input-channel="" 3
output-channel="" 4
scatter-channel="" 5
gather-channel="" 6
order="" 7
phase="" 8
send-timeout="" 9
gather-timeout="" 10
requires-reply="" > 11
<scatterer/> 12
<gatherer/> 13
</scatter-gather>
1 | 端点的 ID。ScatterGatherHandler bean 使用别名 id + '.handler' 注册。RecipientListRouter bean 使用别名 id + '.scatterer' 注册。AggregatingMessageHandler bean 使用别名 id + '.gatherer' 注册。可选。(BeanFactory 生成一个默认 id 值。) |
2 | 生命周期属性表示端点是否应在应用程序上下文初始化期间启动。此外,ScatterGatherHandler 还实现 Lifecycle ,并启动并停止 gatherEndpoint ,如果提供 gather-channel ,则在内部创建 gatherEndpoint 。可选。(默认值为 true 。) |
3 | 要接收请求消息的通道,以便在 ScatterGatherHandler 中处理它们。必需。 |
4 | ScatterGatherHandler 向其发送聚合结果的通道。可选。(传入消息可以自己在 replyChannel 消息头中指定回复通道)。 |
5 | 为拍卖场景发送散射消息的通道。可选。与 <scatterer> 子元素互斥。 |
6 | 接收聚合的每个供应商的答复的通道。它在散射消息中用作 replyChannel 头。可选。默认情况下,创建 FixedSubscriberChannel 。 |
7 | 当多个处理程序订阅相同的 DirectChannel 时,此组件的顺序(用于负载平衡目的)。可选。 |
8 | 指定端点应启动和停止的阶段。启动顺序从最低到最高,关闭顺序从最高到最低。默认情况下,此值为 Integer.MAX_VALUE ,这意味着此容器尽可能晚地启动并尽快停止。可选。 |
9 | 向 output-channel 发送 Message 的答复时要等待的超时时间。默认情况下,send() 阻塞一秒钟。它仅在输出通道有一些“发送”限制时适用,例如具有已满的固定“容量”的 QueueChannel 。在这种情况下,会引发 MessageDeliveryException 。send-timeout 对 AbstractSubscribableChannel 实现忽略。对于 group-timeout(-expression) ,来自计划的已过期任务的 MessageDeliveryException 使此任务重新计划。可选。 |
10 | 允许您指定散射-收集等待答复消息多长时间然后再返回。默认情况下,它会等待 30 秒。如果答复超时,则返回“null”。可选。 |
11 | 指定是否散射-收集必须返回非空值。此值默认为 true 。因此,当底层聚合器在 gather-timeout 之后返回 null 值时,会引发 ReplyRequiredException 。需要注意的是,如果 null 有可能,则应指定 gather-timeout 以避免无限期等待。 |
12 | <recipient-list-router> 选项。可选。与 scatter-channel 属性互斥。 |
13 | The <aggregator> options.
Required. |
Error Handling
由于 Scatter-Gather 是多请求-回复组件,因此错误处理具有一些额外的复杂性。在某些情况下,如果 ReleaseStrategy
允许进程在回复少于请求的情况下完成,则最好只是捕获并忽略下游异常。在其他情况下,当发生错误时,应考虑类似 “`compensation message`" 的内容以从子流返回。
每个异步子流都应当配置一个 errorChannel`标头,以便于从 `MessagePublishingErrorHandler`发送正确的错误消息。否则,将使用通用的错误处理逻辑将错误发送到全局 `errorChannel
。有关异步错误处理的更多信息,请参见 Error Handling。
同步流可以使用 ExpressionEvaluatingRequestHandlerAdvice
来忽略异常或返回补偿消息。当异常从子流之一抛出到 ScatterGatherHandler
时,它只会重新抛出到上游。这样,所有其他子流将徒劳无功,并且它们的答复将在 ScatterGatherHandler
中被忽略。有时这可能是预期的行为,但在大多数情况下,最好在特定子流中处理错误,而不会影响其他所有子流和收集器中的预期。
从 5.1.3 版本开始,ScatterGatherHandler
提供了 errorChannelName
选项。它会填充到分散消息的 errorChannel
头中,并在发生异步错误时使用,或者可以在常规同步子流中使用以直接发送错误消息。
下面的示例配置演示了通过返回补偿消息来进行异步错误处理:
@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
return f -> f
.scatterGather(
scatterer -> scatterer
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
.recipientFlow(f2 -> f2
.channel(c -> c.executor(taskExecutor))
.transform(p -> {
throw new RuntimeException("Sub-flow#2");
})),
null,
s -> s.errorChannel("scatterGatherErrorChannel"));
}
@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
return MessageBuilder.withPayload(payload.getCause().getCause())
.copyHeaders(payload.getFailedMessage().getHeaders())
.build();
}
为了产生正确的回复,我们必须从 MessagingException
的 failedMessage
复制头(包括 replyChannel
和 errorChannel
),该消息是由 MessagePublishingErrorHandler
发送到 scatterGatherErrorChannel
的。这样,目标异常就会返回到 ScatterGatherHandler
的收集器,以完成回复消息组。分散收集端点之后,此类异常 “payload
” 可以在收集器的 MessageGroupProcessor
中过滤掉,或使用其他方式在后端进行处理。
在将分散结果发送到收集器之前, |