Repeat

RepeatTemplate

批量处理是重复性动作,作为简单的优化或某项任务的一部分。为了设计和概括重复,并提供一个迭代器框架,Spring Batch 提供了 RepeatOperations 接口。RepeatOperations 接口具有以下定义:

Batch processing is about repetitive actions, either as a simple optimization or as part of a job. To strategize and generalize the repetition and to provide what amounts to an iterator framework, Spring Batch has the RepeatOperations interface. The RepeatOperations interface has the following definition:

public interface RepeatOperations {

    RepeatStatus iterate(RepeatCallback callback) throws RepeatException;

}

回调是接口,如下面的定义中所示,它允许您插入要重复执行的一些业务逻辑:

The callback is an interface, shown in the following definition, that lets you insert some business logic to be repeated:

public interface RepeatCallback {

    RepeatStatus doInIteration(RepeatContext context) throws Exception;

}

在实现决定迭代何时结束之前,反复执行回调。这些接口中的返回值是枚举值,可以是 RepeatStatus.CONTINUABLERepeatStatus.FINISHEDRepeatStatus 枚举向重复操作的调用者传达有关是否还有剩余工作的相关信息。一般来说,RepeatOperations 的实现应检查 RepeatStatus,并将其用作结束迭代决策的一部分。希望向调用方发出信号,表示没有任何剩余工作的任何回调都可以返回 RepeatStatus.FINISHED

The callback is executed repeatedly until the implementation determines that the iteration should end. The return value in these interfaces is an enumeration value that can be either RepeatStatus.CONTINUABLE or RepeatStatus.FINISHED. A RepeatStatus enumeration conveys information to the caller of the repeat operations about whether any work remains. Generally speaking, implementations of RepeatOperations should inspect RepeatStatus and use it as part of the decision to end the iteration. Any callback that wishes to signal to the caller that there is no work remains can return RepeatStatus.FINISHED.

RepeatOperations 最简单的通用实现是 RepeatTemplate

The simplest general purpose implementation of RepeatOperations is RepeatTemplate:

RepeatTemplate template = new RepeatTemplate();

template.setCompletionPolicy(new SimpleCompletionPolicy(2));

template.iterate(new RepeatCallback() {

    public RepeatStatus doInIteration(RepeatContext context) {
        // Do stuff in batch...
        return RepeatStatus.CONTINUABLE;
    }

});

在前面的示例中,我们返回 RepeatStatus.CONTINUABLE,表示还有更多工作要做。回调还可以返回 RepeatStatus.FINISHED,以向调用方发出信号,表示没有剩余工作。一些迭代可以通过对回调中所做的工作内在的考虑因素来终止。而另一些则有效地进入无限循环(就回调而言),并且完成决策委派给外部策略,如前面示例中所示的情况。

In the preceding example, we return RepeatStatus.CONTINUABLE, to show that there is more work to do. The callback can also return RepeatStatus.FINISHED, to signal to the caller that there is no work remains. Some iterations can be terminated by considerations intrinsic to the work being done in the callback. Others are effectively infinite loops (as far as the callback is concerned), and the completion decision is delegated to an external policy, as in the case shown in the preceding example.

RepeatContext

RepeatCallback 的方法参数是 RepeatContext。许多回调忽略上下文。但是,如果需要,您可以将它用作属性包来在迭代期间存储瞬态数据。在 iterate 方法返回后,上下文就不存在了。

The method parameter for the RepeatCallback is a RepeatContext. Many callbacks ignore the context. However, if necessary, you can use it as an attribute bag to store transient data for the duration of the iteration. After the iterate method returns, the context no longer exists.

如果正在进行嵌套迭代,则 RepeatContext 有一个父上下文。父上下文有时可用于存储需要在对 iterate 的调用之间共享的数据。例如,如果您希望计算迭代中事件的发生次数并记住它跨越后续调用,则这种情况会发生。

If there is a nested iteration in progress, a RepeatContext has a parent context. The parent context is occasionally useful for storing data that need to be shared between calls to iterate. This is the case, for instance, if you want to count the number of occurrences of an event in the iteration and remember it across subsequent calls.

RepeatStatus

RepeatStatus 是 Spring Batch 用于指示处理是否完成的枚举。它具有两个可能的 RepeatStatus 值:

RepeatStatus is an enumeration used by Spring Batch to indicate whether processing has finished. It has two possible RepeatStatus values:

Table 1. RepeatStatus Properties

Value

Description

CONTINUABLE

There is more work to do.

FINISHED

No more repetitions should take place.

您可以使用 RepeatStatus 中的 and() 方法将 RepeatStatus 值与逻辑 AND 操作相结合。这样做的效果是对可继续标志执行逻辑 AND 操作。换句话说,如果任何状态都是 FINISHED,则结果就是 FINISHED

You can combine RepeatStatus values with a logical AND operation by using the and() method in RepeatStatus. The effect of this is to do a logical AND on the continuable flag. In other words, if either status is FINISHED, the result is FINISHED.

Completion Policies

