Splitter

分隔器是负责将消息分割成多个部分并将生成的消息发送以独立处理的组件。通常情况下,它们是一个流水线中的上游生产者,该流水线包括一个聚合器。

Programming Model

用于执行拆分的 API 由一个基类 AbstractMessageSplitter 组成。它是一个 MessageHandler 实现,封装了拆分器的常用功能,例如填充产生的消息中的适当消息标头 (CORRELATION_IDSEQUENCE_SIZESEQUENCE_NUMBER)。此填充操作可追踪消息及其处理结果(在典型情况下,这些标头将复制到由各个转换端点产生的消息中)。例如,由 composed message processor 使用值。

以下示例显示 AbstractMessageSplitter 的摘录:

public abstract class AbstractMessageSplitter
    extends AbstractReplyProducingMessageConsumer {
    ...
    protected abstract Object splitMessage(Message<?> message);

}

要在应用程序中实施特定的分隔器,可以扩展 AbstractMessageSplitter 并实现 splitMessage 方法,其中包含用于分隔消息的逻辑。返回值可以是以下选项之一:

  • 一个 Collection 或一个消息数组或一个 Iterable (或 Iterator) 来迭代消息。在这种情况下,消息将作为消息发送(在填充 CORRELATION_IDSEQUENCE_SIZESEQUENCE_NUMBER 之后)。使用这种方法可以为您提供更多控制,例如填充自定义消息头作为拆分过程的一部分。

  • 一个 Collection 或一个非消息对象数组或一个 Iterable (或 Iterator) 来迭代非消息对象。它与前一种情况类似,除了每个集合元素用作消息有效负载。使用这种方法可以让您专注于域对象,而无需考虑消息传递系统,并产生更容易测试的代码。

  • 一个 Message 或非消息对象(但不是集合或数组)。它与前面几种情况类似,除了会发出一个消息。

在 Spring Integration 中,任何 POJO 都可以实现分隔算法,前提是它定义了一个接受单个参数并具有返回值的方法。在这种情况下,方法的返回值将按照前面所述进行解释。输入参数可以是 Message 或简单的 POJO。在后一种情况下,分隔器会接收传入消息的有效负载。我们推荐这种方法,因为它将代码与 Spring Integration API 解耦,并且通常更容易测试。

Iterators

从 4.1 版开始,AbstractMessageSplitter 支持 Iterator 类型,用于要分隔的 value。请注意,对于 Iterator(或 Iterable),我们无法访问底层项目的数量并且 SEQUENCE_SIZE 标头设置为 0。这意味着 releaseStrategy 的默认 SequenceSizeReleaseStrategy 将不起作用,并且来自分隔器的 CORRELATION_ID 的组将不会被释放;它将保持为“未完成”。在这种情况下,你应使用适当的自定义 ReleaseStrategy 或依赖于 send-partial-result-on-expiry 以及 group-timeoutMessageGroupStoreReaper

从 5.0 版开始,AbstractMessageSplitter 提供 protected obtainSizeIfPossible() 方法以允许确定 IterableIterator 对象的大小(如果可能)。例如 XPathMessageSplitter 可以确定底层 NodeList 对象的大小。从 5.0.9 版开始,此方法也正确返回 com.fasterxml.jackson.core.TreeNode 的大小。

Iterator 对象非常有用,可避免在分隔前需要构建整个内存集合。例如,当底层项目是由使用迭代或流的一些外部系统(例如数据库或 FTP MGET)填充时。

Stream and Flux

从 5.0 版开始,AbstractMessageSplitter 支持 Java Stream 和 Reactive Streams Publisher 类型,用于要分隔的 value。在这种情况下,目标 Iterator 构建在其迭代功能上。

此外,如果分隔器的输出通道是 ReactiveStreamsSubscribableChannel 实例,则 AbstractMessageSplitter 会生成 Flux 结果而不是 Iterator,并且输出通道订阅此 Flux 以便对下游流需求进行基于反压的分隔。

从 5.2 版开始,分隔器支持 discardChannel 选项,用于发送已解析为容器空(集合、数组、流、Flux 等)的分隔函数的请求消息。在这种情况下,没有可迭代的项目可发送到 outputChannelnull 分隔结果仍然作为流结束指示符。

Configuring a Splitter with Java, Groovy and Kotlin DSLs

一个基于 Message 的简单分隔器示例及其与 DSL 配置的可迭代有效负载:

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

@Bean
public IntegrationFlow someFlow() {
    return f -> f.split(Message.class, Message::getPayload);
}
@Bean
fun someFlow() =
    integrationFlow {
        split<Message<*>> { it.payload }
    }
@Bean
someFlow() {
    integrationFlow {
        splitWith {
		    expectedType Message<?>
		    function { it.payload }
        }
    }
}

请参阅各个章节中有关 DSL 的更多信息:

Configuring a Splitter with XML

分隔器可以通过 XML 按照如下方式进行配置:

<int:channel id="inputChannel"/>

<int:splitter id="splitter"           1
  ref="splitterBean"                  2
  method="split"                      3
  input-channel="inputChannel"        4
  output-channel="outputChannel"      5
  discard-channel="discardChannel" /> 6

<int:channel id="outputChannel"/>

<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
1 分离器的 ID 是可选的。
2 对应用程序上下文中定义的 bean 的引用。Bean 必须实现拆分逻辑,如前面部分中所述。可选。如果未提供对 bean 的引用,则假定到达 input-channel 上的消息有效负载是 java.util.Collection 的实现,并且将默认拆分逻辑应用于集合,将每个单独元素合并到一条消息中并将其发送到 output-channel
3 (在 bean 上定义)实现拆分逻辑的方法。可选。
4 分隔器的输入通道。必填。
5 分隔器将传入消息拆分的发送结果的通道。可选(因为传入消息可以自己指定回复通道)。
6 在拆分结果为空的情况下发送请求消息的通道。可选(它们将停止,就像 null 结果)。

如果可以在其他 <splitter> 定义中引用自定义分隔器实现,我们建议使用 ref 属性。但是,如果自定义分隔器处理程序实现应限定在一个 <splitter> 的定义,可以配置内部 bean 定义,如下例所示:

<int:splitter id="testSplitter" input-channel="inChannel" method="split"
                output-channel="outChannel">
  <beans:bean class="org.foo.TestSplitter"/>
</int:splitter>

不允许在同一个 <int:splitter> 配置中同时使用 ref 属性和内部处理程序定义,因为它会创建一个二义条件并导致引发异常。

如果 ref 属性引用扩展 AbstractMessageProducingHandler(例如框架本身提供的拆分器)的 Bean,则配置会通过将输出通道直接注入到处理程序中而得到优化。在这种情况下,每个 ref 都必须是单独的 Bean 实例(或 prototype 作用域的 Bean)或使用内部 <bean/> 配置类型。但是,此优化仅在您在拆分器 XML 定义中未提供任何拆分器特定属性时才会应用。如果您无意中从多个 Bean 引用了相同的消息处理程序,您会获得配置异常。

Configuring a Splitter with Annotations

@Splitter 注解适用于期望 Message 类型或消息有效负载类型的函数,并且方法的返回值应是任意类型的 Collection。如果返回值不是实际的 Message 对象,则每个项目都封装在一个 Message 中,作为 Message 的有效负载。每个生成 Message 发送到为在 @Splitter 上定义端点的指定输出通道。

以下示例展示如何使用 @Splitter 注解配置分隔器:

@Splitter
List<LineItem> extractItems(Order order) {
    return order.getItems()
}