Creating Custom ItemReaders and ItemWriters

到目前为止,本章讨论了在 Spring Batch 中读写的基本契约以及执行此任务的一些常见实现。但是,这些都是相当通用的,并且有许多潜在的场景可能不被开箱即用的实现所涵盖。本节通过一个简单的示例展示如何创建自定义 ItemReaderItemWriter 实现并正确实现它们的契约。ItemReader 还实现了 ItemStream,以说明如何使读取器或写入器可重新启动。

So far, this chapter has discussed the basic contracts of reading and writing in Spring Batch and some common implementations for doing so. However, these are all fairly generic, and there are many potential scenarios that may not be covered by out-of-the-box implementations. This section shows, by using a simple example, how to create a custom ItemReader and ItemWriter implementation and implement their contracts correctly. The ItemReader also implements ItemStream, in order to illustrate how to make a reader or writer restartable.

Custom ItemReader Example

为了本示例的目的,我们创建了一个简单的 ItemReader 实现,从提供的列表中读取。我们首先实现最基本的 ItemReader 契约,也就是 read 方法,如下面的代码所示:

For the purpose of this example, we create a simple ItemReader implementation that reads from a provided list. We start by implementing the most basic contract of ItemReader, the read method, as shown in the following code:

public class CustomItemReader<T> implements ItemReader<T> {

    List<T> items;

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
       NonTransientResourceException, ParseException {

        if (!items.isEmpty()) {
            return items.remove(0);
        }
        return null;
    }
}

前面的类获取一个项目列表并一次返回它们,从列表中删除每个项目。当列表为空时,它返回 null,因此满足了 ItemReader 的最基本要求,如下面的测试代码所示:

The preceding class takes a list of items and returns them one at a time, removing each from the list. When the list is empty, it returns null, thus satisfying the most basic requirements of an ItemReader, as illustrated in the following test code:

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");

ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());

Making the ItemReader Restartable

最后的挑战是使 ItemReader 可重新启动。目前,如果处理中断并重新开始,则 ItemReader 必须从头开始。这在许多情况下实际上是有效的,但是有时希望批处理作业从中断的地方重新开始。关键区别通常在于读取器是有状态还是无状态。无状态读取器无需担心可重新启动性,但是有状态读取器在重新启动时必须尝试重建其最后已知的状态。因此,我们建议您尽可能将自定义读取器保持为无状态的,这样您无需担心可重新启动性。

The final challenge is to make the ItemReader restartable. Currently, if processing is interrupted and begins again, the ItemReader must start at the beginning. This is actually valid in many scenarios, but it is sometimes preferable that a batch job restarts where it left off. The key discriminant is often whether the reader is stateful or stateless. A stateless reader does not need to worry about restartability, but a stateful one has to try to reconstitute its last known state on restart. For this reason, we recommend that you keep custom readers stateless if possible, so you need not worry about restartability.

如果您确实需要存储状态,则应使用 ItemStream 接口:

If you do need to store state, then the ItemStream interface should be used:

public class CustomItemReader<T> implements ItemReader<T>, ItemStream {

    List<T> items;
    int currentIndex = 0;
    private static final String CURRENT_INDEX = "current.index";

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
        ParseException, NonTransientResourceException {

        if (currentIndex < items.size()) {
            return items.get(currentIndex++);
        }

        return null;
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if (executionContext.containsKey(CURRENT_INDEX)) {
            currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
        }
        else {
            currentIndex = 0;
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
    }

    public void close() throws ItemStreamException {}
}

每次调用 ItemStream update 方法时,ItemReader 的当前索引都会存储在提供的 ExecutionContext 中,其键为 “current.index”。当调用 ItemStream open 方法时,会检查 ExecutionContext 以查看它是否包含带有该键的条目。如果找到该键,则当前索引将移动到该位置。这是一个非常简单的示例,但它仍然满足一般契约:

On each call to the ItemStream update method, the current index of the ItemReader is stored in the provided ExecutionContext with a key of 'current.index'. When the ItemStream open method is called, the ExecutionContext is checked to see if it contains an entry with that key. If the key is found, then the current index is moved to that location. This is a fairly trivial example, but it still meets the general contract:

ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);

