Splitter

分隔器是负责将消息分割成多个部分并将生成的消息发送以独立处理的组件。通常情况下,它们是一个流水线中的上游生产者,该流水线包括一个聚合器。

The splitter is a component whose role is to partition a message into several parts and send the resulting messages to be processed independently. Very often, they are upstream producers in a pipeline that includes an aggregator.

Programming Model

用于执行拆分的 API 由一个基类 AbstractMessageSplitter 组成。它是一个 MessageHandler 实现,封装了拆分器的常用功能,例如填充产生的消息中的适当消息标头 (CORRELATION_IDSEQUENCE_SIZESEQUENCE_NUMBER)。此填充操作可追踪消息及其处理结果(在典型情况下,这些标头将复制到由各个转换端点产生的消息中)。例如,由 composed message processor 使用值。

The API for performing splitting consists of one base class, AbstractMessageSplitter. It is a MessageHandler implementation that encapsulates features common to splitters, such as filling in the appropriate message headers (CORRELATION_ID, SEQUENCE_SIZE, and SEQUENCE_NUMBER) on the messages that are produced. This filling enables tracking down the messages and the results of their processing (in a typical scenario, these headers get copied to the messages that are produced by the various transforming endpoints). The values can then be used, for example, by a composed message processor.

以下示例显示 AbstractMessageSplitter 的摘录:

The following example shows an excerpt from AbstractMessageSplitter:

public abstract class AbstractMessageSplitter
    extends AbstractReplyProducingMessageConsumer {
    ...
    protected abstract Object splitMessage(Message<?> message);

}

要在应用程序中实施特定的分隔器,可以扩展 AbstractMessageSplitter 并实现 splitMessage 方法,其中包含用于分隔消息的逻辑。返回值可以是以下选项之一:

To implement a specific splitter in an application, you can extend AbstractMessageSplitter and implement the splitMessage method, which contains logic for splitting the messages. The return value can be one of the following:

  • A Collection or an array of messages or an Iterable (or Iterator) that iterates over messages. In this case, the messages are sent as messages (after the CORRELATION_ID, SEQUENCE_SIZE and SEQUENCE_NUMBER are populated). Using this approach gives you more control — for example, to populate custom message headers as part of the splitting process.

  • A Collection or an array of non-message objects or an Iterable (or Iterator) that iterates over non-message objects. It works like the prior case, except that each collection element is used as a message payload. Using this approach lets you focus on the domain objects without having to consider the messaging system and produces code that is easier to test.

  • a Message or non-message object (but not a collection or an array). It works like the previous cases, except that a single message is sent out.

在 Spring Integration 中,任何 POJO 都可以实现分隔算法,前提是它定义了一个接受单个参数并具有返回值的方法。在这种情况下,方法的返回值将按照前面所述进行解释。输入参数可以是 Message 或简单的 POJO。在后一种情况下,分隔器会接收传入消息的有效负载。我们推荐这种方法,因为它将代码与 Spring Integration API 解耦,并且通常更容易测试。

In Spring Integration, any POJO can implement the splitting algorithm, provided that it defines a method that accepts a single argument and has a return value. In this case, the return value of the method is interpreted as described earlier. The input argument might either be a Message or a simple POJO. In the latter case, the splitter receives the payload of the incoming message. We recommend this approach, because it decouples the code from the Spring Integration API and is typically easier to test.

Iterators

从 4.1 版开始,AbstractMessageSplitter 支持 Iterator 类型,用于要分隔的 value。请注意,对于 Iterator(或 Iterable),我们无法访问底层项目的数量并且 SEQUENCE_SIZE 标头设置为 0。这意味着 releaseStrategy 的默认 SequenceSizeReleaseStrategy 将不起作用,并且来自分隔器的 CORRELATION_ID 的组将不会被释放;它将保持为“未完成”。在这种情况下,你应使用适当的自定义 ReleaseStrategy 或依赖于 send-partial-result-on-expiry 以及 group-timeoutMessageGroupStoreReaper

