Aggregators and Resequencers

从概念上看,AggregatorSplitter 相反。它将一系列单独的消息聚合到单个消息中,并且势必更加复杂。默认情况下,聚合器返回包含来自传入消息的有效负载集合的消息。此规则也适用于 Resequencer。以下示例显示了分隔符-聚合器模式的规范示例:

An Aggregator is conceptually the opposite of a Splitter. It aggregates a sequence of individual messages into a single message and is necessarily more complex. By default, an aggregator returns a message that contains a collection of payloads from incoming messages. The same rules are applied for the Resequencer. The following example shows a canonical example of the splitter-aggregator pattern:

@Bean
public IntegrationFlow splitAggregateFlow() {
    return IntegrationFlow.from("splitAggregateInput")
            .split()
            .channel(MessageChannels.executor(this.taskExecutor()))
            .resequence()
            .aggregate()
            .get();
}

split() 方法将列表拆分为单独的消息,并将它们发送到 ExecutorChannelresequence() 方法按消息头中找到的序列详细信息对消息重新排序。aggregate() 方法收集这些消息。

The split() method splits the list into individual messages and sends them to the ExecutorChannel. The resequence() method reorders messages by sequence details found in the message headers. The aggregate() method collects those messages.

但是,通过指定发布策略和关联策略等信息,可以更改默认行为。考虑以下示例:

However, you can change the default behavior by specifying a release strategy and correlation strategy, among other things. Consider the following example:

.aggregate(a ->
        a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
            .releaseStrategy(g -> g.size() > 10)
            .messageStore(messageStore()))

前一个示例关联具有 myCorrelationKey 头的消息,并且在积累至少十个消息时发布消息。

The preceding example correlates messages that have myCorrelationKey headers and releases the messages once at least ten have been accumulated.

对于 resequence() EIP 方法,提供了类似的 lambda 配置。

Similar lambda configurations are provided for the resequence() EIP method.