Resequencer

重排序器与聚合器有关,但用途不同。聚合器合并消息,而重排序器将消息传递而不改变它们。

Functionality

重排序器的运作方式类似于聚合器,从某种意义上说,它使用 CORRELATION_ID 将消息存储在组中。不同的是,重排序器不会以任何方式处理消息。相反,它会按照 SEQUENCE_NUMBER 标头值的顺序释放它们。

有鉴于此,你可以选择一次释放所有消息(在整个序列之后,根据 SEQUENCE_SIZE,以及其他可能性),或者在可用有效序列后立即释放。(我们将在本章后面介绍“有效序列”的含义。)

重新排序器旨在对短序列消息序列进行重新排序,这些消息序列的间隙很小。如果您有许多带有很多间隙的不相干序列,则可能会遇到性能问题。

Configuring a Resequencer

有关在 Java DSL 中配置重新排序器的信息,请参阅 Aggregators and Resequencers

配置重排序器只需要在 XML 中包含适当的元素。

以下示例显示了重排序器配置:

<int:channel id="inputChannel"/>

<int:channel id="outputChannel"/>

<int:resequencer id="completelyDefinedResequencer"  1
  input-channel="inputChannel"  2
  output-channel="outputChannel"  3
  discard-channel="discardChannel"  4
  release-partial-sequences="true"  5
  message-store="messageStore"  6
  send-partial-result-on-expiry="true"  7
  send-timeout="86420000"  8
  correlation-strategy="correlationStrategyBean"  9
  correlation-strategy-method="correlate"  10
  correlation-strategy-expression="headers['something']"  11
  release-strategy="releaseStrategyBean"  12
  release-strategy-method="release"  13
  release-strategy-expression="size() == 10"  14
  empty-group-min-timeout="60000"  15

  lock-registry="lockRegistry"  16

  group-timeout="60000"  17
  group-timeout-expression="size() ge 2 ? 100 : -1"  18
  scheduler="taskScheduler" />  19
  expire-group-upon-timeout="false" />  20
1 重新排序器的 ID 是可选的。
2 重新排序器的输入通道。必须的。
3 重新排序器向其发送重新排序的消息的通道。可选的。
4 重新排序器向其发送超时消息的通道(如果 send-partial-result-on-timeout`设置为 `false)。可选的。
5 是否在已用序列可用后立即将其发送出去,或者仅在整个消息组到达后才能将其发送出去。可选的。(默认值为 false)。
6 `MessageGroupStore`的引用,可用于将消息组存储在其关联键下,直到它们完成。可选的。(默认值是易失性内存中存储)。
7 分组到期后,是否应该发送有序分组(即使某些消息尚缺失)。可选的。(默认值为 false)。参见 Managing State in an Aggregator: MessageGroupStore
8 在向 Message`或 `discard-channel`发送答复 `output-channel`时等待的超时间隔。它仅适用于输出通道具有一些“发送”限制的情况,例如“容量”固定的 `QueueChannel。在这种情况下,会抛出 MessageDeliveryException`异常。`send-timeout`对于 `AbstractSubscribableChannel`实现将被忽略。对于 `group-timeout(-expression),来自计划的已过期任务的 `MessageDeliveryException`将导致对该任务进行重新计划。可选的。
9 实现消息关联(分组)算法的 Bean 的引用。Bean 可以是 `CorrelationStrategy`接口或 POJO 的实现。后一种情况下,还必须定义 `correlation-strategy-method`属性。可选的。(默认情况下,聚合器使用 `IntegrationMessageHeaderAccessor.CORRELATION_ID`头)。
10 correlation-strategy`引用的 Bean 上定义的方法,该方法实现关联决策算法。可选择,有限制(需要存在 `correlation-strategy)。
11 表示关联策略的 SpEL 表达式。示例:"headers['something']"。只允许 `correlation-strategy`或 `correlation-strategy-expression`中的一个存在。
12 实现释放策略的 Bean 的引用。Bean 可以是 `ReleaseStrategy`接口或 POJO 的实现。后一种情况下,还必须定义 `release-strategy-method`属性。可选的(默认情况下,聚合器将使用 `IntegrationMessageHeaderAccessor.SEQUENCE_SIZE`头属性)。
13 release-strategy`引用的 Bean 上定义的方法,该方法实现完成决策算法。可选择,有限制(需要存在 `release-strategy)。
14 表示释放策略的 SpEL 表达式。表达式的根对象是一个 MessageGroup。示例:"size() == 5"。只允许 `release-strategy`或 `release-strategy-expression`中的一个存在。
15 仅适用于为 `&lt;resequencer&gt;``MessageStore`配置 `MessageGroupStoreReaper`的情况。默认情况下,当配置 `MessageGroupStoreReaper`使部分组失效时,也会删除空组。空组在正常释放组后存在。这是为了能够检测和丢弃迟到的消息。如果你希望以比已过期部分组更长的计划来使空组失效,请设置此属性。然后,空组直到至少经过此毫秒数的未修改状态后才从 `MessageStore`中删除。请注意,使空组失效的实际时间也受限于清道夫的超时属性,并且可能相当于这个值加上超时时间。
16 参见 Configuring an Aggregator with XML
17 参见 Configuring an Aggregator with XML
18 参见 Configuring an Aggregator with XML
19 参见 Configuring an Aggregator with XML
20 默认情况下,当一组因超时(或 MessageGroupStoreReaper)而完成时,保留空组的元数据。立即丢弃迟到的消息。将其设置为 `true`以完全删除该组。然后,迟到的消息启动一个新组,并且在该组再次超时之前不会被丢弃。由于造成超时的序列范围中的 “hole”,新组永远不会正常释放。稍后可以使用 `MessageGroupStoreReaper`连同 `empty-group-min-timeout`属性来使空组失效(完全删除)。从 5.0 版开始,空组在 `empty-group-min-timeout`经过后也会被安排删除。默认值为“false”。

有关详细信息,另请参见 Aggregator Expiring Groups

由于没有要在重新排序器的 Java 类中实现的自定义行为,因此不支持对此进行注释。