Reading Files

可以使用 FileReadingMessageSource 从文件系统使用文件。这是 MessageSource 的一个实现,它根据文件系统目录创建消息。以下示例展示了如何配置 FileReadingMessageSource

A FileReadingMessageSource can be used to consume files from the filesystem. This is an implementation of MessageSource that creates messages from a file system directory. The following example shows how to configure a FileReadingMessageSource:

<bean id="pollableFileSource"
    class="org.springframework.integration.file.FileReadingMessageSource"
    p:directory="${input.directory}"/>

为防止为某些文件创建消息,可以提供 FileListFilter。默认情况下,我们使用以下过滤器:

To prevent creating messages for certain files, you can supply a FileListFilter. By default, we use the following filters:

  • IgnoreHiddenFileListFilter

  • AcceptOnceFileListFilter

IgnoreHiddenFileListFilter 确保不处理隐藏文件。请注意,隐藏的确切定义取决于系统。例如,在基于 UNIX 的系统上,以句点字符开头的文件被视为隐藏文件。而 Microsoft Windows 则具有专用文件属性来指示隐藏文件。

The IgnoreHiddenFileListFilter ensures that hidden files are not processed. Note that the exact definition of hidden is system-dependent. For example, on UNIX-based systems, a file beginning with a period character is considered to be hidden. Microsoft Windows, on the other hand, has a dedicated file attribute to indicate hidden files.

版本 4.2 引入了 IgnoreHiddenFileListFilter。在早期版本中,包括了隐藏文件。在默认配置下,先触发 IgnoreHiddenFileListFilter,然后触发 AcceptOnceFileListFilter

Version 4.2 introduced the IgnoreHiddenFileListFilter. In prior versions, hidden files were included. With the default configuration, the IgnoreHiddenFileListFilter is triggered first, followed by the AcceptOnceFileListFilter.

AcceptOnceFileListFilter 确保仅从目录中获取一次文件。

The AcceptOnceFileListFilter ensures files are picked up only once from the directory.

AcceptOnceFileListFilter 会在内存中存储其状态。如果您希望状态在系统重启后依旧保持,则可以使用 FileSystemPersistentAcceptOnceFileListFilter。此过滤器会将已接受的文件名称存储在 MetadataStore 实现中(请参阅 Metadata Store)。此过滤器会根据文件名和修改时间进行匹配。

The AcceptOnceFileListFilter stores its state in memory. If you wish the state to survive a system restart, you can use the FileSystemPersistentAcceptOnceFileListFilter. This filter stores the accepted file names in a MetadataStore implementation (see Metadata Store). This filter matches on the filename and modified time.

从版本 4.0 开始,此过滤器需要一个 ConcurrentMetadataStore。在与共享数据存储(如使用 RedisMetadataStoreRedis)配合使用时,让筛选密钥在多个应用程序实例或多个服务器正在使用的网络文件共享中共享。

Since version 4.0, this filter requires a ConcurrentMetadataStore. When used with a shared data store (such as Redis with the RedisMetadataStore), it lets filter keys be shared across multiple application instances or across a network file share being used by multiple servers.

从版本 4.1.5 开始,此过滤器有一个新属性(flushOnUpdate),它会导致在每次更新时刷新元数据存储(如果存储实现了 Flushable)。

Since version 4.1.5, this filter has a new property (flushOnUpdate), which causes it to flush the metadata store on every update (if the store implements Flushable).

持久的过滤文件列表现在有一个布尔属性 forRecursion。将此属性设置为 true,还将设置 alwaysAcceptDirectories,这意味着出站网关(lsmget)上的递归操作现在将始终在每次遍历完整目录树。这是为了解决目录树中深处更改未被检测到的问题。此外,forRecursion=true 会导致使用文件的完整路径作为元数据存储键;这解决了在不同目录中多次出现具有相同名称的文件时过滤器无法正常工作的问题。重要提示:这意味着无法在顶级目录下的文件找到持久元数据存储中的现有键。因此,该属性默认为 false;这可能会在未来版本中更改。

The persistent file list filters now have a boolean property forRecursion. Setting this property to true, also sets alwaysAcceptDirectories, which means that the recursive operation on the outbound gateways (ls and mget) will now always traverse the full directory tree each time. This is to solve a problem where changes deep in the directory tree were not detected. In addition, forRecursion=true causes the full path to files to be used as the metadata store keys; this solves a problem where the filter did not work properly if a file with the same name appears multiple times in different directories. IMPORTANT: This means that existing keys in a persistent metadata store will not be found for files beneath the top level directory. For this reason, the property is false by default; this may change in a future release.

以下示例配置了带有筛选器的 FileReadingMessageSource

The following example configures a FileReadingMessageSource with a filter:

<bean id="pollableFileSource"
    class="org.springframework.integration.file.FileReadingMessageSource"
    p:inputDirectory="${input.directory}"
    p:filter-ref="customFilterBean"/>

