Common Batch Patterns

一些批处理作业完全可以从 Spring Batch 中的现成组件中组装而成。例如,可以将 ItemReaderItemWriter 实现配置为涵盖各种场景。但是,在大多情况下,必须编写自定义代码。应用程序开发人员的主要 API 入口点是 TaskletItemReaderItemWriter 和各种侦听器接口。大多数简单的批处理作业可以使用来自 Spring Batch ItemReader 的现成输入,但通常情况下,处理和写入中存在自定义问题,需要开发人员实现 ItemWriterItemProcessor

Some batch jobs can be assembled purely from off-the-shelf components in Spring Batch. For instance, the ItemReader and ItemWriter implementations can be configured to cover a wide range of scenarios. However, for the majority of cases, custom code must be written. The main API entry points for application developers are the Tasklet, the ItemReader, the ItemWriter, and the various listener interfaces. Most simple batch jobs can use off-the-shelf input from a Spring Batch ItemReader, but it is often the case that there are custom concerns in the processing and writing that require developers to implement an ItemWriter or ItemProcessor.

在本章中,我们提供了一些自定义业务逻辑中的常见模式示例。这些示例主要以侦听器接口为特色。应当注意,如果合适,ItemReaderItemWriter 也可以实现侦听器接口。

In this chapter, we provide a few examples of common patterns in custom business logic. These examples primarily feature the listener interfaces. It should be noted that an ItemReader or ItemWriter can implement a listener interface as well, if appropriate.

Logging Item Processing and Failures

一个常见的用例是对步骤中的错误逐项进行特殊处理的需要,例如记录到特殊渠道或将记录插入数据库。一个面向块的 Step(由步骤工厂 bean 创建)允许用户通过一个简单的 ItemReadListener 实现此用例,以针对 read 中的错误,以及一个 ItemWriteListener 以针对 write 中的错误。以下代码段演示了一个同时记录读取和写入失败的侦听器:

A common use case is the need for special handling of errors in a step, item by item, perhaps logging to a special channel or inserting a record into a database. A chunk-oriented Step (created from the step factory beans) lets users implement this use case with a simple ItemReadListener for errors on read and an ItemWriteListener for errors on write. The following code snippet illustrates a listener that logs both read and write failures:

public class ItemFailureLoggerListener extends ItemListenerSupport {

    private static Log logger = LogFactory.getLog("item.error");

    public void onReadError(Exception ex) {
        logger.error("Encountered error on read", e);
    }

    public void onWriteError(Exception ex, List<? extends Object> items) {
        logger.error("Encountered error on write", ex);
    }
}

实现此侦听器后,必须将其注册到一个步骤。

Having implemented this listener, it must be registered with a step.

Java

以下示例展示了如何在 Java 中向步骤注册侦听器:

The following example shows how to register a listener with a step Java:

Java Configuration
@Bean
public Step simpleStep(JobRepository jobRepository) {
	return new StepBuilder("simpleStep", jobRepository)
				...
				.listener(new ItemFailureLoggerListener())
				.build();
}
XML

以下示例展示了如何在 XML 中向步骤注册侦听器:

The following example shows how to register a listener with a step in XML:

XML Configuration
<step id="simpleStep">
...
<listeners>
    <listener>
        <bean class="org.example...ItemFailureLoggerListener"/>
    </listener>
</listeners>
</step>

如果侦听器在 onError() 方法中执行某些操作,则此操作必须在将回滚的事务内。如果您需要在 onError() 方法内使用事务资源(例如数据库),请考虑向该方法添加一个声明式事务(有关详细信息,请参阅 Spring Core 参考指南),并为其传播属性设定一个 REQUIRES_NEW 值。

if your listener does anything in an onError() method, it must be inside a transaction that is going to be rolled back. If you need to use a transactional resource, such as a database, inside an onError() method, consider adding a declarative transaction to that method (see Spring Core Reference Guide for details), and giving its propagation attribute a value of REQUIRES_NEW.

Stopping a Job Manually for Business Reasons

