FTP Inbound Channel Adapter

FTP 输入信道适配器是一个连接到 FTP 服务器并监听远程目录事件(例如,创建新文件)的特殊侦听器,它会在远程目录事件发生时启动文件传输。以下示例显示如何配置 inbound-channel-adapter

<int-ftp:inbound-channel-adapter id="ftpInbound"
    channel="ftpChannel"
    session-factory="ftpSessionFactory"
    auto-create-local-directory="true"
    delete-remote-files="true"
    filename-pattern="*.txt"
    remote-directory="some/remote/path"
    remote-file-separator="/"
    preserve-timestamp="true"
    local-filename-generator-expression="#this.toUpperCase() + '.a'"
    scanner="myDirScanner"
    local-filter="myFilter"
    temporary-file-suffix=".writing"
    max-fetch-size="-1"
    local-directory=".">
    <int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>

正如前面的配置所示,你可以通过使用 inbound-channel-adapter 元素来配置一个 FTP 入站通道适配器,同时还为各种属性(如 local-directoryfilename-pattern(它基于简单的模式匹配,而非正则表达式)和 session-factory 的引用)提供值。 默认情况下,传输的文件与原始文件的名称相同。如果你想覆盖此行为,则你可以设置 local-filename-generator-expression 属性,它允许你提供一个 SpEL 表达式,以生成本地文件的名称。与出站网关和适配器(SpEL 评估上下文的根对象是 Message)不同,此入站适配器在评估时尚未获得信息,因为它最终会以传输文件为它的负载生成信息。因此,SpEL 评估上下文的根对象是远程文件的原始名称(String)。 入站通道适配器首先针对本地目录检索 File`对象,然后根据轮询器配置发送每个文件。从 5.0 版本开始,现在可以限制在需要进行新文件检索时从 FTP 服务器获取的文件数量。如果目标文件非常大,或者在具有持久性文件列表过滤器的群集系统中运行,此时会非常有有益,如后面所述。出于此目的,请使用 `max-fetch-size。负值(默认值)表示无限制,并将检索所有匹配的文件。有关详细信息,请参见 Inbound Channel Adapters: Controlling Remote File Fetching。从 5.0 版本开始,还可以通过设置 scanner`属性向 `inbound-channel-adapter`提供自定义 `DirectoryScanner`实现。 从 Spring Integration 3.0 开始,你可以指定 `preserve-timestamp 属性(其默认值为 false)。当为 true 时,本地文件的修改时间戳将设置为从服务器检索到的值。否则,它将被设置为当前时间。 从版本 4.2 开始,你可以指定 remote-directory-expression 而不是 remote-directory,从而让你可以在每次轮询时动态确定目录(例如,remote-directory-expression="@myBean.determineRemoteDir()")。 从版本 4.3 开始,你可以省略 remote-directoryremote-directory-expression 属性。它们的默认值为 null。在这种情况下,根据 FTP 协议,客户端工作目录被用作默认远程目录。 有时,基于使用 filename-pattern 属性指定简单模式的文件筛选可能不够用。如果出现这种情况,你可以使用 filename-regex 属性指定正则表达式(例如,filename-regex=".*\.test$")。此外,如果你需要完全控制,你可以使用 filter 属性并提供对 o.s.i.file.filters.FileListFilter(筛选文件列表的策略接口)的任何自定义实现的引用。此筛选器确定要检索哪些远程文件。你还可以通过使用 CompositeFileListFilter 将基于模式的筛选器与其他筛选器(例如 AcceptOnceFileListFilter,以避免同步以前获取过的文件)结合使用。 AcceptOnceFileListFilter`在内存中存储其状态。如果您希望在系统重启后状态仍然存在,请考虑改用 `FtpPersistentAcceptOnceFileListFilter。此过滤器将已接受的文件名存储在 MetadataStore`策略的实例中(请参见 Metadata Store)。此过滤器根据文件名和远程修改时间进行匹配。 从版本 4.0 开始,此筛选器需要 `ConcurrentMetadataStore。当与共享数据存储(例如带有 RedisMetadataStoreRedis)配合使用时,它可以使多个应用程序或服务器实例共享筛选器键。 从版本 5.0 开始,默认情况下将具有内存中 SimpleMetadataStoreFtpPersistentAcceptOnceFileListFilter 应用于 FtpInboundFileSynchronizer。此筛选器还与 XML 配置中的 regexpattern 选项一起应用,以及与 Java DSL 中的 FtpInboundChannelAdapterSpec 一起应用。任何其他用例都可以使用 CompositeFileListFilter(或 ChainFileListFilter)进行管理。 前面的讨论是指检索文件之前的筛选。检索到文件后,将对文件系统上的文件应用额外的筛选。默认情况下,这是一个 AcceptOnceFileListFilter,如前所述,它将在内存中保留状态并且不考虑文件的修改时间。除非你的应用程序在处理后删除文件,否则该适配器将在应用程序重启后默认重新处理磁盘上的文件。 此外,如果你配置 filter 以使用 FtpPersistentAcceptOnceFileListFilter 并且远程文件时间戳发生更改(导致重新获取它),则默认的本地筛选器不会让此新文件被处理。 有关此过滤器的详细信息以及如何使用它的信息,请参见 Remote Persistent File List Filters。 您可以使用 local-filter`属性配置本地文件系统过滤器的行为。从 4.3.8 版本开始,默认情况下已配置 `FileSystemPersistentAcceptOnceFileListFilter。此过滤器将已接受的文件名和修改时间戳存储在 MetadataStore`策略的实例中(请参见 Metadata Store),并检测对本地文件修改时间的更改。默认的 `MetadataStore`是 `SimpleMetadataStore,它在内存中存储状态。 从版本 4.1.5 开始,这些筛选器有一个新属性(flushOnUpdate),它会导致它们在每次更新时刷新元数据存储(如果存储实现 Flushable)。