读取文件的一个常见问题是,文件可能在准备好之前被检测到(即,其他一些进程可能还在写入文件)。默认 AcceptOnceFileListFilter 无法防止这种情况。在大多数情况下,如果文件写入进程在可以读取文件后立即重命名每个文件,可以防止这种情况。一个仅接受准备好文件的 filename-patternfilename-regex 过滤器(可能基于已知后缀),与默认 AcceptOnceFileListFilter 组成,允许这种情况。CompositeFileListFilter 允许组合,如下面的示例所示:

A common problem with reading files is that a file may be detected before it is ready (that is, some other process may still be writing the file). The default AcceptOnceFileListFilter does not prevent this. In most cases, this can be prevented if the file-writing process renames each file as soon as it is ready for reading. A filename-pattern or filename-regex filter that accepts only files that are ready (perhaps based on a known suffix), composed with the default AcceptOnceFileListFilter, allows for this situation. The CompositeFileListFilter enables the composition, as the following example shows:

<bean id="pollableFileSource"
    class="org.springframework.integration.file.FileReadingMessageSource"
    p:inputDirectory="${input.directory}"
    p:filter-ref="compositeFilter"/>

<bean id="compositeFilter"
    class="org.springframework.integration.file.filters.CompositeFileListFilter">
    <constructor-arg>
        <list>
            <bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
            <bean class="o.s.i.file.filters.RegexPatternFileListFilter">
                <constructor-arg value="^test.*$"/>
            </bean>
        </list>
    </constructor-arg>
</bean>

如果无法使用临时名称创建文件并重命名为最终名称,Spring Integration 提供了另一种选择。4.2 版本添加了 LastModifiedFileListFilter。此过滤器可以配置带有 age 属性,以便仅传递比此值旧的文件。age 的默认值是 60 秒,但你应选择一个足够大的 age,以避免过早获取文件(例如,由于网络故障)。以下示例展示了如何配置 LastModifiedFileListFilter

If it is not possible to create the file with a temporary name and rename to the final name, Spring Integration provides another alternative. Version 4.2 added the LastModifiedFileListFilter. This filter can be configured with an age property so that only files older than this value are passed by the filter. The age defaults to 60 seconds, but you should choose an age that is large enough to avoid picking up a file early (due to, say, network glitches). The following example shows how to configure a LastModifiedFileListFilter:

<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
    <property name="age" value="120" />
</bean>

从版本 4.3.7 开始,引入了 ChainFileListFilterCompositeFileListFilter 的扩展)以允许后续过滤器应仅查看前一个过滤器的结果的情况。(使用 CompositeFileListFilter,所有过滤器都查看所有文件,但它仅传递所有过滤器都已通过的文件)。需要新行为的一个示例是 LastModifiedFileListFilterAcceptOnceFileListFilter 的组合,当我们不希望在经过一段时间之前接受文件时。由于使用 CompositeFileListFilterAcceptOnceFileListFilter 在第一次通过时看到所有文件,因此当另一个过滤器看到文件时,它不会在稍后传递文件。当模式 filter 与自定义 filter 组合时,CompositeFileListFilter 方法非常有用,后者查找辅助文件以表明文件传输已完成。模式 filter 可能仅传递主文件(例如 something.txt),但“done”filter 需要查看是否存在(例如)something.done

Starting with version 4.3.7, a ChainFileListFilter (an extension of CompositeFileListFilter) has been introduced to allow scenarios when subsequent filters should only see the result of the previous filter. (With the CompositeFileListFilter, all filters see all the files, but it passes only files that have passed all filters). An example of where the new behavior is required is a combination of LastModifiedFileListFilter and AcceptOnceFileListFilter, when we do not wish to accept the file until some amount of time has elapsed. With the CompositeFileListFilter, since the AcceptOnceFileListFilter sees all the files on the first pass, it does not pass it later when the other filter does. The CompositeFileListFilter approach is useful when a pattern filter is combined with a custom filter that looks for a secondary file to indicate that file transfer is complete. The pattern filter might only pass the primary file (such as something.txt) but the “done” filter needs to see whether (for example) something.done is present.

假设我们的文件是 a.txta.doneb.txt

Say we have files a.txt, a.done, and b.txt.

模式 filter 仅传递 a.txtb.txt,而“done”filter 查看所有三个文件并仅传递 a.txt。复合 filter 的最终结果是仅释放 a.txt

The pattern filter passes only a.txt and b.txt, while the “done” filter sees all three files and passes only a.txt. The final result of the composite filter is that only a.txt is released.

使用 ChainFileListFilter 时,如果链中的任何过滤器返回空列表,则不会调用其余过滤器。

With the ChainFileListFilter, if any filter in the chain returns an empty list, the remaining filters are not invoked.

5.0 版本引入了 ExpressionFileListFilter,可以针对作为上下文评估根对象的文件执行 SpEL 表达式。为此,所有用于处理文件的 XML 组件(本地和远程)连同现有的 filter 属性一起都已提供了 filter-expression 选项,如下例所示:

Version 5.0 introduced an ExpressionFileListFilter to execute SpEL expression against a file as a context evaluation root object. For this purpose, all the XML components for file handling (local and remote), along with an existing filter attribute, have been supplied with the filter-expression option, as the following example shows:

