Provided Advice Classes

除了提供应用 AOP 建议类的通用机制外,Spring Integration 还在开箱即用的情况下提供这些建议实现:

Retry Advice

重试建议 (o.s.i.handler.advice.RequestHandlerRetryAdvice) 充分利用了 Spring Retry项目提供的丰富的重试机制。spring-retry`的核心组件是 `RetryTemplate,它允许配置复杂重试场景,包括 `RetryPolicy`和 `BackoffPolicy`策略(以及多种实现),以及 `RecoveryCallback`策略以确定在重试耗尽时采取的操作。

Stateless Retry

Stateless retry is the case where the retry activity is handled entirely within the advice. The thread pauses (if configured to do so) and retries the action.

Stateful Retry

Stateful retry is the case where the retry state is managed within the advice but where an exception is thrown and the caller resubmits the request. An example for stateful retry is when we want the message originator (for example,JMS) to be responsible for resubmitting, rather than performing it on the current thread. Stateful retry needs some mechanism to detect a retried submission.

有关 `spring-retry`的更多信息,请参见 the project’s JavadocSpring Batch的参考文档,其中 `spring-retry`就是源自该文档。

默认的退避行为是不退避。立即尝试重试。使用导致线程在尝试之间暂停的退避策略可能会导致性能问题,包括过度使用内存和线程饥饿。在高容量环境中,应谨慎使用退避策略。

Configuring the Retry Advice

本节中的示例使用始终抛出异常的以下 <service-activator>

public class FailingService {

    public void service(String message) {
        throw new RuntimeException("error");
    }
}
Simple Stateless Retry

The default RetryTemplate has a SimpleRetryPolicy which tries three times. There is no BackOffPolicy, so the three attempts are made back-to-back-to-back with no delay between attempts. There is no RecoveryCallback, so the result is to throw the exception to the caller after the final failed retry occurs. In a Spring Integration environment, this final exception might be handled by using an error-channel on the inbound endpoint. The following example uses RetryTemplate and shows its DEBUG output:

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"/>
    </int:request-handler-advice-chain>
</int:service-activator>

DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
DEBUG [task-scheduler-2]Retry: count=0
DEBUG [task-scheduler-2]Checking for rethrow: count=1
DEBUG [task-scheduler-2]Retry: count=1
DEBUG [task-scheduler-2]Checking for rethrow: count=2
DEBUG [task-scheduler-2]Retry: count=2
DEBUG [task-scheduler-2]Checking for rethrow: count=3
DEBUG [task-scheduler-2]Retry failed last attempt: count=3
Simple Stateless Retry with Recovery

The following example adds a RecoveryCallback to the preceding example and uses an ErrorMessageSendingRecoverer to send an ErrorMessage to a channel:

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
DEBUG [task-scheduler-2]Retry: count=0
DEBUG [task-scheduler-2]Checking for rethrow: count=1
DEBUG [task-scheduler-2]Retry: count=1
DEBUG [task-scheduler-2]Checking for rethrow: count=2
DEBUG [task-scheduler-2]Retry: count=2
DEBUG [task-scheduler-2]Checking for rethrow: count=3
DEBUG [task-scheduler-2]Retry failed last attempt: count=3
DEBUG [task-scheduler-2]Sending ErrorMessage :failedMessage:[Payload=...]
Stateless Retry with Customized Policies, and Recovery

For more sophistication, we can provide the advice with a customized RetryTemplate. This example continues to use the SimpleRetryPolicy but increases the attempts to four. It also adds an ExponentialBackoffPolicy where the first retry waits one second, the second waits five seconds and the third waits 25 (for four attempts in all). The following listing shows the example and its DEBUG output:

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
            <property name="retryTemplate" ref="retryTemplate" />
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="retryPolicy">
        <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
            <property name="maxAttempts" value="4" />
        </bean>
    </property>
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="1000" />
            <property name="multiplier" value="5.0" />
            <property name="maxInterval" value="60000" />
        </bean>
    </property>
</bean>

27.058 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
27.071 DEBUG [task-scheduler-1]Retry: count=0
27.080 DEBUG [task-scheduler-1]Sleeping for 1000
28.081 DEBUG [task-scheduler-1]Checking for rethrow: count=1
28.081 DEBUG [task-scheduler-1]Retry: count=1
28.081 DEBUG [task-scheduler-1]Sleeping for 5000
33.082 DEBUG [task-scheduler-1]Checking for rethrow: count=2
33.082 DEBUG [task-scheduler-1]Retry: count=2
33.083 DEBUG [task-scheduler-1]Sleeping for 25000
58.083 DEBUG [task-scheduler-1]Checking for rethrow: count=3
58.083 DEBUG [task-scheduler-1]Retry: count=3
58.084 DEBUG [task-scheduler-1]Checking for rethrow: count=4
58.084 DEBUG [task-scheduler-1]Retry failed last attempt: count=4
58.086 DEBUG [task-scheduler-1]Sending ErrorMessage :failedMessage:[Payload=...]
Namespace Support for Stateless Retry