Spring Batch 通过 JobOperator 接口提供了 stop() 方法,但这实际上是供操作员而不是应用程序程序员使用的。有时,从业务逻辑中停止作业执行更加方便或更有意义。

Spring Batch provides a stop() method through the JobOperator interface, but this is really for use by the operator rather than the application programmer. Sometimes, it is more convenient or makes more sense to stop a job execution from within the business logic.

最简单的方法是抛出 RuntimeException(无限重试或跳过的除外)。例如,可以使用自定义异常类型,如下面的示例所示:

The simplest thing to do is to throw a RuntimeException (one that is neither retried indefinitely nor skipped). For example, a custom exception type could be used, as shown in the following example:

public class PoisonPillItemProcessor<T> implements ItemProcessor<T, T> {

    @Override
    public T process(T item) throws Exception {
        if (isPoisonPill(item)) {
            throw new PoisonPillException("Poison pill detected: " + item);
        }
        return item;
    }
}

另一种简单的停止步骤执行的方法是从 ItemReader 返回 null,如下面的示例所示:

Another simple way to stop a step from executing is to return null from the ItemReader, as shown in the following example:

public class EarlyCompletionItemReader implements ItemReader<T> {

    private ItemReader<T> delegate;

    public void setDelegate(ItemReader<T> delegate) { ... }

    public T read() throws Exception {
        T item = delegate.read();
        if (isEndItem(item)) {
            return null; // end the step here
        }
        return item;
    }

}

前面的示例实际上依赖于有一个默认的 CompletionPolicy 策略实现,当要处理的项目为 null 时,它会发出批处理完成信号。可以实现一个更高级的完成策略,并通过 SimpleStepFactoryBean 将其注入到 Step 中。

The previous example actually relies on the fact that there is a default implementation of the CompletionPolicy strategy that signals a complete batch when the item to be processed is null. A more sophisticated completion policy could be implemented and injected into the Step through the SimpleStepFactoryBean.

Java

以下示例展示了如何在 Java 中将完成策略注入到步骤中:

The following example shows how to inject a completion policy into a step in Java:

Java Configuration
@Bean
public Step simpleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("simpleStep", jobRepository)
				.<String, String>chunk(new SpecialCompletionPolicy(), transactionManager)
				.reader(reader())
				.writer(writer())
				.build();
}
XML

以下示例展示了如何在 XML 中将完成策略注入到步骤中:

The following example shows how to inject a completion policy into a step in XML:

XML Configuration
<step id="simpleStep">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"
               chunk-completion-policy="completionPolicy"/>
    </tasklet>
</step>

<bean id="completionPolicy" class="org.example...SpecialCompletionPolicy"/>

一种备选方法是在 StepExecution 中设置一个标记,该标记由框架中的 Step 实现检查,介于项目处理之间。为了实现这个备选方法,我们需要访问当前的 StepExecution,可以通过实现 StepListener 并将其注册为 Step,实现这一点。以下示例显示了一个设置标记的侦听器:

An alternative is to set a flag in the StepExecution, which is checked by the Step implementations in the framework in between item processing. To implement this alternative, we need access to the current StepExecution, and this can be achieved by implementing a StepListener and registering it with the Step. The following example shows a listener that sets the flag:

public class CustomItemWriter extends ItemListenerSupport implements StepListener {

    private StepExecution stepExecution;

    public void beforeStep(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }

    public void afterRead(Object item) {
        if (isPoisonPill(item)) {
            stepExecution.setTerminateOnly();
       }
    }

}

当标记设置时,步骤的默认行为是抛出 JobInterruptedException。此行为可以通过 StepInterruptionPolicy 控制。但是,唯一的选项是抛出或不抛出异常,因此这始终是作业的异常终止。

When the flag is set, the default behavior is for the step to throw a JobInterruptedException. This behavior can be controlled through the StepInterruptionPolicy. However, the only choice is to throw or not throw an exception, so this is always an abnormal ending to a job.

Adding a Footer Record

