Aggregators and Resequencers

从概念上看,AggregatorSplitter 相反。它将一系列单独的消息聚合到单个消息中,并且势必更加复杂。默认情况下,聚合器返回包含来自传入消息的有效负载集合的消息。此规则也适用于 Resequencer。以下示例显示了分隔符-聚合器模式的规范示例:

@Bean
public IntegrationFlow splitAggregateFlow() {
    return IntegrationFlow.from("splitAggregateInput")
            .split()
            .channel(MessageChannels.executor(this.taskExecutor()))
            .resequence()
            .aggregate()
            .get();
}

split() 方法将列表拆分为单独的消息,并将它们发送到 ExecutorChannelresequence() 方法按消息头中找到的序列详细信息对消息重新排序。aggregate() 方法收集这些消息。

但是,通过指定发布策略和关联策略等信息,可以更改默认行为。考虑以下示例:

.aggregate(a ->
        a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
            .releaseStrategy(g -> g.size() > 10)
            .messageStore(messageStore()))

前一个示例关联具有 myCorrelationKey 头的消息,并且在积累至少十个消息时发布消息。

对于 resequence() EIP 方法,提供了类似的 lambda 配置。