File Aggregator
从版本 5.5 开始,引入了“FileAggregator
”来涵盖在启用 START/END 标记时“FileSplitter
”使用案例的另一面。为了方便,“FileAggregator
”实现了所有三个序列详细信息策略:
-
具有 `FileHeaders.FILENAME`属性的 `HeaderAttributeCorrelationStrategy`用于关联键计算。当在 `FileSplitter`上启用标记时,它不会填充序列详情标头,因为开始/结束标记消息也包含在序列大小中。`FileHeaders.FILENAME`仍会为发出的每一行(包括开始/结束标记消息)填充。
-
FileMarkerReleaseStrategy
- 检查组中的FileSplitter.FileMarker.Mark.END`消息,然后将 `FileHeaders.LINE_COUNT`标头值与组大小减去 `2
- `FileSplitter.FileMarker`实例进行比较。它还实现在 `GroupConditionProvider`中使用的 `AbstractCorrelatingMessageHandler`的一个便利 `conditionSupplier`联系人。有关更多信息,请参见 Message Group Condition。 -
`FileAggregatingMessageGroupProcessor`只从组中移除 `FileSplitter.FileMarker`消息并收集其余消息到列表有效负载中以进行产生。
以下清单展示了配置“FileAggregator
”的可能方法:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow fileSplitterAggregatorFlow(TaskExecutor taskExecutor) {
return f -> f
.split(Files.splitter()
.markers()
.firstLineAsHeader("firstLine"))
.channel(c -> c.executor(taskExecutor))
.filter(payload -> !(payload instanceof FileSplitter.FileMarker),
e -> e.discardChannel("aggregatorChannel"))
.<String, String>transform(String::toUpperCase)
.channel("aggregatorChannel")
.aggregate(new FileAggregator())
.channel(c -> c.queue("resultChannel"));
}
@Bean
fun fileSplitterAggregatorFlow(taskExecutor: TaskExecutor?) =
integrationFlow {
split(Files.splitter().markers().firstLineAsHeader("firstLine"))
channel { executor(taskExecutor) }
filter<Any>({ it !is FileMarker }) { discardChannel("aggregatorChannel") }
transform(String::toUpperCase)
channel("aggregatorChannel")
aggregate(FileAggregator())
channel { queue("resultChannel") }
}
@serviceActivator(inputChannel="toAggregateFile")
@Bean
public AggregatorFactoryBean fileAggregator() {
AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
aggregator.setProcessorBean(new FileAggregator());
aggregator.setOutputChannel(outputChannel);
return aggregator;
}
<int:chain input-channel="input" output-channel="output">
<int-file:splitter markers="true"/>
<int:aggregator>
<bean class="org.springframework.integration.file.aggregator.FileAggregator"/>
</int:aggregator>
</int:chain>
如果“FileAggregator
”的默认行为不能满足目标逻辑,建议使用各个策略配置聚合器端点。有关详细信息,请参阅“FileAggregator
”JavaDocs。