Starting with version 4.0, the preceding configuration can be greatly simplified, thanks to the namespace support for the retry advice, as the following example shows:

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <ref bean="retrier" />
    </int:request-handler-advice-chain>
</int:service-activator>

<int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel">
    <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
</int:handler-retry-advice>

在前面的示例中,建议被定义为顶级 Bean,以便在多个 request-handler-advice-chain 实例中使用。你也可以直接在链中定义建议,如下面的示例所示:

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel">
            <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
        </int:retry-advice>
    </int:request-handler-advice-chain>
</int:service-activator>

<handler-retry-advice> 可以有 <fixed-back-off><exponential-back-off> 子元素,或没有子元素。没有子元素的 <handler-retry-advice> 不使用后退。如果没有 recovery-channel,则重试耗尽时会抛出异常。名称空间只能与无状态重试一起使用。 对于更复杂的环境(自定义策略等),请使用普通的 <bean> 定义。

Simple Stateful Retry with Recovery

To make retry stateful, we need to provide the advice with a RetryStateGenerator implementation. This class is used to identify a message as being a resubmission so that the RetryTemplate can determine the current state of retry for this message. The framework provides a SpelExpressionRetryStateGenerator, which determines the message identifier by using a SpEL expression. This example again uses the default policies (three attempts with no back off). As with stateless retry, these policies can be customized. The following listing shows the example and its DEBUG output:

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="retryStateGenerator">
                <bean class="o.s.i.handler.advice.SpelExpressionRetryStateGenerator">
                    <constructor-arg value="headers['jms_messageId']" />
                </bean>
            </property>
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

