Reading Files
可以使用 FileReadingMessageSource
从文件系统使用文件。这是 MessageSource
的一个实现,它根据文件系统目录创建消息。以下示例展示了如何配置 FileReadingMessageSource
:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:directory="${input.directory}"/>
为防止为某些文件创建消息,可以提供 FileListFilter
。默认情况下,我们使用以下过滤器:
-
IgnoreHiddenFileListFilter
-
AcceptOnceFileListFilter
IgnoreHiddenFileListFilter
确保不处理隐藏文件。请注意,隐藏的确切定义取决于系统。例如,在基于 UNIX 的系统上,以句点字符开头的文件被视为隐藏文件。而 Microsoft Windows 则具有专用文件属性来指示隐藏文件。
版本 4.2 引入了 IgnoreHiddenFileListFilter
。在早期版本中,包括了隐藏文件。在默认配置下,先触发 IgnoreHiddenFileListFilter
,然后触发 AcceptOnceFileListFilter
。
AcceptOnceFileListFilter
确保仅从目录中获取一次文件。
|
持久的过滤文件列表现在有一个布尔属性 forRecursion
。将此属性设置为 true
,还将设置 alwaysAcceptDirectories
,这意味着出站网关(ls
和 mget
)上的递归操作现在将始终在每次遍历完整目录树。这是为了解决目录树中深处更改未被检测到的问题。此外,forRecursion=true
会导致使用文件的完整路径作为元数据存储键;这解决了在不同目录中多次出现具有相同名称的文件时过滤器无法正常工作的问题。重要提示:这意味着无法在顶级目录下的文件找到持久元数据存储中的现有键。因此,该属性默认为 false
;这可能会在未来版本中更改。
以下示例配置了带有筛选器的 FileReadingMessageSource
:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="customFilterBean"/>
读取文件的一个常见问题是,文件可能在准备好之前被检测到(即,其他一些进程可能还在写入文件)。默认 AcceptOnceFileListFilter
无法防止这种情况。在大多数情况下,如果文件写入进程在可以读取文件后立即重命名每个文件,可以防止这种情况。一个仅接受准备好文件的 filename-pattern
或 filename-regex
过滤器(可能基于已知后缀),与默认 AcceptOnceFileListFilter
组成,允许这种情况。CompositeFileListFilter
允许组合,如下面的示例所示:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="compositeFilter"/>
<bean id="compositeFilter"
class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
<bean class="o.s.i.file.filters.RegexPatternFileListFilter">
<constructor-arg value="^test.*$"/>
</bean>
</list>
</constructor-arg>
</bean>
如果无法使用临时名称创建文件并重命名为最终名称,Spring Integration 提供了另一种选择。4.2 版本添加了 LastModifiedFileListFilter
。此过滤器可以配置带有 age
属性,以便仅传递比此值旧的文件。age 的默认值是 60 秒,但你应选择一个足够大的 age,以避免过早获取文件(例如,由于网络故障)。以下示例展示了如何配置 LastModifiedFileListFilter
:
<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
<property name="age" value="120" />
</bean>
从版本 4.3.7 开始,引入了 ChainFileListFilter
(CompositeFileListFilter
的扩展)以允许后续过滤器应仅查看前一个过滤器的结果的情况。(使用 CompositeFileListFilter
,所有过滤器都查看所有文件,但它仅传递所有过滤器都已通过的文件)。需要新行为的一个示例是 LastModifiedFileListFilter
和 AcceptOnceFileListFilter
的组合,当我们不希望在经过一段时间之前接受文件时。由于使用 CompositeFileListFilter
,AcceptOnceFileListFilter
在第一次通过时看到所有文件,因此当另一个过滤器看到文件时,它不会在稍后传递文件。当模式 filter 与自定义 filter 组合时,CompositeFileListFilter
方法非常有用,后者查找辅助文件以表明文件传输已完成。模式 filter 可能仅传递主文件(例如 something.txt
),但“done
”filter 需要查看是否存在(例如)something.done
。
假设我们的文件是 a.txt
、a.done
和 b.txt
。
模式 filter 仅传递 a.txt
和 b.txt
,而“done
”filter 查看所有三个文件并仅传递 a.txt
。复合 filter 的最终结果是仅释放 a.txt
。
使用 |
5.0 版本引入了 ExpressionFileListFilter
,可以针对作为上下文评估根对象的文件执行 SpEL 表达式。为此,所有用于处理文件的 XML 组件(本地和远程)连同现有的 filter
属性一起都已提供了 filter-expression
选项,如下例所示:
<int-file:inbound-channel-adapter
directory="${inputdir}"
filter-expression="name matches '.text'"
auto-startup="false"/>
5.0.5 版本引入了对拒绝的文件感兴趣的 DiscardAwareFileListFilter
实施。为此,应通过 addDiscardCallback(Consumer<File>)
向此类过滤器实施提供回调。在框架中,此功能与 FileReadingMessageSource.WatchServiceDirectoryScanner
配合使用,并与 LastModifiedFileListFilter
配合使用。与常规的 DirectoryScanner
不同,WatchService
根据目标文件系统上的事件提供用于处理的文件。在使用这些文件对内部队列进行轮询时,LastModifiedFileListFilter
可能忽略它们,因为它们相对于其配置的 age
太新。因此,我们将丢失用于未来可能考虑的文件。丢弃回调挂钩让我们可以在内部队列中保留文件,以便供其在后续轮询中与 age
进行检查。CompositeFileListFilter
还实现了 DiscardAwareFileListFilter
,并将丢弃回调填充到其所有 DiscardAwareFileListFilter
委托方。
由于 |
从版本 5.1 开始,FileReadingMessageSource
不会检查目录是否存在,也不会在调用其 start()
之前创建目录(通常通过包装 SourcePollingChannelAdapter
)。以前,没有简单的方法来防止在引用目录时(例如,从测试中或在稍后应用权限时)出现操作系统权限错误。
Message Headers
从版本 5.0 开始,FileReadingMessageSource
(除了作为轮询 File
的 payload
)向出站 Message
填充以下标头:
-
FileHeaders.FILENAME
:要发送的文件的File.getName()
。可以用于后续的重命名或复制逻辑。 -
FileHeaders.ORIGINAL_FILE
:File
对象本身。当我们丢失原始File
对象时,此标头通常由框架组件(如 splitters 或 transformers)自动填充。不过,出于任何其他自定义用例的一致性和便利性,此标头可用于获取原始文件。 -
FileHeaders.RELATIVE_PATH
:引入一个新标头来表示对扫描的根目录的相对文件路径部分。当要求在其他位置还原源目录层次结构时,此标头可能很有用。为此,可以将DefaultFileNameGenerator
(参见 "`Generating File Names) 配置为使用此标头。
Directory Scanning and Polling
FileReadingMessageSource
不会立即生成目录中的文件的消息。它使用内部队列来存储 scanner
返回的“合格文件”。scanEachPoll
选项用于确保在每次轮询中使用目录的最新输入内容刷新内部队列。默认情况下(scanEachPoll = false
),FileReadingMessageSource
在再次扫描目录之前会清空其队列。此默认行为对于减少扫描目录中的大量文件很有用。但是,在需要自定义排序的情况下,务必考虑将此标志设置为 true
的影响。处理文件时的顺序可能不是预期中的顺序。默认情况下,队列中的文件以其自然(path
)顺序处理。通过扫描添加的新文件,即使队列中已有文件,也会插入到适当的位置,以保持其自然顺序。要自定义顺序,FileReadingMessageSource
可以接受 Comparator<File>
作为构造函数参数。内部(PriorityBlockingQueue
)使用它根据业务要求重新排序其内容。因此,要按特定顺序处理文件,你应向 FileReadingMessageSource
提供比较器,而不是对自定义 DirectoryScanner
生成的列表进行排序。
5.0 版引入了 RecursiveDirectoryScanner
以执行文件树访问。该实现基于 Files.walk(Path start, int maxDepth, FileVisitOption… options)
功能。根目录(DirectoryScanner.listFiles(File)
)参数从结果中排除。所有其他子目录的包含和排除都基于目标 FileListFilter
实现。例如,SimplePatternFileListFilter
默认情况下会过滤出目录。请参阅 AbstractDirectoryAwareFileListFilter
及其实现以获取更多信息。
从版本 5.5 开始,Java DSL 的 |
Namespace Support
可以通过使用特定于文件的命名空间来简化文件读取的配置。为此,请使用以下模板:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>
在此命名空间中,可以缩减 FileReadingMessageSource
,并将其包装到入站通道适配器中,如下所示:
<int-file:inbound-channel-adapter id="filesIn1"
directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>
<int-file:inbound-channel-adapter id="filesIn2"
directory="file:${input.directory}"
filter="customFilterBean" />
<int-file:inbound-channel-adapter id="filesIn3"
directory="file:${input.directory}"
filename-pattern="test*" />
<int-file:inbound-channel-adapter id="filesIn4"
directory="file:${input.directory}"
filename-regex="test[0-9]+\.txt" />
第一个通道适配器示例依赖于默认的 FileListFilter
实施:
-
IgnoreHiddenFileListFilter
(不处理隐藏的文件) -
AcceptOnceFileListFilter
(prevent duplication)
因此,还可以省略 prevent-duplicates
和 ignore-hidden
属性,因为它们默认值为 true
。
Spring Integration 4.2 引入了 ignore-hidden
属性。在以前的版本中,包括隐藏文件。
第二个通道适配器示例使用自定义 filter,第三个使用 filename-pattern
属性添加基于 AntPathMatcher
的 filter,第四个使用 filename-regex
属性向 FileReadingMessageSource
添加基于正则表达式的 pattern filter。filename-pattern
和 filename-regex
属性都与常规的 filter
引用属性互斥。但是,可以使用 filter
属性引用 CompositeFileListFilter
的实例,它结合了任意数量的 filter,包括一个或多个基于 pattern 的 filter,以满足你的特定需求。
当多个进程从同一目录读取时,可能需要锁定文件以防止同时获取它们。为此,可以使用 FileLocker
。有一个基于 java.nio
的可用实施,但也可以实施自己的锁定方案。可以如下所示注入 nio
锁柜:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:nio-locker/>
</int-file:inbound-channel-adapter>
可以如下所示配置自定义锁柜:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
当文件入站适配器与储物柜配置一起使用时,在允许接收文件之前,它会负责获取锁。它不承担解锁文件的责任。如果您已处理文件并让锁处于挂起状态,则您会有一个内存泄漏。如果这是一个问题,您应该在适当的时候自己调用 |
如果过滤和锁定文件不够,可能需要完全控制文件列出的方式。要实施此类要求,可以使用 DirectoryScanner
的实施。此 scanner 让你确切确定每次轮询列出哪些文件。此接口也是 Spring Integration 在内部用于将 FileListFilter
实例和 FileLocker
连接到 FileReadingMessageSource
。可以将自定义 DirectoryScanner
注入到 int-file:inbound-channel-adapter/>
中的 scanner
属性,如下例所示:
<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
scanner="customDirectoryScanner"/>
这样做使您可以完全自由地选择订购、列出和锁定策略。
理解用于实际 scanner
筛选器(包括 patterns
、regex
、prevent-duplicates
和其他)和 locker
实例也非常重要。在适配器上设置的任何这些属性随后都会注入内部 scanner
。对于外部 scanner
,所有筛选器和锁定器属性在 FileReadingMessageSource
上均被禁止。如果需要,必须在自定义 DirectoryScanner
上指定它们。换句话说,如果您将 scanner
注入 FileReadingMessageSource
,则应在 scanner
上提供 filter
和 locker
,而不在 FileReadingMessageSource
上提供。
默认情况下, |
WatchServiceDirectoryScanner
当新文件添加到目录时,FileReadingMessageSource.WatchServiceDirectoryScanner
依赖文件系统事件。在初始化期间,将会注册该目录以生成事件。初始文件列表也会在初始化期间构建。在遍历目录树时,遇到的任何子目录也会被注册以生成事件。在第一次轮询中,将返回通过遍历目录得到的初始文件列表。在随后的轮询中,将返回来自新创建事件的文件。如果添加了新的子目录,则其创建事件将用于遍历新子树以查找现有文件和注册找到的任何新子目录。
当其内部事件 |
可以通过 FileReadingMessageSource.use-watch-service
选项启用 WatchServiceDirectoryScanner
,该选项与 scanner
选项互斥。对于提供的 directory
,将填充一个内部 FileReadingMessageSource.WatchServiceDirectoryScanner
实例。
此外,现在 WatchService
轮询逻辑可以跟踪 StandardWatchEventKinds.ENTRY_MODIFY
和 StandardWatchEventKinds.ENTRY_DELETE
。
如果您需要跟踪现有文件和新文件的修改,则应该在 FileListFilter
中实现 ENTRY_MODIFY
事件逻辑。否则,来自那些事件的文件将以相同的方式处理。
ResettableFileListFilter
实现会接收 ENTRY_DELETE
事件。因此,其文件会被提供给 remove()
操作。当启用此事件时,诸如 AcceptOnceFileListFilter
之类的筛选器会移除该文件。结果是,如果出现文件名的文件,则它将通过筛选器并作为消息发送。
为此,引入了 watch-events
属性(FileReadingMessageSource.setWatchEvents(WatchEventType… watchEvents)
)。(WatchEventType
是 FileReadingMessageSource
中的一个公共内部枚举。)使用这样的选项,我们可以对新文件使用一个下游流逻辑,对修改后的文件使用其他逻辑。以下示例展示了如何在同一个目录中为创建和修改事件配置不同的逻辑:
值得一提的是,ENTRY_DELETE
事件涉及受监视目录的子目录的重命名操作。更具体地说,与先前的目录名相关的 ENTRY_DELETE
事件先于 ENTRY_CREATE
事件,后者通知新(重命名)目录。在某些操作系统(如 Windows)上,ENTRY_DELETE
事件必须被注册才能处理这种情况。否则,在文件资源管理器中重命名受监视的子目录可能会导致未在该子目录中检测到新文件。
<int-file:inbound-channel-adapter id="newFiles"
directory="${input.directory}"
use-watch-service="true"/>
<int-file:inbound-channel-adapter id="modifiedFiles"
directory="${input.directory}"
use-watch-service="true"
filter="acceptAllFilter"
watch-events="MODIFY"/> <!-- The default is CREATE. -->
从 6.1 版开始,FileReadingMessageSource
公开了两个新的与 WatchService
相关的选项:
-
watchMaxDepth
-Files.walkFileTree(Path root, Set attributes, int maxDepth, FileVisitor visitor)
API 的参数; -
watchDirPredicate
- 一个Predicate<Path>
,用于测试扫描树中的目录是否应使用WatchService
和已配置的监视事件类型中进行遍历和注册。
Limiting Memory Consumption
您可以使用 HeadDirectoryScanner
来限制保留在内存中的文件数量。当扫描大型目录时,这会很有用。使用 XML 配置时,通过设置入站通道适配器上的 queue-size
属性即可启用此功能。
在 4.2 版之前,此设置与任何其他筛选器同时使用时不兼容。任何其他筛选器(包括 prevent-duplicates="true"
)会覆盖用于限制大小的筛选器。
|
Configuring with Java Configuration
以下 Spring Boot 应用程序显示了如何使用 Java 配置配置出站适配器的示例:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel fileInputChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(INBOUND_PATH));
source.setFilter(new SimplePatternFileListFilter("*.txt"));
return source;
}
@Bean
@Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}
}
Configuring with the Java DSL
以下 Spring Boot 应用程序展示了一个如何使用 Java DSL 配置出站适配器的示例:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileReadingFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(new File(INBOUND_PATH))
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(Files.toStringTransformer())
.channel("processFileChannel")
.get();
}
}
'tail’ing Files
另一个常见的用例是从文件的末尾(或尾部)获取“行”,在添加新行时捕获新行。提供了两种实现。第一个,OSDelegatingFileTailingMessageProducer
,使用本机 tail
命令(在拥有该命令的操作系统上)。这通常是在这些平台上最有效的实现。对于没有 tail
命令的操作系统,第二个实现 ApacheCommonsFileTailingMessageProducer
,使用 Apache commons-io
Tailer
类。
在这两种情况下,文件系统事件(例如文件不可用和其他事件)都会使用普通的 Spring 事件发布机制发布为 ApplicationEvent
实例。此类事件的示例包括以下内容:
[message=tail: cannot open '/tmp/somefile' for reading:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become inaccessible:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has appeared;
following end of new file, file=/tmp/somefile]
前面示例中显示的事件序列可能会发生,例如当文件被轮换时。
从 5.0 版本开始,当 idleEventInterval
期间文件中没有数据时,将发出 FileTailingIdleEvent
。以下示例展示了这样的事件看起来是什么样子:
[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
并非所有支持 |
从这些端点发出的消息具有以下标头:
-
FileHeaders.ORIGINAL_FILE
: TheFile
object -
FileHeaders.FILENAME
:文件名(File.getName()
)
在 5.0 之前的版本中, |
以下示例使用默认选项创建了一个本地适配器(“-F -n 0”,表示从当前末尾关注文件名)。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
task-executor="exec"
file="/tmp/foo"/>
以下示例使用“-F -n +0”选项创建一个本地适配器(也就是说,遵循文件名并发出所有现有行)。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
native-options="-F -n +0"
task-executor="exec"
file-delay=10000
file="/tmp/foo"/>
如果`tail`命令失败(在某些平台上,即使指定了`-F`,文件丢失也会导致`tail`失败),则每 10 秒重试一次该命令。
默认情况下,本地适配器从标准输出中捕获并以消息形式发送内容。它们还从标准错误输出中捕获内容以引发事件。从 4.3.6 版本开始,您可以通过将`enable-status-reader`设置为`false`来丢弃标准错误事件,如下例所示:
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
enable-status-reader="false"
task-executor="exec"
file="/tmp/foo"/>
在以下示例中,IdleEventInterval`设置为`5000
,这意味着,如果没有写入任何行超过五秒,则每五秒触发一次`FileTailingIdleEvent`:
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
idle-event-interval="5000"
task-executor="exec"
file="/tmp/somefile"/>
在需要停止适配器时,这可能很有用。
以下示例创建一个 Apache commons-io
`Tailer`适配器,该适配器每两秒检查一次文件中是否有新行,并每十秒检查一次某个缺失文件是否存在:
<int-file:tail-inbound-channel-adapter id="apache"
channel="input"
task-executor="exec"
file="/tmp/bar"
delay="2000"
end="false" 1
reopen="true" 2
file-delay="10000"/>
1 | 该文件从开头开始进行监控(end="false" ),而不是从结尾(默认)开始进行监控。 |
2 | 每个区块文件被重新打开(默认是保持文件打开)。 |
指定 delay
、end
或 reopen
属性会强制使用 Apache commons-io
适配器,并让 native-options
属性不可用。