File Splitter
在 4.1.2 版本中添加了 FileSplitter
,并在 4.2 版本中添加了其命名空间支持。`FileSplitter`根据`BufferedReader.readLine()`将文本文件拆分为单独的行。默认情况下,分隔符使用`Iterator`将行作为从文件中读取时的一次一行发出。将`iterator`属性设置为`false`会使其在将所有行作为消息发出之前将所有行读入内存。其一个用例可能是,您希望在发送包含任何行的消息之前检测文件上的 I/O 错误。但是,这只适用于相对较短的文件。
The FileSplitter
was added in version 4.1.2, and its namespace support was added in version 4.2.
The FileSplitter
splits text files into individual lines, based on BufferedReader.readLine()
.
By default, the splitter uses an Iterator
to emit lines one at a time as they are read from the file.
Setting the iterator
property to false
causes it to read all the lines into memory before emitting them as messages.
One use case for this might be if you want to detect I/O errors on the file before sending any messages containing lines.
However, it is only practical for relatively short files.
入站有效负载可以是`File`、String
(File`路径)、`InputStream`或`Reader
。其他有效负载类型保持不变。
Inbound payloads can be File
, String
(a File
path), InputStream
, or Reader
.
Other payload types are emitted unchanged.
以下清单显示了配置`FileSplitter`的可能方法:
The following listing shows possible ways to configure a FileSplitter
:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@SpringBootApplication
public class FileSplitterApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileSplitterApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileSplitterFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(tmpDir.getRoot())
.filter(new ChainFileListFilter<File>()
.addFilter(new AcceptOnceFileListFilter<>())
.addFilter(new ExpressionFileListFilter<>(
new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
.split(Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true))
.channel(c -> c.queue("fileSplittingResultChannel"))
.get();
}
}
@Bean
fun fileSplitterFlow() =
integrationFlow(
Files.inboundAdapter(tmpDir.getRoot())
.filter(
ChainFileListFilter<File?>()
.addFilter(AcceptOnceFileListFilter())
.addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
)
) {
split(
Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true)
)
channel { queue("fileSplittingResultChannel") }
}
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
FileSplitter splitter = new FileSplitter(true, true);
splitter.setApplySequence(true);
splitter.setOutputChannel(outputChannel);
return splitter;
}
<int-file:splitter id="splitter" 1
iterator="" 2
markers="" 3
markers-json="" 4
apply-sequence="" 5
requires-reply="" 6
charset="" 7
first-line-as-header="" 8
input-channel="" 9
output-channel="" 10
send-timeout="" 11
auto-startup="" 12
order="" 13
phase="" /> 14
1 | The bean name of the splitter. |
2 | Set to true (the default) to use an iterator or false to load the file into memory before sending lines. |
3 | Set to true to emit start-of-file and end-of-file marker messages before and after the file data.
Markers are messages with FileSplitter.FileMarker payloads (with START and END values in the mark property).
You might use markers when sequentially processing files in a downstream flow where some lines are filtered.
They enable the downstream processing to know when a file has been completely processed.
In addition, a file_marker header that contains START or END is added to these messages.
The END marker includes a line count.
If the file is empty, only START and END markers are emitted with 0 as the lineCount .
The default is false .
When true , apply-sequence is false by default.
See also markers-json (the next attribute). |
4 | When markers is true, set this to true to have the FileMarker objects be converted to a JSON string.
(Uses a SimpleJsonSerializer underneath). |
5 | Set to false to disable the inclusion of sequenceSize and sequenceNumber headers in messages.
The default is true , unless markers is true .
When true and markers is true , the markers are included in the sequencing.
When true and iterator is true , the sequenceSize header is set to 0 , because the size is unknown. |
6 | Set to true to cause a RequiresReplyException to be thrown if there are no lines in the file.
The default is false . |
7 | Set the charset name to be used when reading text data into String payloads.
The default is the platform charset. |
8 | The header name for the first line to be carried as a header in the messages emitted for the remaining lines. Since version 5.0. |
9 | Set the input channel used to send messages to the splitter. |
10 | Set the output channel to which messages are sent. |
11 | Set the send timeout.
Only applies if the output-channel can block — such as a full QueueChannel . |
12 | Set to false to disable automatically starting the splitter when the context is refreshed.
The default is true . |
13 | Set the order of this endpoint if the input-channel is a <publish-subscribe-channel/> . |
14 | Set the startup phase for the splitter (used when auto-startup is true ). |
FileSplitter
还会将基于文本的 InputStream
拆分为多行。从 4.3 版开始,当与使用 stream
选项检索文件的 FTP 或 SFTP 流入通道适配器或 FTP 或 SFTP 出站网关一起使用时,当文件完全使用完毕后,拆分器会自动关闭支持流的会话。有关这些功能的更多信息,请参阅 FTP Streaming Inbound Channel Adapter 和 SFTP Streaming Inbound Channel Adapter 以及 FTP Outbound Gateway 和 SFTP Outbound Gateway。
The FileSplitter
also splits any text-based InputStream
into lines.
Starting with version 4.3, when used in conjunction with an FTP or SFTP streaming inbound channel adapter or an FTP or SFTP outbound gateway that uses the stream
option to retrieve a file, the splitter automatically closes the session that supports the stream when the file is completely consumed
See FTP Streaming Inbound Channel Adapter and SFTP Streaming Inbound Channel Adapter as well as FTP Outbound Gateway and SFTP Outbound Gateway for more information about these facilities.
使用 Java 配置时,可以使用其他构造函数,如下例所示:
When using Java configuration, an additional constructor is available, as the following example shows:
public FileSplitter(boolean iterator, boolean markers, boolean markersJson)
当`markersJson`为真时,标记表示为 JSON 字符串(使用`SimpleJsonSerializer`)。
When markersJson
is true, the markers are represented as a JSON string (using a SimpleJsonSerializer
).
5.0 版本引入了`firstLineAsHeader`选项,用于指定内容的第一行是一个标题(例如 CSV 文件中的列名)。传递给此属性的参数是标头名称,第一行是作为标头在为其余行发出的消息中携带的。此行不包含在序列标题(如果`applySequence`为真)中,也不包含在与`FileMarker.END`关联的`lineCount`中。注意:从 5.5 版开始,`lineCount`也被作为`FileHeaders.LINE_COUNT`包含到`FileMarker.END`消息的标题中,因为`FileMarker`可以序列化为 JSON。如果文件仅包含标题行,则该文件将被视为为空文件,因此,在拆分期间仅发出`FileMarker`实例(如果启用了标记——否则,不发出任何消息)。默认情况下(如果没有设置标题名称),则第一行被视为数据并成为第一个发出消息的有效负载。
Version 5.0 introduced the firstLineAsHeader
option to specify that the first line of content is a header (such as column names in a CSV file).
The argument passed to this property is the header name under which the first line is carried as a header in the messages emitted for the remaining lines.
This line is not included in the sequence header (if applySequence
is true) nor in the lineCount
associated with FileMarker.END
.
NOTE: Starting with version 5.5, the lineCount` is also included as a FileHeaders.LINE_COUNT
into headers of the FileMarker.END
message, since the FileMarker
could be serialized into JSON.
If a file contains only the header line, the file is treated as empty and, therefore, only FileMarker
instances are emitted during splitting (if markers are enabled — otherwise, no messages are emitted).
By default (if no header name is set), the first line is considered to be data and becomes the payload of the first emitted message.
如果你需要关于从文件内容中提取标题的更复杂的逻辑(不是第一行,不是行的全部内容,不是一个特定标题,等等),请考虑使用 FileSplitter
之前的 header enricher。请注意,已移至标题的行可能会被过滤掉,不会进行普通的处理流程。
If you need more complex logic about header extraction from the file content (not first line, not the whole content of the line, not one particular header, and so on), consider using header enricher ahead of the FileSplitter
.
Note that the lines that have been moved to the headers might be filtered downstream from the normal content process.
Idempotent Downstream Processing a Split File
当 apply-sequence
为真时,拆分器会在 SEQUENCE_NUMBER
标头中添加行号(当 markers
为真时,标记会被视作行)。行号可与 Idempotent Receiver 一起使用,以避免在重新启动后重新处理行。
When apply-sequence
is true, the splitter adds the line number in the SEQUENCE_NUMBER
header (when markers
is true, the markers are counted as lines).
The line number can be used with an Idempotent Receiver to avoid reprocessing lines after a restart.
例如:
For example:
@Bean
public ConcurrentMetadataStore store() {
return new ZookeeperMetadataStore();
}
@Bean
public MetadataStoreSelector selector() {
return new MetadataStoreSelector(
message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
.getAbsolutePath(),
message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
.toString(),
store())
.compareValues(
(oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(selector());
}
@Bean
public IntegrationFlow flow() {
...
.split(new FileSplitter())
...
.handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
...
}