SFTP Streaming Inbound Channel Adapter

4.3 版本引入了流入站通道适配器。该适配器会生成具有 InputStream 类型有效负载的消息,让您无需写入本地文件系统即可获取文件。由于会话保持开放状态,因此使用者应用程序负责在文件被使用时关闭会话。会话在 closeableResource 头部 (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE) 中提供。标准框架组件,例如 FileSplitterStreamTransformer,会自动关闭会话。有关这些组件的更多信息,请参阅 File SplitterStream Transformer。以下示例演示如何配置一个 SFTP 流入站通道适配器:

Version 4.3 introduced the streaming inbound channel adapter. This adapter produces message with payloads of type InputStream, letting you fetch files without writing to the local file system. Since the session remains open, the consuming application is responsible for closing the session when the file has been consumed. The session is provided in the closeableResource header (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE). Standard framework components, such as the FileSplitter and StreamTransformer, automatically close the session. See File Splitter and Stream Transformer for more information about these components. The following example shows how to configure an SFTP streaming inbound channel adapter:

<int-sftp:inbound-streaming-channel-adapter id="ftpInbound"
            channel="ftpChannel"
            session-factory="sessionFactory"
            filename-pattern="*.txt"
            filename-regex=".*\.txt"
            filter="filter"
            filter-expression="@myFilterBean.check(#root)"
            remote-file-separator="/"
            comparator="comparator"
            max-fetch-size="1"
            remote-directory-expression="'foo/bar'">
        <int:poller fixed-rate="1000" />
</int-sftp:inbound-streaming-channel-adapter>

你只能使用 filename-patternfilename-regexfilterfilter-expression 中的一个。

You can use only one of filename-pattern, filename-regex, filter, or filter-expression.

从 5.0 版本开始,默认情况下,SftpStreamingMessageSource 适配器会通过 SftpPersistentAcceptOnceFileListFilter 来防止 FtpPersistentAcceptOnceFileListFilter 的重复文件, SftpPersistentAcceptOnceFileListFilter 基于内存中的 SimpleMetadataStore。默认情况下,此过滤器也会与文件名模式(或正则表达)一起应用。如果您需要允许重复,则可以使用 AcceptAllFileListFilter。您可以使用 CompositeFileListFilter(或 ChainFileListFilter)来处理任何其他用例。Java 配置 shown later 展示了一种在处理后移除远程文件以避免重复的技术。

Starting with version 5.0, by default, the SftpStreamingMessageSource adapter prevents duplicates for remote files by using SftpPersistentAcceptOnceFileListFilter based on the in-memory SimpleMetadataStore. By default, this filter is also applied together with the filename pattern (or regex) as well. If you need to allow duplicates, you can use the AcceptAllFileListFilter. You can handle any other use cases by using CompositeFileListFilter (or ChainFileListFilter). The Java configuration shown later shows one technique to remove the remote file after processing, avoiding duplicates.

有关 `SftpPersistentAcceptOnceFileListFilter`的更多信息,以及如何使用,请参阅 Remote Persistent File List Filters

For more information about the SftpPersistentAcceptOnceFileListFilter, and how it is used, see Remote Persistent File List Filters.

当需要读取时,可以使用 max-fetch-size 属性来限制每次轮询读取的文件数量。将其设置为 1,并在群集环境中运行时使用持久筛选器。有关更多信息,请参阅 Inbound Channel Adapters: Controlling Remote File Fetching

You can use the max-fetch-size attribute to limit the number of files fetched on each poll when a fetch is necessary. Set it to 1 and use a persistent filter when running in a clustered environment. See Inbound Channel Adapters: Controlling Remote File Fetching for more information.

适配器将远程目录和文件名分别置于标头 (FileHeaders.REMOTE_DIRECTORYFileHeaders.REMOTE_FILE) 中。从 5.0 版开始,FileHeaders.REMOTE_FILE_INFO 标头提供其他远程文件信息(采用 JSON 格式)。如果你将 SftpStreamingMessageSource 上的 fileInfoJson 属性设置为 false,那么标头将包含 SftpFileInfo 对象。你可以使用 SftpFileInfo.getFileInfo() 方法访问由底层 SftpClient 提供的 SftpClient.DirEntry 对象。使用 XML 配置时,fileInfoJson 属性不可用,但可以通过将 SftpStreamingMessageSource 注入一个配置类来设置该属性。另请参阅 Remote File Information

The adapter puts the remote directory and the file name in headers (FileHeaders.REMOTE_DIRECTORY and FileHeaders.REMOTE_FILE, respectively). Starting with version 5.0, the FileHeaders.REMOTE_FILE_INFO header provides additional remote file information (in JSON). If you set the fileInfoJson property on the SftpStreamingMessageSource to false, the header contains an SftpFileInfo object. You can access the SftpClient.DirEntry object provided by the underlying SftpClient by using the SftpFileInfo.getFileInfo() method. The fileInfoJson property is not available when you use XML configuration, but you can set it by injecting the SftpStreamingMessageSource into one of your configuration classes. See also Remote File Information.

Configuring with Java Configuration

以下 Spring Boot 应用程序展示了如何使用 Java 配置入站适配器的示例:

The following Spring Boot application shows an example of how to configure the inbound adapter with Java:

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    @InboundChannelAdapter(channel = "stream")
    public MessageSource<InputStream> ftpMessageSource() {
        SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
        messageSource.setRemoteDirectory("sftpSource/");
        messageSource.setFilter(new AcceptAllFileListFilter<>());
        messageSource.setMaxFetchSize(1);
        return messageSource;
    }

    @Bean
    @Transformer(inputChannel = "stream", outputChannel = "data")
    public org.springframework.integration.transformer.Transformer transformer() {
        return new StreamTransformer("UTF-8");
    }

    @Bean
    public SftpRemoteFileTemplate template() {
        return new SftpRemoteFileTemplate(sftpSessionFactory());
    }

    @ServiceActivator(inputChannel = "data", adviceChain = "after")
    @Bean
    public MessageHandler handle() {
        return System.out::println;
    }

    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice after() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpression(
                "@template.remove(headers['file_remoteDirectory'] + '/' +  headers['file_remoteFile'])");
        advice.setPropagateEvaluationFailures(true);
        return advice;
    }

}

请注意,在此示例中,转换器下游的消息处理程序有一个“建议”,可在处理后移除远程文件。

Notice that, in this example, the message handler downstream of the transformer has an advice that removes the remote file after processing.