FTP Streaming Inbound Channel Adapter
版本 4.3 引入了流入站通道适配器。此适配器会生成带有 InputStream`类型负载的消息,允许获取文件而无需写入本地文件系统。由于会话保持打开状态,因此消耗应用程序负责在文件消耗完毕后关闭会话。会话是在 `closeableResource`头(`IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
)中提供的。FileSplitter`和 `StreamTransformer`等标准框架组件会自动关闭会话。有关这些组件的更多信息,请参阅 File Splitter和 Stream Transformer。以下示例显示了如何配置 `inbound-streaming-channel-adapter
:
Version 4.3 introduced the streaming inbound channel adapter.
This adapter produces message with payloads of type InputStream
, letting files be fetched without writing to the
local file system.
Since the session remains open, the consuming application is responsible for closing the session when the file has been
consumed.
The session is provided in the closeableResource
header (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
).
Standard framework components, such as the FileSplitter
and StreamTransformer
, automatically close the session.
See File Splitter and Stream Transformer for more information about these components.
The following example shows how to configure an inbound-streaming-channel-adapter
:
<int-ftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-ftp:inbound-streaming-channel-adapter>
只允许 filename-pattern
、filename-regex
、filter
或 filter-expression
中的一个。
Only one of filename-pattern
, filename-regex
, filter
, or filter-expression
is allowed.
从 5.0 版本开始,默认情况下 FtpStreamingMessageSource
适配器会基于内存中的 SimpleMetadataStore
来防止 FtpPersistentAcceptOnceFileListFilter
的远程文件出现重复。默认情况下,此过滤器也会与文件名模式(或正则表达)一起应用。如果您需要允许重复,则可以使用 AcceptAllFileListFilter
。任何其他用例都可以由 CompositeFileListFilter
(或 ChainFileListFilter
)来处理。Java 配置(later in the document)展示了一种在处理后移除远程文件以避免重复的技术。
Starting with version 5.0, by default, the FtpStreamingMessageSource
adapter prevents duplicates for remote files with FtpPersistentAcceptOnceFileListFilter
based on the in-memory SimpleMetadataStore
.
By default, this filter is also applied with the filename pattern (or regex).
If you need to allow duplicates, you can use AcceptAllFileListFilter
.
Any other use cases can be handled by CompositeFileListFilter
(or ChainFileListFilter
).
The Java configuration (later in the document) shows one technique to remove the remote file after processing to avoid duplicates.
有关 FtpPersistentAcceptOnceFileListFilter
的详细信息以及如何使用它,请参见 Remote Persistent File List Filters。
For more information about the FtpPersistentAcceptOnceFileListFilter
, and how it is used, see Remote Persistent File List Filters.
使用 max-fetch-size
属性来限制每次轮询获取的文件数(如果有必要)。在集群环境中运行时,将其设置为 1
并使用持久过滤器。有关详细信息,请参阅 Inbound Channel Adapters: Controlling Remote File Fetching。
Use the max-fetch-size
attribute to limit the number of files fetched on each poll when a fetch is necessary.
Set it to 1
and use a persistent filter when running in a clustered environment.
See Inbound Channel Adapters: Controlling Remote File Fetching for more information.
适配器分别将远程目录和文件名放在 FileHeaders.REMOTE_DIRECTORY
和 FileHeaders.REMOTE_FILE
标头中。从 5.0 版开始,FileHeaders.REMOTE_FILE_INFO
标头提供其他远程文件信息(默认情况下以 JSON 表示)。如果你将 FtpStreamingMessageSource
上的 fileInfoJson
属性设置为 false
,标头将包含一个 FtpFileInfo
对象。该 FtpFileInfo
对象由基础 Apache Net 库提供,可以使用 FtpFileInfo.getFileInfo()
方法访问它。使用 XML 配置时,fileInfoJson
属性不可用,但你可以通过将 FtpStreamingMessageSource
注入到你的某个配置类中来设置该属性。另请参阅 Remote File Information。
The adapter puts the remote directory and file name in the FileHeaders.REMOTE_DIRECTORY
and FileHeaders.REMOTE_FILE
headers, respectively.
Starting with version 5.0, the FileHeaders.REMOTE_FILE_INFO
header provides additional remote file information (represented in JSON by default).
If you set the fileInfoJson
property on the FtpStreamingMessageSource
to false
, the header contains an FtpFileInfo
object.
The FTPFile
object provided by the underlying Apache Net library can be accessed by using the FtpFileInfo.getFileInfo()
method.
The fileInfoJson
property is not available when you use XML configuration, but you can set it by injecting the FtpStreamingMessageSource
into one of your configuration classes.
See also Remote File Information.
从版本 5.1 开始,comparator
的泛型类型为 FTPFile
。以前,它是 AbstractFileInfo<FTPFile>
。这是因为现在在过滤和应用 maxFetch
之前,排序在处理中执行得更早。
Starting with version 5.1, the generic type of the comparator
is FTPFile
.
Previously, it was AbstractFileInfo<FTPFile>
.
This is because the sort is now performed earlier in the processing, before filtering and applying maxFetch
.
Configuring with Java Configuration
以下 Spring Boot 应用展示了如何通过 Java 配置来配置入站适配器的一个示例:
The following Spring Boot application shows an example of how to configure the inbound adapter with Java configuration:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template());
messageSource.setRemoteDirectory("ftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public FtpRemoteFileTemplate template() {
return new FtpRemoteFileTemplate(ftpSessionFactory());
}
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
}
请注意,在此示例中,转换器下游的消息处理程序有一个“建议”,可在处理后移除远程文件。
Notice that, in this example, the message handler downstream of the transformer has an advice
that removes the remote file after processing.