Resequencer
重排序器与聚合器有关,但用途不同。聚合器合并消息,而重排序器将消息传递而不改变它们。
Functionality
重排序器的运作方式类似于聚合器,从某种意义上说,它使用 CORRELATION_ID
将消息存储在组中。不同的是,重排序器不会以任何方式处理消息。相反,它会按照 SEQUENCE_NUMBER
标头值的顺序释放它们。
有鉴于此,你可以选择一次释放所有消息(在整个序列之后,根据 SEQUENCE_SIZE
,以及其他可能性),或者在可用有效序列后立即释放。(我们将在本章后面介绍“有效序列”的含义。)
重新排序器旨在对短序列消息序列进行重新排序,这些消息序列的间隙很小。如果您有许多带有很多间隙的不相干序列,则可能会遇到性能问题。
Configuring a Resequencer
有关在 Java DSL 中配置重新排序器的信息,请参阅 Aggregators and Resequencers。
配置重排序器只需要在 XML 中包含适当的元素。
以下示例显示了重排序器配置:
<int:channel id="inputChannel"/>
<int:channel id="outputChannel"/>
<int:resequencer id="completelyDefinedResequencer" 1
input-channel="inputChannel" 2
output-channel="outputChannel" 3
discard-channel="discardChannel" 4
release-partial-sequences="true" 5
message-store="messageStore" 6
send-partial-result-on-expiry="true" 7
send-timeout="86420000" 8
correlation-strategy="correlationStrategyBean" 9
correlation-strategy-method="correlate" 10
correlation-strategy-expression="headers['something']" 11
release-strategy="releaseStrategyBean" 12
release-strategy-method="release" 13
release-strategy-expression="size() == 10" 14
empty-group-min-timeout="60000" 15
lock-registry="lockRegistry" 16
group-timeout="60000" 17
group-timeout-expression="size() ge 2 ? 100 : -1" 18
scheduler="taskScheduler" /> 19
expire-group-upon-timeout="false" /> 20
1 | 重新排序器的 ID 是可选的。 |
2 | 重新排序器的输入通道。必须的。 |
3 | 重新排序器向其发送重新排序的消息的通道。可选的。 |
4 | 重新排序器向其发送超时消息的通道(如果 send-partial-result-on-timeout`设置为 `false )。可选的。 |
5 | 是否在已用序列可用后立即将其发送出去,或者仅在整个消息组到达后才能将其发送出去。可选的。(默认值为 false )。 |
6 | `MessageGroupStore`的引用,可用于将消息组存储在其关联键下,直到它们完成。可选的。(默认值是易失性内存中存储)。 |
7 | 分组到期后,是否应该发送有序分组(即使某些消息尚缺失)。可选的。(默认值为 false)。参见 Managing State in an Aggregator: MessageGroupStore 。 |
8 | 在向 Message`或 `discard-channel`发送答复 `output-channel`时等待的超时间隔。它仅适用于输出通道具有一些“发送”限制的情况,例如“容量”固定的 `QueueChannel 。在这种情况下,会抛出 MessageDeliveryException`异常。`send-timeout`对于 `AbstractSubscribableChannel`实现将被忽略。对于 `group-timeout(-expression) ,来自计划的已过期任务的 `MessageDeliveryException`将导致对该任务进行重新计划。可选的。 |
9 | 实现消息关联(分组)算法的 Bean 的引用。Bean 可以是 `CorrelationStrategy`接口或 POJO 的实现。后一种情况下,还必须定义 `correlation-strategy-method`属性。可选的。(默认情况下,聚合器使用 `IntegrationMessageHeaderAccessor.CORRELATION_ID`头)。 |
10 | 在 correlation-strategy`引用的 Bean 上定义的方法,该方法实现关联决策算法。可选择,有限制(需要存在 `correlation-strategy )。 |
11 | 表示关联策略的 SpEL 表达式。示例:"headers['something']" 。只允许 `correlation-strategy`或 `correlation-strategy-expression`中的一个存在。 |
12 | 实现释放策略的 Bean 的引用。Bean 可以是 `ReleaseStrategy`接口或 POJO 的实现。后一种情况下,还必须定义 `release-strategy-method`属性。可选的(默认情况下,聚合器将使用 `IntegrationMessageHeaderAccessor.SEQUENCE_SIZE`头属性)。 |
13 | 在 release-strategy`引用的 Bean 上定义的方法,该方法实现完成决策算法。可选择,有限制(需要存在 `release-strategy )。 |
14 | 表示释放策略的 SpEL 表达式。表达式的根对象是一个 MessageGroup 。示例:"size() == 5" 。只允许 `release-strategy`或 `release-strategy-expression`中的一个存在。 |
15 | 仅适用于为 `<resequencer>``MessageStore`配置 `MessageGroupStoreReaper`的情况。默认情况下,当配置 `MessageGroupStoreReaper`使部分组失效时,也会删除空组。空组在正常释放组后存在。这是为了能够检测和丢弃迟到的消息。如果你希望以比已过期部分组更长的计划来使空组失效,请设置此属性。然后,空组直到至少经过此毫秒数的未修改状态后才从 `MessageStore`中删除。请注意,使空组失效的实际时间也受限于清道夫的超时属性,并且可能相当于这个值加上超时时间。 |
16 | 参见 Configuring an Aggregator with XML。 |
17 | 参见 Configuring an Aggregator with XML。 |
18 | 参见 Configuring an Aggregator with XML。 |
19 | 参见 Configuring an Aggregator with XML。 |
20 | 默认情况下,当一组因超时(或 MessageGroupStoreReaper )而完成时,保留空组的元数据。立即丢弃迟到的消息。将其设置为 `true`以完全删除该组。然后,迟到的消息启动一个新组,并且在该组再次超时之前不会被丢弃。由于造成超时的序列范围中的 “hole”,新组永远不会正常释放。稍后可以使用 `MessageGroupStoreReaper`连同 `empty-group-min-timeout`属性来使空组失效(完全删除)。从 5.0 版开始,空组在 `empty-group-min-timeout`经过后也会被安排删除。默认值为“false”。 |
有关详细信息,另请参见 Aggregator Expiring Groups。
由于没有要在重新排序器的 Java 类中实现的自定义行为,因此不支持对此进行注释。 |