Idempotent Receiver Enterprise Integration Pattern

从 4.1 版本开始,Spring Integration 提供了 Idempotent Receiver 企业集成模式的实现。它是一种功能模式,整个幂等性逻辑应在应用程序中实现。但是,为了简化决策,提供了 IdempotentReceiverInterceptor 组件。这是一个应用于 MessageHandler.handleMessage() 方法并可以根据其配置 filter 请求消息或将其标记为 duplicate 的 AOP Advice

Starting with version 4.1, Spring Integration provides an implementation of the Idempotent Receiver Enterprise Integration Pattern. It is a functional pattern and the whole idempotency logic should be implemented in the application. However, to simplify the decision-making, the IdempotentReceiverInterceptor component is provided. This is an AOP Advice that is applied to the MessageHandler.handleMessage() method and that can filter a request message or mark it as a duplicate, according to its configuration.

以前,你可以使用自定义 MessageSelector<filter/> 中实现此模式(例如,请参阅 Filter)。但是,由于此模式实际上定义的是端点的行为,而不是端点本身,因此幂等接收器实现不提供端点组件。相反,它应用于应用程序中声明的端点。

Previously, you could have implemented this pattern by using a custom MessageSelector in a <filter/> (see Filter), for example. However, since this pattern really defines the behavior of an endpoint rather than being an endpoint itself, the idempotent receiver implementation does not provide an endpoint component. Rather, it is applied to endpoints declared in the application.

IdempotentReceiverInterceptor 的逻辑基于提供的 MessageSelector,如果消息不被该选择器接受,则使用将 duplicateMessage 头设置为 true 对其进行扩展。目标 MessageHandler(或下游流程)可以咨询此头来实现正确的幂等逻辑。如果 IdempotentReceiverInterceptor 使用 discardChannelthrowExceptionOnRejection = true 进行配置,则重复消息不会发送到目标 MessageHandler.handleMessage()。相反,它会被丢弃。如果您想丢弃重复消息(不对其执行任何操作),则应该使用 NullChannel(例如默认 nullChannel Bean)对 discardChannel 进行配置。

The logic of the IdempotentReceiverInterceptor is based on the provided MessageSelector and, if the message is not accepted by that selector, it is enriched with the duplicateMessage header set to true. The target MessageHandler (or downstream flow) can consult this header to implement the correct idempotency logic. If the IdempotentReceiverInterceptor is configured with a discardChannel or throwExceptionOnRejection = true, the duplicate message is not sent to the target MessageHandler.handleMessage(). Rather, it is discarded. If you want to discard (do nothing with) the duplicate message, the discardChannel should be configured with a NullChannel, such as the default nullChannel bean.

为了在消息之间保持状态并提供比较消息以实现幂等能力,我们提供了 MetadataStoreSelector。它接受一个 MessageProcessor 实现(它根据 Message 创建一个查找密钥)和一个可选的 ConcurrentMetadataStoreMetadata Store)。有关更多信息,请参阅 MetadataStoreSelector Javadoc。您还可以通过使用附加的 MessageProcessor 自定义 valueConcurrentMetadataStore。默认情况下,MetadataStoreSelector 使用 timestamp 消息头。

To maintain state between messages and provide the ability to compare messages for the idempotency, we provide the MetadataStoreSelector. It accepts a MessageProcessor implementation (which creates a lookup key based on the Message) and an optional ConcurrentMetadataStore (Metadata Store). See the MetadataStoreSelector Javadoc for more information. You can also customize the value for ConcurrentMetadataStore by using an additional MessageProcessor. By default, MetadataStoreSelector uses the timestamp message header.

通常,如果密钥不存在现有值,则选择器选择一条消息以供接受。在某些情况下,比较密钥的当前值和新值以确定是否应接受该消息是很有用的。从 5.3 版开始,提供了 compareValues 属性,该属性引用 BiPredicate<String, String>;第一个参数是旧值;返回 true 以接受该消息,并将旧值替换为 MetadataStore 中的新值。这有助于减少键的数量;例如,在处理文件中行时,可以在键中存储文件名,并在值中存储当前行号。然后,在重新启动后,您可以跳过已处理的行。有关示例,请参阅 Idempotent Downstream Processing a Split File

Normally, the selector selects a message for acceptance if there is no existing value for the key. In some cases, it is useful to compare the current and new values for a key, to determine whether the message should be accepted. Starting with version 5.3, the compareValues property is provided which references a BiPredicate<String, String>; the first parameter is the old value; return true to accept the message and replace the old value with the new value in the MetadataStore. This can be useful to reduce the number of keys; for example, when processing lines in a file, you can store the file name in the key and the current line number in the value. Then, after a restart, you can skip lines that have already been processed. See Idempotent Downstream Processing a Split File for an example.