<int-file:inbound-channel-adapter
        directory="${inputdir}"
        filter-expression="name matches '.text'"
        auto-startup="false"/>

5.0.5 版本引入了对拒绝的文件感兴趣的 DiscardAwareFileListFilter 实施。为此,应通过 addDiscardCallback(Consumer<File>) 向此类过滤器实施提供回调。在框架中,此功能与 FileReadingMessageSource.WatchServiceDirectoryScanner 配合使用,并与 LastModifiedFileListFilter 配合使用。与常规的 DirectoryScanner 不同,WatchService 根据目标文件系统上的事件提供用于处理的文件。在使用这些文件对内部队列进行轮询时,LastModifiedFileListFilter 可能忽略它们,因为它们相对于其配置的 age 太新。因此,我们将丢失用于未来可能考虑的文件。丢弃回调挂钩让我们可以在内部队列中保留文件,以便供其在后续轮询中与 age 进行检查。CompositeFileListFilter 还实现了 DiscardAwareFileListFilter,并将丢弃回调填充到其所有 DiscardAwareFileListFilter 委托方。

Version 5.0.5 introduced the DiscardAwareFileListFilter implementations that have an interest in rejected files. For this purpose, such a filter implementation should be supplied with a callback through addDiscardCallback(Consumer<File>). In the framework, this functionality is used from the FileReadingMessageSource.WatchServiceDirectoryScanner, in combination with LastModifiedFileListFilter. Unlike the regular DirectoryScanner, the WatchService provides files for processing according to the events on the target file system. At the moment of polling an internal queue with those files, the LastModifiedFileListFilter may discard them because they are too young relative to its configured age. Therefore, we lose the file for future possible considerations. The discard callback hook lets us retain the file in the internal queue so that it is available to be checked against the age in subsequent polls. The CompositeFileListFilter also implements a DiscardAwareFileListFilter and populates a discard callback to all its DiscardAwareFileListFilter delegates.

由于 CompositeFileListFilter 将文件与所有委托进行匹配,因此 discardCallback 可能为同一个文件调用多次。

Since CompositeFileListFilter matches the files against all delegates, the discardCallback may be called several times for the same file.

从版本 5.1 开始,FileReadingMessageSource 不会检查目录是否存在,也不会在调用其 start() 之前创建目录(通常通过包装 SourcePollingChannelAdapter)。以前,没有简单的方法来防止在引用目录时(例如,从测试中或在稍后应用权限时)出现操作系统权限错误。

Starting with version 5.1, the FileReadingMessageSource doesn’t check a directory for existence and doesn’t create it until its start() is called (typically via wrapping SourcePollingChannelAdapter). Previously, there was no simple way to prevent an operation system permissions error when referencing the directory, for example from tests, or when permissions are applied later.

Message Headers

从版本 5.0 开始,FileReadingMessageSource(除了作为轮询 Filepayload)向出站 Message 填充以下标头:

Starting with version 5.0, the FileReadingMessageSource (in addition to the payload as a polled File) populates the following headers to the outbound Message:

  • FileHeaders.FILENAME: The File.getName() of the file to send. Can be used for subsequent rename or copy logic.

  • FileHeaders.ORIGINAL_FILE: The File object itself. Typically, this header is populated automatically by framework components (such as splitters or transformers) when we lose the original File object. However, for consistency and convenience with any other custom use cases, this header can be useful to get access to the original file.

  • FileHeaders.RELATIVE_PATH: A new header introduced to represent the part of file path relative to the root directory for the scan. This header can be useful when the requirement is to restore a source directory hierarchy in the other places. For this purpose, the DefaultFileNameGenerator (see "`Generating File Names) can be configured to use this header.

Directory Scanning and Polling

FileReadingMessageSource 不会立即生成目录中的文件的消息。它使用内部队列来存储 scanner 返回的“合格文件”。scanEachPoll 选项用于确保在每次轮询中使用目录的最新输入内容刷新内部队列。默认情况下(scanEachPoll = false),FileReadingMessageSource 在再次扫描目录之前会清空其队列。此默认行为对于减少扫描目录中的大量文件很有用。但是,在需要自定义排序的情况下,务必考虑将此标志设置为 true 的影响。处理文件时的顺序可能不是预期中的顺序。默认情况下,队列中的文件以其自然(path)顺序处理。通过扫描添加的新文件,即使队列中已有文件,也会插入到适当的位置,以保持其自然顺序。要自定义顺序,FileReadingMessageSource 可以接受 Comparator<File> 作为构造函数参数。内部(PriorityBlockingQueue)使用它根据业务要求重新排序其内容。因此,要按特定顺序处理文件,你应向 FileReadingMessageSource 提供比较器,而不是对自定义 DirectoryScanner 生成的列表进行排序。

The FileReadingMessageSource does not produce messages for files from the directory immediately. It uses an internal queue for 'eligible files' returned by the scanner. The scanEachPoll option is used to ensure that the internal queue is refreshed with the latest input directory content on each poll. By default (scanEachPoll = false), the FileReadingMessageSource empties its queue before scanning the directory again. This default behavior is particularly useful to reduce scans of large numbers of files in a directory. However, in cases where custom ordering is required, it is important to consider the effects of setting this flag to true. The order in which files are processed may not be as expected. By default, files in the queue are processed in their natural (path) order. New files added by a scan, even when the queue already has files, are inserted in the appropriate position to maintain that natural order. To customize the order, the FileReadingMessageSource can accept a Comparator<File> as a constructor argument. It is used by the internal (PriorityBlockingQueue) to reorder its content according to the business requirements. Therefore, to process files in a specific order, you should provide a comparator to the FileReadingMessageSource rather than ordering the list produced by a custom DirectoryScanner.

5.0 版引入了 RecursiveDirectoryScanner 以执行文件树访问。该实现基于 Files.walk(Path start, int maxDepth, FileVisitOption…​ options) 功能。根目录(DirectoryScanner.listFiles(File))参数从结果中排除。所有其他子目录的包含和排除都基于目标 FileListFilter 实现。例如,SimplePatternFileListFilter 默认情况下会过滤出目录。请参阅 AbstractDirectoryAwareFileListFilter 及其实现以获取更多信息。

Version 5.0 introduced RecursiveDirectoryScanner to perform file tree visiting. The implementation is based on the Files.walk(Path start, int maxDepth, FileVisitOption…​ options) functionality. The root directory (DirectoryScanner.listFiles(File)) argument is excluded from the result. All other sub-directories inclusions and exclusions are based on the target FileListFilter implementation. For example, the SimplePatternFileListFilter filters out directories by default. See AbstractDirectoryAwareFileListFilter and its implementations for more information.

从版本 5.5 开始,Java DSL 的 FileInboundChannelAdapterSpec 具有一个方便的 recursive(boolean) 选项,可在目标 FileReadingMessageSource 中使用 RecursiveDirectoryScanner 而非默认的 RecursiveDirectoryScanner

Starting with version 5.5, the FileInboundChannelAdapterSpec of the Java DSL has a convenient recursive(boolean) option to use a RecursiveDirectoryScanner in the target FileReadingMessageSource instead of the default one.

Namespace Support

可以通过使用特定于文件的命名空间来简化文件读取的配置。为此,请使用以下模板:

The configuration for file reading can be simplified by using the file-specific namespace. To do so, use the following template:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-file="http://www.springframework.org/schema/integration/file"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/file
    https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>

在此命名空间中,可以缩减 FileReadingMessageSource,并将其包装到入站通道适配器中,如下所示:

Within this namespace, you can reduce the FileReadingMessageSource and wrap it in an inbound Channel Adapter, as follows:

<int-file:inbound-channel-adapter id="filesIn1"
    directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>

<int-file:inbound-channel-adapter id="filesIn2"
    directory="file:${input.directory}"
    filter="customFilterBean" />

<int-file:inbound-channel-adapter id="filesIn3"
    directory="file:${input.directory}"
    filename-pattern="test*" />

<int-file:inbound-channel-adapter id="filesIn4"
    directory="file:${input.directory}"
    filename-regex="test[0-9]+\.txt" />

第一个通道适配器示例依赖于默认的 FileListFilter 实施:

The first channel adapter example relies on the default FileListFilter implementations:

  • IgnoreHiddenFileListFilter (do not process hidden files)

  • AcceptOnceFileListFilter (prevent duplication)

因此,还可以省略 prevent-duplicatesignore-hidden 属性,因为它们默认值为 true

Therefore, you can also leave off the prevent-duplicates and ignore-hidden attributes, as they are true by default.

Spring Integration 4.2 引入了 ignore-hidden 属性。在以前的版本中,包括隐藏文件。

Spring Integration 4.2 introduced the ignore-hidden attribute. In prior versions, hidden files were included.

第二个通道适配器示例使用自定义 filter,第三个使用 filename-pattern 属性添加基于 AntPathMatcher 的 filter,第四个使用 filename-regex 属性向 FileReadingMessageSource 添加基于正则表达式的 pattern filter。filename-patternfilename-regex 属性都与常规的 filter 引用属性互斥。但是,可以使用 filter 属性引用 CompositeFileListFilter 的实例,它结合了任意数量的 filter,包括一个或多个基于 pattern 的 filter,以满足你的特定需求。

The second channel adapter example uses a custom filter, the third uses the filename-pattern attribute to add an AntPathMatcher based filter, and the fourth uses the filename-regex attribute to add a regular expression pattern-based filter to the FileReadingMessageSource. The filename-pattern and filename-regex attributes are each mutually exclusive with the regular filter reference attribute. However, you can use the filter attribute to reference an instance of CompositeFileListFilter that combines any number of filters, including one or more pattern-based filters to fit your particular needs.

当多个进程从同一目录读取时,可能需要锁定文件以防止同时获取它们。为此,可以使用 FileLocker。有一个基于 java.nio 的可用实施,但也可以实施自己的锁定方案。可以如下所示注入 nio 锁柜:

When multiple processes read from the same directory, you may want to lock files to prevent them from being picked up concurrently. To do so, you can use a FileLocker. There is a java.nio-based implementation available, but it is also possible to implement your own locking scheme. The nio locker can be injected as follows:

<int-file:inbound-channel-adapter id="filesIn"
    directory="file:${input.directory}" prevent-duplicates="true">
    <int-file:nio-locker/>
</int-file:inbound-channel-adapter>

可以如下所示配置自定义锁柜:

You can configure a custom locker as follows:

<int-file:inbound-channel-adapter id="filesIn"
    directory="file:${input.directory}" prevent-duplicates="true">
    <int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>

当文件入站适配器与储物柜配置一起使用时,在允许接收文件之前,它会负责获取锁。它不承担解锁文件的责任。如果您已处理文件并让锁处于挂起状态,则您会有一个内存泄漏。如果这是一个问题,您应该在适当的时候自己调用 FileLocker.unlock(File file)

When a file inbound adapter is configured with a locker, it takes responsibility for acquiring a lock before the file is allowed to be received. It does not assume the responsibility to unlock the file. If you have processed the file and keep the locks hanging around, you have a memory leak. If this is a problem, you should call FileLocker.unlock(File file) yourself at the appropriate time.

如果过滤和锁定文件不够,可能需要完全控制文件列出的方式。要实施此类要求,可以使用 DirectoryScanner 的实施。此 scanner 让你确切确定每次轮询列出哪些文件。此接口也是 Spring Integration 在内部用于将 FileListFilter 实例和 FileLocker 连接到 FileReadingMessageSource。可以将自定义 DirectoryScanner 注入到 int-file:inbound-channel-adapter/> 中的 scanner 属性,如下例所示:

When filtering and locking files is not enough, you might need to control the way files are listed entirely. To implement this type of requirement, you can use an implementation of DirectoryScanner. This scanner lets you determine exactly what files are listed in each poll. This is also the interface that Spring Integration uses internally to wire FileListFilter instances and FileLocker to the FileReadingMessageSource. You can inject a custom DirectoryScanner into the <int-file:inbound-channel-adapter/> on the scanner attribute, as the following example shows:

<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
     scanner="customDirectoryScanner"/>

这样做使您可以完全自由地选择订购、列出和锁定策略。

Doing so gives you full freedom to choose the ordering, listing, and locking strategies.

理解用于实际 scanner 筛选器(包括 patternsregexprevent-duplicates 和其他)和 locker 实例也非常重要。在适配器上设置的任何这些属性随后都会注入内部 scanner。对于外部 scanner,所有筛选器和锁定器属性在 FileReadingMessageSource 上均被禁止。如果需要,必须在自定义 DirectoryScanner 上指定它们。换句话说,如果您将 scanner 注入 FileReadingMessageSource,则应在 scanner 上提供 filterlocker,而不在 FileReadingMessageSource 上提供。

It is also important to understand that filters (including patterns, regex, prevent-duplicates, and others) and locker instances are actually used by the scanner. Any of these attributes set on the adapter are subsequently injected into the internal scanner. For the case of an external scanner, all filter and locker attributes are prohibited on the FileReadingMessageSource. They must be specified (if required) on that custom DirectoryScanner. In other words, if you inject a scanner into the FileReadingMessageSource, you should supply filter and locker on that scanner, not on the FileReadingMessageSource.

默认情况下,DefaultDirectoryScanner 使用 IgnoreHiddenFileListFilterAcceptOnceFileListFilter。若要阻止它们的使用,您可以配置自己的过滤器(如 AcceptAllFileListFilter),甚至可以将其设置为 null

By default, the DefaultDirectoryScanner uses an IgnoreHiddenFileListFilter and an AcceptOnceFileListFilter. To prevent their use, you can configure your own filter (such as AcceptAllFileListFilter) or even set it to null.

WatchServiceDirectoryScanner

当新文件添加到目录时,FileReadingMessageSource.WatchServiceDirectoryScanner 依赖文件系统事件。在初始化期间,将会注册该目录以生成事件。初始文件列表也会在初始化期间构建。在遍历目录树时,遇到的任何子目录也会被注册以生成事件。在第一次轮询中,将返回通过遍历目录得到的初始文件列表。在随后的轮询中,将返回来自新创建事件的文件。如果添加了新的子目录,则其创建事件将用于遍历新子树以查找现有文件和注册找到的任何新子目录。

The FileReadingMessageSource.WatchServiceDirectoryScanner relies on file-system events when new files are added to the directory. During initialization, the directory is registered to generate events. The initial file list is also built during initialization. While walking the directory tree, any subdirectories encountered are also registered to generate events. On the first poll, the initial file list from walking the directory is returned. On subsequent polls, files from new creation events are returned. If a new subdirectory is added, its creation event is used to walk the new subtree to find existing files and register any new subdirectories found.

当其内部事件 queue 没有被程序快速耗尽,且目录修改事件发生了时,WatchKey 会出现问题。如果队列大小超标,则会发出 StandardWatchEventKinds.OVERFLOW 以指示一些文件系统事件可能会丢失。在这种情况下,将完全重新扫描根目录。为了避免重复,请考虑使用适当的 FileListFilter(如 AcceptOnceFileListFilter)或在处理完成后删除文件。

There is an issue with WatchKey when its internal events queue is not drained by the program as quickly as the directory modification events occur. If the queue size is exceeded, a StandardWatchEventKinds.OVERFLOW is emitted to indicate that some file system events may be lost. In this case, the root directory is re-scanned completely. To avoid duplicates, consider using an appropriate FileListFilter (such as the AcceptOnceFileListFilter) or removing files when processing is complete.

可以通过 FileReadingMessageSource.use-watch-service 选项启用 WatchServiceDirectoryScanner,该选项与 scanner 选项互斥。对于提供的 directory,将填充一个内部 FileReadingMessageSource.WatchServiceDirectoryScanner 实例。

The WatchServiceDirectoryScanner can be enabled through the FileReadingMessageSource.use-watch-service option, which is mutually exclusive with the scanner option. An internal FileReadingMessageSource.WatchServiceDirectoryScanner instance is populated for the provided directory.

此外,现在 WatchService 轮询逻辑可以跟踪 StandardWatchEventKinds.ENTRY_MODIFYStandardWatchEventKinds.ENTRY_DELETE

In addition, now the WatchService polling logic can track the StandardWatchEventKinds.ENTRY_MODIFY and StandardWatchEventKinds.ENTRY_DELETE.

如果您需要跟踪现有文件和新文件的修改,则应该在 FileListFilter 中实现 ENTRY_MODIFY 事件逻辑。否则,来自那些事件的文件将以相同的方式处理。

If you need to track the modification of existing files as well as new files, you should implement the ENTRY_MODIFY events logic in the FileListFilter. Otherwise, the files from those events are treated the same way.

ResettableFileListFilter 实现会接收 ENTRY_DELETE 事件。因此,其文件会被提供给 remove() 操作。当启用此事件时,诸如 AcceptOnceFileListFilter 之类的筛选器会移除该文件。结果是,如果出现文件名的文件,则它将通过筛选器并作为消息发送。

The ResettableFileListFilter implementations pick up the ENTRY_DELETE events. Consequently, their files are provided for the remove() operation. When this event is enabled, filters such as the AcceptOnceFileListFilter have the file removed. As a result, if a file with the same name appears, it passes the filter and is sent as a message.

为此,引入了 watch-events 属性(FileReadingMessageSource.setWatchEvents(WatchEventType… watchEvents))。(WatchEventTypeFileReadingMessageSource 中的一个公共内部枚举。)使用这样的选项,我们可以对新文件使用一个下游流逻辑,对修改后的文件使用其他逻辑。以下示例展示了如何在同一个目录中为创建和修改事件配置不同的逻辑:

For this purpose, the watch-events property (FileReadingMessageSource.setWatchEvents(WatchEventType…​ watchEvents)) has been introduced. (WatchEventType is a public inner enumeration in FileReadingMessageSource.) With such an option, we can use one downstream flow logic for new files and use some other logic for modified files. The following example shows how to configure different logic for create and modify events in the same directory:

值得一提的是,ENTRY_DELETE 事件涉及受监视目录的子目录的重命名操作。更具体地说,与先前的目录名相关的 ENTRY_DELETE 事件先于 ENTRY_CREATE 事件,后者通知新(重命名)目录。在某些操作系统(如 Windows)上,ENTRY_DELETE 事件必须被注册才能处理这种情况。否则,在文件资源管理器中重命名受监视的子目录可能会导致未在该子目录中检测到新文件。

It is worth mentioning that the ENTRY_DELETE event is involved in the rename operation of sub-directory of the watched directory. More specifically, ENTRY_DELETE event, which is related to the previous directory name, precedes ENTRY_CREATE event which notifies about the new (renamed) directory. On some operating systems (like Windows), the ENTRY_DELETE event has to be registered to deal with that situation. Otherwise, renaming watched sub-directory in the File Explorer could result in the new files not being detected in that sub-directory.

<int-file:inbound-channel-adapter id="newFiles"
     directory="${input.directory}"
     use-watch-service="true"/>

<int-file:inbound-channel-adapter id="modifiedFiles"
     directory="${input.directory}"
     use-watch-service="true"
     filter="acceptAllFilter"
     watch-events="MODIFY"/> <!-- The default is CREATE. -->

从 6.1 版开始,FileReadingMessageSource 公开了两个新的与 WatchService 相关的选项:

Starting with version 6.1, the FileReadingMessageSource exposes two new WatchService-related options:

  • watchMaxDepth - an argument for the Files.walkFileTree(Path root, Set attributes, int maxDepth, FileVisitor visitor) API;

  • watchDirPredicate - a Predicate<Path> to test if a directory in the scanned tree should be walked and registered with the WatchService and the configured watch event kinds.

Limiting Memory Consumption

您可以使用 HeadDirectoryScanner 来限制保留在内存中的文件数量。当扫描大型目录时,这会很有用。使用 XML 配置时,通过设置入站通道适配器上的 queue-size 属性即可启用此功能。

You can use a HeadDirectoryScanner to limit the number of files retained in memory. This can be useful when scanning large directories. With XML configuration, this is enabled by setting the queue-size property on the inbound channel adapter.

在 4.2 版之前,此设置与任何其他筛选器同时使用时不兼容。任何其他筛选器(包括 prevent-duplicates="true")会覆盖用于限制大小的筛选器。

Prior to version 4.2, this setting was incompatible with the use of any other filters. Any other filters (including prevent-duplicates="true") overwrote the filter used to limit the size.

HeadDirectoryScanner 的使用与 AcceptOnceFileListFilter 不兼容。由于在轮询决策期间会咨询所有筛选器,所以 AcceptOnceFileListFilter 不知道其他筛选器可能会临时筛选文件。即使先前由 HeadDirectoryScanner.HeadFilter 筛选的文件现在可用,AcceptOnceFileListFilter 也会筛选它们。

The use of a HeadDirectoryScanner is incompatible with an AcceptOnceFileListFilter. Since all filters are consulted during the poll decision, the AcceptOnceFileListFilter does not know that other filters might be temporarily filtering files. Even if files that were previously filtered by the HeadDirectoryScanner.HeadFilter are now available, the AcceptOnceFileListFilter filters them.

通常,与在这种情况下使用 AcceptOnceFileListFilter 相反,您应移除处理过的文件,以便先前筛选的文件可以在将来的轮询中使用。

Generally, instead of using an AcceptOnceFileListFilter in this case, you should remove the processed files so that the previously filtered files are available on a future poll.

Configuring with Java Configuration

以下 Spring Boot 应用程序显示了如何使用 Java 配置配置出站适配器的示例:

The following Spring Boot application shows an example of how to configure the outbound adapter with Java configuration:

@SpringBootApplication
public class FileReadingJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileReadingJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public MessageChannel fileInputChannel() {
        return new DirectChannel();
    }

    @Bean
    @InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
    public MessageSource<File> fileReadingMessageSource() {
         FileReadingMessageSource source = new FileReadingMessageSource();
         source.setDirectory(new File(INBOUND_PATH));
         source.setFilter(new SimplePatternFileListFilter("*.txt"));
         return source;
    }

    @Bean
    @Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
    public FileToStringTransformer fileToStringTransformer() {
        return new FileToStringTransformer();
    }

}