Starting with version 4.1, the AbstractMessageSplitter supports the Iterator type for the value to split. Note, in the case of an Iterator (or Iterable), we don’t have access to the number of underlying items and the SEQUENCE_SIZE header is set to 0. This means that the default SequenceSizeReleaseStrategy of an <aggregator> won’t work and the group for the CORRELATION_ID from the splitter won’t be released; it will remain as incomplete. In this case you should use an appropriate custom ReleaseStrategy or rely on send-partial-result-on-expiry together with group-timeout or a MessageGroupStoreReaper.

从 5.0 版开始,AbstractMessageSplitter 提供 protected obtainSizeIfPossible() 方法以允许确定 IterableIterator 对象的大小(如果可能)。例如 XPathMessageSplitter 可以确定底层 NodeList 对象的大小。从 5.0.9 版开始,此方法也正确返回 com.fasterxml.jackson.core.TreeNode 的大小。

Starting with version 5.0, the AbstractMessageSplitter provides protected obtainSizeIfPossible() methods to allow the determination of the size of the Iterable and Iterator objects if that is possible. For example XPathMessageSplitter can determine the size of the underlying NodeList object. And starting with version 5.0.9, this method also properly returns a size of the com.fasterxml.jackson.core.TreeNode.

Iterator 对象非常有用,可避免在分隔前需要构建整个内存集合。例如,当底层项目是由使用迭代或流的一些外部系统(例如数据库或 FTP MGET)填充时。

An Iterator object is useful to avoid the need for building an entire collection in the memory before splitting. For example, when underlying items are populated from some external system (e.g. DataBase or FTP MGET) using iterations or streams.

Stream and Flux

从 5.0 版开始,AbstractMessageSplitter 支持 Java Stream 和 Reactive Streams Publisher 类型,用于要分隔的 value。在这种情况下,目标 Iterator 构建在其迭代功能上。

Starting with version 5.0, the AbstractMessageSplitter supports the Java Stream and Reactive Streams Publisher types for the value to split. In this case, the target Iterator is built on their iteration functionality.

此外,如果分隔器的输出通道是 ReactiveStreamsSubscribableChannel 实例,则 AbstractMessageSplitter 会生成 Flux 结果而不是 Iterator,并且输出通道订阅此 Flux 以便对下游流需求进行基于反压的分隔。

In addition, if the splitter’s output channel is an instance of a ReactiveStreamsSubscribableChannel, the AbstractMessageSplitter produces a Flux result instead of an Iterator, and the output channel is subscribed to this Flux for back-pressure-based splitting on downstream flow demand.

从 5.2 版开始,分隔器支持 discardChannel 选项,用于发送已解析为容器空(集合、数组、流、Flux 等)的分隔函数的请求消息。在这种情况下,没有可迭代的项目可发送到 outputChannelnull 分隔结果仍然作为流结束指示符。

Starting with version 5.2, the splitter supports a discardChannel option for sending those request messages for which a split function has returned an empty container (collection, array, stream, Flux etc.). In this case there is just no item to iterate for sending to the outputChannel. The null splitting result remains as an end of flow indicator.

Configuring a Splitter with Java, Groovy and Kotlin DSLs

一个基于 Message 的简单分隔器示例及其与 DSL 配置的可迭代有效负载:

An example of simple splitter based on a Message and its iterable payload with DSL configuration:

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

@Bean
public IntegrationFlow someFlow() {
    return f -> f.split(Message.class, Message::getPayload);
}
@Bean
fun someFlow() =
    integrationFlow {
        split<Message<*>> { it.payload }
    }
@Bean
someFlow() {
    integrationFlow {
        splitWith {
		    expectedType Message<?>
		    function { it.payload }
        }
    }
}

请参阅各个章节中有关 DSL 的更多信息:

See more information about the DSLs in the respective chapters:

Configuring a Splitter with XML

