Repeat
RepeatTemplate
批量处理是重复性动作,作为简单的优化或某项任务的一部分。为了设计和概括重复,并提供一个迭代器框架,Spring Batch 提供了 RepeatOperations
接口。RepeatOperations
接口具有以下定义:
public interface RepeatOperations {
RepeatStatus iterate(RepeatCallback callback) throws RepeatException;
}
回调是接口,如下面的定义中所示,它允许您插入要重复执行的一些业务逻辑:
public interface RepeatCallback {
RepeatStatus doInIteration(RepeatContext context) throws Exception;
}
在实现决定迭代何时结束之前,反复执行回调。这些接口中的返回值是枚举值,可以是 RepeatStatus.CONTINUABLE
或 RepeatStatus.FINISHED
。RepeatStatus
枚举向重复操作的调用者传达有关是否还有剩余工作的相关信息。一般来说,RepeatOperations
的实现应检查 RepeatStatus
,并将其用作结束迭代决策的一部分。希望向调用方发出信号,表示没有任何剩余工作的任何回调都可以返回 RepeatStatus.FINISHED
。
RepeatOperations
最简单的通用实现是 RepeatTemplate
:
RepeatTemplate template = new RepeatTemplate();
template.setCompletionPolicy(new SimpleCompletionPolicy(2));
template.iterate(new RepeatCallback() {
public RepeatStatus doInIteration(RepeatContext context) {
// Do stuff in batch...
return RepeatStatus.CONTINUABLE;
}
});
在前面的示例中,我们返回 RepeatStatus.CONTINUABLE
,表示还有更多工作要做。回调还可以返回 RepeatStatus.FINISHED
,以向调用方发出信号,表示没有剩余工作。一些迭代可以通过对回调中所做的工作内在的考虑因素来终止。而另一些则有效地进入无限循环(就回调而言),并且完成决策委派给外部策略,如前面示例中所示的情况。
Completion Policies
在 RepeatTemplate
内,iterate
方法中循环的终止是由 CompletionPolicy
决定的,CompletionPolicy
也是 RepeatContext
的工厂。RepeatTemplate
负责使用当前策略创建 RepeatContext
,并在迭代的每个阶段将其传递给 RepeatCallback
。在一个回调完成其 doInIteration
后,RepeatTemplate
必须调用 CompletionPolicy
来要求它更新其状态(该状态将存储在 RepeatContext
中)。然后它会询问策略迭代是否已完成。
Spring Batch 提供了一些简单通用的 CompletionPolicy
实现。SimpleCompletionPolicy
允许最多执行固定次数(而 RepeatStatus.FINISHED
则强制随时提前完成)。
用户可能需要为更复杂的决定实现自己的完成策略。例如,一个批处理窗口阻止批处理作业在线上系统使用时执行,这需要一个自定义策略。
Exception Handling
如果在 RepeatCallback
内抛出异常,则 RepeatTemplate
会咨询 ExceptionHandler
,ExceptionHandler
可以决定是否重新抛出异常。
以下清单显示了 ExceptionHandler
接口定义:
public interface ExceptionHandler {
void handleException(RepeatContext context, Throwable throwable)
throws Throwable;
}
一个常见的用例是计算给定类型的异常数量,并在达到限制时失败。为此,Spring Batch 提供了 SimpleLimitExceptionHandler
和一个更灵活的 RethrowOnThresholdExceptionHandler
。SimpleLimitExceptionHandler
具有一个限制属性和一个应与当前异常进行比较的异常类型。提供的类型的子类也会被计算在内。在达到限制之前,给定类型的异常将被忽略,然后重新抛出。其他类型的异常总是被重新抛出。
SimpleLimitExceptionHandler
的一个重要的可选属性是名为 useParent
的布尔标志。它默认值为 false
,因此只在当前 RepeatContext
中计算限制。当设置为 true
时,限制将跨嵌套迭代中的兄弟上下文(例如,步骤中的集合块)保存。
Listeners
通常,能够跨多个不同迭代针对横切关注点接收其他回调非常有用。出于此目的,Spring Batch 提供了 RepeatListener
接口。RepeatTemplate
允许用户注册 RepeatListener
实现,并且会在迭代过程中使用 RepeatContext
和 RepeatStatus
提供回调(如果可用)。
RepeatListener
接口定义如下:
public interface RepeatListener {
void before(RepeatContext context);
void after(RepeatContext context, RepeatStatus result);
void open(RepeatContext context);
void onError(RepeatContext context, Throwable e);
void close(RepeatContext context);
}
open
和 close
回调在整个迭代之前和之后出现。before
、after
和 onError
适用于各个 RepeatCallback
调用。
请注意,当有多个监听器时,它们会存放在一个列表中,因此存在一个顺序。在这种情况下,open
和 before
会按照相同的顺序被调用,而 after
、onError
和 close
则会按照相反的顺序被调用。
Parallel Processing
RepeatOperations
实现不受限于按顺序执行回调。一些实现能够并行执行其回调非常重要。为此,Spring Batch 提供了 TaskExecutorRepeatTemplate
,它使用 Spring TaskExecutor
策略来运行 RepeatCallback
。默认使用 SynchronousTaskExecutor
,其效果是在同一线程中执行整个迭代(与普通 RepeatTemplate
相同)。
Declarative Iteration
有时,有一些业务处理你知道每次发生时都希望重复执行。一个经典的示例是消息管道的优化。如果一批消息频繁到达,那么处理这些消息比为每条消息承担单独的事务开销更有效。为此,Spring Batch 提供了一个 AOP 拦截器,它将一个方法调用包装到一个 RepeatOperations
对象中。RepeatOperationsInterceptor
执行拦截的方法,并根据所提供 RepeatTemplate
中的 CompletionPolicy
进行重复。
- Java
-
下面的示例使用 Java 配置重复对一个名为
processMessage
的方法进行服务调用(有关如何配置 AOP 拦截器的更多详细信息,请参阅<<[role="bare"][role="bare"]https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#aop,Spring 用户指南>>):
@Bean
public MyService myService() {
ProxyFactory factory = new ProxyFactory(RepeatOperations.class.getClassLoader());
factory.setInterfaces(MyService.class);
factory.setTarget(new MyService());
MyService service = (MyService) factory.getProxy();
JdkRegexpMethodPointcut pointcut = new JdkRegexpMethodPointcut();
pointcut.setPatterns(".*processMessage.*");
RepeatOperationsInterceptor interceptor = new RepeatOperationsInterceptor();
((Advised) service).addAdvisor(new DefaultPointcutAdvisor(pointcut, interceptor));
return service;
}
- XML
-
下面的示例展示了使用 Spring AOP 命名空间对一个名为
processMessage
的方法进行服务调用(有关如何配置 AOP 拦截器的更多详细信息,请参阅<<[role="bare"][role="bare"]https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#aop,Spring 用户指南>>):
<aop:config>
<aop:pointcut id="transactional"
expression="execution(* com..*Service.processMessage(..))" />
<aop:advisor pointcut-ref="transactional"
advice-ref="retryAdvice" order="-1"/>
</aop:config>
<bean id="retryAdvice" class="org.spr...RepeatOperationsInterceptor"/>
前面的示例在拦截器内部使用了一个默认的 RepeatTemplate
。为了更改策略、监听器和其他详细信息,你可以将 RepeatTemplate
实例注入拦截器中。
如果拦截的方法返回 void
,那么拦截器总会返回 RepeatStatus.CONTINUABLE
(因此如果 CompletionPolicy
没有有限的终点,则有陷入无限循环的危险)。否则,它会返回 RepeatStatus.CONTINUABLE
,直到来自拦截的方法的返回值为 null
。在该点,它返回 RepeatStatus.FINISHED
。因此,目标方法内的业务逻辑可以通过返回 null
或抛出所提供的 RepeatTemplate
中 ExceptionHandler
重新抛出的异常来表示没有更多工作要做。