SFTP Inbound Channel Adapter
SFTP 入站通道适配器是一个特殊的侦听器,它连接到服务器并侦听远程目录事件(例如创建新文件),此时它会启动文件传输。以下示例显示如何配置 SFTP 入站通道适配器:
The SFTP inbound channel adapter is a special listener that connects to the server and listens for the remote directory events (such as a new file being created), at which point it initiates a file transfer. The following example shows how to configure an SFTP inbound channel adapter:
<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate"
session-factory="sftpSessionFactory"
channel="requestChannel"
filename-pattern="*.txt"
remote-directory="/foo/bar"
preserve-timestamp="true"
local-directory="file:target/foo"
auto-create-local-directory="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
delete-remote-files="false">
<int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>
前面的配置示例显示了如何为各个属性提供值,包括:
The preceding configuration example shows how to provide values for various attributes, including the following:
-
local-directory
: The location to which files are going to be transferred -
remote-directory
: The remote source directory from which files are going to be transferred -
session-factory
: A reference to the bean we configured earlier
默认情况下,传输的文件与原始文件同名。如果要覆盖此行为,您可以设置 local-filename-generator-expression
属性,它允许您提供一个 SpEL 表达式来生成本地文件的文件名。与出站网关和适配器(SpEL 评估上下文的根对象是 消息
)不同,在评估时,此入站适配器还未有消息,因为这是它最终使用传输的文件作为其有效负载生成的内容。因此,SpEL 评估上下文的根对象是远程文件的原始名称(一个 字符串
)。
By default, the transferred file carries the same name as the original file.
If you want to override this behavior, you can set the local-filename-generator-expression
attribute, which lets you provide a SpEL expression to generate the name of the local file.
Unlike outbound gateways and adapters, where the root object of the SpEL evaluation context is a Message
, this inbound adapter does not yet have the message at the time of evaluation, since that is what it ultimately generates with the transferred file as its payload.
Consequently, the root object of the SpEL evaluation context is the original name of the remote file (a String
).
入站通道适配器首先将文件检索到本地目录,然后根据轮询配置发送每个文件。从5.0版本开始,您可以在需要新的文件检索时限制从SFTP服务器获取的文件数量。当目标文件较大或在具有持久文件列表筛选器(稍后在本部分中讨论)的群集系统中运行时,这可能很有用。为此,使用`max-fetch-size`。负值(默认值)表示没有限制,并且检索所有匹配的文件。有关更多信息,请参见Inbound Channel Adapters: Controlling Remote File Fetching。从5.0版本开始,您还可以通过设置`scanner`属性向`inbound-channel-adapter`提供自定义`DirectoryScanner`实现。
The inbound channel adapter first retrieves the file to a local directory and then emits each file according to the poller configuration.
Starting with version 5.0, you can limit the number of files fetched from the SFTP server when new file retrievals are needed.
This can be beneficial when the target files are large or when running in a clustered system with a persistent file list filter, discussed later in this section.
Use max-fetch-size
for this purpose.
A negative value (the default) means no limit and all matching files are retrieved.
See Inbound Channel Adapters: Controlling Remote File Fetching for more information.
Since version 5.0, you can also provide a custom DirectoryScanner
implementation to the inbound-channel-adapter
by setting the scanner
attribute.
从 Spring Integration 3.0 开始,您可以指定 preserve-timestamp
属性(默认值为 false
)。当为 true
时,本地文件的修改时间戳将设置为从服务器检索到的值。否则,将被设置为当前时间。
Starting with Spring Integration 3.0, you can specify the preserve-timestamp
attribute (the default is false
).
When true
, the local file’s modified timestamp is set to the value retrieved from the server.
Otherwise, it is set to the current time.
从 4.2 版本开始,你可以指定 remote-directory-expression
而不是 remote-directory
,这样你就可以动态确定每次轮询的目录——例如,remote-directory-expression="@myBean.determineRemoteDir()"
。
Starting with version 4.2, you can specify remote-directory-expression
instead of remote-directory
, which lets you dynamically determine the directory on each poll — for example, remote-directory-expression="@myBean.determineRemoteDir()"
.
有时,基于通过 filename-pattern
属性指定的简单模式的文件筛选可能不够用。如果是这种情况,你可以使用 filename-regex
属性来指定一个正则表达式(例如,filename-regex=".*\.test$"
)。如果你需要完全控制,你可以使用 filter
属性来提供对 org.springframework.integration.file.filters.FileListFilter
的自定义实现的引用,它是一个用于筛选文件列表的策略接口。这个过滤器会确定要检索哪些远程文件。你还可以通过使用 CompositeFileListFilter
,将基于模式的过滤器与其他过滤器(例如 AcceptOnceFileListFilter
)结合起来,从而避免同步以前已获取的文件。
Sometimes, file filtering based on the simple pattern specified via filename-pattern
attribute might not suffice.
If this is the case, you can use the filename-regex
attribute to specify a regular expression (for example, filename-regex=".*\.test$"
).
If you need complete control, you can use the filter
attribute to provide a reference to a custom implementation of the org.springframework.integration.file.filters.FileListFilter
, which is a strategy interface for filtering a list of files.
This filter determines which remote files are retrieved.
You can also combine a pattern-based filter with other filters (such as an AcceptOnceFileListFilter
, to avoid synchronizing files that have previously been fetched) by using a CompositeFileListFilter
.
AcceptOnceFileListFilter`将它自己的状态存储在内存中。如果您希望状态在系统重启后仍然存在,请考虑使用`SftpPersistentAcceptOnceFileListFilter
。此过滤器将已接受的文件名存储在`MetadataStore`策略的实例中(参见Metadata Store)。此过滤器根据文件名和远程修改时间进行匹配。
The AcceptOnceFileListFilter
stores its state in memory.
If you wish the state to survive a system restart, consider using the SftpPersistentAcceptOnceFileListFilter
instead.
This filter stores the accepted file names in an instance of the MetadataStore
strategy (see Metadata Store).
This filter matches on the filename and the remote modified time.
从 4.0 版本开始,此过滤器需要一个 ConcurrentMetadataStore
。当与共享数据存储(例如使用 RedisMetadataStore
的 Redis
)一起使用时,这可以使筛选密钥在多个应用程序或服务器实例之间共享。
Since version 4.0, this filter requires a ConcurrentMetadataStore
.
When used with a shared data store (such as Redis
with the RedisMetadataStore
), this lets filter keys be shared across multiple application or server instances.
从 5.0 版本开始,默认情况下 SftpInboundFileSynchronizer
使用带有内存中 SimpleMetadataStore
的 SftpPersistentAcceptOnceFileListFilter
。此过滤器也与 XML 配置中的 regex
或 pattern
选项一起应用,以及通过 Java DSL 中的 SftpInboundChannelAdapterSpec
应用。你可以使用 CompositeFileListFilter
(或 ChainFileListFilter
)处理任何其他用例。
Starting with version 5.0, the SftpPersistentAcceptOnceFileListFilter
with an in-memory SimpleMetadataStore
is applied by default for the SftpInboundFileSynchronizer
.
This filter is also applied, together with the regex
or pattern
option in the XML configuration, as well as through SftpInboundChannelAdapterSpec
in Java DSL.
You can handle any other use-cases by using CompositeFileListFilter
(or ChainFileListFilter
).
上述讨论涉及在检索文件之前对其进行筛选。检索文件后,会将其他过滤器应用到文件系统上的文件。默认情况下,这是一个`AcceptOnceFileListFilter`,如本部分所述,它将状态保留在内存中,并且不考虑文件的修改时间。除非你的应用程序在处理后删除文件,否则该适配器在应用程序重新启动后默认会重新处理磁盘上的文件。
The above discussion refers to filtering the files before retrieving them. Once the files have been retrieved, an additional filter is applied to the files on the file system. By default, this is an`AcceptOnceFileListFilter`, which, as discussed in this section, retains state in memory and does not consider the file’s modified time. Unless your application removes files after processing, the adapter re-processes the files on disk by default after an application restart.
另外,如果你将 filter
配置为使用 SftpPersistentAcceptOnceFileListFilter
并且远程文件时间戳发生更改(导致重新获取),则默认本地过滤器不允许处理此新文件。
Also, if you configure the filter
to use a SftpPersistentAcceptOnceFileListFilter
and the remote file timestamp changes (causing it to be re-fetched), the default local filter does not allow this new file to be processed.
有关此过滤器的详细信息以及如何使用它的信息,请参见 Remote Persistent File List Filters。
For more information about this filter, and how it is used, see Remote Persistent File List Filters.
您可以使用`local-filter`属性来配置本地文件系统筛选器的行为。从4.3.8版本开始,默认情况下配置`FileSystemPersistentAcceptOnceFileListFilter`。此过滤器将已接受的文件名和修改时间戳存储在`MetadataStore`策略的实例中(参见Metadata Store),并检测本地文件修改时间的更改。默认的`MetadataStore`是将状态存储在内存中的`SimpleMetadataStore`。
You can use the local-filter
attribute to configure the behavior of the local file system filter.
Starting with version 4.3.8, a FileSystemPersistentAcceptOnceFileListFilter
is configured by default.
This filter stores the accepted file names and modified timestamp in an instance of the MetadataStore
strategy (see Metadata Store) and detects changes to the local file modified time.
The default MetadataStore
is a SimpleMetadataStore
that stores state in memory.
从 4.1.5 版本开始,这些过滤器都有一个名为 flushOnUpdate
的新属性,它导致它们在每次更新时刷新元数据存储(如果存储实现 Flushable
)。
Since version 4.1.5, these filters have a new property called flushOnUpdate
, which causes them to flush the
metadata store on every update (if the store implements Flushable
).
此外,如果您使用分布式 MetadataStore
(例如 Redis Metadata Store),您可以拥有相同适配器或应用程序的多个实例,并确保只有一个实例处理一个文件。
Further, if you use a distributed MetadataStore
(such as Redis Metadata Store), you can have multiple instances of the same adapter or application and be sure that one and only one instance processes a file.
实际的本地过滤器是一个包含所提供的过滤器和一个模式过滤器的 CompositeFileListFilter
,该模式过滤器防止处理正在下载中的文件(基于 temporary-file-suffix
)。文件使用此后缀下载(默认值为 .writing
),并且在传输完成后,文件将重命名为其最终名称,使其对过滤器“可见”。
The actual local filter is a CompositeFileListFilter
that contains the supplied filter and a pattern filter that prevents processing files that are in the process of being downloaded (based on the temporary-file-suffix
).
Files are downloaded with this suffix (the default is .writing
), and the files are renamed to their final names when the transfer is complete, making them 'visible' to the filter.
有关这些属性的更多详细信息,请参见 schema。
See the schema for more detail on these attributes.
SFTP 入站通道适配器是轮询使用者。因此,你必须配置轮询器(全局默认值或本地元素)。一旦文件被传输到本地目录,就会生成一个 java.io.File
作为其有效负载类型的消息,并将其发送到由 channel
属性标识的通道。
SFTP inbound channel adapter is a polling consumer.
Therefore, you must configure a poller (either a global default or a local element).
Once the file has been transferred to a local directory, a message with java.io.File
as its payload type is generated and sent to the channel identified by the channel
attribute.
从 6.2 版本开始,你可以使用 SftpLastModifiedFileListFilter
基于最后修改策略筛选 SFTP 文件。该过滤器可以配置为 age
属性,以便过滤器仅传递比此值更旧的文件。年龄默认为 60 秒,但你应该选择足够大的年龄,以避免过早提取文件(例如,由于网络故障)。请查看其 Javadoc 以了解更多信息。
Starting with version 6.2, you can filter SFTP files based on last-modified strategy using SftpLastModifiedFileListFilter
.
This filter can be configured with an age
property so that only files older than this value are passed by the filter.
The age defaults to 60 seconds, but you should choose an age that is large enough to avoid picking up a file early (due to, say, network glitches).
Look into its Javadoc for more information.
More on File Filtering and Large Files
有时,监视的(远程)目录中刚刚出现的文件并不完整。通常,此类文件会用一些临时扩展名编写(例如命名为 something.txt.writing
文件上的 .writing
),然后在写入过程完成后重命名。在大多数情况下,开发人员只对完整的文件感兴趣,并且只想过滤这些文件。要处理这些场景,你可以使用 filename-pattern
、filename-regex
和 filter
属性提供的筛选支持。如果你需要一个自定义过滤器实现,你可以通过设置 filter
属性来在适配器中包含一个引用。以下示例演示如何做到这一点:
Sometimes, a file that just appeared in the monitored (remote) directory is not complete.
Typically, such a file is written with some temporary extension (such as .writing
on a file named something.txt.writing
) and then renamed after the writing process completes.
In most cases, developers are interested only in files that are complete and would like to filter only those files.
To handle these scenarios, you can use the filtering support provided by the filename-pattern
, filename-regex
, and filter
attributes.
If you need a custom filter implementation, you can include a reference in your adapter by setting the filter
attribute.
The following example shows how to do so:
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="receiveChannel"
session-factory="sftpSessionFactory"
filter="customFilter"
local-directory="file:/local-test-dir"
remote-directory="/remote-test-dir">
<int:poller fixed-rate="1000" max-messages-per-poll="10" task-executor="executor"/>
</int-sftp:inbound-channel-adapter>
<bean id="customFilter" class="org.foo.CustomFilter"/>
Recovering from Failures
您应该了解适配器的体系结构。文件同步器检索文件,而`FileReadingMessageSource`为每个同步文件发送消息。作为discussed earlier,涉及两个过滤器。`filter`属性(和模式)是指远程(SFTP)文件列表,以避免获取已经获取的文件。`FileReadingMessageSource`使用`local-filter`来确定将哪些文件作为消息发送。
You should understand the architecture of the adapter.
A file synchronizer fetches the files, and a FileReadingMessageSource
emits a message for each synchronized file.
As discussed earlier, two filters are involved.
The filter
attribute (and patterns) refers to the remote (SFTP) file list, to avoid fetching files that have already been fetched.
the FileReadingMessageSource
uses the local-filter
to determine which files are to be sent as messages.
同步器列出远程文件并查询其过滤器。然后传输这些文件。如果在文件传输过程中发生 IO 错误,则会删除任何已添加到筛选器的文件,以便它们有资格在下次轮询时重新获取。这仅适用于过滤器实现了 ReversibleFileListFilter
的情况(例如 AcceptOnceFileListFilter
)。
The synchronizer lists the remote files and consults its filter.
The files are then transferred.
If an IO error occurs during file transfer, any files that have already been added to the filter are removed so that they are eligible to be re-fetched on the next poll.
This applies only if the filter implements ReversibleFileListFilter
(such as the AcceptOnceFileListFilter
).
如果在同步文件后,在处理文件的下游流上发生错误,则不会对过滤器进行自动回滚,因此默认情况下不会重新处理失败的文件。
If, after synchronizing the files, an error occurs on the downstream flow processing a file, no automatic rollback of the filter occurs, so the failed file is not reprocessed by default.
如果你希望在发生故障后重新处理此类文件,则可以使用类似于以下配置来促进从过滤器中删除失败的文件:
If you wish to reprocess such files after a failure, you can use a configuration similar to the following to facilitate the removal of the failed file from the filter:
<int-sftp:inbound-channel-adapter id="sftpAdapter"
session-factory="sftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/sftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-sftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
前面的配置适用于任何 ResettableFileListFilter
。
The preceding configuration works for any ResettableFileListFilter
.
从 5.0 版开始,传入通道适配器可以根据生成的文件名在本地建立子目录。那也可以是一个远端的子路径。为了能够根据层次结构支持递归读取本地目录的修改,您现在可以提供带有基于 Files.walk()
算法的新 RecursiveDirectoryScanner
的 FileReadingMessageSource
内部。有关详细信息,请参见 AbstractInboundFileSynchronizingMessageSource.setScanner()
。此外,您现在可以使用 setUseWatchService()
选项,将 AbstractInboundFileSynchronizingMessageSource
切换到基于 WatchService
的 DirectoryScanner
。它还针对所有 WatchEventType
实例进行配置,以对本地目录的任何修改做出反应。先前展示的重新处理示例基于 FileReadingMessageSource.WatchServiceDirectoryScanner
的内置功能,当文件从本地目录中删除(StandardWatchEventKinds.ENTRY_DELETE
)时,FileReadingMessageSource.WatchServiceDirectoryScanner
会使用 ResettableFileListFilter.remove()
。有关详细信息,请参见 WatchServiceDirectoryScanner
。
Starting with version 5.0, the inbound channel adapter can build sub-directories locally, according to the generated local file name.
That can be a remote sub-path as well.
To be able to read a local directory recursively for modification according to the hierarchy support, you can now supply an internal FileReadingMessageSource
with a new RecursiveDirectoryScanner
based on the Files.walk()
algorithm.
See AbstractInboundFileSynchronizingMessageSource.setScanner()
for more information.
Also, you can now switch the AbstractInboundFileSynchronizingMessageSource
to the WatchService
-based DirectoryScanner
by using setUseWatchService()
option.
It is also configured for all the WatchEventType
instances to react for any modifications in local directory.
The reprocessing sample shown earlier is based on the built-in functionality of the FileReadingMessageSource.WatchServiceDirectoryScanner
, which uses ResettableFileListFilter.remove()
when the file is deleted (StandardWatchEventKinds.ENTRY_DELETE
) from the local directory.
See WatchServiceDirectoryScanner
for more information.
Configuring with Java Configuration
以下 Spring Boot 应用程序展示了如何使用 Java 配置入站适配器的示例:
The following Spring Boot application shows an example of how to configure the inbound adapter with Java:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(port);
factory.setUser("foo");
factory.setPassword("foo");
factory.setAllowUnknownKeys(true);
factory.setTestSession(true);
return new CachingSessionFactory<>(factory);
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(new File("sftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
Configuring with the Java DSL
以下 Spring Boot 应用程序展示了如何使用 Java DSL 配置入站适配器的示例:
The following Spring Boot application shows an example of how to configure the inbound adapter with the Java DSL:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow sftpInboundFlow() {
return IntegrationFlow
.from(Sftp.inboundAdapter(this.sftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilenameExpression("#this.toUpperCase() + '.a'")
.localDirectory(new File("sftp-inbound")),
e -> e.id("sftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}