RepeatTemplate 内,iterate 方法中循环的终止是由 CompletionPolicy 决定的,CompletionPolicy 也是 RepeatContext 的工厂。RepeatTemplate 负责使用当前策略创建 RepeatContext,并在迭代的每个阶段将其传递给 RepeatCallback。在一个回调完成其 doInIteration 后,RepeatTemplate 必须调用 CompletionPolicy 来要求它更新其状态(该状态将存储在 RepeatContext 中)。然后它会询问策略迭代是否已完成。

Inside a RepeatTemplate, the termination of the loop in the iterate method is determined by a CompletionPolicy, which is also a factory for the RepeatContext. The RepeatTemplate has the responsibility to use the current policy to create a RepeatContext and pass that in to the RepeatCallback at every stage in the iteration. After a callback completes its doInIteration, the RepeatTemplate has to make a call to the CompletionPolicy to ask it to update its state (which will be stored in the RepeatContext). Then it asks the policy if the iteration is complete.

Spring Batch 提供了一些简单通用的 CompletionPolicy 实现。SimpleCompletionPolicy 允许最多执行固定次数(而 RepeatStatus.FINISHED 则强制随时提前完成)。

Spring Batch provides some simple general purpose implementations of CompletionPolicy. SimpleCompletionPolicy allows execution up to a fixed number of times (with RepeatStatus.FINISHED forcing early completion at any time).

用户可能需要为更复杂的决定实现自己的完成策略。例如,一个批处理窗口阻止批处理作业在线上系统使用时执行,这需要一个自定义策略。

Users might need to implement their own completion policies for more complicated decisions. For example, a batch processing window that prevents batch jobs from executing once the online systems are in use would require a custom policy.

Exception Handling

如果在 RepeatCallback 内抛出异常,则 RepeatTemplate 会咨询 ExceptionHandlerExceptionHandler 可以决定是否重新抛出异常。

If there is an exception thrown inside a RepeatCallback, the RepeatTemplate consults an ExceptionHandler, which can decide whether or not to re-throw the exception.

以下清单显示了 ExceptionHandler 接口定义:

The following listing shows the ExceptionHandler interface definition:

public interface ExceptionHandler {

    void handleException(RepeatContext context, Throwable throwable)
        throws Throwable;

}

一个常见的用例是计算给定类型的异常数量,并在达到限制时失败。为此,Spring Batch 提供了 SimpleLimitExceptionHandler 和一个更灵活的 RethrowOnThresholdExceptionHandlerSimpleLimitExceptionHandler 具有一个限制属性和一个应与当前异常进行比较的异常类型。提供的类型的子类也会被计算在内。在达到限制之前,给定类型的异常将被忽略,然后重新抛出。其他类型的异常总是被重新抛出。

A common use case is to count the number of exceptions of a given type and fail when a limit is reached. For this purpose, Spring Batch provides the SimpleLimitExceptionHandler and a slightly more flexible RethrowOnThresholdExceptionHandler. The SimpleLimitExceptionHandler has a limit property and an exception type that should be compared with the current exception. All subclasses of the provided type are also counted. Exceptions of the given type are ignored until the limit is reached, and then they are rethrown. Exceptions of other types are always rethrown.

SimpleLimitExceptionHandler 的一个重要的可选属性是名为 useParent 的布尔标志。它默认值为 false,因此只在当前 RepeatContext 中计算限制。当设置为 true 时,限制将跨嵌套迭代中的兄弟上下文(例如,步骤中的集合块)保存。

An important optional property of the SimpleLimitExceptionHandler is the boolean flag called useParent. It is false by default, so the limit is only accounted for in the current RepeatContext. When set to true, the limit is kept across sibling contexts in a nested iteration (such as a set of chunks inside a step).

Listeners

通常,能够跨多个不同迭代针对横切关注点接收其他回调非常有用。出于此目的,Spring Batch 提供了 RepeatListener 接口。RepeatTemplate 允许用户注册 RepeatListener 实现,并且会在迭代过程中使用 RepeatContextRepeatStatus 提供回调(如果可用)。

Often, it is useful to be able to receive additional callbacks for cross-cutting concerns across a number of different iterations. For this purpose, Spring Batch provides the RepeatListener interface. The RepeatTemplate lets users register RepeatListener implementations, and they are given callbacks with the RepeatContext and RepeatStatus where available during the iteration.

RepeatListener 接口定义如下:

The RepeatListener interface has the following definition:

public interface RepeatListener {
    void before(RepeatContext context);
    void after(RepeatContext context, RepeatStatus result);
    void open(RepeatContext context);
    void onError(RepeatContext context, Throwable e);
    void close(RepeatContext context);
}

openclose 回调在整个迭代之前和之后出现。beforeafteronError 适用于各个 RepeatCallback 调用。

The open and close callbacks come before and after the entire iteration. before, after, and onError apply to the individual RepeatCallback calls.

请注意,当有多个监听器时,它们会存放在一个列表中,因此存在一个顺序。在这种情况下,openbefore 会按照相同的顺序被调用,而 afteronErrorclose 则会按照相反的顺序被调用。

