Repeat

RepeatTemplate

批量处理是重复性动作,作为简单的优化或某项任务的一部分。为了设计和概括重复,并提供一个迭代器框架,Spring Batch 提供了 RepeatOperations 接口。RepeatOperations 接口具有以下定义:

public interface RepeatOperations {

    RepeatStatus iterate(RepeatCallback callback) throws RepeatException;

}

回调是接口,如下面的定义中所示,它允许您插入要重复执行的一些业务逻辑:

public interface RepeatCallback {

    RepeatStatus doInIteration(RepeatContext context) throws Exception;

}

在实现决定迭代何时结束之前,反复执行回调。这些接口中的返回值是枚举值,可以是 RepeatStatus.CONTINUABLERepeatStatus.FINISHEDRepeatStatus 枚举向重复操作的调用者传达有关是否还有剩余工作的相关信息。一般来说,RepeatOperations 的实现应检查 RepeatStatus,并将其用作结束迭代决策的一部分。希望向调用方发出信号,表示没有任何剩余工作的任何回调都可以返回 RepeatStatus.FINISHED

RepeatOperations 最简单的通用实现是 RepeatTemplate

RepeatTemplate template = new RepeatTemplate();

template.setCompletionPolicy(new SimpleCompletionPolicy(2));

template.iterate(new RepeatCallback() {

    public RepeatStatus doInIteration(RepeatContext context) {
        // Do stuff in batch...
        return RepeatStatus.CONTINUABLE;
    }

});

在前面的示例中,我们返回 RepeatStatus.CONTINUABLE,表示还有更多工作要做。回调还可以返回 RepeatStatus.FINISHED,以向调用方发出信号,表示没有剩余工作。一些迭代可以通过对回调中所做的工作内在的考虑因素来终止。而另一些则有效地进入无限循环(就回调而言),并且完成决策委派给外部策略,如前面示例中所示的情况。

RepeatContext

RepeatCallback 的方法参数是 RepeatContext。许多回调忽略上下文。但是,如果需要,您可以将它用作属性包来在迭代期间存储瞬态数据。在 iterate 方法返回后,上下文就不存在了。

如果正在进行嵌套迭代,则 RepeatContext 有一个父上下文。父上下文有时可用于存储需要在对 iterate 的调用之间共享的数据。例如,如果您希望计算迭代中事件的发生次数并记住它跨越后续调用,则这种情况会发生。

RepeatStatus

RepeatStatus 是 Spring Batch 用于指示处理是否完成的枚举。它具有两个可能的 RepeatStatus 值:

Table 1. RepeatStatus Properties

Value

Description

CONTINUABLE

还有更多工作要做。

FINISHED

不应该再有重复。

您可以使用 RepeatStatus 中的 and() 方法将 RepeatStatus 值与逻辑 AND 操作相结合。这样做的效果是对可继续标志执行逻辑 AND 操作。换句话说,如果任何状态都是 FINISHED,则结果就是 FINISHED

Completion Policies

RepeatTemplate 内,iterate 方法中循环的终止是由 CompletionPolicy 决定的,CompletionPolicy 也是 RepeatContext 的工厂。RepeatTemplate 负责使用当前策略创建 RepeatContext,并在迭代的每个阶段将其传递给 RepeatCallback。在一个回调完成其 doInIteration 后,RepeatTemplate 必须调用 CompletionPolicy 来要求它更新其状态(该状态将存储在 RepeatContext 中)。然后它会询问策略迭代是否已完成。

Spring Batch 提供了一些简单通用的 CompletionPolicy 实现。SimpleCompletionPolicy 允许最多执行固定次数(而 RepeatStatus.FINISHED 则强制随时提前完成)。

用户可能需要为更复杂的决定实现自己的完成策略。例如,一个批处理窗口阻止批处理作业在线上系统使用时执行,这需要一个自定义策略。

Exception Handling

如果在 RepeatCallback 内抛出异常,则 RepeatTemplate 会咨询 ExceptionHandlerExceptionHandler 可以决定是否重新抛出异常。

以下清单显示了 ExceptionHandler 接口定义:

public interface ExceptionHandler {

    void handleException(RepeatContext context, Throwable throwable)
        throws Throwable;

}

一个常见的用例是计算给定类型的异常数量,并在达到限制时失败。为此,Spring Batch 提供了 SimpleLimitExceptionHandler 和一个更灵活的 RethrowOnThresholdExceptionHandlerSimpleLimitExceptionHandler 具有一个限制属性和一个应与当前异常进行比较的异常类型。提供的类型的子类也会被计算在内。在达到限制之前,给定类型的异常将被忽略,然后重新抛出。其他类型的异常总是被重新抛出。

