Intercepting Step Execution

Job 一样,在 Step 执行期间会发生许多事件,用户可能需要执行一些功能。例如,要写出到需要页脚的平面文件,当 Step 完成时就需要通知 ItemWriter,以便可以写入页脚。可以使用许多 Step 作用域监听器中的一个来实现这一目的。

Just as with the Job, there are many events during the execution of a Step where a user may need to perform some functionality. For example, to write out to a flat file that requires a footer, the ItemWriter needs to be notified when the Step has been completed so that the footer can be written. This can be accomplished with one of many Step scoped listeners.

您可以将任何实现了 StepListener 的扩展的类(但不要实现该接口本身,因为它本身为空)应用于一个步骤,通过 listeners 元素。listeners 元素在步骤、tasklet 或块声明中有效。我们建议您在该功能适用的级别声明监听器,或者(如果它是多功能的,例如 StepExecutionListenerItemReadListener)在它适用的最精细级别声明它。

You can apply any class that implements one of the extensions of StepListener (but not that interface itself, since it is empty) to a step through the listeners element. The listeners element is valid inside a step, tasklet, or chunk declaration. We recommend that you declare the listeners at the level at which its function applies or, if it is multi-featured (such as StepExecutionListener and ItemReadListener), declare it at the most granular level where it applies.

Java

以下示例展示了在 Java 中以块级别应用的监听器:

The following example shows a listener applied at the chunk level in Java:

Java Configuration
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(reader())
				.writer(writer())
				.listener(chunkListener())
				.build();
}
XML

以下示例展示了在 XML 中以块级别应用的监听器:

The following example shows a listener applied at the chunk level in XML:

XML Configuration
<step id="step1">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"/>
        <listeners>
            <listener ref="chunkListener"/>
        </listeners>
    </tasklet>
</step>

一个`ItemReader`、ItemWriter`或`ItemProcessor`它本身实现了 `StepListener`接口,如果使用名称空间 `<step> 元素或其中之一,就自动用`Step` 注册 *StepFactoryBean factory。这仅适用于直接注入 Step`中的组件。如果监听器嵌套在另一个组件内部,你需要明确地注册它(如之前在Registering `ItemStream with a Step中所述)。

An ItemReader, ItemWriter, or ItemProcessor that itself implements one of the StepListener interfaces is registered automatically with the Step if using the namespace <step> element or one of the *StepFactoryBean factories. This only applies to components directly injected into the Step. If the listener is nested inside another component, you need to explicitly register it (as described previously under Registering ItemStream with a Step).

除了 StepListener 接口之外,还提供了注解以解决同样的问题。纯旧 Java 对象可以使用这些注解来提供方法,然后将这些方法转换为相应 StepListener 类型。注解块组件的自定义实现(例如,ItemReaderItemWriterTasklet)也很常见。注解由 XML 解析器分析,用于 <listener/> 元素,并使用生成器中的 listener 方法注册,因此您只需要使用 XML 名称空间或生成器来使用步骤注册监听器即可。

In addition to the StepListener interfaces, annotations are provided to address the same concerns. Plain old Java objects can have methods with these annotations that are then converted into the corresponding StepListener type. It is also common to annotate custom implementations of chunk components, such as ItemReader or ItemWriter or Tasklet. The annotations are analyzed by the XML parser for the <listener/> elements as well as registered with the listener methods in the builders, so all you need to do is use the XML namespace or builders to register the listeners with a step.

StepExecutionListener

StepExecutionListener 代表了 Step 执行的最普通监听器。它允许在 Step 启动之前和结束后发送通知(无论它是正常结束还是失败),如下例所示:

StepExecutionListener represents the most generic listener for Step execution. It allows for notification before a Step is started and after it ends, whether it ended normally or failed, as the following example shows:

public interface StepExecutionListener extends StepListener {

    void beforeStep(StepExecution stepExecution);

    ExitStatus afterStep(StepExecution stepExecution);

}

ExitStatus 拥有 afterStep 的返回类型,让监听器有机会修改在完成 Step 时返回的退出代码。

ExitStatus has a return type of afterStep, to give listeners the chance to modify the exit code that is returned upon completion of a Step.

与该接口对应的注解是:

The annotations corresponding to this interface are:

  • @BeforeStep

  • @AfterStep

ChunkListener

chunk`" 定义为在一个事务范围内处理的项。在每次提交间隔提交事务时,都会提交一个块。您可以使用 `ChunkListener 在块开始处理之前或在块成功完成之后执行逻辑,如下面的接口定义所示:

A “chunk” is defined as the items processed within the scope of a transaction. Committing a transaction, at each commit interval, commits a chunk. You can use a ChunkListener to perform logic before a chunk begins processing or after a chunk has completed successfully, as the following interface definition shows:

public interface ChunkListener extends StepListener {

    void beforeChunk(ChunkContext context);
    void afterChunk(ChunkContext context);
    void afterChunkError(ChunkContext context);

}

beforeChunk 方法在事务启动后但在 ItemReader 上开始读取之前调用。反之,afterChunk 在块已提交(或在回滚时根本未提交)后调用。

The beforeChunk method is called after the transaction is started but before reading begins on the ItemReader. Conversely, afterChunk is called after the chunk has been committed (or not at all if there is a rollback).

与该接口对应的注解是:

The annotations corresponding to this interface are:

  • @BeforeChunk

  • @AfterChunk

  • @AfterChunkError

当没有块声明时,您可以应用 ChunkListenerTaskletStep 负责调用 ChunkListener,因此它也适用于非面向项 tasklet(它在 tasklet 之前和之后调用)。

