Idempotent Receiver Enterprise Integration Pattern
从 4.1 版本开始,Spring Integration 提供了 Idempotent Receiver 企业集成模式的实现。它是一种功能模式,整个幂等性逻辑应在应用程序中实现。但是,为了简化决策,提供了 IdempotentReceiverInterceptor
组件。这是一个应用于 MessageHandler.handleMessage()
方法并可以根据其配置 filter
请求消息或将其标记为 duplicate
的 AOP Advice
。
Starting with version 4.1, Spring Integration provides an implementation of the Idempotent Receiver Enterprise Integration Pattern.
It is a functional pattern and the whole idempotency logic should be implemented in the application.
However, to simplify the decision-making, the IdempotentReceiverInterceptor
component is provided.
This is an AOP Advice
that is applied to the MessageHandler.handleMessage()
method and that can filter
a request message or mark it as a duplicate
, according to its configuration.
以前,你可以使用自定义 MessageSelector
在 <filter/>
中实现此模式(例如,请参阅 Filter)。但是,由于此模式实际上定义的是端点的行为,而不是端点本身,因此幂等接收器实现不提供端点组件。相反,它应用于应用程序中声明的端点。
Previously, you could have implemented this pattern by using a custom MessageSelector
in a <filter/>
(see Filter), for example.
However, since this pattern really defines the behavior of an endpoint rather than being an endpoint itself, the idempotent receiver implementation does not provide an endpoint component.
Rather, it is applied to endpoints declared in the application.
IdempotentReceiverInterceptor
的逻辑基于提供的 MessageSelector
,如果消息不被该选择器接受,则使用将 duplicateMessage
头设置为 true
对其进行扩展。目标 MessageHandler
(或下游流程)可以咨询此头来实现正确的幂等逻辑。如果 IdempotentReceiverInterceptor
使用 discardChannel
或 throwExceptionOnRejection = true
进行配置,则重复消息不会发送到目标 MessageHandler.handleMessage()
。相反,它会被丢弃。如果您想丢弃重复消息(不对其执行任何操作),则应该使用 NullChannel
(例如默认 nullChannel
Bean)对 discardChannel
进行配置。
The logic of the IdempotentReceiverInterceptor
is based on the provided MessageSelector
and, if the message is not accepted by that selector, it is enriched with the duplicateMessage
header set to true
.
The target MessageHandler
(or downstream flow) can consult this header to implement the correct idempotency logic.
If the IdempotentReceiverInterceptor
is configured with a discardChannel
or throwExceptionOnRejection = true
, the duplicate message is not sent to the target MessageHandler.handleMessage()
.
Rather, it is discarded.
If you want to discard (do nothing with) the duplicate message, the discardChannel
should be configured with a NullChannel
, such as the default nullChannel
bean.
为了在消息之间保持状态并提供比较消息以实现幂等能力,我们提供了 MetadataStoreSelector
。它接受一个 MessageProcessor
实现(它根据 Message
创建一个查找密钥)和一个可选的 ConcurrentMetadataStore
(Metadata Store)。有关更多信息,请参阅 MetadataStoreSelector
Javadoc。您还可以通过使用附加的 MessageProcessor
自定义 value
的 ConcurrentMetadataStore
。默认情况下,MetadataStoreSelector
使用 timestamp
消息头。
To maintain state between messages and provide the ability to compare messages for the idempotency, we provide the MetadataStoreSelector
.
It accepts a MessageProcessor
implementation (which creates a lookup key based on the Message
) and an optional ConcurrentMetadataStore
(Metadata Store).
See the MetadataStoreSelector
Javadoc for more information.
You can also customize the value
for ConcurrentMetadataStore
by using an additional MessageProcessor
.
By default, MetadataStoreSelector
uses the timestamp
message header.
通常,如果密钥不存在现有值,则选择器选择一条消息以供接受。在某些情况下,比较密钥的当前值和新值以确定是否应接受该消息是很有用的。从 5.3 版开始,提供了 compareValues
属性,该属性引用 BiPredicate<String, String>
;第一个参数是旧值;返回 true
以接受该消息,并将旧值替换为 MetadataStore
中的新值。这有助于减少键的数量;例如,在处理文件中行时,可以在键中存储文件名,并在值中存储当前行号。然后,在重新启动后,您可以跳过已处理的行。有关示例,请参阅 Idempotent Downstream Processing a Split File。
Normally, the selector selects a message for acceptance if there is no existing value for the key.
In some cases, it is useful to compare the current and new values for a key, to determine whether the message should be accepted.
Starting with version 5.3, the compareValues
property is provided which references a BiPredicate<String, String>
; the first parameter is the old value; return true
to accept the message and replace the old value with the new value in the MetadataStore
.
This can be useful to reduce the number of keys; for example, when processing lines in a file, you can store the file name in the key and the current line number in the value.
Then, after a restart, you can skip lines that have already been processed.
See Idempotent Downstream Processing a Split File for an example.
为了方便起见,MetadataStoreSelector
选项可以再 <idempotent-receiver>
组件上直接进行配置。以下列表显示所有可能的特性:
For convenience, the MetadataStoreSelector
options are configurable directly on the <idempotent-receiver>
component.
The following listing shows all the possible attributes:
<idempotent-receiver
id="" 1
endpoint="" 2
selector="" 3
discard-channel="" 4
metadata-store="" 5
key-strategy="" 6
key-expression="" 7
value-strategy="" 8
value-expression="" 9
compare-values="" 10
throw-exception-on-rejection="" /> 11
1 | The ID of the IdempotentReceiverInterceptor bean.
Optional. |
2 | Consumer endpoint name(s) or pattern(s) to which this interceptor is applied.
Separate names (patterns) with commas (, ), such as endpoint="aaa, bbb*, ccc, *ddd, eee*fff" .
Endpoint bean names matching these patterns are then used to retrieve the target endpoint’s MessageHandler bean (using its .handler suffix), and the IdempotentReceiverInterceptor is applied to those beans.
Required. |
3 | A MessageSelector bean reference.
Mutually exclusive with metadata-store and key-strategy (key-expression) .
When selector is not provided, one of key-strategy or key-strategy-expression is required. |
4 | Identifies the channel to which to send a message when the IdempotentReceiverInterceptor does not accept it.
When omitted, duplicate messages are forwarded to the handler with a duplicateMessage header.
Optional. |
5 | A ConcurrentMetadataStore reference.
Used by the underlying MetadataStoreSelector .
Mutually exclusive with selector .
Optional.
The default MetadataStoreSelector uses an internal SimpleMetadataStore that does not maintain state across application executions. |
6 | A MessageProcessor reference.
Used by the underlying MetadataStoreSelector .
Evaluates an idempotentKey from the request message.
Mutually exclusive with selector and key-expression .
When a selector is not provided, one of key-strategy or key-strategy-expression is required. |
7 | A SpEL expression to populate an ExpressionEvaluatingMessageProcessor .
Used by the underlying MetadataStoreSelector .
Evaluates an idempotentKey by using the request message as the evaluation context root object.
Mutually exclusive with selector and key-strategy .
When a selector is not provided, one of key-strategy or key-strategy-expression is required. |
8 | A MessageProcessor reference.
Used by the underlying MetadataStoreSelector .
Evaluates a value for the idempotentKey from the request message.
Mutually exclusive with selector and value-expression .
By default, the 'MetadataStoreSelector' uses the 'timestamp' message header as the Metadata 'value'. |
9 | A SpEL expression to populate an ExpressionEvaluatingMessageProcessor .
Used by the underlying MetadataStoreSelector .
Evaluates a value for the idempotentKey by using the request message as the evaluation context root object.
Mutually exclusive with selector and value-strategy .
By default, the 'MetadataStoreSelector' uses the 'timestamp' message header as the metadata 'value'. |
10 | A reference to a BiPredicate<String, String> bean which allows you to optionally select a message by comparing the old and new values for the key; null by default. |
11 | Whether to throw an exception if the IdempotentReceiverInterceptor rejects the message.
Defaults to false .
It is applied regardless of whether or not a discard-channel is provided. |
对于 Java 配置,Spring Integration 提供了方法级别的 @IdempotentReceiver
注解。它用于标记具有消息传递批注(@ServiceActivator
、@Router
和其他批注)的 method
,以指定哪些 IdempotentReceiverInterceptor
对象应用于此端点。以下示例显示如何使用 @IdempotentReceiver
注解:
For Java configuration, Spring Integration provides the method-level @IdempotentReceiver
annotation.
It is used to mark a method
that has a messaging annotation (@ServiceActivator
, @Router, and others) to specify which `IdempotentReceiverInterceptor
objects are applied to this endpoint.
The following example shows how to use the @IdempotentReceiver
annotation:
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
m.getHeaders().get(INVOICE_NBR_HEADER)));
}
@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
....
}
当您使用 Java DSL 时,可以将拦截器添加到端点的建议链,如下面的示例所示:
When you use the Java DSL, you can add the interceptor to the endpoint’s advice chain, as the following example shows:
@Bean
public IntegrationFlow flow() {
...
.handle("someBean", "someMethod",
e -> e.advice(idempotentReceiverInterceptor()))
...
}
|
The |