Intercepting Step Execution

Job 一样,在 Step 执行期间会发生许多事件,用户可能需要执行一些功能。例如,要写出到需要页脚的平面文件,当 Step 完成时就需要通知 ItemWriter,以便可以写入页脚。可以使用许多 Step 作用域监听器中的一个来实现这一目的。 您可以将任何实现了 StepListener 的扩展的类(但不要实现该接口本身,因为它本身为空)应用于一个步骤,通过 listeners 元素。listeners 元素在步骤、tasklet 或块声明中有效。我们建议您在该功能适用的级别声明监听器,或者(如果它是多功能的,例如 StepExecutionListenerItemReadListener)在它适用的最精细级别声明它。

Java

以下示例展示了在 Java 中以块级别应用的监听器:

Java Configuration
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(reader())
				.writer(writer())
				.listener(chunkListener())
				.build();
}
XML

以下示例展示了在 XML 中以块级别应用的监听器:

XML Configuration
<step id="step1">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"/>
        <listeners>
            <listener ref="chunkListener"/>
        </listeners>
    </tasklet>
</step>

一个`ItemReader`、ItemWriter`或`ItemProcessor`它本身实现了 `StepListener`接口,如果使用名称空间 `<step> 元素或其中之一,就自动用`Step` 注册 *StepFactoryBean factory。这仅适用于直接注入 Step`中的组件。如果监听器嵌套在另一个组件内部,你需要明确地注册它(如之前在Registering `ItemStream with a Step中所述)。 除了 StepListener 接口之外,还提供了注解以解决同样的问题。纯旧 Java 对象可以使用这些注解来提供方法,然后将这些方法转换为相应 StepListener 类型。注解块组件的自定义实现(例如,ItemReaderItemWriterTasklet)也很常见。注解由 XML 解析器分析,用于 <listener/> 元素,并使用生成器中的 listener 方法注册,因此您只需要使用 XML 名称空间或生成器来使用步骤注册监听器即可。

StepExecutionListener

StepExecutionListener 代表了 Step 执行的最普通监听器。它允许在 Step 启动之前和结束后发送通知(无论它是正常结束还是失败),如下例所示:

public interface StepExecutionListener extends StepListener {

    void beforeStep(StepExecution stepExecution);

    ExitStatus afterStep(StepExecution stepExecution);

}

ExitStatus 拥有 afterStep 的返回类型,让监听器有机会修改在完成 Step 时返回的退出代码。

与该接口对应的注解是:

  • @BeforeStep

  • @AfterStep

ChunkListener

chunk`" 定义为在一个事务范围内处理的项。在每次提交间隔提交事务时,都会提交一个块。您可以使用 `ChunkListener 在块开始处理之前或在块成功完成之后执行逻辑,如下面的接口定义所示:

public interface ChunkListener extends StepListener {

    void beforeChunk(ChunkContext context);
    void afterChunk(ChunkContext context);
    void afterChunkError(ChunkContext context);

}

beforeChunk 方法在事务启动后但在 ItemReader 上开始读取之前调用。反之,afterChunk 在块已提交(或在回滚时根本未提交)后调用。

与该接口对应的注解是:

  • @BeforeChunk

  • @AfterChunk

  • @AfterChunkError

当没有块声明时,您可以应用 ChunkListenerTaskletStep 负责调用 ChunkListener,因此它也适用于非面向项 tasklet(它在 tasklet 之前和之后调用)。

ItemReadListener

在前面讨论跳过逻辑时,曾提到可能记录跳过的记录是有益的,以便以后可以对其进行处理。对于读取错误,这可以使用 ItemReaderListener 完成,如下接口定义所示:

public interface ItemReadListener<T> extends StepListener {

    void beforeRead();
    void afterRead(T item);
    void onReadError(Exception ex);

}

beforeRead 方法在每次对 ItemReader 调用的读取之前调用。afterRead 方法在每次对读取的成功调用后调用,并传入已读取的项目。如果在读取时发生错误,则调用 onReadError 方法。提供了遇到的异常,以便可以对它进行记录。

与该接口对应的注解是:

  • @BeforeRead

  • @AfterRead

  • @OnReadError

ItemProcessListener

ItemReadListener 一样,可以“监听”对项目的处理,如下接口定义所示:

public interface ItemProcessListener<T, S> extends StepListener {

    void beforeProcess(T item);
    void afterProcess(T item, S result);
    void onProcessError(T item, Exception e);

}

beforeProcess 方法在 ItemProcessor 上的 process 之前调用,并处理要处理的项目。afterProcess 方法在项目成功处理后调用。如果在处理时发生错误,则调用 onProcessError 方法。提供了遇到的异常和尝试处理的项目,以便可以对它们进行记录。

与该接口对应的注解是:

  • @BeforeProcess

  • @AfterProcess

  • @OnProcessError

ItemWriteListener

您可以使用 ItemWriteListener 监听对项目的写入,如下接口定义所示:

public interface ItemWriteListener<S> extends StepListener {

    void beforeWrite(List<? extends S> items);
    void afterWrite(List<? extends S> items);
    void onWriteError(Exception exception, List<? extends S> items);

}

beforeWrite 方法在 ItemWriter 上的 write 之前调用,并处理写入的项目列表。afterWrite 方法在项目成功写入后调用。如果在写入时发生错误,则调用 onWriteError 方法。提供了遇到的异常和尝试写入的项目,以便可以对它们进行记录。

与该接口对应的注解是:

  • @BeforeWrite

  • @AfterWrite

  • @OnWriteError

SkipListener

ItemReadListenerItemProcessListenerItemWriteListener 都提供了用于获知错误的机制,但没有一个通知您记录实际上已被跳过。例如,即使重试并成功了一项目,也会调用 onWriteError。因此,有一个单独的用于跟踪跳过项目的接口,如下接口定义所示:

public interface SkipListener<T,S> extends StepListener {

    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);

}

每当在读取过程中跳过一个项目时,就会调用 onSkipInRead。应该注意的是,回滚可能导致同一个项目被多次注册为已跳过。在写文件时跳过一个项目时调用 onSkipInWrite。由于该项目已成功读取(没有被跳过),因此它还提供项目本身作为参数。

与该接口对应的注解是:

  • @OnSkipInRead

  • @OnSkipInWrite

  • @OnSkipInProcess

SkipListeners and Transactions

SkipListener 最常见的用例之一是记录跳过的项目,以便可以使用另一个批处理甚至人工处理来评估并解决导致跳过的错误。由于存在许多可能回滚原始交易的情况,因此 Spring Batch 做出两个保证:

  • 合适的跳过方法(取决于错误发生的时间)仅在每个项目中调用一次。

  • SkipListener 总是getTransaction()在提交事务之前调用。这是为了确保由侦听器调用的任何事务性资源都不会因 ItemWriter 中的故障而回滚。