((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());

大多数 ItemReaders 具有更加复杂的重新启动逻辑。例如,JdbcCursorItemReader 存储光标中最后处理的行行的行 ID。

Most ItemReaders have much more sophisticated restart logic. The JdbcCursorItemReader, for example, stores the row ID of the last processed row in the cursor.

同样值得注意的是,在 ExecutionContext 中使用的键不应该是微不足道的。这是因为相同的 ExecutionContext 用于 Step 中的所有 ItemStreams。在大多数情况下,只需用类名作为键的前缀就足以确保唯一性。但是,在极少数情况下,在同一步骤中使用了两种相同类型的 ItemStream(如果需要两个文件用于输出的情况可能会发生这种情况),则需要一个更唯一的名称。出于此原因,许多 Spring Batch ItemReaderItemWriter 实现都有一个 setName() 属性,该属性允许覆盖此键名。

It is also worth noting that the key used within the ExecutionContext should not be trivial. That is because the same ExecutionContext is used for all ItemStreams within a Step. In most cases, simply prepending the key with the class name should be enough to guarantee uniqueness. However, in the rare cases where two of the same type of ItemStream are used in the same step (which can happen if two files are needed for output), a more unique name is needed. For this reason, many of the Spring Batch ItemReader and ItemWriter implementations have a setName() property that lets this key name be overridden.

Custom ItemWriter Example

实现自定义 ItemWriter 在许多方面与上面的 ItemReader 示例类似,但在足够多的方面上有所不同,足以保证其自己的示例。但是,添加可重新启动性本质上是相同的,因此在本示例中未涵盖它。与 ItemReader 示例一样,为了使示例尽可能简单,这里使用了 List

Implementing a Custom ItemWriter is similar in many ways to the ItemReader example above but differs in enough ways as to warrant its own example. However, adding restartability is essentially the same, so it is not covered in this example. As with the ItemReader example, a List is used in order to keep the example as simple as possible:

public class CustomItemWriter<T> implements ItemWriter<T> {

    List<T> output = TransactionAwareProxyFactory.createTransactionalList();

    public void write(Chunk<? extends T> items) throws Exception {
        output.addAll(items);
    }

    public List<T> getOutput() {
        return output;
    }
}

Making the ItemWriter Restartable

为了使 ItemWriter 可重新启动,我们要遵照 ItemReader 的相同过程,添加并实现 ItemStream 接口来同步执行上下文。在示例中,我们可能必须计算已处理的项数,并将其作为页脚记录添加。如果我们需要执行此操作,我们可以在 ItemWriter 中实现 ItemStream,以便在重新打开流的情况下从执行上下文中重构计数器。

To make the ItemWriter restartable, we would follow the same process as for the ItemReader, adding and implementing the ItemStream interface to synchronize the execution context. In the example, we might have to count the number of items processed and add that as a footer record. If we needed to do that, we could implement ItemStream in our ItemWriter so that the counter was reconstituted from the execution context if the stream was re-opened.

在许多实际情况下,自定义 ItemWriters 也委托给可重新启动的另一个编写器(例如,写入文件时),否则它会写入事务性资源,因此无需重新启动,因为它无状态。当您有状态编写器时,您可能应该确保实现 ItemStream 以及 ItemWriter。还要记住,编写器的客户端需要了解 ItemStream,因此您可能需要在配置中将其注册为流。

In many realistic cases, custom ItemWriters also delegate to another writer that itself is restartable (for example, when writing to a file), or else it writes to a transactional resource and so does not need to be restartable, because it is stateless. When you have a stateful writer you should probably be sure to implement ItemStream as well as ItemWriter. Remember also that the client of the writer needs to be aware of the ItemStream, so you may need to register it as a stream in the configuration.