Creating Custom ItemReaders and ItemWriters
到目前为止,本章讨论了在 Spring Batch 中读写的基本契约以及执行此任务的一些常见实现。但是,这些都是相当通用的,并且有许多潜在的场景可能不被开箱即用的实现所涵盖。本节通过一个简单的示例展示如何创建自定义 ItemReader
和 ItemWriter
实现并正确实现它们的契约。ItemReader
还实现了 ItemStream
,以说明如何使读取器或写入器可重新启动。
Custom ItemReader
Example
为了本示例的目的,我们创建了一个简单的 ItemReader
实现,从提供的列表中读取。我们首先实现最基本的 ItemReader
契约,也就是 read
方法,如下面的代码所示:
public class CustomItemReader<T> implements ItemReader<T> {
List<T> items;
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
NonTransientResourceException, ParseException {
if (!items.isEmpty()) {
return items.remove(0);
}
return null;
}
}
前面的类获取一个项目列表并一次返回它们,从列表中删除每个项目。当列表为空时,它返回 null
,因此满足了 ItemReader
的最基本要求,如下面的测试代码所示:
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());
Making the ItemReader
Restartable
最后的挑战是使 ItemReader
可重新启动。目前,如果处理中断并重新开始,则 ItemReader
必须从头开始。这在许多情况下实际上是有效的,但是有时希望批处理作业从中断的地方重新开始。关键区别通常在于读取器是有状态还是无状态。无状态读取器无需担心可重新启动性,但是有状态读取器在重新启动时必须尝试重建其最后已知的状态。因此,我们建议您尽可能将自定义读取器保持为无状态的,这样您无需担心可重新启动性。
如果您确实需要存储状态,则应使用 ItemStream
接口:
public class CustomItemReader<T> implements ItemReader<T>, ItemStream {
List<T> items;
int currentIndex = 0;
private static final String CURRENT_INDEX = "current.index";
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {
if (currentIndex < items.size()) {
return items.get(currentIndex++);
}
return null;
}
public void open(ExecutionContext executionContext) throws ItemStreamException {
if (executionContext.containsKey(CURRENT_INDEX)) {
currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
}
else {
currentIndex = 0;
}
}
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
}
public void close() throws ItemStreamException {}
}
每次调用 ItemStream
update
方法时,ItemReader
的当前索引都会存储在提供的 ExecutionContext
中,其键为 “current.index”。当调用 ItemStream
open
方法时,会检查 ExecutionContext
以查看它是否包含带有该键的条目。如果找到该键,则当前索引将移动到该位置。这是一个非常简单的示例,但它仍然满足一般契约:
ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);
((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());
大多数 ItemReaders
具有更加复杂的重新启动逻辑。例如,JdbcCursorItemReader
存储光标中最后处理的行行的行 ID。
同样值得注意的是,在 ExecutionContext
中使用的键不应该是微不足道的。这是因为相同的 ExecutionContext
用于 Step
中的所有 ItemStreams
。在大多数情况下,只需用类名作为键的前缀就足以确保唯一性。但是,在极少数情况下,在同一步骤中使用了两种相同类型的 ItemStream
(如果需要两个文件用于输出的情况可能会发生这种情况),则需要一个更唯一的名称。出于此原因,许多 Spring Batch ItemReader
和 ItemWriter
实现都有一个 setName()
属性,该属性允许覆盖此键名。
Custom ItemWriter
Example
实现自定义 ItemWriter
在许多方面与上面的 ItemReader
示例类似,但在足够多的方面上有所不同,足以保证其自己的示例。但是,添加可重新启动性本质上是相同的,因此在本示例中未涵盖它。与 ItemReader
示例一样,为了使示例尽可能简单,这里使用了 List
:
public class CustomItemWriter<T> implements ItemWriter<T> {
List<T> output = TransactionAwareProxyFactory.createTransactionalList();
public void write(Chunk<? extends T> items) throws Exception {
output.addAll(items);
}
public List<T> getOutput() {
return output;
}
}
Making the ItemWriter
Restartable
为了使 ItemWriter
可重新启动,我们要遵照 ItemReader
的相同过程,添加并实现 ItemStream
接口来同步执行上下文。在示例中,我们可能必须计算已处理的项数,并将其作为页脚记录添加。如果我们需要执行此操作,我们可以在 ItemWriter
中实现 ItemStream
,以便在重新打开流的情况下从执行上下文中重构计数器。
在许多实际情况下,自定义 ItemWriters
也委托给可重新启动的另一个编写器(例如,写入文件时),否则它会写入事务性资源,因此无需重新启动,因为它无状态。当您有状态编写器时,您可能应该确保实现 ItemStream
以及 ItemWriter
。还要记住,编写器的客户端需要了解 ItemStream
,因此您可能需要在配置中将其注册为流。