SimpleLimitExceptionHandler 的一个重要的可选属性是名为 useParent 的布尔标志。它默认值为 false,因此只在当前 RepeatContext 中计算限制。当设置为 true 时,限制将跨嵌套迭代中的兄弟上下文(例如,步骤中的集合块)保存。

Listeners

通常,能够跨多个不同迭代针对横切关注点接收其他回调非常有用。出于此目的,Spring Batch 提供了 RepeatListener 接口。RepeatTemplate 允许用户注册 RepeatListener 实现,并且会在迭代过程中使用 RepeatContextRepeatStatus 提供回调(如果可用)。

RepeatListener 接口定义如下:

public interface RepeatListener {
    void before(RepeatContext context);
    void after(RepeatContext context, RepeatStatus result);
    void open(RepeatContext context);
    void onError(RepeatContext context, Throwable e);
    void close(RepeatContext context);
}

openclose 回调在整个迭代之前和之后出现。beforeafteronError 适用于各个 RepeatCallback 调用。

请注意,当有多个监听器时,它们会存放在一个列表中,因此存在一个顺序。在这种情况下,openbefore 会按照相同的顺序被调用,而 afteronErrorclose 则会按照相反的顺序被调用。

Parallel Processing

RepeatOperations 实现不受限于按顺序执行回调。一些实现能够并行执行其回调非常重要。为此,Spring Batch 提供了 TaskExecutorRepeatTemplate,它使用 Spring TaskExecutor 策略来运行 RepeatCallback。默认使用 SynchronousTaskExecutor,其效果是在同一线程中执行整个迭代(与普通 RepeatTemplate 相同)。

Declarative Iteration

有时,有一些业务处理你知道每次发生时都希望重复执行。一个经典的示例是消息管道的优化。如果一批消息频繁到达,那么处理这些消息比为每条消息承担单独的事务开销更有效。为此,Spring Batch 提供了一个 AOP 拦截器,它将一个方法调用包装到一个 RepeatOperations 对象中。RepeatOperationsInterceptor 执行拦截的方法,并根据所提供 RepeatTemplate 中的 CompletionPolicy 进行重复。

Java

下面的示例使用 Java 配置重复对一个名为 processMessage 的方法进行服务调用(有关如何配置 AOP 拦截器的更多详细信息,请参阅<<[role="bare"][role="bare"]https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#aop,Spring 用户指南>>):

@Bean
public MyService myService() {
	ProxyFactory factory = new ProxyFactory(RepeatOperations.class.getClassLoader());
	factory.setInterfaces(MyService.class);
	factory.setTarget(new MyService());

	MyService service = (MyService) factory.getProxy();
	JdkRegexpMethodPointcut pointcut = new JdkRegexpMethodPointcut();
	pointcut.setPatterns(".*processMessage.*");

	RepeatOperationsInterceptor interceptor = new RepeatOperationsInterceptor();

	((Advised) service).addAdvisor(new DefaultPointcutAdvisor(pointcut, interceptor));

	return service;
}
XML

下面的示例展示了使用 Spring AOP 命名空间对一个名为 processMessage 的方法进行服务调用(有关如何配置 AOP 拦截器的更多详细信息,请参阅<<[role="bare"][role="bare"]https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#aop,Spring 用户指南>>):

<aop:config>
    <aop:pointcut id="transactional"
        expression="execution(* com..*Service.processMessage(..))" />
    <aop:advisor pointcut-ref="transactional"
        advice-ref="retryAdvice" order="-1"/>
</aop:config>

<bean id="retryAdvice" class="org.spr...RepeatOperationsInterceptor"/>

前面的示例在拦截器内部使用了一个默认的 RepeatTemplate。为了更改策略、监听器和其他详细信息,你可以将 RepeatTemplate 实例注入拦截器中。

如果拦截的方法返回 void,那么拦截器总会返回 RepeatStatus.CONTINUABLE(因此如果 CompletionPolicy 没有有限的终点,则有陷入无限循环的危险)。否则,它会返回 RepeatStatus.CONTINUABLE,直到来自拦截的方法的返回值为 null。在该点,它返回 RepeatStatus.FINISHED。因此,目标方法内的业务逻辑可以通过返回 null 或抛出所提供的 RepeatTemplateExceptionHandler 重新抛出的异常来表示没有更多工作要做。