Resequencer
重排序器与聚合器有关,但用途不同。聚合器合并消息,而重排序器将消息传递而不改变它们。
The resequencer is related to the aggregator but serves a different purpose. While the aggregator combines messages, the resequencer passes messages through without changing them.
Functionality
重排序器的运作方式类似于聚合器,从某种意义上说,它使用 CORRELATION_ID
将消息存储在组中。不同的是,重排序器不会以任何方式处理消息。相反,它会按照 SEQUENCE_NUMBER
标头值的顺序释放它们。
The resequencer works in a similar way to the aggregator, in the sense that it uses the CORRELATION_ID
to store messages in groups.
The difference is that the Resequencer does not process the messages in any way.
Instead, it releases them in the order of their SEQUENCE_NUMBER
header values.
有鉴于此,你可以选择一次释放所有消息(在整个序列之后,根据 SEQUENCE_SIZE
,以及其他可能性),或者在可用有效序列后立即释放。(我们将在本章后面介绍“有效序列”的含义。)
With respect to that, you can opt to release all messages at once (after the whole sequence, according to the SEQUENCE_SIZE
, and other possibilities) or as soon as a valid sequence is available.
(We cover what we mean by "a valid sequence" later in this chapter.)
重新排序器旨在对短序列消息序列进行重新排序,这些消息序列的间隙很小。如果您有许多带有很多间隙的不相干序列,则可能会遇到性能问题。
The resequencer is intended to resequence relatively short sequences of messages with small gaps. If you have a large number of disjoint sequences with many gaps, you may experience performance issues.
Configuring a Resequencer
有关在 Java DSL 中配置重新排序器的信息,请参阅 Aggregators and Resequencers。
See Aggregators and Resequencers for configuring a resequencer in Java DSL.
配置重排序器只需要在 XML 中包含适当的元素。
Configuring a resequencer requires only including the appropriate element in XML.
以下示例显示了重排序器配置:
The following example shows a resequencer configuration:
<int:channel id="inputChannel"/>
<int:channel id="outputChannel"/>
<int:resequencer id="completelyDefinedResequencer" 1
input-channel="inputChannel" 2
output-channel="outputChannel" 3
discard-channel="discardChannel" 4
release-partial-sequences="true" 5
message-store="messageStore" 6
send-partial-result-on-expiry="true" 7
send-timeout="86420000" 8
correlation-strategy="correlationStrategyBean" 9
correlation-strategy-method="correlate" 10
correlation-strategy-expression="headers['something']" 11
release-strategy="releaseStrategyBean" 12
release-strategy-method="release" 13
release-strategy-expression="size() == 10" 14
empty-group-min-timeout="60000" 15
lock-registry="lockRegistry" 16
group-timeout="60000" 17
group-timeout-expression="size() ge 2 ? 100 : -1" 18
scheduler="taskScheduler" /> 19
expire-group-upon-timeout="false" /> 20
1 | The id of the resequencer is optional. |
2 | The input channel of the resequencer. Required. |
3 | The channel to which the resequencer sends the reordered messages. Optional. |
4 | The channel to which the resequencer sends the messages that timed out (if send-partial-result-on-timeout is set to false ).
Optional. |
5 | Whether to send out ordered sequences as soon as they are available or only after the whole message group arrives.
Optional.
(The default is false .) |
6 | A reference to a MessageGroupStore that can be used to store groups of messages under their correlation key until they are complete.
Optional.
(The default is a volatile in-memory store.) |
7 | Whether, upon the expiration of the group, the ordered group should be sent out (even if some messages are missing).
Optional.
(The default is false.)
See Managing State in an Aggregator: MessageGroupStore . |
8 | The timeout interval to wait when sending a reply Message to the output-channel or discard-channel .
It is applied only if the output channel has some 'sending' limitations, such as a QueueChannel with a fixed 'capacity'.
In this case, a MessageDeliveryException is thrown.
The send-timeout is ignored for AbstractSubscribableChannel implementations.
For group-timeout(-expression) , the MessageDeliveryException from the scheduled expired task leads this task to be rescheduled.
Optional. |
9 | A reference to a bean that implements the message correlation (grouping) algorithm.
The bean can be an implementation of the CorrelationStrategy interface or a POJO.
In the latter case, the correlation-strategy-method attribute must also be defined.
Optional.
(By default, the aggregator uses the IntegrationMessageHeaderAccessor.CORRELATION_ID header.) |
10 | A method that is defined on the bean referenced by correlation-strategy and that implements the correlation decision algorithm.
Optional, with restrictions (requires correlation-strategy to be present). |
11 | A SpEL expression representing the correlation strategy.
Example: "headers['something']" .
Only one of correlation-strategy or correlation-strategy-expression is allowed. |
12 | A reference to a bean that implements the release strategy.
The bean can be an implementation of the ReleaseStrategy interface or a POJO.
In the latter case, the release-strategy-method attribute must also be defined.
Optional (by default, the aggregator will use the IntegrationMessageHeaderAccessor.SEQUENCE_SIZE header attribute). |
13 | A method that is defined on the bean referenced by release-strategy and that implements the completion decision algorithm.
Optional, with restrictions (requires release-strategy to be present). |
14 | A SpEL expression representing the release strategy.
The root object for the expression is a MessageGroup .
Example: "size() == 5" .
Only one of release-strategy or release-strategy-expression is allowed. |
15 | Only applies if a MessageGroupStoreReaper is configured for the <resequencer> MessageStore .
By default, when a MessageGroupStoreReaper is configured to expire partial groups, empty groups are also removed.
Empty groups exist after a group is released normally.
This is to enable the detection and discarding of late-arriving messages.
If you wish to expire empty groups on a longer schedule than expiring partial groups, set this property.
Empty groups are then not removed from the MessageStore until they have not been modified for at least this number of milliseconds.
Note that the actual time to expire an empty group is also affected by the reaper’s timeout property, and it could be as much as this value plus the timeout. |
16 | See Configuring an Aggregator with XML. |
17 | See Configuring an Aggregator with XML. |
18 | See Configuring an Aggregator with XML. |
19 | See Configuring an Aggregator with XML. |
20 | By default, when a group is completed due to a timeout (or by a MessageGroupStoreReaper ), the empty group’s metadata is retained.
Late arriving messages are immediately discarded.
Set this to true to remove the group completely.
Then, late arriving messages start a new group and are not be discarded until the group again times out.
The new group is never released normally because of the “hole” in the sequence range that caused the timeout.
Empty groups can be expired (completely removed) later by using a MessageGroupStoreReaper together with the empty-group-min-timeout attribute.
Starting with version 5.0, empty groups are also scheduled for removal after the empty-group-min-timeout elapses.
The default is 'false'. |
有关详细信息,另请参见 Aggregator Expiring Groups。
Also see Aggregator Expiring Groups for more information.
由于没有要在重新排序器的 Java 类中实现的自定义行为,因此不支持对此进行注释。 |
Since there is no custom behavior to be implemented in Java classes for resequencers, there is no annotation support for it. |