FTP Inbound Channel Adapter
FTP 输入信道适配器是一个连接到 FTP 服务器并监听远程目录事件(例如,创建新文件)的特殊侦听器,它会在远程目录事件发生时启动文件传输。以下示例显示如何配置 inbound-channel-adapter
:
<int-ftp:inbound-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="ftpSessionFactory"
auto-create-local-directory="true"
delete-remote-files="true"
filename-pattern="*.txt"
remote-directory="some/remote/path"
remote-file-separator="/"
preserve-timestamp="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
local-directory=".">
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
正如前面的配置所示,你可以通过使用 inbound-channel-adapter
元素来配置一个 FTP 入站通道适配器,同时还为各种属性(如 local-directory
、filename-pattern
(它基于简单的模式匹配,而非正则表达式)和 session-factory
的引用)提供值。
默认情况下,传输的文件与原始文件的名称相同。如果你想覆盖此行为,则你可以设置 local-filename-generator-expression
属性,它允许你提供一个 SpEL 表达式,以生成本地文件的名称。与出站网关和适配器(SpEL 评估上下文的根对象是 Message
)不同,此入站适配器在评估时尚未获得信息,因为它最终会以传输文件为它的负载生成信息。因此,SpEL 评估上下文的根对象是远程文件的原始名称(String
)。
入站通道适配器首先针对本地目录检索 File`对象,然后根据轮询器配置发送每个文件。从 5.0 版本开始,现在可以限制在需要进行新文件检索时从 FTP 服务器获取的文件数量。如果目标文件非常大,或者在具有持久性文件列表过滤器的群集系统中运行,此时会非常有有益,如后面所述。出于此目的,请使用 `max-fetch-size
。负值(默认值)表示无限制,并将检索所有匹配的文件。有关详细信息,请参见 Inbound Channel Adapters: Controlling Remote File Fetching。从 5.0 版本开始,还可以通过设置 scanner`属性向 `inbound-channel-adapter`提供自定义 `DirectoryScanner`实现。
从 Spring Integration 3.0 开始,你可以指定 `preserve-timestamp
属性(其默认值为 false
)。当为 true
时,本地文件的修改时间戳将设置为从服务器检索到的值。否则,它将被设置为当前时间。
从版本 4.2 开始,你可以指定 remote-directory-expression
而不是 remote-directory
,从而让你可以在每次轮询时动态确定目录(例如,remote-directory-expression="@myBean.determineRemoteDir()"
)。
从版本 4.3 开始,你可以省略 remote-directory
和 remote-directory-expression
属性。它们的默认值为 null
。在这种情况下,根据 FTP 协议,客户端工作目录被用作默认远程目录。
有时,基于使用 filename-pattern
属性指定简单模式的文件筛选可能不够用。如果出现这种情况,你可以使用 filename-regex
属性指定正则表达式(例如,filename-regex=".*\.test$"
)。此外,如果你需要完全控制,你可以使用 filter
属性并提供对 o.s.i.file.filters.FileListFilter
(筛选文件列表的策略接口)的任何自定义实现的引用。此筛选器确定要检索哪些远程文件。你还可以通过使用 CompositeFileListFilter
将基于模式的筛选器与其他筛选器(例如 AcceptOnceFileListFilter
,以避免同步以前获取过的文件)结合使用。
AcceptOnceFileListFilter`在内存中存储其状态。如果您希望在系统重启后状态仍然存在,请考虑改用 `FtpPersistentAcceptOnceFileListFilter
。此过滤器将已接受的文件名存储在 MetadataStore`策略的实例中(请参见 Metadata Store)。此过滤器根据文件名和远程修改时间进行匹配。
从版本 4.0 开始,此筛选器需要 `ConcurrentMetadataStore
。当与共享数据存储(例如带有 RedisMetadataStore
的 Redis
)配合使用时,它可以使多个应用程序或服务器实例共享筛选器键。
从版本 5.0 开始,默认情况下将具有内存中 SimpleMetadataStore
的 FtpPersistentAcceptOnceFileListFilter
应用于 FtpInboundFileSynchronizer
。此筛选器还与 XML 配置中的 regex
或 pattern
选项一起应用,以及与 Java DSL 中的 FtpInboundChannelAdapterSpec
一起应用。任何其他用例都可以使用 CompositeFileListFilter
(或 ChainFileListFilter
)进行管理。
前面的讨论是指检索文件之前的筛选。检索到文件后,将对文件系统上的文件应用额外的筛选。默认情况下,这是一个 AcceptOnceFileListFilter
,如前所述,它将在内存中保留状态并且不考虑文件的修改时间。除非你的应用程序在处理后删除文件,否则该适配器将在应用程序重启后默认重新处理磁盘上的文件。
此外,如果你配置 filter
以使用 FtpPersistentAcceptOnceFileListFilter
并且远程文件时间戳发生更改(导致重新获取它),则默认的本地筛选器不会让此新文件被处理。
有关此过滤器的详细信息以及如何使用它的信息,请参见 Remote Persistent File List Filters。
您可以使用 local-filter`属性配置本地文件系统过滤器的行为。从 4.3.8 版本开始,默认情况下已配置 `FileSystemPersistentAcceptOnceFileListFilter
。此过滤器将已接受的文件名和修改时间戳存储在 MetadataStore`策略的实例中(请参见 Metadata Store),并检测对本地文件修改时间的更改。默认的 `MetadataStore`是 `SimpleMetadataStore
,它在内存中存储状态。
从版本 4.1.5 开始,这些筛选器有一个新属性(flushOnUpdate
),它会导致它们在每次更新时刷新元数据存储(如果存储实现 Flushable
)。
此外,如果您使用分布式 MetadataStore
(如 Redis),您可以拥有相同适配器或应用程序的多个实例,并确保每个文件只被处理一次。
实际的本地筛选器是一个 CompositeFileListFilter
,其中包含提供的筛选器和阻止处理正在下载的文件(基于 temporary-file-suffix
)的模式筛选器。文件使用此后缀下载(默认为 .writing
),并且在传输完成后,文件将重命名为其最终名称,使其对筛选器“可见”。
remote-file-separator
属性允许你配置一个文件分隔符字符,以便在默认斜杠“/”不适用于你的特定环境时使用。
有关这些属性的更多详细信息,请参阅 @{50}。
你同样应该了解,FTP 入站通道适配器是一个轮询消费者。因此,你必须配置一个轮询器(通过使用全局默认值或本地子元素)。一旦文件被传输,带有 java.io.File
有效载荷的消息就会生成并发送到由 channel
属性标识的通道。
从版本 6.2 开始,你可以使用 FtpLastModifiedFileListFilter
根据上次修改策略过滤 FTP 文件。此过滤器可以使用 age
属性进行配置,以便此值较旧的文件才会被过滤器通过。默认情况下,age
为 60 秒,但你应该选择一个足够大的值,以避免过早地处理文件(例如,由于网络问题)。请查看其 JavaDoc 以获取更多信息。
More on File Filtering and Incomplete Files
有时,在受监控的(远程)目录中刚刚出现的文件是不完整的。通常,这样的文件是用一个临时扩展名(例如 somefile.txt.writing
)写入的,然后在写入过程完成后重命名。在大多数情况下,你只对完整的文件感兴趣,并且希望仅过滤完整的文件。要处理这些情况,可以使用 filename-pattern
、filename-regex
和 filter
属性提供的过滤支持。以下示例使用自定义过滤器实现:
<int-ftp:inbound-channel-adapter
channel="ftpChannel"
session-factory="ftpSessionFactory"
filter="customFilter"
local-directory="file:/my_transfers">
remote-directory="some/remote/path"
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
<bean id="customFilter" class="org.example.CustomFilter"/>
Poller Configuration Notes for the Inbound FTP Adapter
入站 FTP 适配器的工作包含两项任务:
-
与远程服务器通信,以便将文件从远程目录传输到本地目录。
-
对于每个已传输的文件,生成一条带有该文件作为有效负载的消息,并将其发送到由“channel”属性标识的通道。这就是为什么它们被称为“通道适配器”而不仅仅是“适配器”。此类适配器的主要工作是生成一条消息,以发送到消息通道。从本质上讲,如果您的本地目录已有一个或多个文件,那么第二项任务会优先以这种方式进行,它首先会从这些文件生成消息。仅在处理完所有本地文件后才会发起远程通信以检索更多文件。
此外,在轮询器上配置触发器时,你应密切注意 max-messages-per-poll
属性。对于所有 SourcePollingChannelAdapter
实例(包括 FTP),其默认值为 1
。这意味着,一旦处理了一个文件,它就会按照触发器配置所确定的下一个执行时间等待。如果你恰好有一个或多个文件位于 local-directory
中,它会处理这些文件,然后再与远程 FTP 服务器通信。此外,如果 max-messages-per-poll
设置为 1
(默认值),它每次只处理一个文件,间隔由触发器定义,实际上充当“一个轮询 === 一个文件
”。
对于典型文件传输用例,你很可能希望得到相反的行为:为每个轮询处理所有可能的文件,然后才等待下一个轮询。如果是这种情况,请将 max-messages-per-poll
设置为 -1。然后,在每个轮询中,适配器会尽可能多地尝试生成消息。换句话说,它处理本地目录中的所有内容,然后连接到远程目录,将所有可用于在本地处理的内容传输过去。只有在这种情况下,轮询操作才被认为已完成,并且轮询器才会等待下一个执行时间。
你还可以将 max-messages-per-poll
值设置为一个正值,表示每次轮询从文件中创建的消息的上限。例如,值为 10
意味着每次轮询它尝试处理不超过十个文件。
Recovering from Failures
了解适配器的架构非常重要。有一个用于获取文件的 File Synchronizer 和一个用于为每个同步文件发出消息的 FileReadingMessageSource
。如前所述,有两个过滤器参与其中。filter
属性(和模式)指远程(FTP)文件列表,以避免获取已经获取的文件。FileReadingMessageSource
使用 local-filter
来确定哪些文件要作为消息发送。
同步器列出远程文件并查询其过滤器。然后传输这些文件。如果在文件传输期间发生 IO 错误,则将已经添加到过滤器中的任何文件删除,以便它们有资格在下次轮询时重新获取。仅当过滤器实现 ReversibleFileListFilter
(例如 AcceptOnceFileListFilter
)时,这一点才适用。
如果在同步文件后,在处理文件的下游流上发生错误,则不会对过滤器进行自动回滚,因此默认情况下不会重新处理失败的文件。
如果希望在发生故障后重新处理此类文件,可以使用类似于以下内容的配置来促进从过滤器中删除失败的文件:
<int-ftp:inbound-channel-adapter id="ftpAdapter"
session-factory="ftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/ftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-ftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
前面的配置适用于任何 ResettableFileListFilter
。
从 5.0 版本开始,入站通道适配器可以在本地构建与生成的本地文件名对应的子目录。它也可以是远程子路径。为了能够根据层次结构支持递归读取本地目录以进行修改,您现在可以供应一个带有基于 Files.walk()`算法的新 `RecursiveDirectoryScanner`的内部 `FileReadingMessageSource
。有关更多信息,请参阅 AbstractInboundFileSynchronizingMessageSource.setScanner()
。此外,您现在可以通过使用 setUseWatchService()`选项,将 `AbstractInboundFileSynchronizingMessageSource`切换到基于 `WatchService`的 `DirectoryScanner
。它还配置了所有 WatchEventType`实例,以响应本地目录中的任何修改。前面显示的重新处理示例基于 `FileReadingMessageSource.WatchServiceDirectoryScanner`的内置功能,以便在从本地目录删除文件 (`StandardWatchEventKinds.ENTRY_DELETE
) 时执行 ResettableFileListFilter.remove()
。有关更多信息,请参阅 WatchServiceDirectoryScanner
。
Configuring with Java Configuration
以下 Spring Boot 应用程序展示了如何使用 Java 配置配置入站适配器的示例:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "ftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> ftpMessageSource() {
FtpInboundFileSynchronizingMessageSource source =
new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
source.setLocalDirectory(new File("ftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
Configuring with the Java DSL
以下 Spring Boot 应用程序展示了如何使用 Java DSL 配置入站适配器的示例:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlow
.from(Ftp.inboundAdapter(this.ftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilename(f -> f.toUpperCase() + ".a")
.localDirectory(new File("d:\\ftp_files")),
e -> e.id("ftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}