通常,当写入平面文件时,在所有处理完成之后,必须附加一个“页脚”记录到该文件的末尾。这可以通过 Spring Batch 提供的 FlatFileFooterCallback 接口实现。 FlatFileFooterCallback(及其对应部分 FlatFileHeaderCallback)是 FlatFileItemWriter 的可选属性,可以添加到项目编写器中。

Often, when writing to flat files, a “footer” record must be appended to the end of the file, after all processing has be completed. This can be achieved using the FlatFileFooterCallback interface provided by Spring Batch. The FlatFileFooterCallback (and its counterpart, the FlatFileHeaderCallback) are optional properties of the FlatFileItemWriter and can be added to an item writer.

Java

以下示例显示了如何在 Java 中使用 FlatFileHeaderCallbackFlatFileFooterCallback

The following example shows how to use the FlatFileHeaderCallback and the FlatFileFooterCallback in Java:

Java Configuration
@Bean
public FlatFileItemWriter<String> itemWriter(Resource outputResource) {
	return new FlatFileItemWriterBuilder<String>()
			.name("itemWriter")
			.resource(outputResource)
			.lineAggregator(lineAggregator())
			.headerCallback(headerCallback())
			.footerCallback(footerCallback())
			.build();
}
XML

以下示例显示了如何在 XML 中使用 FlatFileHeaderCallbackFlatFileFooterCallback

The following example shows how to use the FlatFileHeaderCallback and the FlatFileFooterCallback in XML:

XML Configuration
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="lineAggregator" ref="lineAggregator"/>
    <property name="headerCallback" ref="headerCallback" />
    <property name="footerCallback" ref="footerCallback" />
</bean>

页脚回调接口只有一个方法,该方法在必须写入页脚时调用,如下面的接口定义所示:

The footer callback interface has just one method that is called when the footer must be written, as shown in the following interface definition:

public interface FlatFileFooterCallback {

    void writeFooter(Writer writer) throws IOException;

}

Writing a Summary Footer

涉及页脚记录的一个常见要求是在输出过程中聚合信息,并将此信息附加到该文件末尾。此页脚通常充当文件的摘要或提供校验和。

A common requirement involving footer records is to aggregate information during the output process and to append this information to the end of the file. This footer often serves as a summarization of the file or provides a checksum.

例如,如果批处理作业正在将 Trade 记录写入平面文件,并且要求页脚中放置所有 Trades 的总金额,那么可以使用以下 ItemWriter 实现:

For example, if a batch job is writing Trade records to a flat file, and there is a requirement that the total amount from all the Trades is placed in a footer, then the following ItemWriter implementation can be used:

public class TradeItemWriter implements ItemWriter<Trade>,
                                        FlatFileFooterCallback {

    private ItemWriter<Trade> delegate;

    private BigDecimal totalAmount = BigDecimal.ZERO;

    public void write(Chunk<? extends Trade> items) throws Exception {
        BigDecimal chunkTotal = BigDecimal.ZERO;
        for (Trade trade : items) {
            chunkTotal = chunkTotal.add(trade.getAmount());
        }

        delegate.write(items);

        // After successfully writing all items
        totalAmount = totalAmount.add(chunkTotal);
    }

    public void writeFooter(Writer writer) throws IOException {
        writer.write("Total Amount Processed: " + totalAmount);
    }

    public void setDelegate(ItemWriter delegate) {...}
}

TradeItemWriter 存储一个 totalAmount 值,该值会从写入的每个 Trade 项目的 amount 中增加。在处理完最后一个 Trade 后,框架会调用 writeFooter,该方法将 totalAmount 放入文件中。请注意,write 方法使用了一个临时变量 chunkTotal,该变量存储区块中 Trade 金额的总和。这样做是为了确保,如果在 write 方法中发生跳过,totalAmount 不会改变。只有在 write 方法的末尾,一旦我们确信没有抛出任何异常,我们才会更新 totalAmount