为了方便起见,MetadataStoreSelector 选项可以再 <idempotent-receiver> 组件上直接进行配置。以下列表显示所有可能的特性:

For convenience, the MetadataStoreSelector options are configurable directly on the <idempotent-receiver> component. The following listing shows all the possible attributes:

<idempotent-receiver
        id=""  1
        endpoint=""  2
        selector=""  3
        discard-channel=""  4
        metadata-store=""  5
        key-strategy=""  6
        key-expression=""  7
        value-strategy=""  8
        value-expression=""  9
        compare-values="" 10
        throw-exception-on-rejection="" />  11
1 The ID of the IdempotentReceiverInterceptor bean. Optional.
2 Consumer endpoint name(s) or pattern(s) to which this interceptor is applied. Separate names (patterns) with commas (,), such as endpoint="aaa, bbb*, ccc, *ddd, eee*fff". Endpoint bean names matching these patterns are then used to retrieve the target endpoint’s MessageHandler bean (using its .handler suffix), and the IdempotentReceiverInterceptor is applied to those beans. Required.
3 A MessageSelector bean reference. Mutually exclusive with metadata-store and key-strategy (key-expression). When selector is not provided, one of key-strategy or key-strategy-expression is required.
4 Identifies the channel to which to send a message when the IdempotentReceiverInterceptor does not accept it. When omitted, duplicate messages are forwarded to the handler with a duplicateMessage header. Optional.
5 A ConcurrentMetadataStore reference. Used by the underlying MetadataStoreSelector. Mutually exclusive with selector. Optional. The default MetadataStoreSelector uses an internal SimpleMetadataStore that does not maintain state across application executions.
6 A MessageProcessor reference. Used by the underlying MetadataStoreSelector. Evaluates an idempotentKey from the request message. Mutually exclusive with selector and key-expression. When a selector is not provided, one of key-strategy or key-strategy-expression is required.
7 A SpEL expression to populate an ExpressionEvaluatingMessageProcessor. Used by the underlying MetadataStoreSelector. Evaluates an idempotentKey by using the request message as the evaluation context root object. Mutually exclusive with selector and key-strategy. When a selector is not provided, one of key-strategy or key-strategy-expression is required.
8 A MessageProcessor reference. Used by the underlying MetadataStoreSelector. Evaluates a value for the idempotentKey from the request message. Mutually exclusive with selector and value-expression. By default, the 'MetadataStoreSelector' uses the 'timestamp' message header as the Metadata 'value'.
9 A SpEL expression to populate an ExpressionEvaluatingMessageProcessor. Used by the underlying MetadataStoreSelector. Evaluates a value for the idempotentKey by using the request message as the evaluation context root object. Mutually exclusive with selector and value-strategy. By default, the 'MetadataStoreSelector' uses the 'timestamp' message header as the metadata 'value'.
10 A reference to a BiPredicate<String, String> bean which allows you to optionally select a message by comparing the old and new values for the key; null by default.
11 Whether to throw an exception if the IdempotentReceiverInterceptor rejects the message. Defaults to false. It is applied regardless of whether or not a discard-channel is provided.

对于 Java 配置,Spring Integration 提供了方法级别的 @IdempotentReceiver 注解。它用于标记具有消息传递批注(@ServiceActivator@Router 和其他批注)的 method,以指定哪些 IdempotentReceiverInterceptor 对象应用于此端点。以下示例显示如何使用 @IdempotentReceiver 注解:

For Java configuration, Spring Integration provides the method-level @IdempotentReceiver annotation. It is used to mark a method that has a messaging annotation (@ServiceActivator, @Router, and others) to specify which `IdempotentReceiverInterceptor objects are applied to this endpoint. The following example shows how to use the @IdempotentReceiver annotation:

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
   return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
                                                    m.getHeaders().get(INVOICE_NBR_HEADER)));
}

@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
    ....
}

当您使用 Java DSL 时,可以将拦截器添加到端点的建议链,如下面的示例所示:

When you use the Java DSL, you can add the interceptor to the endpoint’s advice chain, as the following example shows:

@Bean
public IntegrationFlow flow() {
    ...
        .handle("someBean", "someMethod",
            e -> e.advice(idempotentReceiverInterceptor()))
    ...
}

IdempotentReceiverInterceptor 仅设计用于 MessageHandler.handleMessage(Message<?>) 方法。从 4.3.1 版本开始,它实施 HandleMessageAdvice,以 AbstractHandleMessageAdvice 作为基类,以便更好地解离。更多信息,请参阅 Handling Message Advice

The IdempotentReceiverInterceptor is designed only for the MessageHandler.handleMessage(Message<?>) method. Starting with version 4.3.1, it implements HandleMessageAdvice, with the AbstractHandleMessageAdvice as a base class, for better dissociation. See Handling Message Advice for more information.