Reading Files
可以使用 FileReadingMessageSource
从文件系统使用文件。这是 MessageSource
的一个实现,它根据文件系统目录创建消息。以下示例展示了如何配置 FileReadingMessageSource
:
A FileReadingMessageSource
can be used to consume files from the filesystem.
This is an implementation of MessageSource
that creates messages from a file system directory.
The following example shows how to configure a FileReadingMessageSource
:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:directory="${input.directory}"/>
为防止为某些文件创建消息,可以提供 FileListFilter
。默认情况下,我们使用以下过滤器:
To prevent creating messages for certain files, you can supply a FileListFilter
.
By default, we use the following filters:
-
IgnoreHiddenFileListFilter
-
AcceptOnceFileListFilter
IgnoreHiddenFileListFilter
确保不处理隐藏文件。请注意,隐藏的确切定义取决于系统。例如,在基于 UNIX 的系统上,以句点字符开头的文件被视为隐藏文件。而 Microsoft Windows 则具有专用文件属性来指示隐藏文件。
The IgnoreHiddenFileListFilter
ensures that hidden files are not processed.
Note that the exact definition of hidden is system-dependent.
For example, on UNIX-based systems, a file beginning with a period character is considered to be hidden.
Microsoft Windows, on the other hand, has a dedicated file attribute to indicate hidden files.
版本 4.2 引入了 IgnoreHiddenFileListFilter
。在早期版本中,包括了隐藏文件。在默认配置下,先触发 IgnoreHiddenFileListFilter
,然后触发 AcceptOnceFileListFilter
。
Version 4.2 introduced the IgnoreHiddenFileListFilter
.
In prior versions, hidden files were included.
With the default configuration, the IgnoreHiddenFileListFilter
is triggered first, followed by the AcceptOnceFileListFilter
.
AcceptOnceFileListFilter
确保仅从目录中获取一次文件。
The AcceptOnceFileListFilter
ensures files are picked up only once from the directory.
The 从版本 4.0 开始,此过滤器需要一个 Since version 4.0, this filter requires a 从版本 4.1.5 开始,此过滤器有一个新属性( Since version 4.1.5, this filter has a new property ( |
持久的过滤文件列表现在有一个布尔属性 forRecursion
。将此属性设置为 true
,还将设置 alwaysAcceptDirectories
,这意味着出站网关(ls
和 mget
)上的递归操作现在将始终在每次遍历完整目录树。这是为了解决目录树中深处更改未被检测到的问题。此外,forRecursion=true
会导致使用文件的完整路径作为元数据存储键;这解决了在不同目录中多次出现具有相同名称的文件时过滤器无法正常工作的问题。重要提示:这意味着无法在顶级目录下的文件找到持久元数据存储中的现有键。因此,该属性默认为 false
;这可能会在未来版本中更改。
The persistent file list filters now have a boolean property forRecursion
.
Setting this property to true
, also sets alwaysAcceptDirectories
, which means that the recursive operation on the outbound gateways (ls
and mget
) will now always traverse the full directory tree each time.
This is to solve a problem where changes deep in the directory tree were not detected.
In addition, forRecursion=true
causes the full path to files to be used as the metadata store keys; this solves a problem where the filter did not work properly if a file with the same name appears multiple times in different directories.
IMPORTANT: This means that existing keys in a persistent metadata store will not be found for files beneath the top level directory.
For this reason, the property is false
by default; this may change in a future release.
以下示例配置了带有筛选器的 FileReadingMessageSource
:
The following example configures a FileReadingMessageSource
with a filter:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="customFilterBean"/>
读取文件的一个常见问题是,文件可能在准备好之前被检测到(即,其他一些进程可能还在写入文件)。默认 AcceptOnceFileListFilter
无法防止这种情况。在大多数情况下,如果文件写入进程在可以读取文件后立即重命名每个文件,可以防止这种情况。一个仅接受准备好文件的 filename-pattern
或 filename-regex
过滤器(可能基于已知后缀),与默认 AcceptOnceFileListFilter
组成,允许这种情况。CompositeFileListFilter
允许组合,如下面的示例所示:
A common problem with reading files is that a file may be detected before it is ready (that is, some other process may still be writing the file).
The default AcceptOnceFileListFilter
does not prevent this.
In most cases, this can be prevented if the file-writing process renames each file as soon as it is ready for reading.
A filename-pattern
or filename-regex
filter that accepts only files that are ready (perhaps based on a known suffix), composed with the default AcceptOnceFileListFilter
, allows for this situation.
The CompositeFileListFilter
enables the composition, as the following example shows:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="compositeFilter"/>
<bean id="compositeFilter"
class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
<bean class="o.s.i.file.filters.RegexPatternFileListFilter">
<constructor-arg value="^test.*$"/>
</bean>
</list>
</constructor-arg>
</bean>
如果无法使用临时名称创建文件并重命名为最终名称,Spring Integration 提供了另一种选择。4.2 版本添加了 LastModifiedFileListFilter
。此过滤器可以配置带有 age
属性,以便仅传递比此值旧的文件。age 的默认值是 60 秒,但你应选择一个足够大的 age,以避免过早获取文件(例如,由于网络故障)。以下示例展示了如何配置 LastModifiedFileListFilter
:
If it is not possible to create the file with a temporary name and rename to the final name, Spring Integration provides another alternative.
Version 4.2 added the LastModifiedFileListFilter
.
This filter can be configured with an age
property so that only files older than this value are passed by the filter.
The age defaults to 60 seconds, but you should choose an age that is large enough to avoid picking up a file early (due to, say, network glitches).
The following example shows how to configure a LastModifiedFileListFilter
:
<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
<property name="age" value="120" />
</bean>
从版本 4.3.7 开始,引入了 ChainFileListFilter
(CompositeFileListFilter
的扩展)以允许后续过滤器应仅查看前一个过滤器的结果的情况。(使用 CompositeFileListFilter
,所有过滤器都查看所有文件,但它仅传递所有过滤器都已通过的文件)。需要新行为的一个示例是 LastModifiedFileListFilter
和 AcceptOnceFileListFilter
的组合,当我们不希望在经过一段时间之前接受文件时。由于使用 CompositeFileListFilter
,AcceptOnceFileListFilter
在第一次通过时看到所有文件,因此当另一个过滤器看到文件时,它不会在稍后传递文件。当模式 filter 与自定义 filter 组合时,CompositeFileListFilter
方法非常有用,后者查找辅助文件以表明文件传输已完成。模式 filter 可能仅传递主文件(例如 something.txt
),但“done
”filter 需要查看是否存在(例如)something.done
。
Starting with version 4.3.7, a ChainFileListFilter
(an extension of CompositeFileListFilter
) has been introduced to allow scenarios when subsequent filters should only see the result of the previous filter.
(With the CompositeFileListFilter
, all filters see all the files, but it passes only files that have passed all filters).
An example of where the new behavior is required is a combination of LastModifiedFileListFilter
and AcceptOnceFileListFilter
, when we do not wish to accept the file until some amount of time has elapsed.
With the CompositeFileListFilter
, since the AcceptOnceFileListFilter
sees all the files on the first pass, it does not pass it later when the other filter does.
The CompositeFileListFilter
approach is useful when a pattern filter is combined with a custom filter that looks for a secondary file to indicate that file transfer is complete.
The pattern filter might only pass the primary file (such as something.txt
) but the “done” filter needs to see whether (for example) something.done
is present.
假设我们的文件是 a.txt
、a.done
和 b.txt
。
Say we have files a.txt
, a.done
, and b.txt
.
模式 filter 仅传递 a.txt
和 b.txt
,而“done
”filter 查看所有三个文件并仅传递 a.txt
。复合 filter 的最终结果是仅释放 a.txt
。
The pattern filter passes only a.txt
and b.txt
, while the “done” filter sees all three files and passes only a.txt
.
The final result of the composite filter is that only a.txt
is released.
使用 |
With the |
5.0 版本引入了 ExpressionFileListFilter
,可以针对作为上下文评估根对象的文件执行 SpEL 表达式。为此,所有用于处理文件的 XML 组件(本地和远程)连同现有的 filter
属性一起都已提供了 filter-expression
选项,如下例所示:
Version 5.0 introduced an ExpressionFileListFilter
to execute SpEL expression against a file as a context evaluation root object.
For this purpose, all the XML components for file handling (local and remote), along with an existing filter
attribute, have been supplied with the filter-expression
option, as the following example shows:
<int-file:inbound-channel-adapter
directory="${inputdir}"
filter-expression="name matches '.text'"
auto-startup="false"/>
5.0.5 版本引入了对拒绝的文件感兴趣的 DiscardAwareFileListFilter
实施。为此,应通过 addDiscardCallback(Consumer<File>)
向此类过滤器实施提供回调。在框架中,此功能与 FileReadingMessageSource.WatchServiceDirectoryScanner
配合使用,并与 LastModifiedFileListFilter
配合使用。与常规的 DirectoryScanner
不同,WatchService
根据目标文件系统上的事件提供用于处理的文件。在使用这些文件对内部队列进行轮询时,LastModifiedFileListFilter
可能忽略它们,因为它们相对于其配置的 age
太新。因此,我们将丢失用于未来可能考虑的文件。丢弃回调挂钩让我们可以在内部队列中保留文件,以便供其在后续轮询中与 age
进行检查。CompositeFileListFilter
还实现了 DiscardAwareFileListFilter
,并将丢弃回调填充到其所有 DiscardAwareFileListFilter
委托方。
Version 5.0.5 introduced the DiscardAwareFileListFilter
implementations that have an interest in rejected files.
For this purpose, such a filter implementation should be supplied with a callback through addDiscardCallback(Consumer<File>)
.
In the framework, this functionality is used from the FileReadingMessageSource.WatchServiceDirectoryScanner
, in combination with LastModifiedFileListFilter
.
Unlike the regular DirectoryScanner
, the WatchService
provides files for processing according to the events on the target file system.
At the moment of polling an internal queue with those files, the LastModifiedFileListFilter
may discard them because they are too young relative to its configured age
.
Therefore, we lose the file for future possible considerations.
The discard callback hook lets us retain the file in the internal queue so that it is available to be checked against the age
in subsequent polls.
The CompositeFileListFilter
also implements a DiscardAwareFileListFilter
and populates a discard callback to all its DiscardAwareFileListFilter
delegates.
由于 |
Since |
从版本 5.1 开始,FileReadingMessageSource
不会检查目录是否存在,也不会在调用其 start()
之前创建目录(通常通过包装 SourcePollingChannelAdapter
)。以前,没有简单的方法来防止在引用目录时(例如,从测试中或在稍后应用权限时)出现操作系统权限错误。
Starting with version 5.1, the FileReadingMessageSource
doesn’t check a directory for existence and doesn’t create it until its start()
is called (typically via wrapping SourcePollingChannelAdapter
).
Previously, there was no simple way to prevent an operation system permissions error when referencing the directory, for example from tests, or when permissions are applied later.
Message Headers
从版本 5.0 开始,FileReadingMessageSource
(除了作为轮询 File
的 payload
)向出站 Message
填充以下标头:
Starting with version 5.0, the FileReadingMessageSource
(in addition to the payload
as a polled File
) populates the following headers to the outbound Message
:
-
FileHeaders.FILENAME
: TheFile.getName()
of the file to send. Can be used for subsequent rename or copy logic. -
FileHeaders.ORIGINAL_FILE
: TheFile
object itself. Typically, this header is populated automatically by framework components (such as splitters or transformers) when we lose the originalFile
object. However, for consistency and convenience with any other custom use cases, this header can be useful to get access to the original file. -
FileHeaders.RELATIVE_PATH
: A new header introduced to represent the part of file path relative to the root directory for the scan. This header can be useful when the requirement is to restore a source directory hierarchy in the other places. For this purpose, theDefaultFileNameGenerator
(see "`Generating File Names) can be configured to use this header.
Directory Scanning and Polling
FileReadingMessageSource
不会立即生成目录中的文件的消息。它使用内部队列来存储 scanner
返回的“合格文件”。scanEachPoll
选项用于确保在每次轮询中使用目录的最新输入内容刷新内部队列。默认情况下(scanEachPoll = false
),FileReadingMessageSource
在再次扫描目录之前会清空其队列。此默认行为对于减少扫描目录中的大量文件很有用。但是,在需要自定义排序的情况下,务必考虑将此标志设置为 true
的影响。处理文件时的顺序可能不是预期中的顺序。默认情况下,队列中的文件以其自然(path
)顺序处理。通过扫描添加的新文件,即使队列中已有文件,也会插入到适当的位置,以保持其自然顺序。要自定义顺序,FileReadingMessageSource
可以接受 Comparator<File>
作为构造函数参数。内部(PriorityBlockingQueue
)使用它根据业务要求重新排序其内容。因此,要按特定顺序处理文件,你应向 FileReadingMessageSource
提供比较器,而不是对自定义 DirectoryScanner
生成的列表进行排序。
The FileReadingMessageSource
does not produce messages for files from the directory immediately.
It uses an internal queue for 'eligible files' returned by the scanner
.
The scanEachPoll
option is used to ensure that the internal queue is refreshed with the latest input directory content on each poll.
By default (scanEachPoll = false
), the FileReadingMessageSource
empties its queue before scanning the directory again.
This default behavior is particularly useful to reduce scans of large numbers of files in a directory.
However, in cases where custom ordering is required, it is important to consider the effects of setting this flag to true
.
The order in which files are processed may not be as expected.
By default, files in the queue are processed in their natural (path
) order.
New files added by a scan, even when the queue already has files, are inserted in the appropriate position to maintain that natural order.
To customize the order, the FileReadingMessageSource
can accept a Comparator<File>
as a constructor argument.
It is used by the internal (PriorityBlockingQueue
) to reorder its content according to the business requirements.
Therefore, to process files in a specific order, you should provide a comparator to the FileReadingMessageSource
rather than ordering the list produced by a custom DirectoryScanner
.
5.0 版引入了 RecursiveDirectoryScanner
以执行文件树访问。该实现基于 Files.walk(Path start, int maxDepth, FileVisitOption… options)
功能。根目录(DirectoryScanner.listFiles(File)
)参数从结果中排除。所有其他子目录的包含和排除都基于目标 FileListFilter
实现。例如,SimplePatternFileListFilter
默认情况下会过滤出目录。请参阅 AbstractDirectoryAwareFileListFilter
及其实现以获取更多信息。
Version 5.0 introduced RecursiveDirectoryScanner
to perform file tree visiting.
The implementation is based on the Files.walk(Path start, int maxDepth, FileVisitOption… options)
functionality.
The root directory (DirectoryScanner.listFiles(File)
) argument is excluded from the result.
All other sub-directories inclusions and exclusions are based on the target FileListFilter
implementation.
For example, the SimplePatternFileListFilter
filters out directories by default.
See AbstractDirectoryAwareFileListFilter
and its implementations for more information.
从版本 5.5 开始,Java DSL 的 |
Starting with version 5.5, the |
Namespace Support
可以通过使用特定于文件的命名空间来简化文件读取的配置。为此,请使用以下模板:
The configuration for file reading can be simplified by using the file-specific namespace. To do so, use the following template:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>
在此命名空间中,可以缩减 FileReadingMessageSource
,并将其包装到入站通道适配器中,如下所示:
Within this namespace, you can reduce the FileReadingMessageSource
and wrap it in an inbound Channel Adapter, as follows:
<int-file:inbound-channel-adapter id="filesIn1"
directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>
<int-file:inbound-channel-adapter id="filesIn2"
directory="file:${input.directory}"
filter="customFilterBean" />
<int-file:inbound-channel-adapter id="filesIn3"
directory="file:${input.directory}"
filename-pattern="test*" />
<int-file:inbound-channel-adapter id="filesIn4"
directory="file:${input.directory}"
filename-regex="test[0-9]+\.txt" />
第一个通道适配器示例依赖于默认的 FileListFilter
实施:
The first channel adapter example relies on the default FileListFilter
implementations:
-
IgnoreHiddenFileListFilter
(do not process hidden files) -
AcceptOnceFileListFilter
(prevent duplication)
因此,还可以省略 prevent-duplicates
和 ignore-hidden
属性,因为它们默认值为 true
。
Therefore, you can also leave off the prevent-duplicates
and ignore-hidden
attributes, as they are true
by default.
Spring Integration 4.2 引入了 ignore-hidden
属性。在以前的版本中,包括隐藏文件。
Spring Integration 4.2 introduced the ignore-hidden
attribute.
In prior versions, hidden files were included.
第二个通道适配器示例使用自定义 filter,第三个使用 filename-pattern
属性添加基于 AntPathMatcher
的 filter,第四个使用 filename-regex
属性向 FileReadingMessageSource
添加基于正则表达式的 pattern filter。filename-pattern
和 filename-regex
属性都与常规的 filter
引用属性互斥。但是,可以使用 filter
属性引用 CompositeFileListFilter
的实例,它结合了任意数量的 filter,包括一个或多个基于 pattern 的 filter,以满足你的特定需求。
The second channel adapter example uses a custom filter, the third uses the filename-pattern
attribute to add an AntPathMatcher
based filter, and the fourth uses the filename-regex
attribute to add a regular expression pattern-based filter to the FileReadingMessageSource
.
The filename-pattern
and filename-regex
attributes are each mutually exclusive with the regular filter
reference attribute.
However, you can use the filter
attribute to reference an instance of CompositeFileListFilter
that combines any number of filters, including one or more pattern-based filters to fit your particular needs.
当多个进程从同一目录读取时,可能需要锁定文件以防止同时获取它们。为此,可以使用 FileLocker
。有一个基于 java.nio
的可用实施,但也可以实施自己的锁定方案。可以如下所示注入 nio
锁柜:
When multiple processes read from the same directory, you may want to lock files to prevent them from being picked up concurrently.
To do so, you can use a FileLocker
.
There is a java.nio
-based implementation available, but it is also possible to implement your own locking scheme.
The nio
locker can be injected as follows:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:nio-locker/>
</int-file:inbound-channel-adapter>
可以如下所示配置自定义锁柜:
You can configure a custom locker as follows:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
当文件入站适配器与储物柜配置一起使用时,在允许接收文件之前,它会负责获取锁。它不承担解锁文件的责任。如果您已处理文件并让锁处于挂起状态,则您会有一个内存泄漏。如果这是一个问题,您应该在适当的时候自己调用 |
When a file inbound adapter is configured with a locker, it takes responsibility for acquiring a lock before the file is allowed to be received.
It does not assume the responsibility to unlock the file.
If you have processed the file and keep the locks hanging around, you have a memory leak.
If this is a problem, you should call |
如果过滤和锁定文件不够,可能需要完全控制文件列出的方式。要实施此类要求,可以使用 DirectoryScanner
的实施。此 scanner 让你确切确定每次轮询列出哪些文件。此接口也是 Spring Integration 在内部用于将 FileListFilter
实例和 FileLocker
连接到 FileReadingMessageSource
。可以将自定义 DirectoryScanner
注入到 int-file:inbound-channel-adapter/>
中的 scanner
属性,如下例所示:
When filtering and locking files is not enough, you might need to control the way files are listed entirely.
To implement this type of requirement, you can use an implementation of DirectoryScanner
.
This scanner lets you determine exactly what files are listed in each poll.
This is also the interface that Spring Integration uses internally to wire FileListFilter
instances and FileLocker
to the FileReadingMessageSource
.
You can inject a custom DirectoryScanner
into the <int-file:inbound-channel-adapter/>
on the scanner
attribute, as the following example shows:
<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
scanner="customDirectoryScanner"/>
这样做使您可以完全自由地选择订购、列出和锁定策略。
Doing so gives you full freedom to choose the ordering, listing, and locking strategies.
理解用于实际 scanner
筛选器(包括 patterns
、regex
、prevent-duplicates
和其他)和 locker
实例也非常重要。在适配器上设置的任何这些属性随后都会注入内部 scanner
。对于外部 scanner
,所有筛选器和锁定器属性在 FileReadingMessageSource
上均被禁止。如果需要,必须在自定义 DirectoryScanner
上指定它们。换句话说,如果您将 scanner
注入 FileReadingMessageSource
,则应在 scanner
上提供 filter
和 locker
,而不在 FileReadingMessageSource
上提供。
It is also important to understand that filters (including patterns
, regex
, prevent-duplicates
, and others) and locker
instances are actually used by the scanner
.
Any of these attributes set on the adapter are subsequently injected into the internal scanner
.
For the case of an external scanner
, all filter and locker attributes are prohibited on the FileReadingMessageSource
.
They must be specified (if required) on that custom DirectoryScanner
.
In other words, if you inject a scanner
into the FileReadingMessageSource
, you should supply filter
and locker
on that scanner
, not on the FileReadingMessageSource
.
默认情况下, |
By default, the |
WatchServiceDirectoryScanner
当新文件添加到目录时,FileReadingMessageSource.WatchServiceDirectoryScanner
依赖文件系统事件。在初始化期间,将会注册该目录以生成事件。初始文件列表也会在初始化期间构建。在遍历目录树时,遇到的任何子目录也会被注册以生成事件。在第一次轮询中,将返回通过遍历目录得到的初始文件列表。在随后的轮询中,将返回来自新创建事件的文件。如果添加了新的子目录,则其创建事件将用于遍历新子树以查找现有文件和注册找到的任何新子目录。
The FileReadingMessageSource.WatchServiceDirectoryScanner
relies on file-system events when new files are added to the directory.
During initialization, the directory is registered to generate events.
The initial file list is also built during initialization.
While walking the directory tree, any subdirectories encountered are also registered to generate events.
On the first poll, the initial file list from walking the directory is returned.
On subsequent polls, files from new creation events are returned.
If a new subdirectory is added, its creation event is used to walk the new subtree to find existing files and register any new subdirectories found.
当其内部事件 |
There is an issue with |
可以通过 FileReadingMessageSource.use-watch-service
选项启用 WatchServiceDirectoryScanner
,该选项与 scanner
选项互斥。对于提供的 directory
,将填充一个内部 FileReadingMessageSource.WatchServiceDirectoryScanner
实例。
The WatchServiceDirectoryScanner
can be enabled through the FileReadingMessageSource.use-watch-service
option, which is mutually exclusive with the scanner
option.
An internal FileReadingMessageSource.WatchServiceDirectoryScanner
instance is populated for the provided directory
.
此外,现在 WatchService
轮询逻辑可以跟踪 StandardWatchEventKinds.ENTRY_MODIFY
和 StandardWatchEventKinds.ENTRY_DELETE
。
In addition, now the WatchService
polling logic can track the StandardWatchEventKinds.ENTRY_MODIFY
and StandardWatchEventKinds.ENTRY_DELETE
.
如果您需要跟踪现有文件和新文件的修改,则应该在 FileListFilter
中实现 ENTRY_MODIFY
事件逻辑。否则,来自那些事件的文件将以相同的方式处理。
If you need to track the modification of existing files as well as new files, you should implement the ENTRY_MODIFY
events logic in the FileListFilter
.
Otherwise, the files from those events are treated the same way.
ResettableFileListFilter
实现会接收 ENTRY_DELETE
事件。因此,其文件会被提供给 remove()
操作。当启用此事件时,诸如 AcceptOnceFileListFilter
之类的筛选器会移除该文件。结果是,如果出现文件名的文件,则它将通过筛选器并作为消息发送。
The ResettableFileListFilter
implementations pick up the ENTRY_DELETE
events.
Consequently, their files are provided for the remove()
operation.
When this event is enabled, filters such as the AcceptOnceFileListFilter
have the file removed.
As a result, if a file with the same name appears, it passes the filter and is sent as a message.
为此,引入了 watch-events
属性(FileReadingMessageSource.setWatchEvents(WatchEventType… watchEvents)
)。(WatchEventType
是 FileReadingMessageSource
中的一个公共内部枚举。)使用这样的选项,我们可以对新文件使用一个下游流逻辑,对修改后的文件使用其他逻辑。以下示例展示了如何在同一个目录中为创建和修改事件配置不同的逻辑:
For this purpose, the watch-events
property (FileReadingMessageSource.setWatchEvents(WatchEventType… watchEvents)
) has been introduced.
(WatchEventType
is a public inner enumeration in FileReadingMessageSource
.)
With such an option, we can use one downstream flow logic for new files and use some other logic for modified files.
The following example shows how to configure different logic for create and modify events in the same directory:
值得一提的是,ENTRY_DELETE
事件涉及受监视目录的子目录的重命名操作。更具体地说,与先前的目录名相关的 ENTRY_DELETE
事件先于 ENTRY_CREATE
事件,后者通知新(重命名)目录。在某些操作系统(如 Windows)上,ENTRY_DELETE
事件必须被注册才能处理这种情况。否则,在文件资源管理器中重命名受监视的子目录可能会导致未在该子目录中检测到新文件。
It is worth mentioning that the ENTRY_DELETE
event is involved in the rename operation of sub-directory of the watched directory.
More specifically, ENTRY_DELETE
event, which is related to the previous directory name, precedes ENTRY_CREATE
event which notifies about the new (renamed) directory.
On some operating systems (like Windows), the ENTRY_DELETE
event has to be registered to deal with that situation.
Otherwise, renaming watched sub-directory in the File Explorer could result in the new files not being detected in that sub-directory.
<int-file:inbound-channel-adapter id="newFiles"
directory="${input.directory}"
use-watch-service="true"/>
<int-file:inbound-channel-adapter id="modifiedFiles"
directory="${input.directory}"
use-watch-service="true"
filter="acceptAllFilter"
watch-events="MODIFY"/> <!-- The default is CREATE. -->
从 6.1 版开始,FileReadingMessageSource
公开了两个新的与 WatchService
相关的选项:
Starting with version 6.1, the FileReadingMessageSource
exposes two new WatchService
-related options:
-
watchMaxDepth
- an argument for theFiles.walkFileTree(Path root, Set attributes, int maxDepth, FileVisitor visitor)
API; -
watchDirPredicate
- aPredicate<Path>
to test if a directory in the scanned tree should be walked and registered with theWatchService
and the configured watch event kinds.
Limiting Memory Consumption
您可以使用 HeadDirectoryScanner
来限制保留在内存中的文件数量。当扫描大型目录时,这会很有用。使用 XML 配置时,通过设置入站通道适配器上的 queue-size
属性即可启用此功能。
You can use a HeadDirectoryScanner
to limit the number of files retained in memory.
This can be useful when scanning large directories.
With XML configuration, this is enabled by setting the queue-size
property on the inbound channel adapter.
在 4.2 版之前,此设置与任何其他筛选器同时使用时不兼容。任何其他筛选器(包括 prevent-duplicates="true"
)会覆盖用于限制大小的筛选器。
Prior to version 4.2, this setting was incompatible with the use of any other filters.
Any other filters (including prevent-duplicates="true"
) overwrote the filter used to limit the size.
The use of a 通常,与在这种情况下使用 Generally, instead of using an |
Configuring with Java Configuration
以下 Spring Boot 应用程序显示了如何使用 Java 配置配置出站适配器的示例:
The following Spring Boot application shows an example of how to configure the outbound adapter with Java configuration:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel fileInputChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(INBOUND_PATH));
source.setFilter(new SimplePatternFileListFilter("*.txt"));
return source;
}
@Bean
@Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}
}
Configuring with the Java DSL
以下 Spring Boot 应用程序展示了一个如何使用 Java DSL 配置出站适配器的示例:
The following Spring Boot application shows an example of how to configure the outbound adapter with the Java DSL:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileReadingFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(new File(INBOUND_PATH))
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(Files.toStringTransformer())
.channel("processFileChannel")
.get();
}
}
'tail’ing Files
另一个常见的用例是从文件的末尾(或尾部)获取“行”,在添加新行时捕获新行。提供了两种实现。第一个,OSDelegatingFileTailingMessageProducer
,使用本机 tail
命令(在拥有该命令的操作系统上)。这通常是在这些平台上最有效的实现。对于没有 tail
命令的操作系统,第二个实现 ApacheCommonsFileTailingMessageProducer
,使用 Apache commons-io
Tailer
类。
Another popular use case is to get 'lines' from the end (or tail) of a file, capturing new lines when they are added.
Two implementations are provided.
The first, OSDelegatingFileTailingMessageProducer
, uses the native tail
command (on operating systems that have one).
This is generally the most efficient implementation on those platforms.
For operating systems that do not have a tail
command, the second implementation, ApacheCommonsFileTailingMessageProducer
, uses the Apache commons-io
Tailer
class.
在这两种情况下,文件系统事件(例如文件不可用和其他事件)都会使用普通的 Spring 事件发布机制发布为 ApplicationEvent
实例。此类事件的示例包括以下内容:
In both cases, file system events, such as files being unavailable and other events, are published as ApplicationEvent
instances by using the normal Spring event publishing mechanism.
Examples of such events include the following:
[message=tail: cannot open '/tmp/somefile' for reading:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become inaccessible:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has appeared;
following end of new file, file=/tmp/somefile]
前面示例中显示的事件序列可能会发生,例如当文件被轮换时。
The sequence of events shown in the preceding example might occur, for example, when a file is rotated.
从 5.0 版本开始,当 idleEventInterval
期间文件中没有数据时,将发出 FileTailingIdleEvent
。以下示例展示了这样的事件看起来是什么样子:
Starting with version 5.0, a FileTailingIdleEvent
is emitted when there is no data in the file during idleEventInterval
.
The following example shows what such an event looks like:
[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
并非所有支持 |
Not all platforms that support a |
从这些端点发出的消息具有以下标头:
Messages emitted from these endpoints have the following headers:
-
FileHeaders.ORIGINAL_FILE
: TheFile
object -
FileHeaders.FILENAME
: The file name (File.getName()
)
在 5.0 之前的版本中, |
In versions prior to version 5.0, the |
以下示例使用默认选项创建了一个本地适配器(“-F -n 0”,表示从当前末尾关注文件名)。
The following example creates a native adapter with the default options ('-F -n 0', meaning to follow the file name from the current end).
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
task-executor="exec"
file="/tmp/foo"/>
以下示例使用“-F -n +0”选项创建一个本地适配器(也就是说,遵循文件名并发出所有现有行)。
The following example creates a native adapter with '-F -n +0' options (meaning follow the file name, emitting all existing lines).
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
native-options="-F -n +0"
task-executor="exec"
file-delay=10000
file="/tmp/foo"/>
如果`tail`命令失败(在某些平台上,即使指定了`-F`,文件丢失也会导致`tail`失败),则每 10 秒重试一次该命令。
If the tail
command fails (on some platforms, a missing file causes the tail
to fail, even with -F
specified), the command is retried every 10 seconds.
默认情况下,本地适配器从标准输出中捕获并以消息形式发送内容。它们还从标准错误输出中捕获内容以引发事件。从 4.3.6 版本开始,您可以通过将`enable-status-reader`设置为`false`来丢弃标准错误事件,如下例所示:
By default, native adapters capture from standard output and send the content as messages.
They also capture from standard error to raise events.
Starting with version 4.3.6, you can discard the standard error events by setting the enable-status-reader
to false
, as the following example shows:
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
enable-status-reader="false"
task-executor="exec"
file="/tmp/foo"/>
在以下示例中,IdleEventInterval`设置为`5000
,这意味着,如果没有写入任何行超过五秒,则每五秒触发一次`FileTailingIdleEvent`:
In the following example, IdleEventInterval
is set to 5000
, meaning that, if no lines are written for five seconds, FileTailingIdleEvent
is triggered every five seconds:
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
idle-event-interval="5000"
task-executor="exec"
file="/tmp/somefile"/>
在需要停止适配器时,这可能很有用。
This can be useful when you need to stop the adapter.
以下示例创建一个 Apache commons-io
`Tailer`适配器,该适配器每两秒检查一次文件中是否有新行,并每十秒检查一次某个缺失文件是否存在:
The following example creates an Apache commons-io
Tailer
adapter that examines the file for new lines every two seconds and checks for existence of a missing file every ten seconds:
<int-file:tail-inbound-channel-adapter id="apache"
channel="input"
task-executor="exec"
file="/tmp/bar"
delay="2000"
end="false" 1
reopen="true" 2
file-delay="10000"/>
1 | The file is tailed from the beginning (end="false" ) instead of the end (which is the default). |
2 | The file is reopened for each chunk (the default is to keep the file open). |
指定 delay
、end
或 reopen
属性会强制使用 Apache commons-io
适配器,并让 native-options
属性不可用。
Specifying the delay
, end
or reopen
attributes forces the use of the Apache commons-io
adapter and makes the native-options
attribute unavailable.
Dealing With Incomplete Data
在文件传输场景中,一个常见问题是如何确定传输是否已完成,以便您不开始读取不完整的文件。解决此问题的通用技术是用临时文件名写入文件,然后将其以原子方式重命名为最终名称。此技术与一个过滤器(该过滤器屏蔽临时文件不被使用者获取)一起提供了一个可靠的解决方案。此技术由写入文件(本地或远程)的 Spring Integration 组件使用。默认情况下,它们将`.writing`附加到文件名,并在传输完成时将其删除。
A common problem in file-transfer scenarios is how to determine that the transfer is complete so that you do not start reading an incomplete file.
A common technique to solve this problem is to write the file with a temporary name and then atomically rename it to the final name.
This technique, together with a filter that masks the temporary file from being picked up by the consumer, provides a robust solution.
This technique is used by Spring Integration components that write files (locally or remotely).
By default, they append .writing
to the file name and remove it when the transfer is complete.
另一种常见技巧是编写一个第二个 “marker” 文件来指示文件传输完成。在此场景中,你不应认为 somefile.txt
(例如)在 somefile.txt.complete
也存在之前可供使用。Spring Integration 5.0 版本引入了新的过滤器来支持此机制。它为文件系统 (FileSystemMarkerFilePresentFileListFilter
)、FTP 和 SFTP 提供了实现。它们是可配置的,以便标记文件可以具有任何名称,尽管它通常与正在传输的文件相关。请参阅 Javadoc 以获取更多信息。
Another common technique is to write a second “marker” file to indicate that the file transfer is complete.
In this scenario, you should not consider somefile.txt
(for example) to be available for use until somefile.txt.complete
is also present.
Spring Integration version 5.0 introduced new filters to support this mechanism.
Implementations are provided for the file system (FileSystemMarkerFilePresentFileListFilter
), FTP and SFTP.
They are configurable such that the marker file can have any name, although it is usually related to the file being transferred.
See the Javadoc for more information.