分隔器可以通过 XML 按照如下方式进行配置:

A splitter can be configured through XML as follows:

<int:channel id="inputChannel"/>

<int:splitter id="splitter"           1
  ref="splitterBean"                  2
  method="split"                      3
  input-channel="inputChannel"        4
  output-channel="outputChannel"      5
  discard-channel="discardChannel" /> 6

<int:channel id="outputChannel"/>

<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
1 The ID of the splitter is optional.
2 A reference to a bean defined in the application context. The bean must implement the splitting logic, as described in the earlier section. Optional. If a reference to a bean is not provided, it is assumed that the payload of the message that arrived on the input-channel is an implementation of java.util.Collection and the default splitting logic is applied to the collection, incorporating each individual element into a message and sending it to the output-channel.
3 The method (defined on the bean) that implements the splitting logic. Optional.
4 The input channel of the splitter. Required.
5 The channel to which the splitter sends the results of splitting the incoming message. Optional (because incoming messages can specify a reply channel themselves).
6 The channel to which the request message is sent in case of empty splitting result. Optional (they will stop as in case of null result).

如果可以在其他 <splitter> 定义中引用自定义分隔器实现,我们建议使用 ref 属性。但是,如果自定义分隔器处理程序实现应限定在一个 <splitter> 的定义,可以配置内部 bean 定义,如下例所示:

We recommend using a ref attribute if the custom splitter implementation can be referenced in other <splitter> definitions. However, if the custom splitter handler implementation should be scoped to a single definition of the <splitter>, you can configure an inner bean definition, as the following example follows:

<int:splitter id="testSplitter" input-channel="inChannel" method="split"
                output-channel="outChannel">
  <beans:bean class="org.foo.TestSplitter"/>
</int:splitter>

不允许在同一个 <int:splitter> 配置中同时使用 ref 属性和内部处理程序定义,因为它会创建一个二义条件并导致引发异常。

Using both a ref attribute and an inner handler definition in the same <int:splitter> configuration is not allowed, as it creates an ambiguous condition and results in an exception being thrown.

如果 ref 属性引用扩展 AbstractMessageProducingHandler(例如框架本身提供的拆分器)的 Bean,则配置会通过将输出通道直接注入到处理程序中而得到优化。在这种情况下,每个 ref 都必须是单独的 Bean 实例(或 prototype 作用域的 Bean)或使用内部 <bean/> 配置类型。但是,此优化仅在您在拆分器 XML 定义中未提供任何拆分器特定属性时才会应用。如果您无意中从多个 Bean 引用了相同的消息处理程序,您会获得配置异常。

If the ref attribute references a bean that extends AbstractMessageProducingHandler (such as splitters provided by the framework itself), the configuration is optimized by injecting the output channel into the handler directly. In this case, each ref must be a separate bean instance (or a prototype-scoped bean) or use the inner <bean/> configuration type. However, this optimization applies only if you do not provide any splitter-specific attributes in the splitter XML definition. If you inadvertently reference the same message handler from multiple beans, you get a configuration exception.

Configuring a Splitter with Annotations

@Splitter 注解适用于期望 Message 类型或消息有效负载类型的函数,并且方法的返回值应是任意类型的 Collection。如果返回值不是实际的 Message 对象,则每个项目都封装在一个 Message 中,作为 Message 的有效负载。每个生成 Message 发送到为在 @Splitter 上定义端点的指定输出通道。

The @Splitter annotation is applicable to methods that expect either the Message type or the message payload type, and the return values of the method should be a Collection of any type. If the returned values are not actual Message objects, each item is wrapped in a Message as the payload of the Message. Each resulting Message is sent to the designated output channel for the endpoint on which the @Splitter is defined.

以下示例展示如何使用 @Splitter 注解配置分隔器:

The following example shows how to configure a splitter by using the @Splitter annotation:

@Splitter
List<LineItem> extractItems(Order order) {
    return order.getItems()
}