Scatter-Gather
从 4.1 版本开始,Spring 集成提供了 scatter-gather 企业集成模式的实现。它是一个复合端点,其目标是将消息发送到收件人并汇总结果。正如 Enterprise Integration Patterns 中所述,它是一个适用于诸如 “best quote” 等场景的组件,在其中我们需要向多个供应商请求信息,并决定哪一个供应商为我们提供请求物品的最优惠条件。
Starting with version 4.1, Spring Integration provides an implementation of the scatter-gather enterprise integration pattern. It is a compound endpoint for which the goal is to send a message to the recipients and aggregate the results. As noted in Enterprise Integration Patterns, it is a component for scenarios such as “best quote”, where we need to request information from several suppliers and decide which one provides us with the best term for the requested item.
以前,可以通过使用离散组件来配置此模式。此增强带来了更方便的配置。
Previously, the pattern could be configured by using discrete components. This enhancement brings more convenient configuration.
ScatterGatherHandler
是一个请求-响应端点,它结合了 PublishSubscribeChannel
(或 RecipientListRouter
)和 AggregatingMessageHandler
。请求消息发送到 scatter
通道,ScatterGatherHandler
等待聚合器发送到 outputChannel
的答复。
The ScatterGatherHandler
is a request-reply endpoint that combines a PublishSubscribeChannel
(or a RecipientListRouter
) and an AggregatingMessageHandler
.
The request message is sent to the scatter
channel, and the ScatterGatherHandler
waits for the reply that the aggregator sends to the outputChannel
.
Functionality
`Scatter-Gather`模式建议两个场景:“auction`"和 "`distribution”。在这两种情况下,`aggregation`函数相同,并提供 `AggregatingMessageHandler`可以使用的所有选项。(实际上,`ScatterGatherHandler`只要求一个 `AggregatingMessageHandler`作为构造函数参数。)有关详细信息,请参见 Aggregator。
The Scatter-Gather
pattern suggests two scenarios: “auction” and “distribution”.
In both cases, the aggregation
function is the same and provides all the options available for the AggregatingMessageHandler
.
(Actually, the ScatterGatherHandler
requires only an AggregatingMessageHandler
as a constructor argument.)
See Aggregator for more information.
Auction
拍卖 Scatter-Gather
变体使用 “publish-subscribe” 逻辑作为请求消息,其中 “scatter” 通道是带 apply-sequence="true"
的 PublishSubscribeChannel
。但是,此通道可以是任何 MessageChannel
实现(如 ContentEnricher
中的 request-channel
——请参见 Content Enricher )。但是,在这种情况下,您应该为 aggregation
函数创建您自己的自定义 correlationStrategy
。
The auction Scatter-Gather
variant uses “publish-subscribe” logic for the request message, where the “scatter” channel is a PublishSubscribeChannel
with apply-sequence="true"
.
However, this channel can be any MessageChannel
implementation (as is the case with the request-channel
in the ContentEnricher
— see Content Enricher).
However, in this case, you should create your own custom correlationStrategy
for the aggregation
function.
Distribution
分发 Scatter-Gather
变体基于 RecipientListRouter
(请参阅 xref:router/implementations.adoc#router-implementations-recipientlistrouter[RecipientListRouter
),其中包含 RecipientListRouter
的所有可用选项。这是第二个 ScatterGatherHandler
构造函数参数。如果您只想依赖 recipient-list-router
和 aggregator
的默认 correlationStrategy
,则应指定 apply-sequence="true"
。否则,您应为 aggregator
提供自定义 correlationStrategy
。与 PublishSubscribeChannel
变体(拍卖变体)不同,具有 recipient-list-router
selector
选项可根据消息筛选目标供应商。使用 apply-sequence="true"
,会提供默认的 sequenceSize
,并且 aggregator
可以正确释放组。分发选项与拍卖选项互斥。
The distribution Scatter-Gather
variant is based on the RecipientListRouter
(see RecipientListRouter
) with all available options for the RecipientListRouter
.
This is the second ScatterGatherHandler
constructor argument.
If you want to rely on only the default correlationStrategy
for the recipient-list-router
and the aggregator
, you should specify apply-sequence="true"
.
Otherwise, you should supply a custom correlationStrategy
for the aggregator
.
Unlike the PublishSubscribeChannel
variant (the auction variant), having a recipient-list-router
selector
option lets filter target suppliers based on the message.
With apply-sequence="true"
, the default sequenceSize
is supplied, and the aggregator
can release the group correctly.
The distribution option is mutually exclusive with the auction option.
仅在基于 |
The |
对于拍卖和分发变体,请求(分散)消息会通过 gatherResultChannel
头进行丰富,以等待来自 aggregator
的回复消息。
For both the auction and the distribution variants, the request (scatter) message is enriched with the gatherResultChannel
header to wait for a reply message from the aggregator
.
默认情况下,所有供应商都应该将其结果发送到 replyChannel
头(通常通过省略最终端点的 “output-channel
”)。但是,还提供了 gatherChannel
选项,使供应商可以将其答复发送到该通道进行聚合。
By default, all suppliers should send their result to the replyChannel
header (usually by omitting the output-channel
from the ultimate endpoint).
However, the gatherChannel
option is also provided, letting suppliers send their reply to that channel for the aggregation.
Configuring a Scatter-Gather Endpoint
以下示例演示了 Scatter-Gather
的 bean 定义的 Java 配置:
The following example shows Java configuration for the bean definition for Scatter-Gather
:
@Bean
public MessageHandler distributor() {
RecipientListRouter router = new RecipientListRouter();
router.setApplySequence(true);
router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
distributionChannel3()));
return router;
}
@Bean
public MessageHandler gatherer() {
return new AggregatingMessageHandler(
new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
new SimpleMessageStore(),
new HeaderAttributeCorrelationStrategy(
IntegrationMessageHeaderAccessor.CORRELATION_ID),
new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}
@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
handler.setOutputChannel(output());
return handler;
}
在前面的示例中,我们用 “applySequence="true"
” 和收件人通道列表配置了 RecipientListRouter
“distributor
” bean。下一个 bean 是用于 AggregatingMessageHandler
。最后,我们将这两个 bean 注入 ScatterGatherHandler
bean 定义中,并将其标记为 “@ServiceActivator
” 以将分散收集组件连接到集成流。
In the preceding example, we configure the RecipientListRouter
distributor
bean with applySequence="true"
and the list of recipient channels.
The next bean is for an AggregatingMessageHandler
.
Finally, we inject both those beans into the ScatterGatherHandler
bean definition and mark it as a @ServiceActivator
to wire the scatter-gather component into the integration flow.
以下示例演示了如何使用 XML 命名空间配置 <scatter-gather>
端点:
The following example shows how to configure the <scatter-gather>
endpoint by using the XML namespace:
<scatter-gather
id="" 1
auto-startup="" 2
input-channel="" 3
output-channel="" 4
scatter-channel="" 5
gather-channel="" 6
order="" 7
phase="" 8
send-timeout="" 9
gather-timeout="" 10
requires-reply="" > 11
<scatterer/> 12
<gatherer/> 13
</scatter-gather>
1 | The id of the endpoint.
The ScatterGatherHandler bean is registered with an alias of id + '.handler' .
The RecipientListRouter bean is registered with an alias of id + '.scatterer' .
The AggregatingMessageHandler bean is registered with an alias of id + '.gatherer' .
Optional.
(The BeanFactory generates a default id value.) |
2 | Lifecycle attribute signaling whether the endpoint should be started during application context initialization.
In addition, the ScatterGatherHandler also implements Lifecycle and starts and stops gatherEndpoint , which is created internally if a gather-channel is provided.
Optional.
(The default is true .) |
3 | The channel on which to receive request messages to handle them in the ScatterGatherHandler .
Required. |
4 | The channel to which the ScatterGatherHandler sends the aggregation results.
Optional.
(Incoming messages can specify a reply channel themselves in the replyChannel message header). |
5 | The channel to which to send the scatter message for the auction scenario.
Optional.
Mutually exclusive with the <scatterer> sub-element. |
6 | The channel on which to receive replies from each supplier for the aggregation.
It is used as the replyChannel header in the scatter message.
Optional.
By default, the FixedSubscriberChannel is created. |
7 | The order of this component when more than one handler is subscribed to the same DirectChannel (use for load balancing purposes).
Optional. |
8 | Specifies the phase in which the endpoint should be started and stopped.
The startup order proceeds from lowest to highest, and the shutdown order is from highest to lowest.
By default, this value is Integer.MAX_VALUE , meaning that this container starts as late as possible and stops as soon as possible.
Optional. |
9 | The timeout interval to wait when sending a reply Message to the output-channel .
By default, the send() blocks for one second.
It applies only if the output channel has some 'sending' limitations — for example, a QueueChannel with a fixed 'capacity' that is full.
In this case, a MessageDeliveryException is thrown.
The send-timeout is ignored for AbstractSubscribableChannel implementations.
For group-timeout(-expression) , the MessageDeliveryException from the scheduled expired task leads this task to be rescheduled.
Optional. |
10 | Lets you specify how long the scatter-gather waits for the reply message before returning.
By default, it waits for 30 seconds.
'null' is returned if the reply times out.
Optional. |
11 | Specifies whether the scatter-gather must return a non-null value.
This value is true by default.
Consequently, a ReplyRequiredException is thrown when the underlying aggregator returns a null value after gather-timeout .
Note, if null is a possibility, the gather-timeout should be specified to avoid an indefinite wait. |
12 | The <recipient-list-router> options.
Optional.
Mutually exclusive with scatter-channel attribute. |
13 | The <aggregator> options.
Required. |
Error Handling
由于 Scatter-Gather 是多请求-回复组件,因此错误处理具有一些额外的复杂性。在某些情况下,如果 ReleaseStrategy
允许进程在回复少于请求的情况下完成,则最好只是捕获并忽略下游异常。在其他情况下,当发生错误时,应考虑类似 “`compensation message`" 的内容以从子流返回。
Since Scatter-Gather is a multi request-reply component, error handling has some extra complexity.
In some cases, it is better to just catch and ignore downstream exceptions if the ReleaseStrategy
allows the process to finish with fewer replies than requests.
In other cases something like a “compensation message” should be considered for returning from sub-flow, when an error happens.
每个异步子流都应当配置一个 errorChannel`标头,以便于从 `MessagePublishingErrorHandler`发送正确的错误消息。否则,将使用通用的错误处理逻辑将错误发送到全局 `errorChannel
。有关异步错误处理的更多信息,请参见 Error Handling。
Every async sub-flow should be configured with a errorChannel
header for the proper error message sending from the MessagePublishingErrorHandler
.
Otherwise, an error will be sent to the global errorChannel
with the common error handling logic.
See Error Handling for more information about async error processing.
同步流可以使用 ExpressionEvaluatingRequestHandlerAdvice
来忽略异常或返回补偿消息。当异常从子流之一抛出到 ScatterGatherHandler
时,它只会重新抛出到上游。这样,所有其他子流将徒劳无功,并且它们的答复将在 ScatterGatherHandler
中被忽略。有时这可能是预期的行为,但在大多数情况下,最好在特定子流中处理错误,而不会影响其他所有子流和收集器中的预期。
Synchronous flows may use an ExpressionEvaluatingRequestHandlerAdvice
for ignoring the exception or returning a compensation message.
When an exception is thrown from one of the sub-flows to the ScatterGatherHandler
, it is just re-thrown to upstream.
This way all other sub-flows will work for nothing and their replies are going to be ignored in the ScatterGatherHandler
.
This might be an expected behavior sometimes, but in most cases it would be better to handle the error in the particular sub-flow without impacting all others and the expectations in the gatherer.
从 5.1.3 版本开始,ScatterGatherHandler
提供了 errorChannelName
选项。它会填充到分散消息的 errorChannel
头中,并在发生异步错误时使用,或者可以在常规同步子流中使用以直接发送错误消息。
Starting with version 5.1.3, the ScatterGatherHandler
is supplied with the errorChannelName
option.
It is populated to the errorChannel
header of the scatter message and is used when an async error happens or can be used in the regular synchronous sub-flow for directly sending an error message.
下面的示例配置演示了通过返回补偿消息来进行异步错误处理:
The sample configuration below demonstrates async error handling by returning a compensation message:
@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
return f -> f
.scatterGather(
scatterer -> scatterer
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
.recipientFlow(f2 -> f2
.channel(c -> c.executor(taskExecutor))
.transform(p -> {
throw new RuntimeException("Sub-flow#2");
})),
null,
s -> s.errorChannel("scatterGatherErrorChannel"));
}
@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
return MessageBuilder.withPayload(payload.getCause().getCause())
.copyHeaders(payload.getFailedMessage().getHeaders())
.build();
}
为了产生正确的回复,我们必须从 MessagingException
的 failedMessage
复制头(包括 replyChannel
和 errorChannel
),该消息是由 MessagePublishingErrorHandler
发送到 scatterGatherErrorChannel
的。这样,目标异常就会返回到 ScatterGatherHandler
的收集器,以完成回复消息组。分散收集端点之后,此类异常 “payload
” 可以在收集器的 MessageGroupProcessor
中过滤掉,或使用其他方式在后端进行处理。
To produce a proper reply, we have to copy headers (including replyChannel
and errorChannel
) from the failedMessage
of the MessagingException
that has been sent to the scatterGatherErrorChannel
by the MessagePublishingErrorHandler
.
This way the target exception is returned to the gatherer of the ScatterGatherHandler
for reply messages group completion.
Such an exception payload
can be filtered out in the MessageGroupProcessor
of the gatherer or processed other way downstream, after the scatter-gather endpoint.
在将分散结果发送到收集器之前, |
Before sending scattering results to the gatherer, |