This TradeItemWriter stores a totalAmount value that is increased with the amount from each Trade item written. After the last Trade is processed, the framework calls writeFooter, which puts the totalAmount into the file. Note that the write method makes use of a temporary variable, chunkTotal, that stores the total of the Trade amounts in the chunk. This is done to ensure that, if a skip occurs in the write method, the totalAmount is left unchanged. It is only at the end of the write method, once we are guaranteed that no exceptions are thrown, that we update the totalAmount.

为了调用 writeFooter 方法,必须将 TradeItemWriter(实现 FlatFileFooterCallback)作为 footerCallback 接入 FlatFileItemWriter 中。

In order for the writeFooter method to be called, the TradeItemWriter (which implements FlatFileFooterCallback) must be wired into the FlatFileItemWriter as the footerCallback.

Java

以下示例显示了如何在 Java 中接入 TradeItemWriter

The following example shows how to wire the TradeItemWriter in Java:

Java Configuration
@Bean
public TradeItemWriter tradeItemWriter() {
	TradeItemWriter itemWriter = new TradeItemWriter();

	itemWriter.setDelegate(flatFileItemWriter(null));

	return itemWriter;
}

@Bean
public FlatFileItemWriter<String> flatFileItemWriter(Resource outputResource) {
	return new FlatFileItemWriterBuilder<String>()
			.name("itemWriter")
			.resource(outputResource)
			.lineAggregator(lineAggregator())
			.footerCallback(tradeItemWriter())
			.build();
}
XML

以下示例显示了如何在 XML 中接入 TradeItemWriter

The following example shows how to wire the TradeItemWriter in XML:

XML Configuration
<bean id="tradeItemWriter" class="..TradeItemWriter">
    <property name="delegate" ref="flatFileItemWriter" />
</bean>

<bean id="flatFileItemWriter" class="org.spr...FlatFileItemWriter">
   <property name="resource" ref="outputResource" />
   <property name="lineAggregator" ref="lineAggregator"/>
   <property name="footerCallback" ref="tradeItemWriter" />
</bean>

迄今为止撰写的 TradeItemWriter 仅在 Step 不可重新启动时正常运行。这是因为该类是有状态的(因为它存储了 totalAmount),但是 totalAmount 不会持久保存到数据库中。因此,在重新启动时无法检索它。为了使该类可重新启动,应实现 ItemStream 接口以及方法 openupdate,如下面的示例所示:

The way that the TradeItemWriter has been written so far functions correctly only if the Step is not restartable. This is because the class is stateful (since it stores the totalAmount), but the totalAmount is not persisted to the database. Therefore, it cannot be retrieved in the event of a restart. In order to make this class restartable, the ItemStream interface should be implemented along with the methods open and update, as shown in the following example:

