Aggregator
Aggregator 的核心逻辑是关联策略和释放策略:关联策略确定如何将消息分组在一起,而释放策略确定何时释放组中的所有消息进行聚合。
注意,Aggregator 是一种有状态的组件,它在 MessageGroupStore 中维护消息组的状态。可以使用 MessageGroupStoreReaper 定期过期组或在应用程序关闭时手动触发过期。
Aggregator 与 Splitter 的架构基本相反,它是一种能够接收多条消息,并将它们组合成一条单独消息的消息处理器。事实上,Aggregator 通常是包含 Splitter 的管道中的下游使用者。
从技术上讲,Aggregator 比 Splitter 更复杂,因为它是有状态的。它必须保存要汇总的消息,并确定何时准备好对完整的消息组进行汇总。为此,它需要一个 MessageStore
。
Functionality
Aggregator 通过关联和存储相关消息组,将它们组合在一起,直到该组被认为是完整的。此时,Aggregator 通过处理整个组来创建一条单独的消息,并将汇总的消息作为输出发送。
实现 Aggregator 需要提供执行聚合的逻辑(即,从多个消息创建单个消息)。相关概念包括关联和释放。
关联确定了消息如何分组以进行聚合。在 Spring Integration 中,关联默认基于 IntegrationMessageHeaderAccessor.CORRELATION_ID
消息头进行。具有相同 IntegrationMessageHeaderAccessor.CORRELATION_ID
的消息被分组在一起。但是,您可以自定义关联策略,以允许使用其他方式指定应如何将消息分组在一起。为此,您可以实现一个 CorrelationStrategy
(将在本章后面介绍)。
要确定一组消息准备好处理的点,将咨询 ReleaseStrategy
。聚合器的默认释放策略在所有包含在序列中的消息都存在时释放一个组,这基于 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
头。您可以通过提供对自定义 ReleaseStrategy
实现的引用来覆盖此默认策略。
Programming Model
聚合 API 由许多类组成:
-
接口
MessageGroupProcessor`及其子类:`MethodInvokingAggregatingMessageGroupProcessor`和 `ExpressionEvaluatingMessageGroupProcessor
-
ReleaseStrategy`接口及其默认实现:`SimpleSequenceSizeReleaseStrategy
-
CorrelationStrategy`接口及其默认实现:`HeaderAttributeCorrelationStrategy
AggregatingMessageHandler
AggregatingMessageHandler
(AbstractCorrelatingMessageHandler
的子类)是一个 MessageHandler
实现,封装了一个聚合器(和其他关联用例)的常用功能,如下所示:
-
将消息关联到要聚合的组中
-
在 `MessageStore`中维护这些消息,直至能够释放该组
-
决定何时释放该组
-
将已释放的组聚合到单个消息中
-
识别已过期的组并对此做出响应
决定如何将消息分组在一起的职责被委派给 CorrelationStrategy
实例。决定是否可以释放消息组的职责被委派给 ReleaseStrategy
实例。
以下清单简要介绍了基础的 AbstractAggregatingMessageGroupProcessor
(实现 aggregatePayloads
方法的职责留给开发人员):
public abstract class AbstractAggregatingMessageGroupProcessor
implements MessageGroupProcessor {
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
// default implementation exists
}
protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);
}
参见 DefaultAggregatingMessageGroupProcessor
、ExpressionEvaluatingMessageGroupProcessor
和 MethodInvokingMessageGroupProcessor
作为 AbstractAggregatingMessageGroupProcessor
的开箱即用实现。
从 5.2 版开始,Function<MessageGroup, Map<String, Object>>
策略可用于 AbstractAggregatingMessageGroupProcessor
,以合并和计算输出消息的头。DefaultAggregateHeadersFunction
实现可用,其逻辑返回组中没有冲突的所有头;组中一条或多条消息上不存在的头不被视为冲突。有冲突的头将被省略。此功能与新引入的 DelegatingMessageGroupProcessor
一起,可用于任何任意(非 AbstractAggregatingMessageGroupProcessor
)MessageGroupProcessor
实现。从本质上说,框架将提供的函数注入到 AbstractAggregatingMessageGroupProcessor
实例中,并将所有其他实现包装到 DelegatingMessageGroupProcessor
中。AbstractAggregatingMessageGroupProcessor
和 DelegatingMessageGroupProcessor
之间的逻辑差异在于,后者在调用委托策略之前不会提前计算标头,并且如果委托返回 Message
或 AbstractIntegrationMessageBuilder
,则不会调用函数。在这种情况下,该框架假定目标实现已负责生成一组适当的头并填充到返回结果中。Function<MessageGroup, Map<String, Object>>
策略可用作 XML 配置的 headers-function
引用属性,可作为 Java DSL 的 AggregatorSpec.headersFunction()
选项,也可作为普通 Java 配置的 AggregatorFactoryBean.setHeadersFunction()
。
CorrelationStrategy
由 AbstractCorrelatingMessageHandler
所有,并且具有基于 IntegrationMessageHeaderAccessor.CORRELATION_ID
消息头的默认值,如下例所示:
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
...
this.correlationStrategy = correlationStrategy == null ?
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
...
}
至于消息组的实际处理,默认实现是 DefaultAggregatingMessageGroupProcessor
。它创建一个单个 Message
,其有效负载是针对给定组接收到的有效负载的 List
。这非常适用于带有分隔器、发布-订阅通道或收件人列表路由器的简单散集实现。
在以下类型的场景中使用发布-订阅通道或收件人列表路由器时,请务必启用 |
在实现应用程序的特定聚合器策略时,您可以扩展 AbstractAggregatingMessageGroupProcessor
并实现 aggregatePayloads
方法。但是,有更好的解决方案(与 API 耦合性较低),用来实现聚合逻辑,可以通过 XML 或注释进行配置。
一般而言,任何 POJO 都可以实现聚合算法,如果它提供一个将单个 java.util.List
作为参数(同样支持参数化列表)的方法。该方法以以下方式调用以聚合消息:
-
如果参数是一个
java.util.Collection<T>
且参数类型 T 可以赋值为Message
,那么发送累积的所有消息列表到聚合器。 -
如果参数是非参数化的
java.util.Collection
或参数类型不可赋值给Message
,那么该方法接收累积消息的负载。 -
如果返回值类型无法赋值给
Message
,它将被视为由框架自动创建的Message
的负载。
为了代码简单性和促进最佳实践(例如松耦合、可测试性等),实现聚合逻辑的首选方式是通过 POJO 并使用 XML 或注释支持在应用程序中配置它。 |
从 5.3 版开始,AbstractCorrelatingMessageHandler
在处理消息组后对消息头执行 MessageBuilder.popSequenceDetails()
修改,以实现具有多个嵌套级别的适当分隔器-聚合器场景。仅当消息组释放结果不是消息集合时才执行此操作。在这种情况下,目标 MessageGroupProcessor
负责在生成这些消息时调用 MessageBuilder.popSequenceDetails()
。
如果 MessageGroupProcessor
返回一个 Message
,则仅当 sequenceDetails
与组中的第一条消息匹配时,才会对输出消息执行 MessageBuilder.popSequenceDetails()
。(以前仅当 MessageGroupProcessor
返回普通有效负载或 AbstractIntegrationMessageBuilder
时才执行此操作。)
此功能可以通过新的 popSequence
boolean
属性进行控制,因此当相关详细信息尚未填充标准拆分器时,可以在某些方案中禁用 MessageBuilder.popSequenceDetails()
。本质上,此属性会撤消最近的上游 applySequence = true
在 AbstractMessageSplitter
中所做的操作。有关更多信息,请参见 Splitter。
SimpleMessageGroup.getMessages()
方法返回一个 unmodifiableCollection
。因此,如果一个聚合 POJO 方法有一个 Collection<Message>
参数,则传入的参数正是那个 Collection
实例,并且当您为聚合器使用 SimpleMessageStore
时,该原始 Collection<Message>
在释放群组后被清除。因此,如果它从聚合器中传递出去,则 POJO 中的 Collection<Message>
变量也会被清除。如果您希望照常释放该集合以进行进一步处理,则必须构建一个新的 Collection
(例如 new ArrayList<Message>(messages)
)。从版本 4.3 开始,框架不再将消息复制到新集合中,以避免不需要的额外对象创建。
在 4.2 版之前,无法使用 XML 配置提供 MessageGroupProcessor
。仅可以使用 POJO 方法进行聚合。现在,如果框架检测到引用的(或内部)bean 实现了 MessageProcessor
,它将用作聚合器的输出处理器。
如果您希望从自定义 MessageGroupProcessor
中释放一组对象作为消息的有效负载,您的类应扩展 AbstractAggregatingMessageGroupProcessor
并实现 aggregatePayloads()
。
此外,从 4.2 版开始,提供了一个 SimpleMessageGroupProcessor
。它返回组中的消息集合,如前所述,这会导致释放的消息被单独发送。
此操作可以让聚合器充当消息障碍,其中将保存到达的消息,直至释放策略触发并组以一系列单独消息的形式释放为止。
从版本 6.0 开始,上述分裂行为仅当群组处理器是 SimpleMessageGroupProcessor
时才有效。否则,只有返回 Collection<Message>
的任何其他 MessageGroupProcessor
实现才会发出单个应答消息,其有效负载为整个消息集合。此逻辑由聚合器的典型目的决定,即按某个键收集请求消息并生成单个分组消息。
ReleaseStrategy
ReleaseStrategy
界面定义如下:
public interface ReleaseStrategy {
boolean canRelease(MessageGroup group);
}
一般来说,如果它提供接受单个 java.util.List
作为参数(还支持参数化列表)并返回布尔值的方法,则任何 POJO 都可以实现完成判决逻辑。在收到每条新消息后,将调用此方法,以如下方式确定群组是否完成:
-
如果参数是一个
java.util.List<T>
且参数类型T
可以赋值为Message
,那么组内累积的所有消息列表将被发送到方法。 -
如果参数是一个非参数化的
java.util.List
或参数类型不可赋值给Message
,那么该方法接收累积消息的负载。 -
如果消息组已准备好聚合,则该方法必须返回
true
,否则返回 false。
以下示例演示了如何对类型为 Message
的 List
使用 @ReleaseStrategy
注解:
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<Message<?>>) {...}
}
以下示例演示了如何对类型为 String
的 List
使用 @ReleaseStrategy
注解:
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<String>) {...}
}
根据前两个示例中的签名,基于 POJO 的发布策略将传递尚未发布的消息的 Collection
(如果你需要访问整个 Message
)或有效负载对象的 Collection
(如果类型参数不是 Message
)。这满足了大多数用例。但是,如果出于某种原因,你需要访问完整的 MessageGroup
,则应提供 ReleaseStrategy
界面的实现。
在处理可能较大的群组时,你应该了解如何调用这些方法,因为在群组释放之前,可能会多次调用释放策略。对 ReleaseStrategy
来说,最有效的一种方法是实现,因为聚合器可以直接调用它。其次最有效的方法是使用 Collection<Message<?>>
参数类型的 POJO 方法。最不有效的一种是使用 Collection<Something>
类型的 POJO 方法。当每次调用释放策略时,框架都必须将群组中消息的有效负载复制到新集合(并可能尝试将有效负载转换为 Something
)。使用 Collection<?>
可以避免转换,但仍然需要创建新 Collection
。
出于这些原因,对于大群组,我们建议你实现 ReleaseStrategy
。
在群组因聚合释放时,该群组的所有未发布消息都得到处理并从该群组中删除。如果群组也已完成(即如果已收到来自某个序列的所有消息,或者如果没有定义任何序列),则该群组将标记为已完成。此群组的任何新消息都发送到废弃通道(如果定义)。将 expire-groups-upon-completion
设置为 true
(默认值为 false
)可删除整个群组,且任何新消息(与其移除的群组具有相同的关联 ID)将形成新群组。你可以使用 MessageGroupStoreReaper
和将 send-partial-result-on-expiry
设置为 true
来释放部分序列。
为了促进丢弃迟到的消息,聚合器必须在释放组后维护有关该组的状态。这最终可能导致内存不足的情况。为避免这种情况,您应考虑配置 MessageGroupStoreReaper
以删除组元数据。到一定程度后,到不再期望迟到的消息到达,则应设置到期参数以使组到期。有关配置清理器的信息,请参阅 Managing State in an Aggregator: MessageGroupStore
。
Spring Integration 为 ReleaseStrategy
提供实现:SimpleSequenceSizeReleaseStrategy
。此实现查询每个到达消息的 SEQUENCE_NUMBER
和 SEQUENCE_SIZE
头部,以在消息群组完成并准备好聚合时进行判决。如前所述,它也是默认策略。
在版本 5.0 之前,默认释放策略是 |
如果你正在聚合大群组,则不需要释放部分群组,且不需要检测/拒绝重复序列,可以考虑改用 SimpleSequenceSizeReleaseStrategy
- 它对这些用例来说更有效,且是 5.0 版 中的默认策略,并且未指定部分群组释放。
Aggregating Large Groups
4.3 版本将 SimpleMessageGroup
中消息的默认 Collection
更改为 HashSet
(以前是 BlockingQueue
)。从大组中删除单个消息时,这很昂贵(需要 O(n) 线性扫描)。虽然哈希集通常删除得更快,但对于大消息来说,哈希集可能很昂贵,因为在插入和删除时都必须计算哈希。如果你要散列昂贵的消息,请考虑使用其他一些集合类型。正如 Using MessageGroupFactory
中所讨论的,提供了一个 SimpleMessageGroupFactory
,以便你可以选择最适合你需要的 Collection
。你还可以提供你自己的工厂实现来创建其他一些 Collection<Message<?>>
。
以下示例演示了如何使用以前的实现和 SimpleSequenceSizeReleaseStrategy
配置聚合器:
<int:aggregator input-channel="aggregate"
output-channel="out" message-store="store" release-strategy="releaser" />
<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
<property name="messageGroupFactory">
<bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
<constructor-arg value="BLOCKING_QUEUE"/>
</bean>
</property>
</bean>
<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
如果过滤器端点参与聚合器上游的流,则序列大小释放策略(固定或基于 |
Correlation Strategy
CorrelationStrategy
界面定义如下:
public interface CorrelationStrategy {
Object getCorrelationKey(Message<?> message);
}
该方法返回一个表示关联消息与消息群组所用的关联键的 Object
。该键必须满足 EQUALS()
和 HASHCODE()
实现对 MAP
中键使用的标准。
一般来说,任何 POJO 都能实现关联逻辑,并且消息映射到方法参数(或参数)的规则与 ServiceActivator
相同(包括对 @Header
注解的支持)。该方法必须返回值,并且该值不能为 NULL
。
Spring Integration 为 CorrelationStrategy
提供实现:HeaderAttributeCorrelationStrategy
。此实现返回消息头部之一的值(其名称由构造函数参数指定)作为关联键。默认情况下,关联策略是 HeaderAttributeCorrelationStrategy
,它返回 CORRELATION_ID
头部属性的值。如果你有自定义的头部名称要用于关联,则可以在 HeaderAttributeCorrelationStrategy
实例上对其进行配置,并为聚合器的关联策略提供该自定义名称。
Lock Registry
群组的变更对于线程来说是安全的。因此,当同时发送具有相同关联 ID 的消息时,只有其中一个消息在聚合器中得到处理,从而使其有效地成为 每个消息群组的单线程。LockRegistry
用于获取已解析关联 ID 的锁。默认使用 DefaultLockRegistry
(内存中)。对于正在使用共享 MessageGroupStore
的服务器之间同步更新,您必须配置一个共享的锁定注册表。
Avoiding Deadlocks
如上所述,当消息群组发生改变(添加或释放消息)时,将保持锁定。
考虑以下流程:
...->aggregator1-> ... ->aggregator2-> ...
如果有多个线程,并且聚合器共享通用锁注册表,则可能出现死锁。这将导致挂起线程,jstack <pid>
可能会显示如下结果:
Found one Java-level deadlock:
=============================
"t2":
waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t1"
"t1":
waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t2"
有几种方法可以避免此问题:
-
确保每个聚合器都有自己的锁注册表(这可能是应用实例之间共享的注册表,但是流中的两个或两个以上的聚合器必须分别具有一个不同的注册表)。
-
使用
ExecutorChannel
或QueueChannel
作为聚合器的输出通道,以便下游流在新的线程上运行。 -
从 5.1.1 版开始,将
releaseLockBeforeSend
聚合器属性设置为true
。
如果由于某种原因,单个聚合器的输出最终被重新路由回同一聚合器,也可能导致此问题。当然,上述第一个解决方案在此情况下不适用。 |
Configuring an Aggregator in Java DSL
有关如何用 Java DSL 配置聚合器,请参见 Aggregators and Resequencers。
Configuring an Aggregator with XML
Spring Integration 通过`<aggregator/>` 元素支持使用 XML 配置聚合器。以下示例显示了聚合器的示例:
<channel id="inputChannel"/>
<int:aggregator id="myAggregator" 1
auto-startup="true" 2
input-channel="inputChannel" 3
output-channel="outputChannel" 4
discard-channel="throwAwayChannel" 5
message-store="persistentMessageStore" 6
order="1" 7
send-partial-result-on-expiry="false" 8
send-timeout="1000" 9
correlation-strategy="correlationStrategyBean" 10
correlation-strategy-method="correlate" 11
correlation-strategy-expression="headers['foo']" 12
ref="aggregatorBean" 13
method="aggregate" 14
release-strategy="releaseStrategyBean" 15
release-strategy-method="release" 16
release-strategy-expression="size() == 5" 17
expire-groups-upon-completion="false" 18
empty-group-min-timeout="60000" 19
lock-registry="lockRegistry" 20
group-timeout="60000" 21
group-timeout-expression="size() ge 2 ? 100 : -1" 22
expire-groups-upon-timeout="true" 23
scheduler="taskScheduler" > 24
<expire-transactional/> 25
<expire-advice-chain/> 26
</aggregator>
<int:channel id="outputChannel"/>
<int:channel id="throwAwayChannel"/>
<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
<constructor-arg ref="dataSource"/>
</bean>
<bean id="aggregatorBean" class="sample.PojoAggregator"/>
<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>
<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
1 | 聚合器的 ID 是可选的。 |
2 | 生命周期属性,用于指示是否在应用程序上下文开始时启动聚合器。可选(默认值为“true”)。 |
3 | 聚合器接收消息的通道。必需的。 |
4 | 聚合器发送聚合结果的通道。可选的(因为传入的消息本身可以在“replyChannel”消息头中指定一个回复通道)。 |
5 | 聚合器发送超时消息的通道(如果 send-partial-result-on-expiry 是 false )。可选的。 |
6 | 对 MessageGroupStore 的引用,用于在消息完整之前根据它们的相关键存储消息组。可选的。默认情况下,它是一个易失性内存存储。有关更多信息,请参见 Message Store。 |
7 | 当有多个句柄订阅同一个 DirectChannel 时,此聚合器的顺序(用于负载平衡目的)。可选的。 |
8 | 表示过期消息应在包含的 MessageGroup 过期后聚合并发送到“output-channel”或“replyChannel”(请参见 MessageGroupStore.expireMessageGroups(long) )。使 MessageGroup 过期的一种方法是配置 MessageGroupStoreReaper 。但是,您还可以通过调用 MessageGroupStore.expireMessageGroups(timeout) 使 MessageGroup 过期。可以通过控制总线操作来实现这一点,或者,如果您有对 MessageGroupStore 实例的引用,则通过调用 expireMessageGroups(timeout) 来实现。否则,此属性本身没有任何作用。它仅用作指标,指示是否丢弃即将过期的 MessageGroup 中的所有消息或将其发送到输出或回复通道。可选的(默认值为 false )。注意:此属性可能更恰当地称为 send-partial-result-on-timeout ,因为如果 expire-groups-upon-timeout 设置为 false ,则该组可能实际上不会过期。 |
9 | 向 output-channel 或 discard-channel 发送回复 Message 时要等待的超时间隔。默认值为 30 秒。仅当输出通道具有一些“发送”限制(例如具有固定“容量”的 QueueChannel )时才应用它。在这种情况下,将抛出 MessageDeliveryException 。对于 AbstractSubscribableChannel 实现,忽略 send-timeout 。对于 group-timeout(-expression) ,MessageDeliveryException 将计划的过期任务引导到重新计划此任务。可选的。 |
10 | 对实现消息相关性(分组)算法的 bean 的引用。bean 可以是 CorrelationStrategy 接口的实现或 POJO。在后一种情况下,也必须定义 correlation-strategy-method 属性。可选的(默认情况下,聚合器使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 头)。 |
11 | 在由 correlation-strategy 引用的 bean 上定义的方法。它实现了相关性决策算法。可选的,有限制(必须存在 correlation-strategy )。 |
12 | 表示关联策略的 SpEL 表达式。示例:"headers['something']" 。只允许 `correlation-strategy`或 `correlation-strategy-expression`中的一个存在。 |
13 | 对在应用程序上下文中定义的 bean 的引用。bean 必须实现聚合逻辑,如前所述。可选的(默认情况下,聚合消息列表成为输出消息的负载)。 |
14 | 在由 ref 属性引用的 bean 上定义的方法。它实现了消息聚合算法。可选的(取决于是否定义了 ref 属性)。 |
15 | 对实现释放策略的 bean 的引用。bean 可以是 ReleaseStrategy 接口的实现或 POJO。在后一种情况下,也必须定义 release-strategy-method 属性。可选的(默认情况下,聚合器使用 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 头属性)。 |
16 | 在由 release-strategy 属性引用的 bean 上定义的方法。它实现了完成决策算法。可选的,有限制(必须存在 release-strategy )。 |
17 | 表示释放策略的 SpEL 表达式。表达式的根对象是一个 MessageGroup 。示例:"size() == 5" 。只允许 `release-strategy`或 `release-strategy-expression`中的一个存在。 |
18 | 当设置为 true 时(默认值为 false ),已完成的组将从消息存储中删除,让具有相同相关性的后续消息形成一个新的组。默认行为是将具有与已完成组相同相关性的消息发送到 discard-channel 。 |
19 | 仅当为 <aggregator> 的 MessageStore 配置 MessageGroupStoreReaper 时才适用。默认情况下,当配置 MessageGroupStoreReaper 使部分组过期时,也会删除空组。空组在组正常释放后存在。空组能够检测并丢弃延迟到达的消息。如果您希望空组的过期时间表长于部分组的过期时间表,请设置此属性。然后,空组将不会从 MessageStore 中删除,除非它们至少经过这么长的毫秒未修改。请注意,空组的实际过期时间也受收割器的 timeout 属性的影响,并且可能等于此值加上超时时间。 |
20 | 对 org.springframework.integration.util.LockRegistry bean 的引用。它用于根据 MessageGroup 的 groupId 获取 Lock ,以便对 MessageGroup 执行并发操作。默认情况下,使用一个内部 DefaultLockRegistry 。使用分布式 LockRegistry ,例如 ZookeeperLockRegistry ,可确保只有一个聚合器实例可以并发地对一个组执行操作。有关更多信息,请参见 Redis Lock Registry 或 Zookeeper Lock Registry。 |
21 | 一个超时(以毫秒为单位),用来强制 MessageGroup 在 ReleaseStrategy 在当前消息到达后不释放群组时完成。当需要在新消息未在超时内(该超时自最后一则消息达到时开始计算)到达 MessageGroup 的情况下发出部分结果(或放弃群组)时,此属性为聚合器提供了一个基于内置时间的发布策略。要设置从 MessageGroup 创建时开始计算超时的内容,请参阅 group-timeout-expression 信息。当一条新消息到达聚合器时,对其 MessageGroup 的任何现有 ScheduledFuture<?> 都将被取消。如果 ReleaseStrategy 返回 false (表示不释放)和 groupTimeout > 0 ,将会计划一项新任务来终止该群组。我们建议不要将此属性设置为零(或负值)。这样做会有效禁用聚合器,因为每个消息群组会立即完成。但是,你可以通过使用表达式有条件地将其设置为零(或负值)。有关信息,请参阅 group-timeout-expression 。在完成期间执行的操作取决于 ReleaseStrategy 和 send-partial-group-on-expiry 属性。有关详细信息,请参阅 Aggregator and Group Timeout。它与“group-timeout-expression”属性互斥。 |
22 | SpEL 表达式,该表达式评估为一个 groupTimeout ,其中 MessageGroup 作为 #root 评估上下文对象。用于计划强制完成 MessageGroup 。如果表达式评估为 null ,则不计划完成。如果评估为零,则群组将在当前线程上立即完成。实际上,这提供了一个动态的 group-timeout 属性。例如,如果你希望在群组创建后的 10 秒后强制完成 MessageGroup ,则你可以考虑使用以下 SpEL 表达式:timestamp + 10000 - T(System).currentTimeMillis() ,其中 timestamp 由 MessageGroup.getTimestamp() 提供,因为 MessageGroup 此处是 #root 评估上下文对象。但是,请记住,群组创建时间可能与第一条到达消息的时间不同,具体取决于其他群组到期属性的配置。有关详细信息,请参阅 group-timeout 。与“group-timeout”属性互斥。 |
23 | 当由于超时(或 MessageGroupStoreReaper )导致群组完成时,群组将被默认为过期(完全删除)。迟到的消息会启动一个新群组。你可以将此项设置为 false 来完成群组,但保留其元数据,以便将迟到的消息丢弃。可以用 MessageGroupStoreReaper 和 empty-group-min-timeout 属性一起将空群组稍后过期。默认为“true”。 |
24 | 一个 TaskScheduler bean 引用,用于计划在没有新消息 MessageGroup 在 groupTimeout 内到达 MessageGroup 时强制其完成。如果未提供,则使用 ApplicationContext 中注册的默认调度程序(taskScheduler )(ThreadPoolTaskScheduler )。如果未指定 group-timeout 或 group-timeout-expression ,则此属性不适用。 |
25 | 从 4.1 版起。它允许为 forceComplete 操作启动事务。它由 group-timeout(-expression) 或 MessageGroupStoreReaper 启动,不应用于常规的 add 、release 和 discard 操作。只允许此子元素或 <expire-advice-chain/> 。 |
26 | 从 version 4.1 起。它允许为 forceComplete 操作配置任何 Advice 。它由 group-timeout(-expression) 或 MessageGroupStoreReaper 启动,不应用于常规的 add 、release 和 discard 操作。只允许此子元素或 <expire-transactional/> 。还可以在此处使用 Spring tx 名称空间配置事务 Advice 。 |
有两个与组到期(完全删除)相关的属性。当一个组到期时,它没有任何记录,如果一条带有相同关联性的新消息到达,则会启动一个新组。当一个组完成(没有到期)时,空组将保留下来,而迟到的消息将被丢弃。可以使用 MessageGroupStoreReaper
结合 empty-group-min-timeout
属性来稍后删除空组。
expire-groups-upon-completion`与在 `ReleaseStrategy
释放组时完成“正常”完成相关。默认值为 false
。
如果一个组没有正常完成,而是由于超时而被释放或丢弃,则该组通常会到期。从 4.1 版本开始,您可以使用 expire-groups-upon-timeout
来控制此行为。为了向后兼容,它默认为 true
。
当一个群组超时时,ReleaseStrategy
会再有一次机会释放该群组。如果它这样做并且 expire-groups-upon-timeout
为 false,则到期由 expire-groups-upon-completion
控制。如果群组在超时期间未被释放策略释放,则到期由 expire-groups-upon-timeout
控制。超时的群组会被丢弃或发生部分释放(基于 send-partial-result-on-expiry
)。
从 5.0 版本开始,空组也会在 empty-group-min-timeout
之后被安排移除。如果 expireGroupsUponCompletion == false
并且 minimumTimeoutForEmptyGroups > 0
,则在释放正常或部分序列时安排移除该组的任务。
从 5.4 版本开始,可以配置聚合器(和重新排序器)来使孤立组(可能无法通过其他方式释放的持久消息存储中的组)到期。expireTimeout
(如果大于 0
)指示存储中比此值更旧的组应被清除。在启动时调用 purgeOrphanedGroups()
方法,并且与提供的 expireDuration
一起在计划的任务中定期调用。此方法也可以在任何时候在外部调用。根据上面提到的提供的到期选项,到期逻辑完全委托给 forceComplete(MessageGroup)
功能。当需要从那些不再遵循常规消息到达逻辑释放的那些老组中清理消息存储时,这种定期清除功能非常有用。在大多数情况下,当使用持久消息组存储时,会在应用程序重新启动后发生这种情况。此功能与带有计划任务的 MessageGroupStoreReaper
类似,但当使用组超时而不是清理器时,它提供了一种方便的方法来处理特定组件中的老组。MessageGroupStore
必须专门为当前关联性端点提供。否则,一个聚合器可能会从另一个聚合器清除组。对于聚合器,使用此技术过期的组将被丢弃或作为一个部分组释放,具体取决于 expireGroupsUponCompletion
属性。
如果可以在其他 <aggregator>
定义中引用自定义聚合器处理程序实现,我们通常建议使用 ref
属性。但是,如果自定义聚合器实现仅由 <aggregator>
单个定义使用,则可以使用内部 Bean 定义(从 1.0.3 版本开始)在 <aggregator>
元素中配置聚合 POJO,如下例所示:
<aggregator input-channel="input" method="sum" output-channel="output">
<beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
在同一 |
以下示例显示了聚合器 Bean 的实现:
public class PojoAggregator {
public Long add(List<Long> results) {
long total = 0l;
for (long partialResult: results) {
total += partialResult;
}
return total;
}
}
前一个示例的完成策略 Bean 的实现可能如下:
public class PojoReleaseStrategy {
...
public boolean canRelease(List<Long> numbers) {
int sum = 0;
for (long number: numbers) {
sum += number;
}
return sum >= maxValue;
}
}
在有意义的情况下,可以将释放策略方法和聚合器方法合并到一个 bean 中。 |
以上示例的相关性策略 Bean 的实现可能如下所示:
public class PojoCorrelationStrategy {
...
public Long groupNumbersByLastDigit(Long number) {
return number % 10;
}
}
前一个示例中的聚合器将根据某些标准(在本例中,是除以十后的余数)对数字进行分组,并一直保留该组,直到有效负载提供的数字之和超过某个值。
在有意义的情况下,可以将释放策略方法、关联策略方法和聚合器方法合并到一个 bean 中。(实际上,它们的所有内容或任何两个内容都可以合并。) |
Aggregators and Spring Expression Language (SpEL)
自 Spring Integration 2.0 起,您可以使用 SpEL 处理各种策略(相关性、释放和聚合),如果此类释放策略背后的逻辑相对简单,我们建议使用它。假设您有一个专为接收一系列对象而设计的传统组件。我们知道,默认释放策略将 List
中的所有聚合消息组装在一起。现在我们遇到了两个问题。首先,我们需要从列表中提取各个消息。其次,我们需要提取每个消息的有效载荷并将对象数组组装起来。以下示例解决了这两个问题:
public String[] processRelease(List<Message<String>> messages){
List<String> stringList = new ArrayList<String>();
for (Message<String> message : messages) {
stringList.add(message.getPayload());
}
return stringList.toArray(new String[]{});
}
但是,使用 SpEL,实际上可以通过一行表达式相对轻松地处理这样的需求,从而免去您编写自定义类并将其配置为 Bean 的麻烦。以下示例演示了如何执行此操作:
<int:aggregator input-channel="aggChannel"
output-channel="replyChannel"
expression="#this.![payload].toArray()"/>
在前面的配置中,我们用一个 collection projection 表达式从列表中所有消息的有效载荷中组装一个新的集合,然后将其转换为一个数组,从而实现与之前 Java 代码相同的结果。
在处理自定义释放和相关性策略时,您可以应用相同基于表达式的处理方法。
除了在 correlation-strategy
属性中定义一个用于自定义 CorrelationStrategy
的 bean,还可以按照以下示例中的所示,使用 SpEL 表达式实现简单的关联逻辑,并在 correlation-strategy-expression
中对其进行配置:
correlation-strategy-expression="payload.person.id"
在前一个示例中,我们假设负载有一个具有 id
的 person
属性,它将用于关联消息。
同样,对于 ReleaseStrategy
,您可以采用作为 SpEL 表达式的形式实现释放逻辑,并在 release-strategy-expression
属性中对其进行配置。用于评估上下文是 MessageGroup
本身。可以在表达式中使用组的 message
属性引用消息的 List
。
在 5.0 版本之前的版本中,根对象是 |
release-strategy-expression="!messages.?[payload==5].empty"
在前一个示例中,SpEL 评估上下文是 MessageGroup
本身,而且您在陈述,在此组中只要有一个消息的负载为 5
,就应该释放此组。
Aggregator and Group Timeout
从 4.0 版本开始,已经引入了两个新的互斥属性: group-timeout
和 group-timeout-expression
。请参阅 Configuring an Aggregator with XML。在某些情况下,如果当前消息到达时 ReleaseStrategy
已经释放,则可能需要在一段时间之后发出聚合器结果(或丢弃组)。为此, groupTimeout
选项允许强制调度 MessageGroup
完成,如下例所示:
<aggregator input-channel="input" output-channel="output"
send-partial-result-on-expiry="true"
group-timeout-expression="size() ge 2 ? 10000 : -1"
release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>
使用这个示例,如果聚合器按 release-strategy-expression
中定义的顺序接收最后一条消息,则可以正常释放。如果未收到该特定消息,groupTimeout
会强制该组在十秒钟后完成,前提是该组至少包含两条消息。
强制该组完成会导致 ReleaseStrategy
和 send-partial-result-on-expiry
的结果有所不同。首先,再次咨询释放策略,以了解是否要进行正常释放。虽然该组没有更改,但 ReleaseStrategy
可以决定此时释放该组。如果释放策略仍然不释放该组,则其已过期。如果 send-partial-result-on-expiry
为 true
,则 (部分) MessageGroup
中的现有消息将作为正常的聚合器回复消息发送到 output-channel
。否则,将被丢弃。
groupTimeout
行为与 MessageGroupStoreReaper
之间存在差异(请参阅 Configuring an Aggregator with XML)。清除器定期启动 MessageGroupStore
中所有 MessageGroup
的强制完成。如果在 groupTimeout
期间未收到新消息,则 groupTimeout
对每个 MessageGroup
单独进行。此外,清除器可用于删除空组(保留空组以丢弃延迟的消息,如果 expire-groups-upon-completion
为假)。
从 5.5 版本开始,可以将 groupTimeoutExpression
评估为 java.util.Date
实例。这在某些情况下非常有用,例如根据组的创建时间(MessageGroup.getTimestamp()
)确定计划的任务时刻,而不是根据当前消息的到达时刻(在将 groupTimeoutExpression
评估为 long
时进行计算):
group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"
Configuring an Aggregator with Annotations
以下示例展示了一个使用注解配置的聚合器:
public class Waiter {
...
@Aggregator 1
public Delivery aggregatingMethod(List<OrderItem> items) {
...
}
@ReleaseStrategy 2
public boolean releaseChecker(List<Message<?>> messages) {
...
}
@CorrelationStrategy 3
public String correlateBy(OrderItem item) {
...
}
}
1 | 一个注释,表示应将此方法用作聚合器。如果将此类用作聚合器,则必须指定。 |
2 | 一个注释,表示将此方法用作聚合器的发布策略。如果在任何方法上均不存在,聚合器将使用 SimpleSequenceSizeReleaseStrategy 。 |
3 | 一个注释,表示应将此方法用作聚合器的关联策略。如果没有关联策略的迹象,聚合器将基于 CORRELATION_ID 使用 HeaderAttributeCorrelationStrategy 。 |
XML 元素所提供的配置选项也可用于 @Aggregator
注解。
聚合器既可以直接从 XML 引用,也可以在类中定义了 @MessageEndpoint
时通过类路径扫描自动检测。
注解配置(包括 @Aggregator
等)仅适用于简单的用例,在这些用例中,大多数默认选项已足够。如果您使用注解配置来控制更多选项,请考虑为 AggregatingMessageHandler
使用 @Bean
定义,并用 @ServiceActivator
标记其 @Bean
方法,如下面所示:
@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
AggregatingMessageHandler aggregator =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
jdbcMessageGroupStore);
aggregator.setOutputChannel(resultsChannel());
aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
aggregator.setTaskScheduler(this.taskScheduler);
return aggregator;
}
请参阅 Programming Model 和 Annotations on @Bean
Methods 了解更多信息。
从 4.2 版本开始, |
Managing State in an Aggregator: MessageGroupStore
聚合器(以及 Spring Integration 中的一些其他模式)是一种有状态模式,它要求根据一段时间内到达的一组具有相同关联键的消息做出决策。有状态模式中的接口设计(如 ReleaseStrategy
)遵循如下原则:“组件(无论是由框架定义还是由用户定义的)都必须保持无状态”。所有的状态都由 MessageGroup
携带,并且它的管理委托给 MessageGroupStore
。MessageGroupStore
的接口定义如下:
public interface MessageGroupStore {
int getMessageCountForAllMessageGroups();
int getMarkedMessageCountForAllMessageGroups();
int getMessageGroupCount();
MessageGroup getMessageGroup(Object groupId);
MessageGroup addMessageToGroup(Object groupId, Message<?> message);
MessageGroup markMessageGroup(MessageGroup group);
MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);
MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);
void removeMessageGroup(Object groupId);
void registerMessageGroupExpiryCallback(MessageGroupCallback callback);
int expireMessageGroups(long timeout);
}
有关更多信息,请参阅 Javadoc。
MessageGroupStore
会在等待触发释放策略时累积 MessageGroups
中的状态信息,并且该事件可能永远不会发生。因此,为了防止过时消息滞留,并让易失性存储区在应用程序关闭时提供用于清理的挂钩,MessageGroupStore
让您注册回调,以便在它们过期时应用于其 MessageGroups
。该接口非常直接,如下表所示:
public interface MessageGroupCallback {
void execute(MessageGroupStore messageGroupStore, MessageGroup group);
}
回调可以对存储区和消息组有直接访问权限,以便管理持久状态(例如,从存储区中完全删除组)。
MessageGroupStore
维护一份这些回调的列表,按需将其应用于所有时间戳比作为参数提供的某个时间更早的消息(参见前面描述的 registerMessageGroupExpiryCallback(..)
和 expireMessageGroups(..)
方法)。
在打算依赖 expireMessageGroups
功能时,请务必不要在不同的聚合器组件中使用相同的 MessageGroupStore
实例。每个 AbstractCorrelatingMessageHandler
都会根据 forceComplete()
回调注册自己的 MessageGroupCallback
。这样,每个待过期的组都可能被错误的聚合器完成或丢弃。从 5.0.10 版本开始,AbstractCorrelatingMessageHandler
中的 UniqueExpiryCallback
用于 MessageGroupStore
中的注册回调。MessageGroupStore
反过来会检查是否存在该类的实例,并且如果在回调集中已经存在该类的实例,它会记录一个包含适当消息的错误。通过这种方式,框架不允许在不同的聚合器/重新排序器中使用 MessageGroupStore
实例,以避免由此对并非由特定关联处理程序创建的组进行过期的副作用。
可以用带有超时值的 expireMessageGroups
方法进行调用。任何消息的过期时间为当前时间减去此值,并且已应用回调。因此,定义消息组“expiry
”的含义的是存储的用户。
作为对用户提供方便,Spring Integration 以 MessageGroupStoreReaper
的形式为消息过期提供了一个包装器,如下面的示例所示:
<bean id="reaper" class="org...MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="30000"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>
收割器是一个 Runnable
。在前面的示例中,消息组存储的到期方法每隔 10 秒调用一次。超时本身为 30 秒。
重要的是要明白, |
除了收割器,还会在应用程序通过 AbstractCorrelatingMessageHandler
中的生命周期回调关闭时调用过期回调。
AbstractCorrelatingMessageHandler
会注册自己的过期回调,而这与聚合器的 XML 配置中的布尔标志 send-partial-result-on-expiry
存在关联。如果将标志设置为 true
,那么在调用到期回调时,可以将尚未发布的组中任何未标记的消息发送到输出通道。
由于 MessageGroupStoreReaper
是从计划任务中调用的,并且可能导致为下游集成流生成消息(取决于 sendPartialResultOnExpiry
选项),因此建议使用自定义 TaskScheduler
提供 MessagePublishingErrorHandler
通过 errorChannel
处理异常,因为这是常规聚合器释放功能所期望的。相同的逻辑也适用于组超时功能,该功能也依赖 TaskScheduler
。有关更多信息,请参阅 Error Handling。
在为不同的关联端点使用共享 MessageStore
时,你必须配置适当的 CorrelationStrategy
来确保组 ID 的唯一性。否则,当一个关联端点发布或使其他关联的端点失效时,可能会发生意外的行为。带有相同关联键的消息存储在同一个消息组中。
一些 MessageStore
实现允许通过对数据进行分区来使用相同物理资源。例如,JdbcMessageStore
具有 region
属性,而 MongoDbMessageStore
具有 collectionName
属性。
有关 MessageStore
接口及其实现的更多信息,请参见 Message Store。
Flux Aggregator
在版本 5.2 中,引入了 FluxAggregatorMessageHandler
组件。它基于 Project Reactor 的 Flux.groupBy()
和 Flux.window()
操作符。传入的消息被发送到此组件构造函数中 Flux.create()
引发的 FluxSink
中。如果没有提供 outputChannel
,或者它不是 ReactiveStreamsSubscribableChannel
的实例,则对主 Flux
的订阅是从 Lifecycle.start()
实现完成的。否则,它将被推迟到 ReactiveStreamsSubscribableChannel
实现完成的订阅。消息由 Flux.groupBy()
使用关联策略进行分组,以获得组键。默认情况下,会查询消息的 IntegrationMessageHeaderAccessor.CORRELATION_ID
标头。
默认情况下,每个关闭的窗口都会作为消息载荷中的 Flux
发布,以用于生产。此消息包含窗口中第一条消息中的所有标头。输出消息载荷中的此 Flux
必须在后端订阅和处理。FluxAggregatorMessageHandler
的 setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>)
配置选项可以自定义(或取代)此类逻辑。例如,如果我们希望在最终消息中获得有效负载的 List
,我们可以这样配置 Flux.collectList()
:
fluxAggregatorMessageHandler.setCombineFunction(
(messageFlux) ->
messageFlux
.map(Message::getPayload)
.collectList()
.map(GenericMessage::new));
在 FluxAggregatorMessageHandler
中有几个选项可用于选择合适的窗口策略:
-
setBoundaryTrigger(Predicate<Message<?>>)
- 传播到Flux.windowUntil()
操作员。有关详细信息,请参阅其 JavaDocs。拥有优先于所有其他窗口选项。 -
setWindowSize(int)
和setWindowSizeFunction(Function<Message<?>, Integer>)
- 传播到Flux.window(int)
或windowTimeout(int, Duration)
。默认情况下,窗口大小通过群组中的第一条消息及其IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
标头计算得出。 -
setWindowTimespan(Duration)
- 根据窗口大小配置传播到Flux.window(Duration)
或windowTimeout(int, Duration)
。 -
setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)
- 在任何未被公开选项涵盖的自定义窗口操作中,作用是一个函数,将转换应用到分组的 flux 中。
由于此组件是 MessageHandler
实现,所以可以将它与 @ServiceActivator
消息传递注释一起简单地用作 @Bean
定义。借助 Java DSL,可以从 .handle()
EIP 方法中使用它。下方示例演示了如何在运行时注册 IntegrationFlow
,以及如何使用 FluxAggregatorMessageHandler
与上游的拆分器相关联:
IntegrationFlow fluxFlow =
(flow) -> flow
.split()
.channel(MessageChannels.flux())
.handle(new FluxAggregatorMessageHandler());
IntegrationFlowContext.IntegrationFlowRegistration registration =
this.integrationFlowContext.registration(fluxFlow)
.register();
Flux<Message<?>> window =
registration.getMessagingTemplate()
.convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);
Condition on the Message Group
从 5.5 版本开始,AbstractCorrelatingMessageHandler
(包括其 Java 和 XML DSL)公开了 BiFunction<Message<?>, String, String>
实现的 groupConditionSupplier
选项。此功能用于添加到组的每条消息中,并且一个结果条件语句存储在组中,以便在将来考虑。 ReleaseStrategy
可以在不遍历组中的所有消息的情况下咨询此条件。请参阅 GroupConditionProvider
JavaDocs 和 Message Group Condition 了解更多信息。
另请参阅 File Aggregator。