File Splitter

在 4.1.2 版本中添加了 FileSplitter,并在 4.2 版本中添加了其命名空间支持。FileSplitter`根据`BufferedReader.readLine()`将文本文件拆分为单独的行。默认情况下,分隔符使用`Iterator`将行作为从文件中读取时的一次一行发出。将`iterator`属性设置为`false`会使其在将所有行作为消息发出之前将所有行读入内存。其一个用例可能是,您希望在发送包含任何行的消息之前检测文件上的 I/O 错误。但是,这只适用于相对较短的文件。 入站有效负载可以是`FileStringFile`路径)、`InputStream`或`Reader。其他有效负载类型保持不变。 以下清单显示了配置`FileSplitter`的可能方法:

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@SpringBootApplication
public class FileSplitterApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileSplitterApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow fileSplitterFlow() {
        return IntegrationFlow
            .from(Files.inboundAdapter(tmpDir.getRoot())
                 .filter(new ChainFileListFilter<File>()
                        .addFilter(new AcceptOnceFileListFilter<>())
                        .addFilter(new ExpressionFileListFilter<>(
                             new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
            .split(Files.splitter()
                     .markers()
                     .charset(StandardCharsets.US_ASCII)
                     .firstLineAsHeader("fileHeader")
                     .applySequence(true))
            .channel(c -> c.queue("fileSplittingResultChannel"))
            .get();
    }

}
@Bean
fun fileSplitterFlow() =
    integrationFlow(
        Files.inboundAdapter(tmpDir.getRoot())
            .filter(
                ChainFileListFilter<File?>()
                    .addFilter(AcceptOnceFileListFilter())
                    .addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
            )
    ) {
        split(
            Files.splitter()
                .markers()
                .charset(StandardCharsets.US_ASCII)
                .firstLineAsHeader("fileHeader")
                .applySequence(true)
        )
        channel { queue("fileSplittingResultChannel") }
    }
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
    FileSplitter splitter = new FileSplitter(true, true);
    splitter.setApplySequence(true);
    splitter.setOutputChannel(outputChannel);
    return splitter;
}
<int-file:splitter id="splitter" 1
    iterator=""                  2
    markers=""                   3
    markers-json=""              4
    apply-sequence=""            5
    requires-reply=""            6
    charset=""                   7
    first-line-as-header=""      8
    input-channel=""             9
    output-channel=""            10
    send-timeout=""              11
    auto-startup=""              12
    order=""                     13
    phase="" />                  14
1 分隔符 Bean 名称。
2 设为 true(默认值)以使用迭代器,或设为 `false`以在发送行之前将文件载入内存中。
3 true`设为发送文件数据之前和之后发出文件起始标记消息和文件结束标记消息。标记是带有 `FileSplitter.FileMarker`有效负载(在 `mark`属性中带有 `START`和 `END`值)的消息。在对下游流中的文件进行顺序处理时,可以使用标记,其中会过滤某些行。它们让下游处理知道某个文件已完成处理。另外,一个包含 `START`或 `END`的 `file_marker`标头被添加到这些消息中。`END`标记包括行计数。如果文件为空,则仅发出 `START`和 `END`标记,其中 `lineCount`为 `0。默认值为 false。当 true`时,`apply-sequence`的默认值为 `false。另请参见 markers-json(下一个属性)。
4 markers`为真时,将其设为 `true`以使 `FileMarker`对象被转换为 JSON 字符串。(使用下面的 `SimpleJsonSerializer)。
5 设为 false`以禁用在消息中包含 `sequenceSize`和 `sequenceNumber`标头。默认值为 `true,除非 markers`为 `true。当 true`和 `markers`为 `true`时,标记被包含在序列中。当 `true`和 `iterator`为 `true`时,`sequenceSize`标头被设为 `0,因为大小未知。
6 设为 true`以导致在文件内没有行时抛出一个 `RequiresReplyException。默认值为 false
7 设置在将文本数据读入 `String`有效负载时要使用的字符集名称。默认值为平台字符集。
8 作为标头传递的第一行的标头名称,余下行发出的消息中作为标头。自版本 5.0 起。
9 设置用于发送消息到拆分器的输入通道。
10 设置消息发送到的输出通道。
11 设置发送超时。仅适用在 output-channel`可以阻止的情况下,例如完全 `QueueChannel
12 设为 false`以禁用在刷新上下文时自动启动拆分器。默认值为 `true
13 如果 input-channel`为 `<publish-subscribe-channel/>,则设置此端点的顺序。
14 设置拆分器的启动阶段(当 `auto-startup`为 `true`时使用)。

FileSplitter 还会将基于文本的 InputStream 拆分为多行。从 4.3 版开始,当与使用 stream 选项检索文件的 FTP 或 SFTP 流入通道适配器或 FTP 或 SFTP 出站网关一起使用时,当文件完全使用完毕后,拆分器会自动关闭支持流的会话。有关这些功能的更多信息,请参阅 FTP Streaming Inbound Channel AdapterSFTP Streaming Inbound Channel Adapter 以及 FTP Outbound GatewaySFTP Outbound Gateway。 使用 Java 配置时,可以使用其他构造函数,如下例所示:

public FileSplitter(boolean iterator, boolean markers, boolean markersJson)

当`markersJson`为真时,标记表示为 JSON 字符串(使用`SimpleJsonSerializer`)。 5.0 版本引入了`firstLineAsHeader`选项,用于指定内容的第一行是一个标题(例如 CSV 文件中的列名)。传递给此属性的参数是标头名称,第一行是作为标头在为其余行发出的消息中携带的。此行不包含在序列标题(如果`applySequence`为真)中,也不包含在与`FileMarker.END`关联的`lineCount`中。注意:从 5.5 版开始,lineCount`也被作为`FileHeaders.LINE_COUNT`包含到`FileMarker.END`消息的标题中,因为`FileMarker`可以序列化为 JSON。如果文件仅包含标题行,则该文件将被视为为空文件,因此,在拆分期间仅发出`FileMarker`实例(如果启用了标记——否则,不发出任何消息)。默认情况下(如果没有设置标题名称),则第一行被视为数据并成为第一个发出消息的有效负载。 如果你需要关于从文件内容中提取标题的更复杂的逻辑(不是第一行,不是行的全部内容,不是一个特定标题,等等),请考虑使用 `FileSplitter 之前的 header enricher。请注意,已移至标题的行可能会被过滤掉,不会进行普通的处理流程。

Idempotent Downstream Processing a Split File

apply-sequence 为真时,拆分器会在 SEQUENCE_NUMBER 标头中添加行号(当 markers 为真时,标记会被视作行)。行号可与 Idempotent Receiver 一起使用,以避免在重新启动后重新处理行。

例如:

@Bean
public ConcurrentMetadataStore store() {
    return new ZookeeperMetadataStore();
}

@Bean
public MetadataStoreSelector selector() {
    return new MetadataStoreSelector(
            message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
                    .getAbsolutePath(),
            message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
                    .toString(),
            store())
                    .compareValues(
                            (oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    return new IdempotentReceiverInterceptor(selector());
}

@Bean
public IntegrationFlow flow() {
    ...
    .split(new FileSplitter())
    ...
    .handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
    ...
}