You can apply a ChunkListener when there is no chunk declaration. The TaskletStep is responsible for calling the ChunkListener, so it applies to a non-item-oriented tasklet as well (it is called before and after the tasklet).

ItemReadListener

在前面讨论跳过逻辑时,曾提到可能记录跳过的记录是有益的,以便以后可以对其进行处理。对于读取错误,这可以使用 ItemReaderListener 完成,如下接口定义所示:

When discussing skip logic previously, it was mentioned that it may be beneficial to log the skipped records so that they can be dealt with later. In the case of read errors, this can be done with an ItemReaderListener, as the following interface definition shows:

public interface ItemReadListener<T> extends StepListener {

    void beforeRead();
    void afterRead(T item);
    void onReadError(Exception ex);

}

beforeRead 方法在每次对 ItemReader 调用的读取之前调用。afterRead 方法在每次对读取的成功调用后调用,并传入已读取的项目。如果在读取时发生错误,则调用 onReadError 方法。提供了遇到的异常,以便可以对它进行记录。

The beforeRead method is called before each call to read on the ItemReader. The afterRead method is called after each successful call to read and is passed the item that was read. If there was an error while reading, the onReadError method is called. The exception encountered is provided so that it can be logged.

与该接口对应的注解是:

The annotations corresponding to this interface are:

  • @BeforeRead

  • @AfterRead

  • @OnReadError

ItemProcessListener

ItemReadListener 一样,可以“监听”对项目的处理,如下接口定义所示:

As with the ItemReadListener, the processing of an item can be “listened” to, as the following interface definition shows:

public interface ItemProcessListener<T, S> extends StepListener {

    void beforeProcess(T item);
    void afterProcess(T item, S result);
    void onProcessError(T item, Exception e);

}

beforeProcess 方法在 ItemProcessor 上的 process 之前调用,并处理要处理的项目。afterProcess 方法在项目成功处理后调用。如果在处理时发生错误,则调用 onProcessError 方法。提供了遇到的异常和尝试处理的项目,以便可以对它们进行记录。

The beforeProcess method is called before process on the ItemProcessor and is handed the item that is to be processed. The afterProcess method is called after the item has been successfully processed. If there was an error while processing, the onProcessError method is called. The exception encountered and the item that was attempted to be processed are provided, so that they can be logged.

与该接口对应的注解是:

The annotations corresponding to this interface are:

  • @BeforeProcess

  • @AfterProcess

  • @OnProcessError

ItemWriteListener

您可以使用 ItemWriteListener 监听对项目的写入,如下接口定义所示:

You can “listen” to the writing of an item with the ItemWriteListener, as the following interface definition shows:

public interface ItemWriteListener<S> extends StepListener {

    void beforeWrite(List<? extends S> items);
    void afterWrite(List<? extends S> items);
    void onWriteError(Exception exception, List<? extends S> items);

}

beforeWrite 方法在 ItemWriter 上的 write 之前调用,并处理写入的项目列表。afterWrite 方法在项目成功写入后调用。如果在写入时发生错误,则调用 onWriteError 方法。提供了遇到的异常和尝试写入的项目,以便可以对它们进行记录。

The beforeWrite method is called before write on the ItemWriter and is handed the list of items that is written. The afterWrite method is called after the item has been successfully written. If there was an error while writing, the onWriteError method is called. The exception encountered and the item that was attempted to be written are provided, so that they can be logged.

与该接口对应的注解是:

The annotations corresponding to this interface are:

  • @BeforeWrite

  • @AfterWrite

  • @OnWriteError

SkipListener

ItemReadListenerItemProcessListenerItemWriteListener 都提供了用于获知错误的机制,但没有一个通知您记录实际上已被跳过。例如,即使重试并成功了一项目,也会调用 onWriteError。因此,有一个单独的用于跟踪跳过项目的接口,如下接口定义所示:

ItemReadListener, ItemProcessListener, and ItemWriteListener all provide mechanisms for being notified of errors, but none informs you that a record has actually been skipped. onWriteError, for example, is called even if an item is retried and successful. For this reason, there is a separate interface for tracking skipped items, as the following interface definition shows:

public interface SkipListener<T,S> extends StepListener {

    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);

}

每当在读取过程中跳过一个项目时,就会调用 onSkipInRead。应该注意的是,回滚可能导致同一个项目被多次注册为已跳过。在写文件时跳过一个项目时调用 onSkipInWrite。由于该项目已成功读取(没有被跳过),因此它还提供项目本身作为参数。

onSkipInRead is called whenever an item is skipped while reading. It should be noted that rollbacks may cause the same item to be registered as skipped more than once. onSkipInWrite is called when an item is skipped while writing. Because the item has been read successfully (and not skipped), it is also provided the item itself as an argument.

与该接口对应的注解是:

The annotations corresponding to this interface are:

  • @OnSkipInRead

  • @OnSkipInWrite

  • @OnSkipInProcess

SkipListeners and Transactions

SkipListener 最常见的用例之一是记录跳过的项目,以便可以使用另一个批处理甚至人工处理来评估并解决导致跳过的错误。由于存在许多可能回滚原始交易的情况,因此 Spring Batch 做出两个保证:

One of the most common use cases for a SkipListener is to log out a skipped item, so that another batch process or even human process can be used to evaluate and fix the issue that leads to the skip. Because there are many cases in which the original transaction may be rolled back, Spring Batch makes two guarantees:

  • The appropriate skip method (depending on when the error happened) is called only once per item.

  • The SkipListener is always called just before the transaction is committed. This is to ensure that any transactional resources call by the listener are not rolled back by a failure within the ItemWriter.