此外,如果您使用分布式 MetadataStore(如 Redis),您可以拥有相同适配器或应用程序的多个实例,并确保每个文件只被处理一次。

实际的本地筛选器是一个 CompositeFileListFilter,其中包含提供的筛选器和阻止处理正在下载的文件(基于 temporary-file-suffix)的模式筛选器。文件使用此后缀下载(默认为 .writing),并且在传输完成后,文件将重命名为其最终名称,使其对筛选器“可见”。 remote-file-separator 属性允许你配置一个文件分隔符字符,以便在默认斜杠“/”不适用于你的特定环境时使用。 有关这些属性的更多详细信息,请参阅 @{50}。 你同样应该了解,FTP 入站通道适配器是一个轮询消费者。因此,你必须配置一个轮询器(通过使用全局默认值或本地子元素)。一旦文件被传输,带有 java.io.File 有效载荷的消息就会生成并发送到由 channel 属性标识的通道。 从版本 6.2 开始,你可以使用 FtpLastModifiedFileListFilter 根据上次修改策略过滤 FTP 文件。此过滤器可以使用 age 属性进行配置,以便此值较旧的文件才会被过滤器通过。默认情况下,age 为 60 秒,但你应该选择一个足够大的值,以避免过早地处理文件(例如,由于网络问题)。请查看其 JavaDoc 以获取更多信息。

More on File Filtering and Incomplete Files

有时,在受监控的(远程)目录中刚刚出现的文件是不完整的。通常,这样的文件是用一个临时扩展名(例如 somefile.txt.writing)写入的,然后在写入过程完成后重命名。在大多数情况下,你只对完整的文件感兴趣,并且希望仅过滤完整的文件。要处理这些情况,可以使用 filename-patternfilename-regexfilter 属性提供的过滤支持。以下示例使用自定义过滤器实现:

<int-ftp:inbound-channel-adapter
    channel="ftpChannel"
    session-factory="ftpSessionFactory"
    filter="customFilter"
    local-directory="file:/my_transfers">
    remote-directory="some/remote/path"
    <int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>

<bean id="customFilter" class="org.example.CustomFilter"/>

Poller Configuration Notes for the Inbound FTP Adapter

入站 FTP 适配器的工作包含两项任务:

  1. 与远程服务器通信,以便将文件从远程目录传输到本地目录。

  2. 对于每个已传输的文件,生成一条带有该文件作为有效负载的消息,并将其发送到由“channel”属性标识的通道。这就是为什么它们被称为“通道适配器”而不仅仅是“适配器”。此类适配器的主要工作是生成一条消息,以发送到消息通道。从本质上讲,如果您的本地目录已有一个或多个文件,那么第二项任务会优先以这种方式进行,它首先会从这些文件生成消息。仅在处理完所有本地文件后才会发起远程通信以检索更多文件。

此外,在轮询器上配置触发器时,你应密切注意 max-messages-per-poll 属性。对于所有 SourcePollingChannelAdapter 实例(包括 FTP),其默认值为 1。这意味着,一旦处理了一个文件,它就会按照触发器配置所确定的下一个执行时间等待。如果你恰好有一个或多个文件位于 local-directory 中,它会处理这些文件,然后再与远程 FTP 服务器通信。此外,如果 max-messages-per-poll 设置为 1(默认值),它每次只处理一个文件,间隔由触发器定义,实际上充当“一个轮询 === 一个文件”。

对于典型文件传输用例,你很可能希望得到相反的行为:为每个轮询处理所有可能的文件,然后才等待下一个轮询。如果是这种情况,请将 max-messages-per-poll 设置为 -1。然后,在每个轮询中,适配器会尽可能多地尝试生成消息。换句话说,它处理本地目录中的所有内容,然后连接到远程目录,将所有可用于在本地处理的内容传输过去。只有在这种情况下,轮询操作才被认为已完成,并且轮询器才会等待下一个执行时间。

