Splitter
分隔器是负责将消息分割成多个部分并将生成的消息发送以独立处理的组件。通常情况下,它们是一个流水线中的上游生产者,该流水线包括一个聚合器。
The splitter is a component whose role is to partition a message into several parts and send the resulting messages to be processed independently. Very often, they are upstream producers in a pipeline that includes an aggregator.
Programming Model
用于执行拆分的 API 由一个基类 AbstractMessageSplitter
组成。它是一个 MessageHandler
实现,封装了拆分器的常用功能,例如填充产生的消息中的适当消息标头 (CORRELATION_ID
、SEQUENCE_SIZE
和 SEQUENCE_NUMBER
)。此填充操作可追踪消息及其处理结果(在典型情况下,这些标头将复制到由各个转换端点产生的消息中)。例如,由 composed message processor 使用值。
The API for performing splitting consists of one base class, AbstractMessageSplitter
.
It is a MessageHandler
implementation that encapsulates features common to splitters, such as filling in the appropriate message headers (CORRELATION_ID
, SEQUENCE_SIZE
, and SEQUENCE_NUMBER
) on the messages that are produced.
This filling enables tracking down the messages and the results of their processing (in a typical scenario, these headers get copied to the messages that are produced by the various transforming endpoints).
The values can then be used, for example, by a composed message processor.
以下示例显示 AbstractMessageSplitter
的摘录:
The following example shows an excerpt from AbstractMessageSplitter
:
public abstract class AbstractMessageSplitter
extends AbstractReplyProducingMessageConsumer {
...
protected abstract Object splitMessage(Message<?> message);
}
要在应用程序中实施特定的分隔器,可以扩展 AbstractMessageSplitter
并实现 splitMessage
方法,其中包含用于分隔消息的逻辑。返回值可以是以下选项之一:
To implement a specific splitter in an application, you can extend AbstractMessageSplitter
and implement the splitMessage
method, which contains logic for splitting the messages.
The return value can be one of the following:
-
A
Collection
or an array of messages or anIterable
(orIterator
) that iterates over messages. In this case, the messages are sent as messages (after theCORRELATION_ID
,SEQUENCE_SIZE
andSEQUENCE_NUMBER
are populated). Using this approach gives you more control — for example, to populate custom message headers as part of the splitting process. -
A
Collection
or an array of non-message objects or anIterable
(orIterator
) that iterates over non-message objects. It works like the prior case, except that each collection element is used as a message payload. Using this approach lets you focus on the domain objects without having to consider the messaging system and produces code that is easier to test. -
a
Message
or non-message object (but not a collection or an array). It works like the previous cases, except that a single message is sent out.
在 Spring Integration 中,任何 POJO 都可以实现分隔算法,前提是它定义了一个接受单个参数并具有返回值的方法。在这种情况下,方法的返回值将按照前面所述进行解释。输入参数可以是 Message
或简单的 POJO。在后一种情况下,分隔器会接收传入消息的有效负载。我们推荐这种方法,因为它将代码与 Spring Integration API 解耦,并且通常更容易测试。
In Spring Integration, any POJO can implement the splitting algorithm, provided that it defines a method that accepts a single argument and has a return value.
In this case, the return value of the method is interpreted as described earlier.
The input argument might either be a Message
or a simple POJO.
In the latter case, the splitter receives the payload of the incoming message.
We recommend this approach, because it decouples the code from the Spring Integration API and is typically easier to test.
Iterators
从 4.1 版开始,AbstractMessageSplitter
支持 Iterator
类型,用于要分隔的 value
。请注意,对于 Iterator
(或 Iterable
),我们无法访问底层项目的数量并且 SEQUENCE_SIZE
标头设置为 0
。这意味着 releaseStrategy
的默认 SequenceSizeReleaseStrategy
将不起作用,并且来自分隔器的 CORRELATION_ID
的组将不会被释放;它将保持为“未完成”。在这种情况下,你应使用适当的自定义 ReleaseStrategy
或依赖于 send-partial-result-on-expiry
以及 group-timeout
或 MessageGroupStoreReaper
。
Starting with version 4.1, the AbstractMessageSplitter
supports the Iterator
type for the value
to split.
Note, in the case of an Iterator
(or Iterable
), we don’t have access to the number of underlying items and the SEQUENCE_SIZE
header is set to 0
.
This means that the default SequenceSizeReleaseStrategy
of an <aggregator>
won’t work and the group for the CORRELATION_ID
from the splitter
won’t be released; it will remain as incomplete
.
In this case you should use an appropriate custom ReleaseStrategy
or rely on send-partial-result-on-expiry
together with group-timeout
or a MessageGroupStoreReaper
.
从 5.0 版开始,AbstractMessageSplitter
提供 protected obtainSizeIfPossible()
方法以允许确定 Iterable
和 Iterator
对象的大小(如果可能)。例如 XPathMessageSplitter
可以确定底层 NodeList
对象的大小。从 5.0.9 版开始,此方法也正确返回 com.fasterxml.jackson.core.TreeNode
的大小。
Starting with version 5.0, the AbstractMessageSplitter
provides protected obtainSizeIfPossible()
methods to allow the determination of the size of the Iterable
and Iterator
objects if that is possible.
For example XPathMessageSplitter
can determine the size of the underlying NodeList
object.
And starting with version 5.0.9, this method also properly returns a size of the com.fasterxml.jackson.core.TreeNode
.
Iterator
对象非常有用,可避免在分隔前需要构建整个内存集合。例如,当底层项目是由使用迭代或流的一些外部系统(例如数据库或 FTP MGET
)填充时。
An Iterator
object is useful to avoid the need for building an entire collection in the memory before splitting.
For example, when underlying items are populated from some external system (e.g. DataBase or FTP MGET
) using iterations or streams.
Stream and Flux
从 5.0 版开始,AbstractMessageSplitter
支持 Java Stream
和 Reactive Streams Publisher
类型,用于要分隔的 value
。在这种情况下,目标 Iterator
构建在其迭代功能上。
Starting with version 5.0, the AbstractMessageSplitter
supports the Java Stream
and Reactive Streams Publisher
types for the value
to split.
In this case, the target Iterator
is built on their iteration functionality.
此外,如果分隔器的输出通道是 ReactiveStreamsSubscribableChannel
实例,则 AbstractMessageSplitter
会生成 Flux
结果而不是 Iterator
,并且输出通道订阅此 Flux
以便对下游流需求进行基于反压的分隔。
In addition, if the splitter’s output channel is an instance of a ReactiveStreamsSubscribableChannel
, the AbstractMessageSplitter
produces a Flux
result instead of an Iterator
, and the output channel is subscribed to this Flux
for back-pressure-based splitting on downstream flow demand.
从 5.2 版开始,分隔器支持 discardChannel
选项,用于发送已解析为容器空(集合、数组、流、Flux
等)的分隔函数的请求消息。在这种情况下,没有可迭代的项目可发送到 outputChannel
。null
分隔结果仍然作为流结束指示符。
Starting with version 5.2, the splitter supports a discardChannel
option for sending those request messages for which a split function has returned an empty container (collection, array, stream, Flux
etc.).
In this case there is just no item to iterate for sending to the outputChannel
.
The null
splitting result remains as an end of flow indicator.
Configuring a Splitter with Java, Groovy and Kotlin DSLs
一个基于 Message
的简单分隔器示例及其与 DSL 配置的可迭代有效负载:
An example of simple splitter based on a Message
and its iterable payload with DSL configuration:
-
Java DSL
-
Kotlin DSL
-
Groovy DSL
@Bean
public IntegrationFlow someFlow() {
return f -> f.split(Message.class, Message::getPayload);
}
@Bean
fun someFlow() =
integrationFlow {
split<Message<*>> { it.payload }
}
@Bean
someFlow() {
integrationFlow {
splitWith {
expectedType Message<?>
function { it.payload }
}
}
}
请参阅各个章节中有关 DSL 的更多信息:
See more information about the DSLs in the respective chapters:
Configuring a Splitter with XML
分隔器可以通过 XML 按照如下方式进行配置:
A splitter can be configured through XML as follows:
<int:channel id="inputChannel"/>
<int:splitter id="splitter" 1
ref="splitterBean" 2
method="split" 3
input-channel="inputChannel" 4
output-channel="outputChannel" 5
discard-channel="discardChannel" /> 6
<int:channel id="outputChannel"/>
<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
1 | The ID of the splitter is optional. |
2 | A reference to a bean defined in the application context.
The bean must implement the splitting logic, as described in the earlier section.
Optional.
If a reference to a bean is not provided, it is assumed that the payload of the message that arrived on the input-channel is an implementation of java.util.Collection and the default splitting logic is applied to the collection, incorporating each individual element into a message and sending it to the output-channel . |
3 | The method (defined on the bean) that implements the splitting logic. Optional. |
4 | The input channel of the splitter. Required. |
5 | The channel to which the splitter sends the results of splitting the incoming message. Optional (because incoming messages can specify a reply channel themselves). |
6 | The channel to which the request message is sent in case of empty splitting result.
Optional (they will stop as in case of null result). |
如果可以在其他 <splitter>
定义中引用自定义分隔器实现,我们建议使用 ref
属性。但是,如果自定义分隔器处理程序实现应限定在一个 <splitter>
的定义,可以配置内部 bean 定义,如下例所示:
We recommend using a ref
attribute if the custom splitter implementation can be referenced in other <splitter>
definitions.
However, if the custom splitter handler implementation should be scoped to a single definition of the <splitter>
, you can configure an inner bean definition, as the following example follows:
<int:splitter id="testSplitter" input-channel="inChannel" method="split"
output-channel="outChannel">
<beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
不允许在同一个 |
Using both a |
如果 ref
属性引用扩展 AbstractMessageProducingHandler
(例如框架本身提供的拆分器)的 Bean,则配置会通过将输出通道直接注入到处理程序中而得到优化。在这种情况下,每个 ref
都必须是单独的 Bean 实例(或 prototype
作用域的 Bean)或使用内部 <bean/>
配置类型。但是,此优化仅在您在拆分器 XML 定义中未提供任何拆分器特定属性时才会应用。如果您无意中从多个 Bean 引用了相同的消息处理程序,您会获得配置异常。
If the ref
attribute references a bean that extends AbstractMessageProducingHandler
(such as splitters provided by the framework itself), the configuration is optimized by injecting the output channel into the handler directly.
In this case, each ref
must be a separate bean instance (or a prototype
-scoped bean) or use the inner <bean/>
configuration type.
However, this optimization applies only if you do not provide any splitter-specific attributes in the splitter XML definition.
If you inadvertently reference the same message handler from multiple beans, you get a configuration exception.
Configuring a Splitter with Annotations
@Splitter
注解适用于期望 Message
类型或消息有效负载类型的函数,并且方法的返回值应是任意类型的 Collection
。如果返回值不是实际的 Message
对象,则每个项目都封装在一个 Message
中,作为 Message
的有效负载。每个生成 Message
发送到为在 @Splitter
上定义端点的指定输出通道。
The @Splitter
annotation is applicable to methods that expect either the Message
type or the message payload type, and the return values of the method should be a Collection
of any type.
If the returned values are not actual Message
objects, each item is wrapped in a Message
as the payload of the Message
.
Each resulting Message
is sent to the designated output channel for the endpoint on which the @Splitter
is defined.
以下示例展示如何使用 @Splitter
注解配置分隔器:
The following example shows how to configure a splitter by using the @Splitter
annotation:
@Splitter
List<LineItem> extractItems(Order order) {
return order.getItems()
}
See also Advising Endpoints Using Annotations and File Splitter.