Creating Custom ItemReaders and ItemWriters
到目前为止,本章讨论了在 Spring Batch 中读写的基本契约以及执行此任务的一些常见实现。但是,这些都是相当通用的,并且有许多潜在的场景可能不被开箱即用的实现所涵盖。本节通过一个简单的示例展示如何创建自定义 ItemReader
和 ItemWriter
实现并正确实现它们的契约。ItemReader
还实现了 ItemStream
,以说明如何使读取器或写入器可重新启动。
So far, this chapter has discussed the basic contracts of reading and writing in Spring
Batch and some common implementations for doing so. However, these are all fairly
generic, and there are many potential scenarios that may not be covered by out-of-the-box
implementations. This section shows, by using a simple example, how to create a custom
ItemReader
and ItemWriter
implementation and implement their contracts correctly. The
ItemReader
also implements ItemStream
, in order to illustrate how to make a reader or
writer restartable.
Custom ItemReader
Example
为了本示例的目的,我们创建了一个简单的 ItemReader
实现,从提供的列表中读取。我们首先实现最基本的 ItemReader
契约,也就是 read
方法,如下面的代码所示:
For the purpose of this example, we create a simple ItemReader
implementation that
reads from a provided list. We start by implementing the most basic contract of
ItemReader
, the read
method, as shown in the following code:
public class CustomItemReader<T> implements ItemReader<T> {
List<T> items;
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
NonTransientResourceException, ParseException {
if (!items.isEmpty()) {
return items.remove(0);
}
return null;
}
}
前面的类获取一个项目列表并一次返回它们,从列表中删除每个项目。当列表为空时,它返回 null
,因此满足了 ItemReader
的最基本要求,如下面的测试代码所示:
The preceding class takes a list of items and returns them one at a time, removing each
from the list. When the list is empty, it returns null
, thus satisfying the most basic
requirements of an ItemReader
, as illustrated in the following test code:
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());
Making the ItemReader
Restartable
最后的挑战是使 ItemReader
可重新启动。目前,如果处理中断并重新开始,则 ItemReader
必须从头开始。这在许多情况下实际上是有效的,但是有时希望批处理作业从中断的地方重新开始。关键区别通常在于读取器是有状态还是无状态。无状态读取器无需担心可重新启动性,但是有状态读取器在重新启动时必须尝试重建其最后已知的状态。因此,我们建议您尽可能将自定义读取器保持为无状态的,这样您无需担心可重新启动性。
The final challenge is to make the ItemReader
restartable. Currently, if processing is
interrupted and begins again, the ItemReader
must start at the beginning. This is
actually valid in many scenarios, but it is sometimes preferable that a batch job
restarts where it left off. The key discriminant is often whether the reader is stateful
or stateless. A stateless reader does not need to worry about restartability, but a
stateful one has to try to reconstitute its last known state on restart. For this reason,
we recommend that you keep custom readers stateless if possible, so you need not worry
about restartability.
如果您确实需要存储状态,则应使用 ItemStream
接口:
If you do need to store state, then the ItemStream
interface should be used:
public class CustomItemReader<T> implements ItemReader<T>, ItemStream {
List<T> items;
int currentIndex = 0;
private static final String CURRENT_INDEX = "current.index";
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {
if (currentIndex < items.size()) {
return items.get(currentIndex++);
}
return null;
}
public void open(ExecutionContext executionContext) throws ItemStreamException {
if (executionContext.containsKey(CURRENT_INDEX)) {
currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
}
else {
currentIndex = 0;
}
}
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
}
public void close() throws ItemStreamException {}
}
每次调用 ItemStream
update
方法时,ItemReader
的当前索引都会存储在提供的 ExecutionContext
中,其键为 “current.index”。当调用 ItemStream
open
方法时,会检查 ExecutionContext
以查看它是否包含带有该键的条目。如果找到该键,则当前索引将移动到该位置。这是一个非常简单的示例,但它仍然满足一般契约:
On each call to the ItemStream
update
method, the current index of the ItemReader
is stored in the provided ExecutionContext
with a key of 'current.index'. When the
ItemStream
open
method is called, the ExecutionContext
is checked to see if it
contains an entry with that key. If the key is found, then the current index is moved to
that location. This is a fairly trivial example, but it still meets the general contract:
ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);
((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());
大多数 ItemReaders
具有更加复杂的重新启动逻辑。例如,JdbcCursorItemReader
存储光标中最后处理的行行的行 ID。
Most ItemReaders
have much more sophisticated restart logic. The
JdbcCursorItemReader
, for example, stores the row ID of the last processed row in the
cursor.
同样值得注意的是,在 ExecutionContext
中使用的键不应该是微不足道的。这是因为相同的 ExecutionContext
用于 Step
中的所有 ItemStreams
。在大多数情况下,只需用类名作为键的前缀就足以确保唯一性。但是,在极少数情况下,在同一步骤中使用了两种相同类型的 ItemStream
(如果需要两个文件用于输出的情况可能会发生这种情况),则需要一个更唯一的名称。出于此原因,许多 Spring Batch ItemReader
和 ItemWriter
实现都有一个 setName()
属性,该属性允许覆盖此键名。
It is also worth noting that the key used within the ExecutionContext
should not be
trivial. That is because the same ExecutionContext
is used for all ItemStreams
within
a Step
. In most cases, simply prepending the key with the class name should be enough
to guarantee uniqueness. However, in the rare cases where two of the same type of
ItemStream
are used in the same step (which can happen if two files are needed for
output), a more unique name is needed. For this reason, many of the Spring Batch
ItemReader
and ItemWriter
implementations have a setName()
property that lets this
key name be overridden.
Custom ItemWriter
Example
实现自定义 ItemWriter
在许多方面与上面的 ItemReader
示例类似,但在足够多的方面上有所不同,足以保证其自己的示例。但是,添加可重新启动性本质上是相同的,因此在本示例中未涵盖它。与 ItemReader
示例一样,为了使示例尽可能简单,这里使用了 List
:
Implementing a Custom ItemWriter
is similar in many ways to the ItemReader
example
above but differs in enough ways as to warrant its own example. However, adding
restartability is essentially the same, so it is not covered in this example. As with the
ItemReader
example, a List
is used in order to keep the example as simple as
possible:
public class CustomItemWriter<T> implements ItemWriter<T> {
List<T> output = TransactionAwareProxyFactory.createTransactionalList();
public void write(Chunk<? extends T> items) throws Exception {
output.addAll(items);
}
public List<T> getOutput() {
return output;
}
}
Making the ItemWriter
Restartable
为了使 ItemWriter
可重新启动,我们要遵照 ItemReader
的相同过程,添加并实现 ItemStream
接口来同步执行上下文。在示例中,我们可能必须计算已处理的项数,并将其作为页脚记录添加。如果我们需要执行此操作,我们可以在 ItemWriter
中实现 ItemStream
,以便在重新打开流的情况下从执行上下文中重构计数器。
To make the ItemWriter
restartable, we would follow the same process as for the
ItemReader
, adding and implementing the ItemStream
interface to synchronize the
execution context. In the example, we might have to count the number of items processed
and add that as a footer record. If we needed to do that, we could implement
ItemStream
in our ItemWriter
so that the counter was reconstituted from the execution
context if the stream was re-opened.
在许多实际情况下,自定义 ItemWriters
也委托给可重新启动的另一个编写器(例如,写入文件时),否则它会写入事务性资源,因此无需重新启动,因为它无状态。当您有状态编写器时,您可能应该确保实现 ItemStream
以及 ItemWriter
。还要记住,编写器的客户端需要了解 ItemStream
,因此您可能需要在配置中将其注册为流。
In many realistic cases, custom ItemWriters
also delegate to another writer that itself
is restartable (for example, when writing to a file), or else it writes to a
transactional resource and so does not need to be restartable, because it is stateless.
When you have a stateful writer you should probably be sure to implement ItemStream
as
well as ItemWriter
. Remember also that the client of the writer needs to be aware of
the ItemStream
, so you may need to register it as a stream in the configuration.