Idempotent Receiver Enterprise Integration Pattern

从 4.1 版本开始,Spring Integration 提供了 Idempotent Receiver 企业集成模式的实现。它是一种功能模式,整个幂等性逻辑应在应用程序中实现。但是,为了简化决策,提供了 IdempotentReceiverInterceptor 组件。这是一个应用于 MessageHandler.handleMessage() 方法并可以根据其配置 filter 请求消息或将其标记为 duplicate 的 AOP Advice

以前,你可以使用自定义 MessageSelector<filter/> 中实现此模式(例如,请参阅 Filter)。但是,由于此模式实际上定义的是端点的行为,而不是端点本身,因此幂等接收器实现不提供端点组件。相反,它应用于应用程序中声明的端点。

IdempotentReceiverInterceptor 的逻辑基于提供的 MessageSelector,如果消息不被该选择器接受,则使用将 duplicateMessage 头设置为 true 对其进行扩展。目标 MessageHandler(或下游流程)可以咨询此头来实现正确的幂等逻辑。如果 IdempotentReceiverInterceptor 使用 discardChannelthrowExceptionOnRejection = true 进行配置,则重复消息不会发送到目标 MessageHandler.handleMessage()。相反,它会被丢弃。如果您想丢弃重复消息(不对其执行任何操作),则应该使用 NullChannel(例如默认 nullChannel Bean)对 discardChannel 进行配置。

为了在消息之间保持状态并提供比较消息以实现幂等能力,我们提供了 MetadataStoreSelector。它接受一个 MessageProcessor 实现(它根据 Message 创建一个查找密钥)和一个可选的 ConcurrentMetadataStoreMetadata Store)。有关更多信息,请参阅 MetadataStoreSelector Javadoc。您还可以通过使用附加的 MessageProcessor 自定义 valueConcurrentMetadataStore。默认情况下,MetadataStoreSelector 使用 timestamp 消息头。

通常,如果密钥不存在现有值,则选择器选择一条消息以供接受。在某些情况下,比较密钥的当前值和新值以确定是否应接受该消息是很有用的。从 5.3 版开始,提供了 compareValues 属性,该属性引用 BiPredicate<String, String>;第一个参数是旧值;返回 true 以接受该消息,并将旧值替换为 MetadataStore 中的新值。这有助于减少键的数量;例如,在处理文件中行时,可以在键中存储文件名,并在值中存储当前行号。然后,在重新启动后,您可以跳过已处理的行。有关示例,请参阅 Idempotent Downstream Processing a Split File

为了方便起见,MetadataStoreSelector 选项可以再 <idempotent-receiver> 组件上直接进行配置。以下列表显示所有可能的特性:

<idempotent-receiver
        id=""  1
        endpoint=""  2
        selector=""  3
        discard-channel=""  4
        metadata-store=""  5
        key-strategy=""  6
        key-expression=""  7
        value-strategy=""  8
        value-expression=""  9
        compare-values="" 10
        throw-exception-on-rejection="" />  11
1 IdempotentReceiverInterceptor Bean 的 ID。可选。
2 将此拦截器应用到的客户机终端节点名称(模式)。使用逗号分隔名称(模式)(,),例如 endpoint="aaa, bbb*, ccc, *ddd, eee*fff"。然后,使用匹配这些模式的终端节点 bean 名称检索目标终端节点的 MessageHandler bean(使用其 .handler 后缀),并 IdempotentReceiverInterceptor 应用到这些 bean。必需。
3 MessageSelector Bean 引用。与 metadata-storekey-strategy (key-expression) 相互排斥。当未提供 selector 时,需要 key-strategykey-strategy-expression 之一。
4 标识在 IdempotentReceiverInterceptor 不接受消息时向其发送消息的通道。省略时,会将重复消息转发到带有 duplicateMessage 标头的处理程序。可选。
5 ConcurrentMetadataStore 引用。基础 MetadataStoreSelector 使用。与 selector 相互排斥。可选。默认 MetadataStoreSelector 使用不维护跨应用程序执行状态的内部 SimpleMetadataStore
6 MessageProcessor 引用。基础 MetadataStoreSelector 使用。从请求消息求值 idempotentKey。与 selectorkey-expression 相互排斥。当未提供 selector 时,需要 key-strategykey-strategy-expression 之一。
7 用于填充 ExpressionEvaluatingMessageProcessor 的 SpEL 表达式。基础 MetadataStoreSelector 使用。使用请求消息作为评估上下文根对象来评估 idempotentKey。与 selectorkey-strategy 相互排斥。当未提供 selector 时,需要 key-strategykey-strategy-expression 之一。
8 MessageProcessor 引用。基础 MetadataStoreSelector 使用。为 idempotentKey 从请求消息中评估 value。与 selectorvalue-expression 相互排斥。默认情况下,MetadataStoreSelector 使用消息标头“timestamp”作为元数据“value”。
9 用于填充 ExpressionEvaluatingMessageProcessor 的 SpEL 表达式。基础 MetadataStoreSelector 使用。使用请求消息作为评估上下文根对象为 idempotentKey 评估 value。与 selectorvalue-strategy 相互排斥。默认情况下,MetadataStoreSelector 使用消息标头“timestamp”作为元数据“value”。
10 BiPredicate&lt;String, String&gt; bean 的引用,可让你通过比较密钥的旧值和新值进行可选消息选择;默认值为 null
11 如果 IdempotentReceiverInterceptor 拒绝该消息,则指示是否抛出异常。默认为 false。无论是否提供了 discard-channel,都会将其应用。

对于 Java 配置,Spring Integration 提供了方法级别的 @IdempotentReceiver 注解。它用于标记具有消息传递批注(@ServiceActivator@Router 和其他批注)的 method,以指定哪些 IdempotentReceiverInterceptor 对象应用于此端点。以下示例显示如何使用 @IdempotentReceiver 注解:

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
   return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
                                                    m.getHeaders().get(INVOICE_NBR_HEADER)));
}

@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
    ....
}

当您使用 Java DSL 时,可以将拦截器添加到端点的建议链,如下面的示例所示:

@Bean
public IntegrationFlow flow() {
    ...
        .handle("someBean", "someMethod",
            e -> e.advice(idempotentReceiverInterceptor()))
    ...
}

IdempotentReceiverInterceptor 仅设计用于 MessageHandler.handleMessage(Message<?>) 方法。从 4.3.1 版本开始,它实施 HandleMessageAdvice,以 AbstractHandleMessageAdvice 作为基类,以便更好地解离。更多信息,请参阅 Handling Message Advice