Configuring with the Java DSL

以下 Spring Boot 应用程序展示了一个如何使用 Java DSL 配置出站适配器的示例:

The following Spring Boot application shows an example of how to configure the outbound adapter with the Java DSL:

@SpringBootApplication
public class FileReadingJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileReadingJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow fileReadingFlow() {
         return IntegrationFlow
                  .from(Files.inboundAdapter(new File(INBOUND_PATH))
                              .patternFilter("*.txt"),
                          e -> e.poller(Pollers.fixedDelay(1000)))
                  .transform(Files.toStringTransformer())
                  .channel("processFileChannel")
                  .get();
    }

}

'tail’ing Files

另一个常见的用例是从文件的末尾(或尾部)获取“行”,在添加新行时捕获新行。提供了两种实现。第一个,OSDelegatingFileTailingMessageProducer,使用本机 tail 命令(在拥有该命令的操作系统上)。这通常是在这些平台上最有效的实现。对于没有 tail 命令的操作系统,第二个实现 ApacheCommonsFileTailingMessageProducer,使用 Apache commons-io Tailer 类。

Another popular use case is to get 'lines' from the end (or tail) of a file, capturing new lines when they are added. Two implementations are provided. The first, OSDelegatingFileTailingMessageProducer, uses the native tail command (on operating systems that have one). This is generally the most efficient implementation on those platforms. For operating systems that do not have a tail command, the second implementation, ApacheCommonsFileTailingMessageProducer, uses the Apache commons-io Tailer class.