public void open(ExecutionContext executionContext) {
    if (executionContext.containsKey("total.amount") {
        totalAmount = (BigDecimal) executionContext.get("total.amount");
    }
}

public void update(ExecutionContext executionContext) {
    executionContext.put("total.amount", totalAmount);
}

update 方法在该对象持久保存到数据库之前将 totalAmount 的最新版本存储到 ExecutionContext。open 方法从 ExecutionContext 检索任何现有的 totalAmount 并将其用作处理的起点,从而允许 TradeItemWriter 在上次运行 Step 的位置重新启动。

The update method stores the most current version of totalAmount to the ExecutionContext just before that object is persisted to the database. The open method retrieves any existing totalAmount from the ExecutionContext and uses it as the starting point for processing, allowing the TradeItemWriter to pick up on restart where it left off the previous time the Step was run.

Driving Query Based ItemReaders

chapter on readers and writers中,讨论了使用分页的数据库输入。许多数据库供应商(例如 DB2)具有极度悲观的锁定策略,如果正在读取的表还需要在线应用程序的其他部分使用,则这些策略会导致问题。此外,在非常大的数据集上打开游标会导致某些供应商的数据库出现问题。因此,许多项目更喜欢使用“驱动查询”方法来读取数据。这种方法通过迭代键(而不是需要返回的整个对象)来工作,如下所示:

In the chapter on readers and writers, database input using paging was discussed. Many database vendors, such as DB2, have extremely pessimistic locking strategies that can cause issues if the table being read also needs to be used by other portions of the online application. Furthermore, opening cursors over extremely large datasets can cause issues on databases from certain vendors. Therefore, many projects prefer to use a 'Driving Query' approach to reading in data. This approach works by iterating over keys, rather than the entire object that needs to be returned, as the following image illustrates:

drivingQueryExample
Figure 1. Driving Query Job

如你所见,前图所示的示例使用与基于游标的示例中使用的相同“FOO”表。但不是选择整行,而只在 SQL 语句中选择了 ID。因此,不是从 read 中返回 FOO 对象,而是返回 Integer。此数字随后可用于查询“详情”,即一个完整的 Foo 对象,如下图所示:

As you can see, the example shown in the preceding image uses the same 'FOO' table as was used in the cursor-based example. However, rather than selecting the entire row, only the IDs were selected in the SQL statement. So, rather than a FOO object being returned from read, an Integer is returned. This number can then be used to query for the 'details', which is a complete Foo object, as shown in the following image:

drivingQueryJob
Figure 2. Driving Query Example

应使用 ItemProcessor 将从驱动查询中获取的键转换为一个完整的 Foo 对象。可以基于键使用现有的 DAO 来查询完整对象。

An ItemProcessor should be used to transform the key obtained from the driving query into a full Foo object. An existing DAO can be used to query for the full object based on the key.

Multi-Line Records

在平面文件中通常是每条记录都仅限于一行,但通常一个文件可能有多行,多种格式的记录。文件中的以下摘录显示了此类安排的一个示例:

While it is usually the case with flat files that each record is confined to a single line, it is common that a file might have records spanning multiple lines with multiple formats. The following excerpt from a file shows an example of such an arrangement:

HEA;0013100345;2007-02-15
NCU;Smith;Peter;;T;20014539;F
BAD;;Oak Street 31/A;;Small Town;00235;IL;US
FOT;2;2;267.34

以“HEA”开头的行和以“FOT”开头的行之间的所有内容都被认为是一条记录。为了正确处理此情况,必须考虑一些事项:

Everything between the line starting with 'HEA' and the line starting with 'FOT' is considered one record. There are a few considerations that must be made in order to handle this situation correctly:

  • Instead of reading one record at a time, the ItemReader must read every line of the multi-line record as a group, so that it can be passed to the ItemWriter intact.

  • Each line type may need to be tokenized differently.

由于一条记录跨越多行,并且我们可能不知道有多少行,因此 ItemReader 必须小心地始终读取整个记录。为了执行此操作,应将自定义 ItemReader 实现为 FlatFileItemReader 的包装器。

Because a single record spans multiple lines and because we may not know how many lines there are, the ItemReader must be careful to always read an entire record. In order to do this, a custom ItemReader should be implemented as a wrapper for the FlatFileItemReader.

Java

以下示例展示了在 Java 中实现自定义 ItemReader 的方法:

The following example shows how to implement a custom ItemReader in Java:

Java Configuration
@Bean
public MultiLineTradeItemReader itemReader() {
	MultiLineTradeItemReader itemReader = new MultiLineTradeItemReader();

	itemReader.setDelegate(flatFileItemReader());

	return itemReader;
}

@Bean
public FlatFileItemReader flatFileItemReader() {
	FlatFileItemReader<Trade> reader = new FlatFileItemReaderBuilder<>()
			.name("flatFileItemReader")
			.resource(new ClassPathResource("data/iosample/input/multiLine.txt"))
			.lineTokenizer(orderFileTokenizer())
			.fieldSetMapper(orderFieldSetMapper())
			.build();
	return reader;
}
XML

以下示例展示了在 XML 中实现自定义 ItemReader 的方法:

The following example shows how to implement a custom ItemReader in XML:

XML Configuration
<bean id="itemReader" class="org.spr...MultiLineTradeItemReader">
    <property name="delegate">
        <bean class="org.springframework.batch.item.file.FlatFileItemReader">
            <property name="resource" value="data/iosample/input/multiLine.txt" />
            <property name="lineMapper">
                <bean class="org.spr...DefaultLineMapper">
                    <property name="lineTokenizer" ref="orderFileTokenizer"/>
                    <property name="fieldSetMapper" ref="orderFieldSetMapper"/>
                </bean>
            </property>
        </bean>
    </property>
</bean>

为确保正确标记每一行(对固定长度的输入尤其重要),可以对委托 FlatFileItemReader 使用 PatternMatchingCompositeLineTokenizer。请参阅读者和作者章节中的 link:readersAndWriters.html#flatFileItemReader[FlatFileItemReader 了解更多详情。然后,委托读取器使用 PassThroughFieldSetMapper 将每行的 FieldSet 传送回包装 ItemReader

To ensure that each line is tokenized properly, which is especially important for fixed-length input, the PatternMatchingCompositeLineTokenizer can be used on the delegate FlatFileItemReader. See FlatFileItemReader in the Readers and Writers chapter for more details. The delegate reader then uses a PassThroughFieldSetMapper to deliver a FieldSet for each line back to the wrapping ItemReader.

Java

以下示例展示了如何在 Java 中确保每行都已正确标记化:

The following example shows how to ensure that each line is properly tokenized in Java:

Java Content
@Bean
public PatternMatchingCompositeLineTokenizer orderFileTokenizer() {
	PatternMatchingCompositeLineTokenizer tokenizer =
			new PatternMatchingCompositeLineTokenizer();

	Map<String, LineTokenizer> tokenizers = new HashMap<>(4);

	tokenizers.put("HEA*", headerRecordTokenizer());
	tokenizers.put("FOT*", footerRecordTokenizer());
	tokenizers.put("NCU*", customerLineTokenizer());
	tokenizers.put("BAD*", billingAddressLineTokenizer());

	tokenizer.setTokenizers(tokenizers);

	return tokenizer;
}
XML

以下示例展示了如何在 XML 中确保每行都已正确标记化:

The following example shows how to ensure that each line is properly tokenized in XML:

XML Content
<bean id="orderFileTokenizer" class="org.spr...PatternMatchingCompositeLineTokenizer">
    <property name="tokenizers">
        <map>
            <entry key="HEA*" value-ref="headerRecordTokenizer" />
            <entry key="FOT*" value-ref="footerRecordTokenizer" />
            <entry key="NCU*" value-ref="customerLineTokenizer" />
            <entry key="BAD*" value-ref="billingAddressLineTokenizer" />
        </map>
    </property>
</bean>

此包装器必须能够识别记录的结尾,以便它可以对其委托持续调用 read(),直至达到结尾处。对于读取的每一行,包装器应构建要返回的项。一旦达到页脚,就可以将项返回以传递到 ItemProcessorItemWriter,如下例所示:

This wrapper has to be able to recognize the end of a record so that it can continually call read() on its delegate until the end is reached. For each line that is read, the wrapper should build up the item to be returned. Once the footer is reached, the item can be returned for delivery to the ItemProcessor and ItemWriter, as shown in the following example:

private FlatFileItemReader<FieldSet> delegate;

public Trade read() throws Exception {
    Trade t = null;

    for (FieldSet line = null; (line = this.delegate.read()) != null;) {
        String prefix = line.readString(0);
        if (prefix.equals("HEA")) {
            t = new Trade(); // Record must start with header
        }
        else if (prefix.equals("NCU")) {
            Assert.notNull(t, "No header was found.");
            t.setLast(line.readString(1));
            t.setFirst(line.readString(2));
            ...
        }
        else if (prefix.equals("BAD")) {
            Assert.notNull(t, "No header was found.");
            t.setCity(line.readString(4));
            t.setState(line.readString(6));
          ...
        }
        else if (prefix.equals("FOT")) {
            return t; // Record must end with footer
        }
    }
    Assert.isNull(t, "No 'END' was found.");
    return null;
}

Executing System Commands

许多批处理作业要求在批处理作业中调用外部命令。调度程序可以单独启动这样一个进程,但运行的通用元数据优势将消失。此外,多步骤作业也需要分成多个作业。

Many batch jobs require that an external command be called from within the batch job. Such a process could be kicked off separately by the scheduler, but the advantage of common metadata about the run would be lost. Furthermore, a multi-step job would also need to be split up into multiple jobs as well.

由于这种需求很普遍,因此 Spring Batch 提供了一个供调用系统命令的 Tasklet 实现。

Because the need is so common, Spring Batch provides a Tasklet implementation for calling system commands.

Java

以下示例展示了如何在 Java 中调用外部命令:

The following example shows how to call an external command in Java:

Java Configuration
@Bean
public SystemCommandTasklet tasklet() {
	SystemCommandTasklet tasklet = new SystemCommandTasklet();

	tasklet.setCommand("echo hello");
	tasklet.setTimeout(5000);

	return tasklet;
}
XML

以下示例展示了如何在 XML 中调用外部命令方法:

The following example shows how to call an external command in XML:

XML Configuration
<bean class="org.springframework.batch.core.step.tasklet.SystemCommandTasklet">
    <property name="command" value="echo hello" />
    <!-- 5 second timeout for the command to complete -->
    <property name="timeout" value="5000" />
</bean>

Handling Step Completion When No Input is Found

在许多批处理场景中,在数据库或要处理的文件中找不到行并不是特例。Step 只被视为未找到任何工作并以读取 0 项完成。Spring Batch 中默认提供的 ItemReader 实现都采用此方法。如果什么都没有写出,即使存在输入(通常在文件被错误命名或出现一些类似问题时发生),也会造成一些混乱。出于此原因,应检查元数据本身以确定框架发现处理了多少工作。但是,如果找不到输入被视为异常情况怎么办?在这种情况下,以编程方式检查元数据以查找无已处理项并导致失败是最佳解决方案。由于这是一个常见的用例,Spring Batch 提供了一个具有该功能的侦听器,如 NoWorkFoundStepExecutionListener 类的定义所示:

In many batch scenarios, finding no rows in a database or file to process is not exceptional. The Step is simply considered to have found no work and completes with 0 items read. All of the ItemReader implementations provided out of the box in Spring Batch default to this approach. This can lead to some confusion if nothing is written out even when input is present (which usually happens if a file was misnamed or some similar issue arises). For this reason, the metadata itself should be inspected to determine how much work the framework found to be processed. However, what if finding no input is considered exceptional? In this case, programmatically checking the metadata for no items processed and causing failure is the best solution. Because this is a common use case, Spring Batch provides a listener with exactly this functionality, as shown in the class definition for NoWorkFoundStepExecutionListener:

public class NoWorkFoundStepExecutionListener extends StepExecutionListenerSupport {

    public ExitStatus afterStep(StepExecution stepExecution) {
        if (stepExecution.getReadCount() == 0) {
            return ExitStatus.FAILED;
        }
        return null;
    }

}

前述 StepExecutionListener 在“afterStep”阶段检查 StepExecutionreadCount 特性,以确定是否未读取任何项。如果是这种情况,则返回退出代码 FAILED,表示 Step 应当失败。否则,返回 null,这不会影响 Step 的状态。

The preceding StepExecutionListener inspects the readCount property of the StepExecution during the 'afterStep' phase to determine if no items were read. If that is the case, an exit code FAILED is returned, indicating that the Step should fail. Otherwise, null is returned, which does not affect the status of the Step.

Passing Data to Future Steps

经常需要将信息从一个步骤传递到另一个步骤。这可以通过 ExecutionContext 来实现。问题在于存在两个 ExecutionContexts:一个在 Step 级别,另一个在 Job 级别。Step ExecutionContext 仅保留一步之久,而 Job ExecutionContext 贯穿整个 Job。另一方面,Step ExecutionContext 在每次 Step 提交一个块时都会更新,而 Job ExecutionContext 仅在每个 Step 的末尾更新。

It is often useful to pass information from one step to another. This can be done through the ExecutionContext. The catch is that there are two ExecutionContexts: one at the Step level and one at the Job level. The Step ExecutionContext remains only as long as the step, while the Job ExecutionContext remains through the whole Job. On the other hand, the Step ExecutionContext is updated every time the Step commits a chunk, while the Job ExecutionContext is updated only at the end of each Step.

此分离导致的后果是所有数据都必须在 Step 执行时放在 Step ExecutionContext 中。这样做可确保在 Step 运行时数据得到正确存储。如果将数据存储到 Job ExecutionContext,则在 Step 执行期间该数据不会持续存在。如果 Step 失败,则该数据会丢失。

The consequence of this separation is that all data must be placed in the Step ExecutionContext while the Step is executing. Doing so ensures that the data is stored properly while the Step runs. If data is stored to the Job ExecutionContext, then it is not persisted during Step execution. If the Step fails, that data is lost.

public class SavingItemWriter implements ItemWriter<Object> {
    private StepExecution stepExecution;

    public void write(Chunk<? extends Object> items) throws Exception {
        // ...

        ExecutionContext stepContext = this.stepExecution.getExecutionContext();
        stepContext.put("someKey", someObject);
    }

    @BeforeStep
    public void saveStepExecution(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }
}

为了使数据可供将来的 Steps 使用,必须在步骤完成后将其“提升”到 Job ExecutionContext。Spring Batch 为此提供了 ExecutionContextPromotionListener。侦听器必须配置为与其 ExecutionContext 中数据相关的键,必须提升这些键。它还可以选择配置一组退出代码模式,从而使提升得以发生(COMPLETED 是默认值)。与所有侦听器一样,它必须在 Step 上注册。

To make the data available to future Steps, it must be “promoted” to the Job ExecutionContext after the step has finished. Spring Batch provides the ExecutionContextPromotionListener for this purpose. The listener must be configured with the keys related to the data in the ExecutionContext that must be promoted. It can also, optionally, be configured with a list of exit code patterns for which the promotion should occur (COMPLETED is the default). As with all listeners, it must be registered on the Step.

Java

以下示例展示了如何在 Java 中将步骤提升到 Job ExecutionContext

The following example shows how to promote a step to the Job ExecutionContext in Java:

Java Configuration
@Bean
public Job job1(JobRepository jobRepository, Step step1, Step step2) {
	return new JobBuilder("job1", jobRepository)
				.start(step1)
				.next(step2)
				.build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(reader())
				.writer(savingWriter())
				.listener(promotionListener())
				.build();
}

@Bean
public ExecutionContextPromotionListener promotionListener() {
	ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();

	listener.setKeys(new String[] {"someKey"});

	return listener;
}
XML

以下示例展示了如何在 XML 中将步骤提升到 Job ExecutionContext

The following example shows how to promote a step to the Job ExecutionContext in XML:

XML Configuration
<job id="job1">
    <step id="step1">
        <tasklet>
            <chunk reader="reader" writer="savingWriter" commit-interval="10"/>
        </tasklet>
        <listeners>
            <listener ref="promotionListener"/>
        </listeners>
    </step>

    <step id="step2">
       ...
    </step>
</job>

<beans:bean id="promotionListener" class="org.spr....ExecutionContextPromotionListener">
    <beans:property name="keys">
        <list>
            <value>someKey</value>
        </list>
    </beans:property>
</beans:bean>

最后,必须从 Job ExecutionContext 中检索已保存的值,如下例所示:

Finally, the saved values must be retrieved from the Job ExecutionContext, as shown in the following example:

public class RetrievingItemWriter implements ItemWriter<Object> {
    private Object someObject;

    public void write(Chunk<? extends Object> items) throws Exception {
        // ...
    }

    @BeforeStep
    public void retrieveInterstepData(StepExecution stepExecution) {
        JobExecution jobExecution = stepExecution.getJobExecution();
        ExecutionContext jobContext = jobExecution.getExecutionContext();
        this.someObject = jobContext.get("someKey");
    }
}