Stream Support
在许多情况下,应用程序数据是从流获取的。不建议将对流的引用作为消息有效负载发送到使用者。相反,将从输入流读取的数据创建消息,消息有效负载将逐个写入输出流。 你需要将此依赖项包含在你的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
<version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-stream:{project-version}"
Reading from Streams
Spring Integration 提供了两个流适配器。ByteStreamReadingMessageSource
和 CharacterStreamReadingMessageSource
都实现了 MessageSource
。在通道适配器元素中配置其中一个,可以配置轮询周期,消息总线可以自动检测和调度它们。字节流版本需要一个 InputStream
,字符流版本需要一个 Reader
作为单个构造函数参数。ByteStreamReadingMessageSource
还接受 bytesPerMessage
属性,以确定它尝试写入每个 Message
的字节数。默认值为 1024
。以下示例创建了一个输入流,创建每个包含 2048 个字节的消息:
<bean class="org.springframework.integration.stream.ByteStreamReadingMessageSource">
<constructor-arg ref="someInputStream"/>
<property name="bytesPerMessage" value="2048"/>
</bean>
<bean class="org.springframework.integration.stream.CharacterStreamReadingMessageSource">
<constructor-arg ref="someReader"/>
</bean>
CharacterStreamReadingMessageSource
将读取器包装在一个 BufferedReader
中(如果它不是一个 BufferedReader
)。您可以在第二个构造函数参数中设置缓冲读取器使用的缓冲区大小。从版本 5.0 开始,第三个构造函数参数(blockToDetectEOF
)控制 CharacterStreamReadingMessageSource
的行为。当为 false
(默认值)时,receive()
方法检查读取器是否为 ready()
,如果不是,则返回 null。在这种情况下,不会检测到 EOF(文件结束)。当为 true
时,receive()
方法会阻塞,直到有数据可用或在基础流上检测到 EOF。检测到 EOF 时,会发布 StreamClosedEvent
(应用程序事件)。您可以使用实现了 ApplicationListener<StreamClosedEvent>
的 Bean 来使用这个事件。
为促进 EOF 检测,轮询线程在 |
轮询在检测到 EOF 后继续在每次轮询中发布事件。应用程序侦听器可以停止适配器以防止这种情况发生。事件在轮询线程上发布。停止适配器会导致线程中断。如果你打算在停止适配器后执行一些可中断任务,那么你必须在其他线程上执行 stop()
或为这些下游活动使用其他线程。请注意,发送到 QueueChannel
是可中断的,所以,如果你想要从此侦听器发送消息,那么请在停止适配器之前执行此操作。
这促进了“管道
”或将数据重定向到 stdin
,如下面的两个示例所示:
cat myfile.txt | java -jar my.jar
java -jar my.jar < foo.txt
此方法让应用程序在管道关闭时停止。
提供了四个方便的工厂方法:
public static final CharacterStreamReadingMessageSource stdin() { ... }
public static final CharacterStreamReadingMessageSource stdin(String charsetName) { ... }
public static final CharacterStreamReadingMessageSource stdinPipe() { ... }
public static final CharacterStreamReadingMessageSource stdinPipe(String charsetName) { ... }
Writing to Streams
对于目标流,可以使用以下两个实现中的任一个:ByteStreamWritingMessageHandler
或 CharacterStreamWritingMessageHandler
。每个实现都需要一个构造函数参数(OutputStream
用于字节流或 Writer
用于字符流),并且每个实现都提供一个添加可选的“bufferSize”的第二个构造函数。由于这两个实现最终都实现了 MessageHandler
接口,因此可以从 channel-adapter
配置中引用它们,如 Channel Adapter 中所述。
<bean class="org.springframework.integration.stream.ByteStreamWritingMessageHandler">
<constructor-arg ref="someOutputStream"/>
<constructor-arg value="1024"/>
</bean>
<bean class="org.springframework.integration.stream.CharacterStreamWritingMessageHandler">
<constructor-arg ref="someWriter"/>
</bean>
Stream Namespace Support
Spring Integration 定义了一个命名空间来减少流相关通道适配器所需的配置。使用它需要以下模式位置:
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration/stream
https://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">
以下代码片段显示了支持的不同配置选项,用于配置入站通道适配器:
<int-stream:stdin-channel-adapter id="adapterWithDefaultCharset"/>
<int-stream:stdin-channel-adapter id="adapterWithProvidedCharset" charset="UTF-8"/>
从 5.0 版本开始,可以设置 detect-eof
属性,它设置 blockToDetectEOF
属性。有关详细信息,请参见 Reading from Streams。
要配置出站通道适配器,您也可以使用命名空间支持。以下示例显示出站通道适配器的不同配置:
<int-stream:stdout-channel-adapter id="stdoutAdapterWithDefaultCharset"
channel="testChannel"/>
<int-stream:stdout-channel-adapter id="stdoutAdapterWithProvidedCharset" charset="UTF-8"
channel="testChannel"/>
<int-stream:stderr-channel-adapter id="stderrAdapter" channel="testChannel"/>
<int-stream:stdout-channel-adapter id="newlineAdapter" append-newline="true"
channel="testChannel"/>