在这两种情况下,文件系统事件(例如文件不可用和其他事件)都会使用普通的 Spring 事件发布机制发布为 ApplicationEvent 实例。此类事件的示例包括以下内容:

In both cases, file system events, such as files being unavailable and other events, are published as ApplicationEvent instances by using the normal Spring event publishing mechanism. Examples of such events include the following:

[message=tail: cannot open '/tmp/somefile' for reading:
               No such file or directory, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has become inaccessible:
               No such file or directory, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has appeared;
               following end of new file, file=/tmp/somefile]

前面示例中显示的事件序列可能会发生,例如当文件被轮换时。

The sequence of events shown in the preceding example might occur, for example, when a file is rotated.

从 5.0 版本开始,当 idleEventInterval 期间文件中没有数据时,将发出 FileTailingIdleEvent。以下示例展示了这样的事件看起来是什么样子:

Starting with version 5.0, a FileTailingIdleEvent is emitted when there is no data in the file during idleEventInterval. The following example shows what such an event looks like:

[message=Idle timeout, file=/tmp/somefile] [idle time=5438]

并非所有支持 tail 命令的平台都提供这些状态消息。

Not all platforms that support a tail command provide these status messages.

从这些端点发出的消息具有以下标头:

Messages emitted from these endpoints have the following headers:

  • FileHeaders.ORIGINAL_FILE: The File object

  • FileHeaders.FILENAME: The file name (File.getName())