Note that, when there is more than one listener, they are in a list, so there is an order. In this case, open and before are called in the same order while after, onError, and close are called in reverse order.

Parallel Processing

RepeatOperations 实现不受限于按顺序执行回调。一些实现能够并行执行其回调非常重要。为此,Spring Batch 提供了 TaskExecutorRepeatTemplate,它使用 Spring TaskExecutor 策略来运行 RepeatCallback。默认使用 SynchronousTaskExecutor,其效果是在同一线程中执行整个迭代(与普通 RepeatTemplate 相同)。

Implementations of RepeatOperations are not restricted to executing the callback sequentially. It is quite important that some implementations are able to execute their callbacks in parallel. To this end, Spring Batch provides the TaskExecutorRepeatTemplate, which uses the Spring TaskExecutor strategy to run the RepeatCallback. The default is to use a SynchronousTaskExecutor, which has the effect of executing the whole iteration in the same thread (the same as a normal RepeatTemplate).

Declarative Iteration

有时,有一些业务处理你知道每次发生时都希望重复执行。一个经典的示例是消息管道的优化。如果一批消息频繁到达,那么处理这些消息比为每条消息承担单独的事务开销更有效。为此,Spring Batch 提供了一个 AOP 拦截器,它将一个方法调用包装到一个 RepeatOperations 对象中。RepeatOperationsInterceptor 执行拦截的方法,并根据所提供 RepeatTemplate 中的 CompletionPolicy 进行重复。

Sometimes, there is some business processing that you know you want to repeat every time it happens. The classic example of this is the optimization of a message pipeline. If a batch of messages arrives frequently, it is more efficient to process them than to bear the cost of a separate transaction for every message. Spring Batch provides an AOP interceptor that wraps a method call in a RepeatOperations object for this purpose. The RepeatOperationsInterceptor executes the intercepted method and repeats according to the CompletionPolicy in the provided RepeatTemplate.

Java

下面的示例使用 Java 配置重复对一个名为 processMessage 的方法进行服务调用(有关如何配置 AOP 拦截器的更多详细信息,请参阅<<[role="bare"][role="bare"]https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#aop,Spring 用户指南>>):

The following example uses Java configuration to repeat a service call to a method called processMessage (for more detail on how to configure AOP interceptors, see the <<[role="bare"]https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#aop,Spring User Guide>>):

@Bean
public MyService myService() {
	ProxyFactory factory = new ProxyFactory(RepeatOperations.class.getClassLoader());
	factory.setInterfaces(MyService.class);
	factory.setTarget(new MyService());

	MyService service = (MyService) factory.getProxy();
	JdkRegexpMethodPointcut pointcut = new JdkRegexpMethodPointcut();
	pointcut.setPatterns(".*processMessage.*");

	RepeatOperationsInterceptor interceptor = new RepeatOperationsInterceptor();

	((Advised) service).addAdvisor(new DefaultPointcutAdvisor(pointcut, interceptor));

	return service;
}
XML

下面的示例展示了使用 Spring AOP 命名空间对一个名为 processMessage 的方法进行服务调用(有关如何配置 AOP 拦截器的更多详细信息,请参阅<<[role="bare"][role="bare"]https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#aop,Spring 用户指南>>):

The following example shows declarative iteration that uses the Spring AOP namespace to repeat a service call to a method called processMessage (for more detail on how to configure AOP interceptors, see the <<[role="bare"]https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#aop,Spring User Guide>>):

<aop:config>
    <aop:pointcut id="transactional"
        expression="execution(* com..*Service.processMessage(..))" />
    <aop:advisor pointcut-ref="transactional"
        advice-ref="retryAdvice" order="-1"/>
</aop:config>

<bean id="retryAdvice" class="org.spr...RepeatOperationsInterceptor"/>

前面的示例在拦截器内部使用了一个默认的 RepeatTemplate。为了更改策略、监听器和其他详细信息,你可以将 RepeatTemplate 实例注入拦截器中。

The preceding example uses a default RepeatTemplate inside the interceptor. To change the policies, listeners, and other details, you can inject an instance of RepeatTemplate into the interceptor.

如果拦截的方法返回 void,那么拦截器总会返回 RepeatStatus.CONTINUABLE(因此如果 CompletionPolicy 没有有限的终点,则有陷入无限循环的危险)。否则,它会返回 RepeatStatus.CONTINUABLE,直到来自拦截的方法的返回值为 null。在该点,它返回 RepeatStatus.FINISHED。因此,目标方法内的业务逻辑可以通过返回 null 或抛出所提供的 RepeatTemplateExceptionHandler 重新抛出的异常来表示没有更多工作要做。

If the intercepted method returns void, the interceptor always returns RepeatStatus.CONTINUABLE (so there is a danger of an infinite loop if the CompletionPolicy does not have a finite end point). Otherwise, it returns RepeatStatus.CONTINUABLE until the return value from the intercepted method is null. At that point, it returns RepeatStatus.FINISHED. Consequently, the business logic inside the target method can signal that there is no more work to do by returning null or by throwing an exception that is rethrown by the ExceptionHandler in the provided RepeatTemplate.