你还可以将 max-messages-per-poll 值设置为一个正值,表示每次轮询从文件中创建的消息的上限。例如,值为 10 意味着每次轮询它尝试处理不超过十个文件。

Recovering from Failures

了解适配器的架构非常重要。有一个用于获取文件的 File Synchronizer 和一个用于为每个同步文件发出消息的 FileReadingMessageSource。如前所述,有两个过滤器参与其中。filter 属性(和模式)指远程(FTP)文件列表,以避免获取已经获取的文件。FileReadingMessageSource 使用 local-filter 来确定哪些文件要作为消息发送。

同步器列出远程文件并查询其过滤器。然后传输这些文件。如果在文件传输期间发生 IO 错误,则将已经添加到过滤器中的任何文件删除,以便它们有资格在下次轮询时重新获取。仅当过滤器实现 ReversibleFileListFilter(例如 AcceptOnceFileListFilter)时,这一点才适用。

如果在同步文件后,在处理文件的下游流上发生错误,则不会对过滤器进行自动回滚,因此默认情况下不会重新处理失败的文件。

如果希望在发生故障后重新处理此类文件,可以使用类似于以下内容的配置来促进从过滤器中删除失败的文件:

<int-ftp:inbound-channel-adapter id="ftpAdapter"
        session-factory="ftpSessionFactory"
        channel="requestChannel"
        remote-directory-expression="'/ftpSource'"
        local-directory="file:myLocalDir"
        auto-create-local-directory="true"
        filename-pattern="*.txt">
    <int:poller fixed-rate="1000">
        <int:transactional synchronization-factory="syncFactory" />
    </int:poller>
</int-ftp:inbound-channel-adapter>

<bean id="acceptOnceFilter"
    class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>

<bean id="transactionManager"
    class="org.springframework.integration.transaction.PseudoTransactionManager" />

前面的配置适用于任何 ResettableFileListFilter

从 5.0 版本开始,入站通道适配器可以在本地构建与生成的本地文件名对应的子目录。它也可以是远程子路径。为了能够根据层次结构支持递归读取本地目录以进行修改,您现在可以供应一个带有基于 Files.walk()`算法的新 `RecursiveDirectoryScanner`的内部 `FileReadingMessageSource。有关更多信息,请参阅 AbstractInboundFileSynchronizingMessageSource.setScanner()。此外,您现在可以通过使用 setUseWatchService()`选项,将 `AbstractInboundFileSynchronizingMessageSource`切换到基于 `WatchService`的 `DirectoryScanner。它还配置了所有 WatchEventType`实例,以响应本地目录中的任何修改。前面显示的重新处理示例基于 `FileReadingMessageSource.WatchServiceDirectoryScanner`的内置功能,以便在从本地目录删除文件 (`StandardWatchEventKinds.ENTRY_DELETE) 时执行 ResettableFileListFilter.remove()。有关更多信息,请参阅 WatchServiceDirectoryScanner

Configuring with Java Configuration

以下 Spring Boot 应用程序展示了如何使用 Java 配置配置入站适配器的示例:

@SpringBootApplication
public class FtpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
        sf.setHost("localhost");
        sf.setPort(port);
        sf.setUsername("foo");
        sf.setPassword("foo");
        sf.setTestSession(true);
        return new CachingSessionFactory<FTPFile>(sf);
    }

    @Bean
    public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
        FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory());
        fileSynchronizer.setDeleteRemoteFiles(false);
        fileSynchronizer.setRemoteDirectory("foo");
        fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
        return fileSynchronizer;
    }

    @Bean
    @InboundChannelAdapter(channel = "ftpChannel", poller = @Poller(fixedDelay = "5000"))
    public MessageSource<File> ftpMessageSource() {
        FtpInboundFileSynchronizingMessageSource source =
                new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
        source.setLocalDirectory(new File("ftp-inbound"));
        source.setAutoCreateLocalDirectory(true);
        source.setLocalFilter(new AcceptOnceFileListFilter<File>());
        source.setMaxFetchSize(1);
        return source;
    }

    @Bean
    @ServiceActivator(inputChannel = "ftpChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

Configuring with the Java DSL

以下 Spring Boot 应用程序展示了如何使用 Java DSL 配置入站适配器的示例:

@SpringBootApplication
public class FtpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow ftpInboundFlow() {
        return IntegrationFlow
            .from(Ftp.inboundAdapter(this.ftpSessionFactory)
                    .preserveTimestamp(true)
                    .remoteDirectory("foo")
                    .regexFilter(".*\\.txt$")
                    .localFilename(f -> f.toUpperCase() + ".a")
                    .localDirectory(new File("d:\\ftp_files")),
                e -> e.id("ftpInboundAdapter")
                    .autoStartup(true)
                    .poller(Pollers.fixedDelay(5000)))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
    }
}

Dealing With Incomplete Data

请参阅 @{66}。

提供 @{67} 来过滤远程系统中没有相应标记文件的远程文件。请参阅 @{68}(并浏览到父类)以获取配置信息。