在 5.0 之前的版本中,FileHeaders.FILENAME 头包含一个文件的绝对路径的字符串表示形式,您现在可以通过对原始文件头调用 getAbsolutePath() 来获取该字符串表示形式。

In versions prior to version 5.0, the FileHeaders.FILENAME header contained a string representation of the file’s absolute path. You can now obtain that string representation by calling getAbsolutePath() on the original file header.

以下示例使用默认选项创建了一个本地适配器(“-F -n 0”,表示从当前末尾关注文件名)。

The following example creates a native adapter with the default options ('-F -n 0', meaning to follow the file name from the current end).

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	task-executor="exec"
	file="/tmp/foo"/>

以下示例使用“-F -n +0”选项创建一个本地适配器(也就是说,遵循文件名并发出所有现有行)。

The following example creates a native adapter with '-F -n +0' options (meaning follow the file name, emitting all existing lines).

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	native-options="-F -n +0"
	task-executor="exec"
	file-delay=10000
	file="/tmp/foo"/>

如果`tail`命令失败(在某些平台上,即使指定了`-F`,文件丢失也会导致`tail`失败),则每 10 秒重试一次该命令。

If the tail command fails (on some platforms, a missing file causes the tail to fail, even with -F specified), the command is retried every 10 seconds.

默认情况下,本地适配器从标准输出中捕获并以消息形式发送内容。它们还从标准错误输出中捕获内容以引发事件。从 4.3.6 版本开始,您可以通过将`enable-status-reader`设置为`false`来丢弃标准错误事件,如下例所示:

By default, native adapters capture from standard output and send the content as messages. They also capture from standard error to raise events. Starting with version 4.3.6, you can discard the standard error events by setting the enable-status-reader to false, as the following example shows:

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	enable-status-reader="false"
	task-executor="exec"
	file="/tmp/foo"/>

在以下示例中,IdleEventInterval`设置为`5000,这意味着,如果没有写入任何行超过五秒,则每五秒触发一次`FileTailingIdleEvent`:

In the following example, IdleEventInterval is set to 5000, meaning that, if no lines are written for five seconds, FileTailingIdleEvent is triggered every five seconds:

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	idle-event-interval="5000"
	task-executor="exec"
	file="/tmp/somefile"/>

在需要停止适配器时,这可能很有用。

This can be useful when you need to stop the adapter.

以下示例创建一个 Apache commons-io `Tailer`适配器,该适配器每两秒检查一次文件中是否有新行,并每十秒检查一次某个缺失文件是否存在:

The following example creates an Apache commons-io Tailer adapter that examines the file for new lines every two seconds and checks for existence of a missing file every ten seconds:

<int-file:tail-inbound-channel-adapter id="apache"
	channel="input"
	task-executor="exec"
	file="/tmp/bar"
	delay="2000"
	end="false"             1
	reopen="true"           2
	file-delay="10000"/>
1 The file is tailed from the beginning (end="false") instead of the end (which is the default).
2 The file is reopened for each chunk (the default is to keep the file open).

指定 delayendreopen 属性会强制使用 Apache commons-io 适配器,并让 native-options 属性不可用。

Specifying the delay, end or reopen attributes forces the use of the Apache commons-io adapter and makes the native-options attribute unavailable.

Dealing With Incomplete Data

在文件传输场景中,一个常见问题是如何确定传输是否已完成,以便您不开始读取不完整的文件。解决此问题的通用技术是用临时文件名写入文件,然后将其以原子方式重命名为最终名称。此技术与一个过滤器(该过滤器屏蔽临时文件不被使用者获取)一起提供了一个可靠的解决方案。此技术由写入文件(本地或远程)的 Spring Integration 组件使用。默认情况下,它们将`.writing`附加到文件名,并在传输完成时将其删除。

A common problem in file-transfer scenarios is how to determine that the transfer is complete so that you do not start reading an incomplete file. A common technique to solve this problem is to write the file with a temporary name and then atomically rename it to the final name. This technique, together with a filter that masks the temporary file from being picked up by the consumer, provides a robust solution. This technique is used by Spring Integration components that write files (locally or remotely). By default, they append .writing to the file name and remove it when the transfer is complete.

另一种常见技巧是编写一个第二个 “marker” 文件来指示文件传输完成。在此场景中,你不应认为 somefile.txt(例如)在 somefile.txt.complete 也存在之前可供使用。Spring Integration 5.0 版本引入了新的过滤器来支持此机制。它为文件系统 (FileSystemMarkerFilePresentFileListFilter)、FTPSFTP 提供了实现。它们是可配置的,以便标记文件可以具有任何名称,尽管它通常与正在传输的文件相关。请参阅 Javadoc 以获取更多信息。

Another common technique is to write a second “marker” file to indicate that the file transfer is complete. In this scenario, you should not consider somefile.txt (for example) to be available for use until somefile.txt.complete is also present. Spring Integration version 5.0 introduced new filters to support this mechanism. Implementations are provided for the file system (FileSystemMarkerFilePresentFileListFilter), FTP and SFTP. They are configurable such that the marker file can have any name, although it is usually related to the file being transferred. See the Javadoc for more information.