Aggregator
Aggregator 的核心逻辑是关联策略和释放策略:关联策略确定如何将消息分组在一起,而释放策略确定何时释放组中的所有消息进行聚合。
注意,Aggregator 是一种有状态的组件,它在 MessageGroupStore 中维护消息组的状态。可以使用 MessageGroupStoreReaper 定期过期组或在应用程序关闭时手动触发过期。
Aggregator 与 Splitter 的架构基本相反,它是一种能够接收多条消息,并将它们组合成一条单独消息的消息处理器。事实上,Aggregator 通常是包含 Splitter 的管道中的下游使用者。
Basically a mirror-image of the splitter, the aggregator is a type of message handler that receives multiple messages and combines them into a single message. In fact, an aggregator is often a downstream consumer in a pipeline that includes a splitter.
从技术上讲,Aggregator 比 Splitter 更复杂,因为它是有状态的。它必须保存要汇总的消息,并确定何时准备好对完整的消息组进行汇总。为此,它需要一个 MessageStore
。
Technically, the aggregator is more complex than a splitter, because it is stateful.
It must hold the messages to be aggregated and determine when the complete group of messages is ready to be aggregated.
In order to do so, it requires a MessageStore
.
Functionality
Aggregator 通过关联和存储相关消息组,将它们组合在一起,直到该组被认为是完整的。此时,Aggregator 通过处理整个组来创建一条单独的消息,并将汇总的消息作为输出发送。
The Aggregator combines a group of related messages, by correlating and storing them, until the group is deemed to be complete. At that point, the aggregator creates a single message by processing the whole group and sends the aggregated message as output.
实现 Aggregator 需要提供执行聚合的逻辑(即,从多个消息创建单个消息)。相关概念包括关联和释放。
Implementing an aggregator requires providing the logic to perform the aggregation (that is, the creation of a single message from many). Two related concepts are correlation and release.
关联确定了消息如何分组以进行聚合。在 Spring Integration 中,关联默认基于 IntegrationMessageHeaderAccessor.CORRELATION_ID
消息头进行。具有相同 IntegrationMessageHeaderAccessor.CORRELATION_ID
的消息被分组在一起。但是,您可以自定义关联策略,以允许使用其他方式指定应如何将消息分组在一起。为此,您可以实现一个 CorrelationStrategy
(将在本章后面介绍)。
Correlation determines how messages are grouped for aggregation.
In Spring Integration, correlation is done by default, based on the IntegrationMessageHeaderAccessor.CORRELATION_ID
message header.
Messages with the same IntegrationMessageHeaderAccessor.CORRELATION_ID
are grouped together.
However, you can customize the correlation strategy to allow other ways of specifying how the messages should be grouped together.
To do so, you can implement a CorrelationStrategy
(covered later in this chapter).
要确定一组消息准备好处理的点,将咨询 ReleaseStrategy
。聚合器的默认释放策略在所有包含在序列中的消息都存在时释放一个组,这基于 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
头。您可以通过提供对自定义 ReleaseStrategy
实现的引用来覆盖此默认策略。
To determine the point at which a group of messages is ready to be processed, a ReleaseStrategy
is consulted.
The default release strategy for the aggregator releases a group when all messages included in a sequence are present, based on the IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
header.
You can override this default strategy by providing a reference to a custom ReleaseStrategy
implementation.
Programming Model
聚合 API 由许多类组成:
The Aggregation API consists of a number of classes:
-
The interface
MessageGroupProcessor
, and its subclasses:MethodInvokingAggregatingMessageGroupProcessor
andExpressionEvaluatingMessageGroupProcessor
-
The
ReleaseStrategy
interface and its default implementation:SimpleSequenceSizeReleaseStrategy
-
The
CorrelationStrategy
interface and its default implementation:HeaderAttributeCorrelationStrategy
AggregatingMessageHandler
AggregatingMessageHandler
(AbstractCorrelatingMessageHandler
的子类)是一个 MessageHandler
实现,封装了一个聚合器(和其他关联用例)的常用功能,如下所示:
The AggregatingMessageHandler
(a subclass of AbstractCorrelatingMessageHandler
) is a MessageHandler
implementation, encapsulating the common functionality of an aggregator (and other correlating use cases), which are as follows:
-
Correlating messages into a group to be aggregated
-
Maintaining those messages in a
MessageStore
until the group can be released -
Deciding when the group can be released
-
Aggregating the released group into a single message
-
Recognizing and responding to an expired group
决定如何将消息分组在一起的职责被委派给 CorrelationStrategy
实例。决定是否可以释放消息组的职责被委派给 ReleaseStrategy
实例。
The responsibility for deciding how the messages should be grouped together is delegated to a CorrelationStrategy
instance.
The responsibility for deciding whether the message group can be released is delegated to a ReleaseStrategy
instance.
以下清单简要介绍了基础的 AbstractAggregatingMessageGroupProcessor
(实现 aggregatePayloads
方法的职责留给开发人员):
The following listing shows a brief highlight of the base AbstractAggregatingMessageGroupProcessor
(the responsibility for implementing the aggregatePayloads
method is left to the developer):
public abstract class AbstractAggregatingMessageGroupProcessor
implements MessageGroupProcessor {
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
// default implementation exists
}
protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);
}
参见 DefaultAggregatingMessageGroupProcessor
、ExpressionEvaluatingMessageGroupProcessor
和 MethodInvokingMessageGroupProcessor
作为 AbstractAggregatingMessageGroupProcessor
的开箱即用实现。
See DefaultAggregatingMessageGroupProcessor
, ExpressionEvaluatingMessageGroupProcessor
and MethodInvokingMessageGroupProcessor
as out-of-the-box implementations of the AbstractAggregatingMessageGroupProcessor
.
从 5.2 版开始,Function<MessageGroup, Map<String, Object>>
策略可用于 AbstractAggregatingMessageGroupProcessor
,以合并和计算输出消息的头。DefaultAggregateHeadersFunction
实现可用,其逻辑返回组中没有冲突的所有头;组中一条或多条消息上不存在的头不被视为冲突。有冲突的头将被省略。此功能与新引入的 DelegatingMessageGroupProcessor
一起,可用于任何任意(非 AbstractAggregatingMessageGroupProcessor
)MessageGroupProcessor
实现。从本质上说,框架将提供的函数注入到 AbstractAggregatingMessageGroupProcessor
实例中,并将所有其他实现包装到 DelegatingMessageGroupProcessor
中。AbstractAggregatingMessageGroupProcessor
和 DelegatingMessageGroupProcessor
之间的逻辑差异在于,后者在调用委托策略之前不会提前计算标头,并且如果委托返回 Message
或 AbstractIntegrationMessageBuilder
,则不会调用函数。在这种情况下,该框架假定目标实现已负责生成一组适当的头并填充到返回结果中。Function<MessageGroup, Map<String, Object>>
策略可用作 XML 配置的 headers-function
引用属性,可作为 Java DSL 的 AggregatorSpec.headersFunction()
选项,也可作为普通 Java 配置的 AggregatorFactoryBean.setHeadersFunction()
。
Starting with version 5.2, a Function<MessageGroup, Map<String, Object>>
strategy is available for the AbstractAggregatingMessageGroupProcessor
to merge and compute (aggregate) headers for an output message.
The DefaultAggregateHeadersFunction
implementation is available with logic that returns all headers that have no conflicts among the group; an absent header on one or more messages within the group is not considered a conflict.
Conflicting headers are omitted.
Along with the newly introduced DelegatingMessageGroupProcessor
, this function is used for any arbitrary (non-AbstractAggregatingMessageGroupProcessor
) MessageGroupProcessor
implementation.
Essentially, the framework injects a provided function into an AbstractAggregatingMessageGroupProcessor
instance and wraps all other implementations into a DelegatingMessageGroupProcessor
.
The difference in logic between the AbstractAggregatingMessageGroupProcessor
and the DelegatingMessageGroupProcessor
that the latter doesn’t compute headers in advance, before calling the delegate strategy, and doesn’t invoke the function if the delegate returns a Message
or AbstractIntegrationMessageBuilder
.
In that case, the framework assumes that the target implementation has taken care of producing a proper set of headers populated into the returned result.
The Function<MessageGroup, Map<String, Object>>
strategy is available as the headers-function
reference attribute for XML configuration, as the AggregatorSpec.headersFunction()
option for the Java DSL and as AggregatorFactoryBean.setHeadersFunction()
for plain Java configuration.
CorrelationStrategy
由 AbstractCorrelatingMessageHandler
所有,并且具有基于 IntegrationMessageHeaderAccessor.CORRELATION_ID
消息头的默认值,如下例所示:
The CorrelationStrategy
is owned by the AbstractCorrelatingMessageHandler
and has a default value based on the IntegrationMessageHeaderAccessor.CORRELATION_ID
message header, as the following example shows:
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
...
this.correlationStrategy = correlationStrategy == null ?
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
...
}
至于消息组的实际处理,默认实现是 DefaultAggregatingMessageGroupProcessor
。它创建一个单个 Message
,其有效负载是针对给定组接收到的有效负载的 List
。这非常适用于带有分隔器、发布-订阅通道或收件人列表路由器的简单散集实现。
As for the actual processing of the message group, the default implementation is the DefaultAggregatingMessageGroupProcessor
.
It creates a single Message
whose payload is a List
of the payloads received for a given group.
This works well for simple scatter-gather implementations with a splitter, a publish-subscribe channel, or a recipient list router upstream.
在以下类型的场景中使用发布-订阅通道或收件人列表路由器时,请务必启用 |
When using a publish-subscribe channel or a recipient list router in this type of scenario, be sure to enable the |
在实现应用程序的特定聚合器策略时,您可以扩展 AbstractAggregatingMessageGroupProcessor
并实现 aggregatePayloads
方法。但是,有更好的解决方案(与 API 耦合性较低),用来实现聚合逻辑,可以通过 XML 或注释进行配置。
When implementing a specific aggregator strategy for an application, you can extend AbstractAggregatingMessageGroupProcessor
and implement the aggregatePayloads
method.
However, there are better solutions, less coupled to the API, for implementing the aggregation logic, which can be configured either through XML or through annotations.
一般而言,任何 POJO 都可以实现聚合算法,如果它提供一个将单个 java.util.List
作为参数(同样支持参数化列表)的方法。该方法以以下方式调用以聚合消息:
In general, any POJO can implement the aggregation algorithm if it provides a method that accepts a single java.util.List
as an argument (parameterized lists are supported as well).
This method is invoked for aggregating messages as follows:
-
If the argument is a
java.util.Collection<T>
and the parameter type T is assignable toMessage
, the whole list of messages accumulated for aggregation is sent to the aggregator. -
If the argument is a non-parameterized
java.util.Collection
or the parameter type is not assignable toMessage
, the method receives the payloads of the accumulated messages. -
If the return type is not assignable to
Message
, it is treated as the payload for aMessage
that is automatically created by the framework.
为了代码简单性和促进最佳实践(例如松耦合、可测试性等),实现聚合逻辑的首选方式是通过 POJO 并使用 XML 或注释支持在应用程序中配置它。 |
In the interest of code simplicity and promoting best practices such as low coupling, testability, and others, the preferred way of implementing the aggregation logic is through a POJO and using the XML or annotation support for configuring it in the application. |
从 5.3 版开始,AbstractCorrelatingMessageHandler
在处理消息组后对消息头执行 MessageBuilder.popSequenceDetails()
修改,以实现具有多个嵌套级别的适当分隔器-聚合器场景。仅当消息组释放结果不是消息集合时才执行此操作。在这种情况下,目标 MessageGroupProcessor
负责在生成这些消息时调用 MessageBuilder.popSequenceDetails()
。
Starting with version 5.3, after processing message group, an AbstractCorrelatingMessageHandler
performs a MessageBuilder.popSequenceDetails()
message headers modification for the proper splitter-aggregator scenario with several nested levels.
It is done only if the message group release result is not a collection of messages.
In that case a target MessageGroupProcessor
is responsible for the MessageBuilder.popSequenceDetails()
call while building those messages.
如果 MessageGroupProcessor
返回一个 Message
,则仅当 sequenceDetails
与组中的第一条消息匹配时,才会对输出消息执行 MessageBuilder.popSequenceDetails()
。(以前仅当 MessageGroupProcessor
返回普通有效负载或 AbstractIntegrationMessageBuilder
时才执行此操作。)
If the MessageGroupProcessor
returns a Message
, a MessageBuilder.popSequenceDetails()
will be performed on the output message only if the sequenceDetails
matches with first message in group.
(Previously this has been done only if a plain payload or an AbstractIntegrationMessageBuilder
has been returned from the MessageGroupProcessor
.)
此功能可以通过新的 popSequence
boolean
属性进行控制,因此当相关详细信息尚未填充标准拆分器时,可以在某些方案中禁用 MessageBuilder.popSequenceDetails()
。本质上,此属性会撤消最近的上游 applySequence = true
在 AbstractMessageSplitter
中所做的操作。有关更多信息,请参见 Splitter。
This functionality can be controlled by a new popSequence
boolean
property, so the MessageBuilder.popSequenceDetails()
can be disabled in some scenarios when correlation details have not been populated by the standard splitter.
This property, essentially, undoes what has been done by the nearest upstream applySequence = true
in the AbstractMessageSplitter
.
See Splitter for more information.
SimpleMessageGroup.getMessages()
方法返回一个 unmodifiableCollection
。因此,如果一个聚合 POJO 方法有一个 Collection<Message>
参数,则传入的参数正是那个 Collection
实例,并且当您为聚合器使用 SimpleMessageStore
时,该原始 Collection<Message>
在释放群组后被清除。因此,如果它从聚合器中传递出去,则 POJO 中的 Collection<Message>
变量也会被清除。如果您希望照常释放该集合以进行进一步处理,则必须构建一个新的 Collection
(例如 new ArrayList<Message>(messages)
)。从版本 4.3 开始,框架不再将消息复制到新集合中,以避免不需要的额外对象创建。
The SimpleMessageGroup.getMessages()
method returns an unmodifiableCollection
.
Therefore, if an aggregating POJO method has a Collection<Message>
parameter, the argument passed in is exactly that Collection
instance and, when you use a SimpleMessageStore
for the aggregator, that original Collection<Message>
is cleared after releasing the group.
Consequently, the Collection<Message>
variable in the POJO is cleared too, if it is passed out of the aggregator.
If you wish to simply release that collection as-is for further processing, you must build a new Collection
(for example, new ArrayList<Message>(messages)
).
Starting with version 4.3, the framework no longer copies the messages to a new collection, to avoid undesired extra object creation.
在 4.2 版之前,无法使用 XML 配置提供 MessageGroupProcessor
。仅可以使用 POJO 方法进行聚合。现在,如果框架检测到引用的(或内部)bean 实现了 MessageProcessor
,它将用作聚合器的输出处理器。
Prior to version 4.2, it was not possible to provide a MessageGroupProcessor
by using XML configuration.
Only POJO methods could be used for aggregation.
Now, if the framework detects that the referenced (or inner) bean implements MessageProcessor
, it is used as the aggregator’s output processor.
如果您希望从自定义 MessageGroupProcessor
中释放一组对象作为消息的有效负载,您的类应扩展 AbstractAggregatingMessageGroupProcessor
并实现 aggregatePayloads()
。
If you wish to release a collection of objects from a custom MessageGroupProcessor
as the payload of a message, your class should extend AbstractAggregatingMessageGroupProcessor
and implement aggregatePayloads()
.
此外,从 4.2 版开始,提供了一个 SimpleMessageGroupProcessor
。它返回组中的消息集合,如前所述,这会导致释放的消息被单独发送。
Also, since version 4.2, a SimpleMessageGroupProcessor
is provided.
It returns the collection of messages from the group, which, as indicated earlier, causes the released messages to be sent individually.
此操作可以让聚合器充当消息障碍,其中将保存到达的消息,直至释放策略触发并组以一系列单独消息的形式释放为止。
This lets the aggregator work as a message barrier, where arriving messages are held until the release strategy fires and the group is released as a sequence of individual messages.
从版本 6.0 开始,上述分裂行为仅当群组处理器是 SimpleMessageGroupProcessor
时才有效。否则,只有返回 Collection<Message>
的任何其他 MessageGroupProcessor
实现才会发出单个应答消息,其有效负载为整个消息集合。此逻辑由聚合器的典型目的决定,即按某个键收集请求消息并生成单个分组消息。
Starting with version 6.0, the splitting behaviour, described above, works only if the group processor is a SimpleMessageGroupProcessor
.
Otherwise, with any other MessageGroupProcessor
implementation that returns a Collection<Message>
, only a single reply message is emitted with the whole collection of messages as its payload.
Such logic is dictated by the canonical purpose of an aggregator - collect request messages by some key and produce a single grouped message.
ReleaseStrategy
ReleaseStrategy
界面定义如下:
The ReleaseStrategy
interface is defined as follows:
public interface ReleaseStrategy {
boolean canRelease(MessageGroup group);
}
一般来说,如果它提供接受单个 java.util.List
作为参数(还支持参数化列表)并返回布尔值的方法,则任何 POJO 都可以实现完成判决逻辑。在收到每条新消息后,将调用此方法,以如下方式确定群组是否完成:
In general, any POJO can implement the completion decision logic if it provides a method that accepts a single java.util.List
as an argument (parameterized lists are supported as well) and returns a boolean value.
This method is invoked after the arrival of each new message, to decide whether the group is complete or not, as follows:
-
If the argument is a
java.util.List<T>
and the parameter typeT
is assignable toMessage
, the whole list of messages accumulated in the group is sent to the method. -
If the argument is a non-parametrized
java.util.List
or the parameter type is not assignable toMessage
, the method receives the payloads of the accumulated messages. -
The method must return
true
if the message group is ready for aggregation or false otherwise.
以下示例演示了如何对类型为 Message
的 List
使用 @ReleaseStrategy
注解:
The following example shows how to use the @ReleaseStrategy
annotation for a List
of type Message
:
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<Message<?>>) {...}
}
以下示例演示了如何对类型为 String
的 List
使用 @ReleaseStrategy
注解:
The following example shows how to use the @ReleaseStrategy
annotation for a List
of type String
:
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<String>) {...}
}
根据前两个示例中的签名,基于 POJO 的发布策略将传递尚未发布的消息的 Collection
(如果你需要访问整个 Message
)或有效负载对象的 Collection
(如果类型参数不是 Message
)。这满足了大多数用例。但是,如果出于某种原因,你需要访问完整的 MessageGroup
,则应提供 ReleaseStrategy
界面的实现。
Based on the signatures in the preceding two examples, the POJO-based release strategy is passed a Collection
of not-yet-released messages (if you need access to the whole Message
) or a Collection
of payload objects (if the type parameter is anything other than Message
).
This satisfies the majority of use cases.
However, if, for some reason, you need to access the full MessageGroup
, you should provide an implementation of the ReleaseStrategy
interface.
在处理可能较大的群组时,你应该了解如何调用这些方法,因为在群组释放之前,可能会多次调用释放策略。对 ReleaseStrategy
来说,最有效的一种方法是实现,因为聚合器可以直接调用它。其次最有效的方法是使用 Collection<Message<?>>
参数类型的 POJO 方法。最不有效的一种是使用 Collection<Something>
类型的 POJO 方法。当每次调用释放策略时,框架都必须将群组中消息的有效负载复制到新集合(并可能尝试将有效负载转换为 Something
)。使用 Collection<?>
可以避免转换,但仍然需要创建新 Collection
。
When handling potentially large groups, you should understand how these methods are invoked, because the release strategy may be invoked multiple times before the group is released.
The most efficient is an implementation of ReleaseStrategy
, because the aggregator can invoke it directly.
The second most efficient is a POJO method with a Collection<Message<?>>
parameter type.
The least efficient is a POJO method with a Collection<Something>
type.
The framework has to copy the payloads from the messages in the group into a new collection (and possibly attempt conversion on the payloads to Something
) every time the release strategy is called.
Using Collection<?>
avoids the conversion but still requires creating the new Collection
.
出于这些原因,对于大群组,我们建议你实现 ReleaseStrategy
。
For these reasons, for large groups, we recommended that you implement ReleaseStrategy
.
在群组因聚合释放时,该群组的所有未发布消息都得到处理并从该群组中删除。如果群组也已完成(即如果已收到来自某个序列的所有消息,或者如果没有定义任何序列),则该群组将标记为已完成。此群组的任何新消息都发送到废弃通道(如果定义)。将 expire-groups-upon-completion
设置为 true
(默认值为 false
)可删除整个群组,且任何新消息(与其移除的群组具有相同的关联 ID)将形成新群组。你可以使用 MessageGroupStoreReaper
和将 send-partial-result-on-expiry
设置为 true
来释放部分序列。
When the group is released for aggregation, all its not-yet-released messages are processed and removed from the group.
If the group is also complete (that is, if all messages from a sequence have arrived or if there is no sequence defined), then the group is marked as complete.
Any new messages for this group are sent to the discard channel (if defined).
Setting expire-groups-upon-completion
to true
(the default is false
) removes the entire group, and any new messages (with the same correlation ID as the removed group) form a new group.
You can release partial sequences by using a MessageGroupStoreReaper
together with send-partial-result-on-expiry
being set to true
.
为了促进丢弃迟到的消息,聚合器必须在释放组后维护有关该组的状态。这最终可能导致内存不足的情况。为避免这种情况,您应考虑配置 MessageGroupStoreReaper
以删除组元数据。到一定程度后,到不再期望迟到的消息到达,则应设置到期参数以使组到期。有关配置清理器的信息,请参阅 Managing State in an Aggregator: MessageGroupStore
。
To facilitate discarding of late-arriving messages, the aggregator must maintain state about the group after it has been released.
This can eventually cause out-of-memory conditions.
To avoid such situations, you should consider configuring a MessageGroupStoreReaper
to remove the group metadata.
The expiry parameters should be set to expire groups once a point has been reach after which late messages are not expected to arrive.
For information about configuring a reaper, see Managing State in an Aggregator: MessageGroupStore
.
Spring Integration 为 ReleaseStrategy
提供实现:SimpleSequenceSizeReleaseStrategy
。此实现查询每个到达消息的 SEQUENCE_NUMBER
和 SEQUENCE_SIZE
头部,以在消息群组完成并准备好聚合时进行判决。如前所述,它也是默认策略。
Spring Integration provides an implementation for ReleaseStrategy
: SimpleSequenceSizeReleaseStrategy
.
This implementation consults the SEQUENCE_NUMBER
and SEQUENCE_SIZE
headers of each arriving message to decide when a message group is complete and ready to be aggregated.
As shown earlier, it is also the default strategy.
在版本 5.0 之前,默认释放策略是 |
Before version 5.0, the default release strategy was |
如果你正在聚合大群组,则不需要释放部分群组,且不需要检测/拒绝重复序列,可以考虑改用 SimpleSequenceSizeReleaseStrategy
- 它对这些用例来说更有效,且是 5.0 版 中的默认策略,并且未指定部分群组释放。
If you are aggregating large groups, you don’t need to release partial groups, and you don’t need to detect/reject duplicate sequences, consider using the SimpleSequenceSizeReleaseStrategy
instead - it is much more efficient for these use cases, and is the default since version 5.0 when partial group release is not specified.
Aggregating Large Groups
4.3 版本将 SimpleMessageGroup
中消息的默认 Collection
更改为 HashSet
(以前是 BlockingQueue
)。从大组中删除单个消息时,这很昂贵(需要 O(n) 线性扫描)。虽然哈希集通常删除得更快,但对于大消息来说,哈希集可能很昂贵,因为在插入和删除时都必须计算哈希。如果你要散列昂贵的消息,请考虑使用其他一些集合类型。正如 Using MessageGroupFactory
中所讨论的,提供了一个 SimpleMessageGroupFactory
,以便你可以选择最适合你需要的 Collection
。你还可以提供你自己的工厂实现来创建其他一些 Collection<Message<?>>
。
The 4.3 release changed the default Collection
for messages in a SimpleMessageGroup
to HashSet
(it was previously a BlockingQueue
).
This was expensive when removing individual messages from large groups (an O(n) linear scan was required).
Although the hash set is generally much faster to remove, it can be expensive for large messages, because the hash has to be calculated on both inserts and removes.
If you have messages that are expensive to hash, consider using some other collection type.
As discussed in Using MessageGroupFactory
, a SimpleMessageGroupFactory
is provided so that you can select the Collection
that best suits your needs.
You can also provide your own factory implementation to create some other Collection<Message<?>>
.
以下示例演示了如何使用以前的实现和 SimpleSequenceSizeReleaseStrategy
配置聚合器:
The following example shows how to configure an aggregator with the previous implementation and a SimpleSequenceSizeReleaseStrategy
:
<int:aggregator input-channel="aggregate"
output-channel="out" message-store="store" release-strategy="releaser" />
<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
<property name="messageGroupFactory">
<bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
<constructor-arg value="BLOCKING_QUEUE"/>
</bean>
</property>
</bean>
<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
如果过滤器端点参与聚合器上游的流,则序列大小释放策略(固定或基于 |
If the filter endpoint is involved in the flow upstream of an aggregator, the sequence size release strategy (fixed or based on the |
Correlation Strategy
CorrelationStrategy
界面定义如下:
The CorrelationStrategy
interface is defined as follows:
public interface CorrelationStrategy {
Object getCorrelationKey(Message<?> message);
}
该方法返回一个表示关联消息与消息群组所用的关联键的 Object
。该键必须满足 EQUALS()
和 HASHCODE()
实现对 MAP
中键使用的标准。
The method returns an Object
that represents the correlation key used for associating the message with a message group.
The key must satisfy the criteria used for a key in a Map
with respect to the implementation of equals()
and hashCode()
.
一般来说,任何 POJO 都能实现关联逻辑,并且消息映射到方法参数(或参数)的规则与 ServiceActivator
相同(包括对 @Header
注解的支持)。该方法必须返回值,并且该值不能为 NULL
。
In general, any POJO can implement the correlation logic, and the rules for mapping a message to a method’s argument (or arguments) are the same as for a ServiceActivator
(including support for @Header
annotations).
The method must return a value, and the value must not be null
.
Spring Integration 为 CorrelationStrategy
提供实现:HeaderAttributeCorrelationStrategy
。此实现返回消息头部之一的值(其名称由构造函数参数指定)作为关联键。默认情况下,关联策略是 HeaderAttributeCorrelationStrategy
,它返回 CORRELATION_ID
头部属性的值。如果你有自定义的头部名称要用于关联,则可以在 HeaderAttributeCorrelationStrategy
实例上对其进行配置,并为聚合器的关联策略提供该自定义名称。
Spring Integration provides an implementation for CorrelationStrategy
: HeaderAttributeCorrelationStrategy
.
This implementation returns the value of one of the message headers (whose name is specified by a constructor argument) as the correlation key.
By default, the correlation strategy is a HeaderAttributeCorrelationStrategy
that returns the value of the CORRELATION_ID
header attribute.
If you have a custom header name you would like to use for correlation, you can configure it on an instance of HeaderAttributeCorrelationStrategy
and provide that as a reference for the aggregator’s correlation strategy.
Lock Registry
群组的变更对于线程来说是安全的。因此,当同时发送具有相同关联 ID 的消息时,只有其中一个消息在聚合器中得到处理,从而使其有效地成为 每个消息群组的单线程。LockRegistry
用于获取已解析关联 ID 的锁。默认使用 DefaultLockRegistry
(内存中)。对于正在使用共享 MessageGroupStore
的服务器之间同步更新,您必须配置一个共享的锁定注册表。
Changes to groups are thread safe.
So, when you send messages for the same correlation ID concurrently, only one of them will be processed in the aggregator, making it effectively as a single-threaded per message group.
A LockRegistry
is used to obtain a lock for the resolved correlation ID.
A DefaultLockRegistry
is used by default (in-memory).
For synchronizing updates across servers where a shared MessageGroupStore
is being used, you must configure a shared lock registry.
Avoiding Deadlocks
如上所述,当消息群组发生改变(添加或释放消息)时,将保持锁定。
As discussed above, when message groups are mutated (messages added or released) a lock is held.
考虑以下流程:
Consider the following flow:
...->aggregator1-> ... ->aggregator2-> ...
如果有多个线程,并且聚合器共享通用锁注册表,则可能出现死锁。这将导致挂起线程,jstack <pid>
可能会显示如下结果:
If there are multiple threads, and the aggregators share a common lock registry, it is possible to get a deadlock.
This will cause hung threads and jstack <pid>
might present a result such as:
Found one Java-level deadlock:
=============================
"t2":
waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t1"
"t1":
waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t2"
有几种方法可以避免此问题:
There are several ways to avoid this problem:
-
ensure each aggregator has its own lock registry (this can be a shared registry across application instances but two or more aggregators in the flow must each have a distinct registry)
-
use an
ExecutorChannel
orQueueChannel
as the output channel of the aggregator so that the downstream flow runs on a new thread -
starting with version 5.1.1, set the
releaseLockBeforeSend
aggregator property totrue
如果由于某种原因,单个聚合器的输出最终被重新路由回同一聚合器,也可能导致此问题。当然,上述第一个解决方案在此情况下不适用。 |
This problem can also be caused if, for some reason, the output of a single aggregator is eventually routed back to the same aggregator. Of course, the first solution above does not apply in this case. |
Configuring an Aggregator in Java DSL
有关如何用 Java DSL 配置聚合器,请参见 Aggregators and Resequencers。
See Aggregators and Resequencers for how to configure an aggregator in Java DSL.
Configuring an Aggregator with XML
Spring Integration 通过`<aggregator/>` 元素支持使用 XML 配置聚合器。以下示例显示了聚合器的示例:
Spring Integration supports the configuration of an aggregator with XML through the <aggregator/>
element.
The following example shows an example of an aggregator:
<channel id="inputChannel"/>
<int:aggregator id="myAggregator" 1
auto-startup="true" 2
input-channel="inputChannel" 3
output-channel="outputChannel" 4
discard-channel="throwAwayChannel" 5
message-store="persistentMessageStore" 6
order="1" 7
send-partial-result-on-expiry="false" 8
send-timeout="1000" 9
correlation-strategy="correlationStrategyBean" 10
correlation-strategy-method="correlate" 11
correlation-strategy-expression="headers['foo']" 12
ref="aggregatorBean" 13
method="aggregate" 14
release-strategy="releaseStrategyBean" 15
release-strategy-method="release" 16
release-strategy-expression="size() == 5" 17
expire-groups-upon-completion="false" 18
empty-group-min-timeout="60000" 19
lock-registry="lockRegistry" 20
group-timeout="60000" 21
group-timeout-expression="size() ge 2 ? 100 : -1" 22
expire-groups-upon-timeout="true" 23
scheduler="taskScheduler" > 24
<expire-transactional/> 25
<expire-advice-chain/> 26
</aggregator>
<int:channel id="outputChannel"/>
<int:channel id="throwAwayChannel"/>
<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
<constructor-arg ref="dataSource"/>
</bean>
<bean id="aggregatorBean" class="sample.PojoAggregator"/>
<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>
<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
1 | The id of the aggregator is optional. |
2 | Lifecycle attribute signaling whether the aggregator should be started during application context startup. Optional (the default is 'true'). |
3 | The channel from which where aggregator receives messages. Required. |
4 | The channel to which the aggregator sends the aggregation results. Optional (because incoming messages can themselves specify a reply channel in the 'replyChannel' message header). |
5 | The channel to which the aggregator sends the messages that timed out (if send-partial-result-on-expiry is false ).
Optional. |
6 | A reference to a MessageGroupStore used to store groups of messages under their correlation key until they are complete.
Optional.
By default, it is a volatile in-memory store.
See Message Store for more information. |
7 | The order of this aggregator when more than one handle is subscribed to the same DirectChannel (use for load-balancing purposes).
Optional. |
8 | Indicates that expired messages should be aggregated and sent to the 'output-channel' or 'replyChannel' once their containing MessageGroup is expired (see MessageGroupStore.expireMessageGroups(long) ).
One way of expiring a MessageGroup is by configuring a MessageGroupStoreReaper .
However, you can alternatively expire MessageGroup by calling MessageGroupStore.expireMessageGroups(timeout) .
You can accomplish that through a Control Bus operation or, if you have a reference to the MessageGroupStore instance, by invoking expireMessageGroups(timeout) .
Otherwise, by itself, this attribute does nothing.
It serves only as an indicator of whether to discard or send to the output or reply channel any messages that are still in the MessageGroup that is about to be expired.
Optional (the default is false ).
NOTE: This attribute might more properly be called send-partial-result-on-timeout , because the group may not actually expire if expire-groups-upon-timeout is set to false . |
9 | The timeout interval to wait when sending a reply Message to the output-channel or discard-channel .
Defaults to 30 seconds.
It is applied only if the output channel has some 'sending' limitations, such as a QueueChannel with a fixed 'capacity'.
In this case, a MessageDeliveryException is thrown.
For AbstractSubscribableChannel implementations, the send-timeout is ignored .
For group-timeout(-expression) , the MessageDeliveryException from the scheduled expiring task leads this task to be rescheduled.
Optional. |
10 | A reference to a bean that implements the message correlation (grouping) algorithm.
The bean can be an implementation of the CorrelationStrategy interface or a POJO.
In the latter case, the correlation-strategy-method attribute must be defined as well.
Optional (by default, the aggregator uses the IntegrationMessageHeaderAccessor.CORRELATION_ID header). |
11 | A method defined on the bean referenced by correlation-strategy .
It implements the correlation decision algorithm.
Optional, with restrictions (correlation-strategy must be present). |
12 | A SpEL expression representing the correlation strategy.
Example: "headers['something']" .
Only one of correlation-strategy or correlation-strategy-expression is allowed. |
13 | A reference to a bean defined in the application context. The bean must implement the aggregation logic, as described earlier. Optional (by default, the list of aggregated messages becomes a payload of the output message). |
14 | A method defined on the bean referenced by the ref attribute.
It implements the message aggregation algorithm.
Optional (it depends on ref attribute being defined). |
15 | A reference to a bean that implements the release strategy.
The bean can be an implementation of the ReleaseStrategy interface or a POJO.
In the latter case, the release-strategy-method attribute must be defined as well.
Optional (by default, the aggregator uses the IntegrationMessageHeaderAccessor.SEQUENCE_SIZE header attribute). |
16 | A method defined on the bean referenced by the release-strategy attribute.
It implements the completion decision algorithm.
Optional, with restrictions (release-strategy must be present). |
17 | A SpEL expression representing the release strategy.
The root object for the expression is a MessageGroup .
Example: "size() == 5" .
Only one of release-strategy or release-strategy-expression is allowed. |
18 | When set to true (the default is false ), completed groups are removed from the message store, letting subsequent messages with the same correlation form a new group.
The default behavior is to send messages with the same correlation as a completed group to the discard-channel . |
19 | Applies only if a MessageGroupStoreReaper is configured for the MessageStore of the <aggregator> .
By default, when a MessageGroupStoreReaper is configured to expire partial groups, empty groups are also removed.
Empty groups exist after a group is normally released.
The empty groups enable the detection and discarding of late-arriving messages.
If you wish to expire empty groups on a longer schedule than expiring partial groups, set this property.
Empty groups are then not removed from the MessageStore until they have not been modified for at least this number of milliseconds.
Note that the actual time to expire an empty group is also affected by the reaper’s timeout property, and it could be as much as this value plus the timeout. |
20 | A reference to a org.springframework.integration.util.LockRegistry bean.
It used to obtain a Lock based on the groupId for concurrent operations on the MessageGroup .
By default, an internal DefaultLockRegistry is used.
Use of a distributed LockRegistry , such as the ZookeeperLockRegistry , ensures only one instance of the aggregator can operate on a group concurrently.
See Redis Lock Registry or Zookeeper Lock Registry for more information. |
21 | A timeout (in milliseconds) to force the MessageGroup complete when the ReleaseStrategy does not release the group when the current message arrives.
This attribute provides a built-in time-based release strategy for the aggregator when there is a need to emit a partial result (or discard the group) if a new message does not arrive for the MessageGroup within the timeout which counts from the time the last message arrived.
To set up a timeout which counts from the time the MessageGroup was created see group-timeout-expression information.
When a new message arrives at the aggregator, any existing ScheduledFuture<?> for its MessageGroup is canceled.
If the ReleaseStrategy returns false (meaning do not release) and groupTimeout > 0 , a new task is scheduled to expire the group.
We do not advise setting this attribute to zero (or a negative value).
Doing so effectively disables the aggregator, because every message group is immediately completed.
You can, however, conditionally set it to zero (or a negative value) by using an expression.
See group-timeout-expression for information.
The action taken during the completion depends on the ReleaseStrategy and the send-partial-group-on-expiry attribute.
See Aggregator and Group Timeout for more information.
It is mutually exclusive with 'group-timeout-expression' attribute. |
22 | The SpEL expression that evaluates to a groupTimeout with the MessageGroup as the #root evaluation context object.
Used for scheduling the MessageGroup to be forced complete.
If the expression evaluates to null , the completion is not scheduled.
If it evaluates to zero, the group is completed immediately on the current thread.
In effect, this provides a dynamic group-timeout property.
As an example, if you wish to forcibly complete a MessageGroup after 10 seconds have elapsed since the time the group was created you might consider using the following SpEL expression: timestamp + 10000 - T(System).currentTimeMillis() where timestamp is provided by MessageGroup.getTimestamp() as the MessageGroup here is the #root evaluation context object.
Bear in mind however that the group creation time might differ from the time of the first arrived message depending on other group expiration properties' configuration.
See group-timeout for more information.
Mutually exclusive with 'group-timeout' attribute. |
23 | When a group is completed due to a timeout (or by a MessageGroupStoreReaper ), the group is expired (completely removed) by default.
Late arriving messages start a new group.
You can set this to false to complete the group but have its metadata remain so that late arriving messages are discarded.
Empty groups can be expired later using a MessageGroupStoreReaper together with the empty-group-min-timeout attribute.
It defaults to 'true'. |
24 | A TaskScheduler bean reference to schedule the MessageGroup to be forced complete if no new message arrives for the MessageGroup within the groupTimeout .
If not provided, the default scheduler (taskScheduler ) registered in the ApplicationContext (ThreadPoolTaskScheduler ) is used.
This attribute does not apply if group-timeout or group-timeout-expression is not specified. |
25 | Since version 4.1.
It lets a transaction be started for the forceComplete operation.
It is initiated from a group-timeout(-expression) or by a MessageGroupStoreReaper and is not applied to the normal add , release , and discard operations.
Only this sub-element or <expire-advice-chain/> is allowed. |
26 | Since version 4.1.
It allows the configuration of any Advice for the forceComplete operation.
It is initiated from a group-timeout(-expression) or by a MessageGroupStoreReaper and is not applied to the normal add , release , and discard operations.
Only this sub-element or <expire-transactional/> is allowed.
A transaction Advice can also be configured here by using the Spring tx namespace. |
有两个与组到期(完全删除)相关的属性。当一个组到期时,它没有任何记录,如果一条带有相同关联性的新消息到达,则会启动一个新组。当一个组完成(没有到期)时,空组将保留下来,而迟到的消息将被丢弃。可以使用 MessageGroupStoreReaper
结合 empty-group-min-timeout
属性来稍后删除空组。
There are two attributes related to expiring (completely removing) groups.
When a group is expired, there is no record of it, and, if a new message arrives with the same correlation, a new group is started.
When a group is completed (without expiry), the empty group remains and late-arriving messages are discarded.
Empty groups can be removed later by using a MessageGroupStoreReaper
in combination with the empty-group-min-timeout
attribute.
expire-groups-upon-completion`与在 `ReleaseStrategy
释放组时完成“正常”完成相关。默认值为 false
。
expire-groups-upon-completion
relates to “normal” completion when the ReleaseStrategy
releases the group.
This defaults to false
.
如果一个组没有正常完成,而是由于超时而被释放或丢弃,则该组通常会到期。从 4.1 版本开始,您可以使用 expire-groups-upon-timeout
来控制此行为。为了向后兼容,它默认为 true
。
If a group is not completed normally but is released or discarded because of a timeout, the group is normally expired.
Since version 4.1, you can control this behavior by using expire-groups-upon-timeout
.
It defaults to true
for backwards compatibility.
当一个群组超时时,ReleaseStrategy
会再有一次机会释放该群组。如果它这样做并且 expire-groups-upon-timeout
为 false,则到期由 expire-groups-upon-completion
控制。如果群组在超时期间未被释放策略释放,则到期由 expire-groups-upon-timeout
控制。超时的群组会被丢弃或发生部分释放(基于 send-partial-result-on-expiry
)。
When a group is timed out, the ReleaseStrategy
is given one more opportunity to release the group.
If it does so and expire-groups-upon-timeout
is false, expiration is controlled by expire-groups-upon-completion
.
If the group is not released by the release strategy during timeout, then the expiration is controlled by the expire-groups-upon-timeout
.
Timed-out groups are either discarded or a partial release occurs (based on send-partial-result-on-expiry
).
从 5.0 版本开始,空组也会在 empty-group-min-timeout
之后被安排移除。如果 expireGroupsUponCompletion == false
并且 minimumTimeoutForEmptyGroups > 0
,则在释放正常或部分序列时安排移除该组的任务。
Since version 5.0, empty groups are also scheduled for removal after empty-group-min-timeout
.
If expireGroupsUponCompletion == false
and minimumTimeoutForEmptyGroups > 0
, the task to remove the group is scheduled when normal or partial sequences release happens.
从 5.4 版本开始,可以配置聚合器(和重新排序器)来使孤立组(可能无法通过其他方式释放的持久消息存储中的组)到期。expireTimeout
(如果大于 0
)指示存储中比此值更旧的组应被清除。在启动时调用 purgeOrphanedGroups()
方法,并且与提供的 expireDuration
一起在计划的任务中定期调用。此方法也可以在任何时候在外部调用。根据上面提到的提供的到期选项,到期逻辑完全委托给 forceComplete(MessageGroup)
功能。当需要从那些不再遵循常规消息到达逻辑释放的那些老组中清理消息存储时,这种定期清除功能非常有用。在大多数情况下,当使用持久消息组存储时,会在应用程序重新启动后发生这种情况。此功能与带有计划任务的 MessageGroupStoreReaper
类似,但当使用组超时而不是清理器时,它提供了一种方便的方法来处理特定组件中的老组。MessageGroupStore
必须专门为当前关联性端点提供。否则,一个聚合器可能会从另一个聚合器清除组。对于聚合器,使用此技术过期的组将被丢弃或作为一个部分组释放,具体取决于 expireGroupsUponCompletion
属性。
Starting with version 5.4, the aggregator (and resequencer) can be configured to expire orphaned groups (groups in a persistent message store that might not otherwise be released).
The expireTimeout
(if greater than 0
) indicates that groups older than this value in the store should be purged.
The purgeOrphanedGroups()
method is called on start up and, together with the provided expireDuration
, periodically within a scheduled task.
This method is also can be called externally at any time.
The expiration logic is fully delegated to the forceComplete(MessageGroup)
functionality according to the provided expiration options mentioned above.
Such a periodic purge functionality is useful when a message store is needed to be cleaned up from those old groups which are not going to be released any more with regular message arrival logic.
In most cases this happens after an application restart, when using a persistent message group store.
The functionality is similar to the MessageGroupStoreReaper
with a scheduled task, but provides a convenient way to deal with old groups within specific components, when using group timeout instead of a reaper.
The MessageGroupStore
must be provided exclusively for the current correlation endpoint.
Otherwise, one aggregator may purge groups from another.
With the aggregator, groups expired using this technique will either be discarded or released as a partial group, depending on the expireGroupsUponCompletion
property.
如果可以在其他 <aggregator>
定义中引用自定义聚合器处理程序实现,我们通常建议使用 ref
属性。但是,如果自定义聚合器实现仅由 <aggregator>
单个定义使用,则可以使用内部 Bean 定义(从 1.0.3 版本开始)在 <aggregator>
元素中配置聚合 POJO,如下例所示:
We generally recommend using a ref
attribute if a custom aggregator handler implementation may be referenced in other <aggregator>
definitions.
However, if a custom aggregator implementation is only being used by a single definition of the <aggregator>
, you can use an inner bean definition (starting with version 1.0.3) to configure the aggregation POJO within the <aggregator>
element, as the following example shows:
<aggregator input-channel="input" method="sum" output-channel="output">
<beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
在同一 |
Using both a |
以下示例显示了聚合器 Bean 的实现:
The following example shows an implementation of the aggregator bean:
public class PojoAggregator {
public Long add(List<Long> results) {
long total = 0l;
for (long partialResult: results) {
total += partialResult;
}
return total;
}
}
前一个示例的完成策略 Bean 的实现可能如下:
An implementation of the completion strategy bean for the preceding example might be as follows:
public class PojoReleaseStrategy {
...
public boolean canRelease(List<Long> numbers) {
int sum = 0;
for (long number: numbers) {
sum += number;
}
return sum >= maxValue;
}
}
在有意义的情况下,可以将释放策略方法和聚合器方法合并到一个 bean 中。 |
Wherever it makes sense to do so, the release strategy method and the aggregator method can be combined into a single bean. |
以上示例的相关性策略 Bean 的实现可能如下所示:
An implementation of the correlation strategy bean for the example above might be as follows:
public class PojoCorrelationStrategy {
...
public Long groupNumbersByLastDigit(Long number) {
return number % 10;
}
}
前一个示例中的聚合器将根据某些标准(在本例中,是除以十后的余数)对数字进行分组,并一直保留该组,直到有效负载提供的数字之和超过某个值。
The aggregator in the preceding example would group numbers by some criterion (in this case, the remainder after dividing by ten) and hold the group until the sum of the numbers provided by the payloads exceeds a certain value.
在有意义的情况下,可以将释放策略方法、关联策略方法和聚合器方法合并到一个 bean 中。(实际上,它们的所有内容或任何两个内容都可以合并。) |
Wherever it makes sense to do so, the release strategy method, the correlation strategy method, and the aggregator method can be combined in a single bean. (Actually, all of them or any two of them can be combined.) |
Aggregators and Spring Expression Language (SpEL)
自 Spring Integration 2.0 起,您可以使用 SpEL 处理各种策略(相关性、释放和聚合),如果此类释放策略背后的逻辑相对简单,我们建议使用它。假设您有一个专为接收一系列对象而设计的传统组件。我们知道,默认释放策略将 List
中的所有聚合消息组装在一起。现在我们遇到了两个问题。首先,我们需要从列表中提取各个消息。其次,我们需要提取每个消息的有效载荷并将对象数组组装起来。以下示例解决了这两个问题:
Since Spring Integration 2.0, you can handle the various strategies (correlation, release, and aggregation) with SpEL, which we recommend if the logic behind such a release strategy is relatively simple.
Suppose you have a legacy component that was designed to receive an array of objects.
We know that the default release strategy assembles all aggregated messages in the List
.
Now we have two problems.
First, we need to extract individual messages from the list.
Second, we need to extract the payload of each message and assemble the array of objects.
The following example solves both problems:
public String[] processRelease(List<Message<String>> messages){
List<String> stringList = new ArrayList<String>();
for (Message<String> message : messages) {
stringList.add(message.getPayload());
}
return stringList.toArray(new String[]{});
}
但是,使用 SpEL,实际上可以通过一行表达式相对轻松地处理这样的需求,从而免去您编写自定义类并将其配置为 Bean 的麻烦。以下示例演示了如何执行此操作:
However, with SpEL, such a requirement could actually be handled relatively easily with a one-line expression, thus sparing you from writing a custom class and configuring it as a bean. The following example shows how to do so:
<int:aggregator input-channel="aggChannel"
output-channel="replyChannel"
expression="#this.![payload].toArray()"/>
在前面的配置中,我们用一个 collection projection 表达式从列表中所有消息的有效载荷中组装一个新的集合,然后将其转换为一个数组,从而实现与之前 Java 代码相同的结果。
In the preceding configuration, we use a collection projection expression to assemble a new collection from the payloads of all the messages in the list and then transform it to an array, thus achieving the same result as the earlier Java code.
在处理自定义释放和相关性策略时,您可以应用相同基于表达式的处理方法。
You can apply the same expression-based approach when dealing with custom release and correlation strategies.
除了在 correlation-strategy
属性中定义一个用于自定义 CorrelationStrategy
的 bean,还可以按照以下示例中的所示,使用 SpEL 表达式实现简单的关联逻辑,并在 correlation-strategy-expression
中对其进行配置:
Instead of defining a bean for a custom CorrelationStrategy
in the correlation-strategy
attribute, you can implement your simple correlation logic as a SpEL expression and configure it in the correlation-strategy-expression
attribute, as the following example shows:
correlation-strategy-expression="payload.person.id"
在前一个示例中,我们假设负载有一个具有 id
的 person
属性,它将用于关联消息。
In the preceding example, we assume that the payload has a person
attribute with an id
, which is going to be used to correlate messages.
同样,对于 ReleaseStrategy
,您可以采用作为 SpEL 表达式的形式实现释放逻辑,并在 release-strategy-expression
属性中对其进行配置。用于评估上下文是 MessageGroup
本身。可以在表达式中使用组的 message
属性引用消息的 List
。
Likewise, for the ReleaseStrategy
, you can implement your release logic as a SpEL expression and configure it in the release-strategy-expression
attribute.
The root object for evaluation context is the MessageGroup
itself.
The List
of messages can be referenced by using the message
property of the group within the expression.
在 5.0 版本之前的版本中,根对象是 |
In releases prior to version 5.0, the root object was the collection of |
release-strategy-expression="!messages.?[payload==5].empty"
在前一个示例中,SpEL 评估上下文是 MessageGroup
本身,而且您在陈述,在此组中只要有一个消息的负载为 5
,就应该释放此组。
In the preceding example, the root object of the SpEL evaluation context is the MessageGroup
itself, and you are stating that, as soon as there is a message with payload of 5
in this group, the group should be released.
Aggregator and Group Timeout
从 4.0 版本开始,已经引入了两个新的互斥属性: group-timeout
和 group-timeout-expression
。请参阅 Configuring an Aggregator with XML。在某些情况下,如果当前消息到达时 ReleaseStrategy
已经释放,则可能需要在一段时间之后发出聚合器结果(或丢弃组)。为此, groupTimeout
选项允许强制调度 MessageGroup
完成,如下例所示:
Starting with version 4.0, two new mutually exclusive attributes have been introduced: group-timeout
and group-timeout-expression
.
See Configuring an Aggregator with XML.
In some cases, you may need to emit the aggregator result (or discard the group) after a timeout if the ReleaseStrategy
does not release when the current message arrives.
For this purpose, the groupTimeout
option lets scheduling the MessageGroup
be forced to complete, as the following example shows:
<aggregator input-channel="input" output-channel="output"
send-partial-result-on-expiry="true"
group-timeout-expression="size() ge 2 ? 10000 : -1"
release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>
使用这个示例,如果聚合器按 release-strategy-expression
中定义的顺序接收最后一条消息,则可以正常释放。如果未收到该特定消息,groupTimeout
会强制该组在十秒钟后完成,前提是该组至少包含两条消息。
With this example, the normal release is possible if the aggregator receives the last message in sequence as defined by the release-strategy-expression
.
If that specific message does not arrive, the groupTimeout
forces the group to complete after ten seconds, as long as the group contains at least two Messages.
强制该组完成会导致 ReleaseStrategy
和 send-partial-result-on-expiry
的结果有所不同。首先,再次咨询释放策略,以了解是否要进行正常释放。虽然该组没有更改,但 ReleaseStrategy
可以决定此时释放该组。如果释放策略仍然不释放该组,则其已过期。如果 send-partial-result-on-expiry
为 true
,则 (部分) MessageGroup
中的现有消息将作为正常的聚合器回复消息发送到 output-channel
。否则,将被丢弃。
The results of forcing the group to complete depends on the ReleaseStrategy
and the send-partial-result-on-expiry
.
First, the release strategy is again consulted to see if a normal release is to be made.
While the group has not changed, the ReleaseStrategy
can decide to release the group at this time.
If the release strategy still does not release the group, it is expired.
If send-partial-result-on-expiry
is true
, existing messages in the (partial) MessageGroup
are released as a normal aggregator reply message to the output-channel
.
Otherwise, it is discarded.
groupTimeout
行为与 MessageGroupStoreReaper
之间存在差异(请参阅 Configuring an Aggregator with XML)。清除器定期启动 MessageGroupStore
中所有 MessageGroup
的强制完成。如果在 groupTimeout
期间未收到新消息,则 groupTimeout
对每个 MessageGroup
单独进行。此外,清除器可用于删除空组(保留空组以丢弃延迟的消息,如果 expire-groups-upon-completion
为假)。
There is a difference between groupTimeout
behavior and MessageGroupStoreReaper
(see Configuring an Aggregator with XML).
The reaper initiates forced completion for all MessageGroup
s in the MessageGroupStore
periodically.
The groupTimeout
does it for each MessageGroup
individually if a new message does not arrive during the groupTimeout
.
Also, the reaper can be used to remove empty groups (empty groups are retained in order to discard late messages if expire-groups-upon-completion
is false).
从 5.5 版本开始,可以将 groupTimeoutExpression
评估为 java.util.Date
实例。这在某些情况下非常有用,例如根据组的创建时间(MessageGroup.getTimestamp()
)确定计划的任务时刻,而不是根据当前消息的到达时刻(在将 groupTimeoutExpression
评估为 long
时进行计算):
Starting with version 5.5, the groupTimeoutExpression
can be evaluated to a java.util.Date
instance.
This can be useful in cases like determining a scheduled task moment based on the group creation time (MessageGroup.getTimestamp()
) instead of a current message arrival as it is calculated when groupTimeoutExpression
is evaluated to long
:
group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"
Configuring an Aggregator with Annotations
以下示例展示了一个使用注解配置的聚合器:
The following example shows an aggregator configured with annotations:
public class Waiter {
...
@Aggregator 1
public Delivery aggregatingMethod(List<OrderItem> items) {
...
}
@ReleaseStrategy 2
public boolean releaseChecker(List<Message<?>> messages) {
...
}
@CorrelationStrategy 3
public String correlateBy(OrderItem item) {
...
}
}
1 | An annotation indicating that this method should be used as an aggregator. It must be specified if this class is used as an aggregator. |
2 | An annotation indicating that this method is used as the release strategy of an aggregator.
If not present on any method, the aggregator uses the SimpleSequenceSizeReleaseStrategy . |
3 | An annotation indicating that this method should be used as the correlation strategy of an aggregator.
If no correlation strategy is indicated, the aggregator uses the HeaderAttributeCorrelationStrategy based on CORRELATION_ID . |
XML 元素所提供的配置选项也可用于 @Aggregator
注解。
All of the configuration options provided by the XML element are also available for the @Aggregator
annotation.
聚合器既可以直接从 XML 引用,也可以在类中定义了 @MessageEndpoint
时通过类路径扫描自动检测。
The aggregator can be either referenced explicitly from XML or, if the @MessageEndpoint
is defined on the class, detected automatically through classpath scanning.
注解配置(包括 @Aggregator
等)仅适用于简单的用例,在这些用例中,大多数默认选项已足够。如果您使用注解配置来控制更多选项,请考虑为 AggregatingMessageHandler
使用 @Bean
定义,并用 @ServiceActivator
标记其 @Bean
方法,如下面所示:
Annotation configuration (@Aggregator
and others) for the Aggregator component covers only simple use cases, where most default options are sufficient.
If you need more control over those options when using annotation configuration, consider using a @Bean
definition for the AggregatingMessageHandler
and mark its @Bean
method with @ServiceActivator
, as the following example shows:
@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
AggregatingMessageHandler aggregator =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
jdbcMessageGroupStore);
aggregator.setOutputChannel(resultsChannel());
aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
aggregator.setTaskScheduler(this.taskScheduler);
return aggregator;
}
请参阅 Programming Model 和 Annotations on @Bean
Methods 了解更多信息。
See Programming Model and Annotations on @Bean
Methods for more information.
从 4.2 版本开始, |
Starting with version 4.2, the |
Managing State in an Aggregator: MessageGroupStore
聚合器(以及 Spring Integration 中的一些其他模式)是一种有状态模式,它要求根据一段时间内到达的一组具有相同关联键的消息做出决策。有状态模式中的接口设计(如 ReleaseStrategy
)遵循如下原则:“组件(无论是由框架定义还是由用户定义的)都必须保持无状态”。所有的状态都由 MessageGroup
携带,并且它的管理委托给 MessageGroupStore
。MessageGroupStore
的接口定义如下:
Aggregator (and some other patterns in Spring Integration) is a stateful pattern that requires decisions to be made based on a group of messages that have arrived over a period of time, all with the same correlation key.
The design of the interfaces in the stateful patterns (such as ReleaseStrategy
) is driven by the principle that the components (whether defined by the framework or by a user) should be able to remain stateless.
All state is carried by the MessageGroup
and its management is delegated to the MessageGroupStore
.
The MessageGroupStore
interface is defined as follows:
public interface MessageGroupStore {
int getMessageCountForAllMessageGroups();
int getMarkedMessageCountForAllMessageGroups();
int getMessageGroupCount();
MessageGroup getMessageGroup(Object groupId);
MessageGroup addMessageToGroup(Object groupId, Message<?> message);
MessageGroup markMessageGroup(MessageGroup group);
MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);
MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);
void removeMessageGroup(Object groupId);
void registerMessageGroupExpiryCallback(MessageGroupCallback callback);
int expireMessageGroups(long timeout);
}
有关更多信息,请参阅 Javadoc。
For more information, see the Javadoc.
MessageGroupStore
会在等待触发释放策略时累积 MessageGroups
中的状态信息,并且该事件可能永远不会发生。因此,为了防止过时消息滞留,并让易失性存储区在应用程序关闭时提供用于清理的挂钩,MessageGroupStore
让您注册回调,以便在它们过期时应用于其 MessageGroups
。该接口非常直接,如下表所示:
The MessageGroupStore
accumulates state information in MessageGroups
while waiting for a release strategy to be triggered, and that event might not ever happen.
So, to prevent stale messages from lingering, and for volatile stores to provide a hook for cleaning up when the application shuts down, the MessageGroupStore
lets you register callbacks to apply to its MessageGroups
when they expire.
The interface is very straightforward, as the following listing shows:
public interface MessageGroupCallback {
void execute(MessageGroupStore messageGroupStore, MessageGroup group);
}
回调可以对存储区和消息组有直接访问权限,以便管理持久状态(例如,从存储区中完全删除组)。
The callback has direct access to the store and the message group so that it can manage the persistent state (for example, by entirely removing the group from the store).
MessageGroupStore
维护一份这些回调的列表,按需将其应用于所有时间戳比作为参数提供的某个时间更早的消息(参见前面描述的 registerMessageGroupExpiryCallback(..)
和 expireMessageGroups(..)
方法)。
The MessageGroupStore
maintains a list of these callbacks, which it applies, on demand, to all messages whose timestamps are earlier than a time supplied as a parameter (see the registerMessageGroupExpiryCallback(..)
and expireMessageGroups(..)
methods, described earlier).
在打算依赖 expireMessageGroups
功能时,请务必不要在不同的聚合器组件中使用相同的 MessageGroupStore
实例。每个 AbstractCorrelatingMessageHandler
都会根据 forceComplete()
回调注册自己的 MessageGroupCallback
。这样,每个待过期的组都可能被错误的聚合器完成或丢弃。从 5.0.10 版本开始,AbstractCorrelatingMessageHandler
中的 UniqueExpiryCallback
用于 MessageGroupStore
中的注册回调。MessageGroupStore
反过来会检查是否存在该类的实例,并且如果在回调集中已经存在该类的实例,它会记录一个包含适当消息的错误。通过这种方式,框架不允许在不同的聚合器/重新排序器中使用 MessageGroupStore
实例,以避免由此对并非由特定关联处理程序创建的组进行过期的副作用。
It is important not to use the same MessageGroupStore
instance in different aggregator components, when you intend to rely on the expireMessageGroups
functionality.
Every AbstractCorrelatingMessageHandler
registers its own MessageGroupCallback
based on the forceComplete()
callback.
This way each group for expiration may be completed or discarded by the wrong aggregator.
Starting with version 5.0.10, a UniqueExpiryCallback
is used from the AbstractCorrelatingMessageHandler
for the registration callback in the MessageGroupStore
.
The MessageGroupStore
, in turn, checks for presence an instance of this class and logs an error with an appropriate message if one is already present in the callbacks set.
This way the Framework disallows usage of the MessageGroupStore
instance in different aggregators/resequencers to avoid the mentioned side effect of expiration the groups not created by the particular correlation handler.
可以用带有超时值的 expireMessageGroups
方法进行调用。任何消息的过期时间为当前时间减去此值,并且已应用回调。因此,定义消息组“expiry
”的含义的是存储的用户。
You can call the expireMessageGroups
method with a timeout value.
Any message older than the current time minus this value is expired and has the callbacks applied.
Thus, it is the user of the store that defines what is meant by message group “expiry”.
作为对用户提供方便,Spring Integration 以 MessageGroupStoreReaper
的形式为消息过期提供了一个包装器,如下面的示例所示:
As a convenience for users, Spring Integration provides a wrapper for the message expiry in the form of a MessageGroupStoreReaper
, as the following example shows:
<bean id="reaper" class="org...MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="30000"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>
收割器是一个 Runnable
。在前面的示例中,消息组存储的到期方法每隔 10 秒调用一次。超时本身为 30 秒。
The reaper is a Runnable
.
In the preceding example, the message group store’s expire method is called every ten seconds.
The timeout itself is 30 seconds.
重要的是要明白, |
It is important to understand that the 'timeout' property of |
除了收割器,还会在应用程序通过 AbstractCorrelatingMessageHandler
中的生命周期回调关闭时调用过期回调。
In addition to the reaper, the expiry callbacks are invoked when the application shuts down through a lifecycle callback in the AbstractCorrelatingMessageHandler
.
AbstractCorrelatingMessageHandler
会注册自己的过期回调,而这与聚合器的 XML 配置中的布尔标志 send-partial-result-on-expiry
存在关联。如果将标志设置为 true
,那么在调用到期回调时,可以将尚未发布的组中任何未标记的消息发送到输出通道。
The AbstractCorrelatingMessageHandler
registers its own expiry callback, and this is the link with the boolean flag send-partial-result-on-expiry
in the XML configuration of the aggregator.
If the flag is set to true
, then, when the expiry callback is invoked, any unmarked messages in groups that are not yet released can be sent on to the output channel.
由于 MessageGroupStoreReaper
是从计划任务中调用的,并且可能导致为下游集成流生成消息(取决于 sendPartialResultOnExpiry
选项),因此建议使用自定义 TaskScheduler
提供 MessagePublishingErrorHandler
通过 errorChannel
处理异常,因为这是常规聚合器释放功能所期望的。相同的逻辑也适用于组超时功能,该功能也依赖 TaskScheduler
。有关更多信息,请参阅 Error Handling。
Since the MessageGroupStoreReaper
is called from a scheduled task, and may result in the production of a message (depending on the sendPartialResultOnExpiry
option) to a downstream integration flow, it is recommended to supply a custom TaskScheduler
with a MessagePublishingErrorHandler
to handler exceptions via an errorChannel
, as it might be expected by the regular aggregator release functionality.
The same logic applies for group timeout functionality which also relies on a TaskScheduler
.
See Error Handling for more information.
在为不同的关联端点使用共享 MessageStore
时,你必须配置适当的 CorrelationStrategy
来确保组 ID 的唯一性。否则,当一个关联端点发布或使其他关联的端点失效时,可能会发生意外的行为。带有相同关联键的消息存储在同一个消息组中。
When a shared MessageStore
is used for different correlation endpoints, you must configure a proper CorrelationStrategy
to ensure uniqueness for group IDs.
Otherwise, unexpected behavior may happen when one correlation endpoint releases or expire messages from others.
Messages with the same correlation key are stored in the same message group.
一些 MessageStore
实现允许通过对数据进行分区来使用相同物理资源。例如,JdbcMessageStore
具有 region
属性,而 MongoDbMessageStore
具有 collectionName
属性。
Some MessageStore
implementations allow using the same physical resources, by partitioning the data.
For example, the JdbcMessageStore
has a region
property, and the MongoDbMessageStore
has a collectionName
property.
有关 MessageStore
接口及其实现的更多信息,请参见 Message Store。
For more information about the MessageStore
interface and its implementations, see Message Store.
Flux Aggregator
在版本 5.2 中,引入了 FluxAggregatorMessageHandler
组件。它基于 Project Reactor 的 Flux.groupBy()
和 Flux.window()
操作符。传入的消息被发送到此组件构造函数中 Flux.create()
引发的 FluxSink
中。如果没有提供 outputChannel
,或者它不是 ReactiveStreamsSubscribableChannel
的实例,则对主 Flux
的订阅是从 Lifecycle.start()
实现完成的。否则,它将被推迟到 ReactiveStreamsSubscribableChannel
实现完成的订阅。消息由 Flux.groupBy()
使用关联策略进行分组,以获得组键。默认情况下,会查询消息的 IntegrationMessageHeaderAccessor.CORRELATION_ID
标头。
In version 5.2, the FluxAggregatorMessageHandler
component has been introduced.
It is based on the Project Reactor Flux.groupBy()
and Flux.window()
operators.
The incoming messages are emitted into the FluxSink
initiated by the Flux.create()
in the constructor of this component.
If the outputChannel
is not provided or it is not an instance of ReactiveStreamsSubscribableChannel
, the subscription to the main Flux
is done from the Lifecycle.start()
implementation.
Otherwise, it is postponed to the subscription done by the ReactiveStreamsSubscribableChannel
implementation.
The messages are grouped by the Flux.groupBy()
using a CorrelationStrategy
for the group key.
By default, the IntegrationMessageHeaderAccessor.CORRELATION_ID
header of the message is consulted.
默认情况下,每个关闭的窗口都会作为消息载荷中的 Flux
发布,以用于生产。此消息包含窗口中第一条消息中的所有标头。输出消息载荷中的此 Flux
必须在后端订阅和处理。FluxAggregatorMessageHandler
的 setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>)
配置选项可以自定义(或取代)此类逻辑。例如,如果我们希望在最终消息中获得有效负载的 List
,我们可以这样配置 Flux.collectList()
:
By default, every closed window is released as a Flux
in payload of a message to produce.
This message contains all the headers from the first message in the window.
This Flux
in the output message payload must be subscribed and processed downstream.
Such a logic can be customized (or superseded) by the setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>)
configuration option of the FluxAggregatorMessageHandler
.
For example, if we would like to have a List
of payloads in the final message, we can configure a Flux.collectList()
like this:
fluxAggregatorMessageHandler.setCombineFunction(
(messageFlux) ->
messageFlux
.map(Message::getPayload)
.collectList()
.map(GenericMessage::new));
在 FluxAggregatorMessageHandler
中有几个选项可用于选择合适的窗口策略:
There are several options in the FluxAggregatorMessageHandler
to select an appropriate window strategy:
-
setBoundaryTrigger(Predicate<Message<?>>)
- is propagated to theFlux.windowUntil()
operator. See its JavaDocs for more information. Has a precedence over all other window options. -
setWindowSize(int)
andsetWindowSizeFunction(Function<Message<?>, Integer>)
- is propagated to theFlux.window(int)
orwindowTimeout(int, Duration)
. By default, a window size is calculated from the first message in group and itsIntegrationMessageHeaderAccessor.SEQUENCE_SIZE
header. -
setWindowTimespan(Duration)
- is propagated to theFlux.window(Duration)
orwindowTimeout(int, Duration)
depending on the window size configuration. -
setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)
- a function to apply a transformation into the grouped fluxes for any custom window operation not covered by the exposed options.
由于此组件是 MessageHandler
实现,所以可以将它与 @ServiceActivator
消息传递注释一起简单地用作 @Bean
定义。借助 Java DSL,可以从 .handle()
EIP 方法中使用它。下方示例演示了如何在运行时注册 IntegrationFlow
,以及如何使用 FluxAggregatorMessageHandler
与上游的拆分器相关联:
Since this component is a MessageHandler
implementation it can simply be used as a @Bean
definition together with a @ServiceActivator
messaging annotation.
With Java DSL it can be used from the .handle()
EIP-method.
The sample below demonstrates how we can register an IntegrationFlow
at runtime and how a FluxAggregatorMessageHandler
can be correlated with a splitter upstream:
IntegrationFlow fluxFlow =
(flow) -> flow
.split()
.channel(MessageChannels.flux())
.handle(new FluxAggregatorMessageHandler());
IntegrationFlowContext.IntegrationFlowRegistration registration =
this.integrationFlowContext.registration(fluxFlow)
.register();
Flux<Message<?>> window =
registration.getMessagingTemplate()
.convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);
Condition on the Message Group
从 5.5 版本开始,AbstractCorrelatingMessageHandler
(包括其 Java 和 XML DSL)公开了 BiFunction<Message<?>, String, String>
实现的 groupConditionSupplier
选项。此功能用于添加到组的每条消息中,并且一个结果条件语句存储在组中,以便在将来考虑。 ReleaseStrategy
可以在不遍历组中的所有消息的情况下咨询此条件。请参阅 GroupConditionProvider
JavaDocs 和 Message Group Condition 了解更多信息。
Starting with version 5.5, an AbstractCorrelatingMessageHandler
(including its Java & XML DSLs) exposes a groupConditionSupplier
option of the BiFunction<Message<?>, String, String>
implementation.
This function is used on each message added to the group and a result condition sentence is stored into the group for future consideration.
The ReleaseStrategy
may consult this condition instead of iterating over all the messages in the group.
See GroupConditionProvider
JavaDocs and Message Group Condition for more information.
另请参阅 File Aggregator。
See also File Aggregator.