24.351 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
24.368 DEBUG [Container#0-1]Retry: count=0
24.387 DEBUG [Container#0-1]Checking for rethrow: count=1
24.387 DEBUG [Container#0-1]Rethrow in retry for policy: count=1
24.387 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
24.391 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
25.412 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
25.412 DEBUG [Container#0-1]Retry: count=1
25.413 DEBUG [Container#0-1]Checking for rethrow: count=2
25.413 DEBUG [Container#0-1]Rethrow in retry for policy: count=2
25.413 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
25.414 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
26.418 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
26.418 DEBUG [Container#0-1]Retry: count=2
26.419 DEBUG [Container#0-1]Checking for rethrow: count=3
26.419 DEBUG [Container#0-1]Rethrow in retry for policy: count=3
26.419 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
26.420 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
27.425 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
27.426 DEBUG [Container#0-1]Retry failed last attempt: count=3
27.426 DEBUG [Container#0-1]Sending ErrorMessage :failedMessage:[Payload=...]

如果你将前面的示例与无状态示例进行比较,你可以看到,对于有状态重试,异常在每次失败时都会抛出给调用者。

Exception Classification for Retry

Spring Retry has a great deal of flexibility for determining which exceptions can invoke retry. The default configuration retries for all exceptions and the exception classifier looks at the top-level exception. If you configure it to, say, retry only on MyException and your application throws a SomeOtherException where the cause is a MyException, retry does not occur.

自 Spring Retry 1.0.3 以来,BinaryExceptionClassifier 有一个名为 traverseCauses 的属性(默认值为 false)。当为 true 时,它会遍历异常原因,直到找到匹配项或遍历完原因。 要在重试中使用此分类器,请使用采用最大尝试次数、Exception 对象的 MaptraverseCauses 布尔值构造函数创建的 SimpleRetryPolicy。然后你可以将此策略注入到 RetryTemplate 中。

这种情况下需要 traverseCauses,因为用户异常可能包装在 MessagingException 中。

Circuit Breaker Advice

断路器模式的总体思路是,如果服务当前不可用,则不要浪费时间(和资源)尝试使用它。o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice 实现了这一模式。当断路器处于闭合状态时,端点会尝试调用服务。如果连续尝试次数超过一定数量,则断路器将处于打开状态。当它处于打开状态时,新的请求“快速失败”,并且在一段时间过去之前不会尝试调用该服务。

在该时间过期时,将断路器设置为半开状态。在此状态下,如果单次尝试失败,则断路器将立即进入打开状态。如果尝试成功,则断路器进入关闭状态,在这种情况下,直到再次发生配置的连续故障次数,才进入打开状态。任何成功的尝试都会将状态重置为零故障,以确定断路器何时可能再次进入打开状态。

通常,此建议可用于可能需要一段时间才能失败(例如尝试建立网络连接的超时)的外部服务。

RequestHandlerCircuitBreakerAdvice 有两个属性:thresholdhalfOpenAfterthreshold 属性表示断路器打开之前需要发生的连续故障次数。默认为 5halfOpenAfter 属性表示上次故障后断路器在尝试另一个请求之前等待的时间。默认值为 1000 毫秒。

以下示例配置了断路器并显示了其 DEBUGERROR 输出:

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
            <property name="threshold" value="2" />
            <property name="halfOpenAfter" value="12000" />
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator

在前一个示例中,阈值设置为 2halfOpenAfter 设置为 12 秒。每 5 秒就会到达一个新请求。前两次尝试调用了服务。第三次和第四次尝试失败,并抛出一个异常,表明断路器已打开。第五次请求已尝试,因为该请求在最后一次失败后 15 秒。第六次尝试立即失败,因为断路器立即进入打开状态。

Expression Evaluating Advice

最后提供的建议类是 o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice。此建议比其他两条建议更通用。它提供了一种在发送到端点的原始入站消息上求值表达式的机制。可以求值单独的表达式,以便在成功或失败后求值。可以选择将包含求值结果和输入消息的消息发送到消息通道。

此建议的典型用例可能是 <ftp:outbound-channel-adapter/>,可能在传输成功时将文件移至一个目录,或在传输失败时将文件移至另一个目录:

该建议具有在成功时设置表达式的属性、失败时的表达式以及每个表达式的相应通道。对于成功的情况,发送到 successChannel 的消息是 AdviceMessage,有效负载是表达式求值的结果。一个名为 inputMessage 的附加属性包含发送到处理程序的原始消息。发送到 failureChannel(当处理程序抛出异常时)的消息是带有 MessageHandlingExpressionEvaluatingAdviceException 有效负载的 ErrorMessage。像所有 MessagingException 实例一样,此有效负载具有 failedMessagecause 属性,以及一个名为 evaluationResult 的附加属性,其中包含表达式求值的結果。

从版本 5.1.3 开始,如果配置了通道但未提供表达式,则使用默认表达式对消息 payload 进行评估。

当在建议的范围内抛出异常时,默认情况下,该异常将在任何 failureExpression 求值后抛给调用者。如果您希望禁止抛出异常,请将 trapException 属性设置为 true。以下建议显示如何使用 Java DSL 配置 advice

@SpringBootApplication
public class EerhaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
        MessageChannel in = context.getBean("advised.input", MessageChannel.class);
        in.send(new GenericMessage<>("good"));
        in.send(new GenericMessage<>("bad"));
        context.close();
    }

    @Bean
    public IntegrationFlow advised() {
        return f -> f.<String>handle((payload, headers) -> {
            if (payload.equals("good")) {
                return null;
            }
            else {
                throw new RuntimeException("some failure");
            }
        }, c -> c.advice(expressionAdvice()));
    }

    @Bean
    public Advice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannelName("success.input");
        advice.setOnSuccessExpressionString("payload + ' was successful'");
        advice.setFailureChannelName("failure.input");
        advice.setOnFailureExpressionString(
                "payload + ' was bad, with reason: ' + #exception.cause.message");
        advice.setTrapException(true);
        return advice;
    }

    @Bean
    public IntegrationFlow success() {
        return f -> f.handle(System.out::println);
    }

    @Bean
    public IntegrationFlow failure() {
        return f -> f.handle(System.out::println);
    }

}

Rate Limiter Advice

速率限制器建议(RateLimiterRequestHandlerAdvice)确保端点不会因请求而过载。当速率限制遭到破坏时,请求将变为已阻止状态。

此建议的典型用例可能是外部服务提供程序不允许每分钟超过 n 个请求。

`RateLimiterRequestHandlerAdvice`实现完全基于 Resilience4j项目,并且需要 `RateLimiter`或 `RateLimiterConfig`注入。还可以配置默认值和/或自定义名称。

以下示例配置了速率限制器建议,每 1 秒一个请求:

@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
    return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
            .limitRefreshPeriod(Duration.ofSeconds(1))
            .limitForPeriod(1)
            .build());
}

@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
		adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
    ...
}

Caching Advice

从 5.2 版本开始,引入了 CacheRequestHandlerAdvice。它基于 Spring Framework 中的缓存抽象,并与 @Caching 注释家族提供的概念和功能保持一致。内部逻辑基于 CacheAspectSupport 扩展,其中缓存操作代理围绕带有请求 Message<?> 作为参数的 AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage 方法进行。可以使用 SpEL 表达式或 Function 来配置此建议以评估缓存键。请求 Message<?> 可用作 SpEL 评估上下文的根对象或作为 Function 输入参数。默认情况下,请求消息的 payload 用于缓存键。必须使用 cacheNames 配置 CacheRequestHandlerAdvice,当默认缓存操作为 CacheableOperation 或一组任意 CacheOperation 时。可以分别配置每个 CacheOperation,也可以具有共享选项,例如可以从 CacheRequestHandlerAdvice 配置中重用的 CacheManagerCacheResolverCacheErrorHandler。此配置功能类似于 Spring Framework 的 @CacheConfig@Caching 注释组合。如果没有提供 CacheManager,则默认情况下从 CacheAspectSupport 中的 BeanFactory 解析单个 bean。

以下示例配置了两个具有不同缓存操作集的建议:

@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
    CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
    cacheRequestHandlerAdvice.setKeyExpressionString("payload");
    return cacheRequestHandlerAdvice;
}

@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
    ...
}

@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
    CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
    cacheRequestHandlerAdvice.setKeyExpressionString("payload");
    CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
    cachePutBuilder.setCacheName(TEST_PUT_CACHE);
    CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
    cacheEvictBuilder.setCacheName(TEST_CACHE);
    cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
    return cacheRequestHandlerAdvice;
}

@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
    adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
    ...
}