Item processing
ItemReader and ItemWriter interfaces对于它们特定的任务都非常有用,但是如果你想在写入之前插入业务逻辑怎么办?读写的一个选项是使用复合模式:创建一个 ItemWriter`包含另一个 `ItemWriter`或一个 `ItemReader`包含另一个 `ItemReader
。以下代码展示了一个示例:
public class CompositeItemWriter<T> implements ItemWriter<T> {
ItemWriter<T> itemWriter;
public CompositeItemWriter(ItemWriter<T> itemWriter) {
this.itemWriter = itemWriter;
}
public void write(Chunk<? extends T> items) throws Exception {
//Add business logic here
itemWriter.write(items);
}
public void setDelegate(ItemWriter<T> itemWriter){
this.itemWriter = itemWriter;
}
}
前面的类包含另一个 ItemWriter
,它在提供了一些业务逻辑之后对其进行委托。此模式也可以轻松地用于 ItemReader
,可能基于主 ItemReader
提供的输入获取更多引用数据。如果您需要自己控制对 write
的调用,它也很有用。但是,如果您只是希望在实际写入之前“转换”传入以进行写入的项目,则不必自己 write
。您只需修改该项即可。对于此场景,Spring Batch 提供了 ItemProcessor
接口,如下面的接口定义所示:
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
ItemProcessor
很简单。给定一个对象,对其进行转换并返回另一个对象。提供对象可以是相同类型,也可以不是相同类型。关键在于可以在进程中应用业务逻辑,并且创建该逻辑完全由开发人员决定。ItemProcessor
可以直接连接到步骤。例如,假设 ItemReader
提供 Foo
类型的类,并且在写出之前需要将其转换为 Bar
类型。以下示例显示了执行转换的 ItemProcessor
:
public class Foo {}
public class Bar {
public Bar(Foo foo) {}
}
public class FooProcessor implements ItemProcessor<Foo, Bar> {
public Bar process(Foo foo) throws Exception {
//Perform simple transformation, convert a Foo to a Bar
return new Bar(foo);
}
}
public class BarWriter implements ItemWriter<Bar> {
public void write(Chunk<? extends Bar> bars) throws Exception {
//write bars
}
}
在前面的示例中,有一个名为 Foo
的类,一个名为 Bar
的类,以及一个名为 FooProcessor
的类,该类遵守 ItemProcessor
接口。转换很简单,但是可以在此处进行任何类型的转换。BarWriter
编写 Bar
对象,如果提供了任何其他类型,则抛出异常。同样,如果提供了除 Foo
之外的任何内容,FooProcessor
也会抛出异常。然后可以将 FooProcessor
注入到 Step
中,如下面的示例所示:
-
Java
-
XML
@Bean
public Job ioSampleJob(JobRepository jobRepository, Step step1) {
return new JobBuilder("ioSampleJob", jobRepository)
.start(step1)
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<Foo, Bar>chunk(2, transactionManager)
.reader(fooReader())
.processor(fooProcessor())
.writer(barWriter())
.build();
}
<job id="ioSampleJob">
<step name="step1">
<tasklet>
<chunk reader="fooReader" processor="fooProcessor" writer="barWriter"
commit-interval="2"/>
</tasklet>
</step>
</job>
ItemProcessor
与 ItemReader
或 ItemWriter
之间的一个区别在于,ItemProcessor
对于 Step
是可选的。
Chaining ItemProcessors
在许多场景中,执行单个转换很有用,但是如果您想将多个 ItemProcessor
实现“链接”在一起,该怎么办?您可以使用前面提到的复合模式来做到这一点。为了更新前面的单个转换示例,Foo
转换为 Bar
,而 Bar
转换为 Foobar
并写出,如下面的示例所示:
public class Foo {}
public class Bar {
public Bar(Foo foo) {}
}
public class Foobar {
public Foobar(Bar bar) {}
}
public class FooProcessor implements ItemProcessor<Foo, Bar> {
public Bar process(Foo foo) throws Exception {
//Perform simple transformation, convert a Foo to a Bar
return new Bar(foo);
}
}
public class BarProcessor implements ItemProcessor<Bar, Foobar> {
public Foobar process(Bar bar) throws Exception {
return new Foobar(bar);
}
}
public class FoobarWriter implements ItemWriter<Foobar>{
public void write(Chunk<? extends Foobar> items) throws Exception {
//write items
}
}
可以将 FooProcessor
和 BarProcessor
'链接' 在一起以给出 Foobar
结果,如以下示例所示:
CompositeItemProcessor<Foo,Foobar> compositeProcessor =
new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();
itemProcessors.add(new FooProcessor());
itemProcessors.add(new BarProcessor());
compositeProcessor.setDelegates(itemProcessors);
与前面的示例一样,您可以将复合处理器配置到 Step
中:
-
Java
-
XML
@Bean
public Job ioSampleJob(JobRepository jobRepository, Step step1) {
return new JobBuilder("ioSampleJob", jobRepository)
.start(step1)
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<Foo, Foobar>chunk(2, transactionManager)
.reader(fooReader())
.processor(compositeProcessor())
.writer(foobarWriter())
.build();
}
@Bean
public CompositeItemProcessor compositeProcessor() {
List<ItemProcessor> delegates = new ArrayList<>(2);
delegates.add(new FooProcessor());
delegates.add(new BarProcessor());
CompositeItemProcessor processor = new CompositeItemProcessor();
processor.setDelegates(delegates);
return processor;
}
<job id="ioSampleJob">
<step name="step1">
<tasklet>
<chunk reader="fooReader" processor="compositeItemProcessor" writer="foobarWriter"
commit-interval="2"/>
</tasklet>
</step>
</job>
<bean id="compositeItemProcessor"
class="org.springframework.batch.item.support.CompositeItemProcessor">
<property name="delegates">
<list>
<bean class="..FooProcessor" />
<bean class="..BarProcessor" />
</list>
</property>
</bean>
Filtering Records
项目处理器的典型用途之一是在将记录传递给 ItemWriter
之前将其过滤掉。过滤是一个不同于跳过的操作。跳过表示记录无效,而过滤表示不应写入记录。
例如,考虑一个读取包含三种不同类型的记录的文件的批处理作业:要插入的记录、要更新的记录和要删除的记录。如果系统不支持记录删除,则我们不希望将任何可删除记录发送到 ItemWriter
。但是,由于这些记录实际上不是错误记录,因此我们希望对其进行筛选而不是跳过它们。结果,ItemWriter
将仅接收可插入和可更新记录。
要筛选记录,您可以从 ItemProcessor
返回 null
。框架检测到结果为 null
,并避免将该项目添加到传递给 ItemWriter
的记录列表中。从 ItemProcessor
引发的异常导致跳过。
Validating Input
ItemReaders and ItemWriters章节讨论了多种解析输入的方法。每个主要的实现如果 “well formed.” 都会抛出异常。FixedLengthTokenizer
在缺少数据范围时会抛出异常。类似地,尝试访问 RowMapper`或 `FieldSetMapper`中不存在的索引或与预期不同的索引会导致抛出异常。所有这些类型的异常都在 `read
返回之前抛出。但是,它们并没有解决已返回项目是否有效的问题。例如,如果某个字段是年龄,则不能为负。它可以正确解析,因为它存在并且是一个数字,但它不会导致异常。由于已经有很多验证框架,Spring Batch 不会尝试再提供一个。相反,它提供了一个简单的接口,称为 Validator
,你可以通过任意数量的框架来实现它,如下面的接口定义所示:
public interface Validator<T> {
void validate(T value) throws ValidationException;
}
契约是,如果对象无效,validate
方法会抛出异常;如果对象有效,则正常返回。Spring Batch 提供了一个 ValidatingItemProcessor
,如下面的 bean 定义所示:
-
Java
-
XML
@Bean
public ValidatingItemProcessor itemProcessor() {
ValidatingItemProcessor processor = new ValidatingItemProcessor();
processor.setValidator(validator());
return processor;
}
@Bean
public SpringValidator validator() {
SpringValidator validator = new SpringValidator();
validator.setValidator(new TradeValidator());
return validator;
}
<bean class="org.springframework.batch.item.validator.ValidatingItemProcessor">
<property name="validator" ref="validator" />
</bean>
<bean id="validator" class="org.springframework.batch.item.validator.SpringValidator">
<property name="validator">
<bean class="org.springframework.batch.samples.domain.trade.internal.validator.TradeValidator"/>
</property>
</bean>
您还可以使用 BeanValidatingItemProcessor
来验证用 Bean Validation API (JSR-303) 注释注释的项目。例如,考虑以下类型 Person
:
class Person {
@NotEmpty
private String name;
public Person(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
您可以通过在应用程序上下文中声明 BeanValidatingItemProcessor
bean 并将其注册为分块导向步骤中的处理器来验证项目:
@Bean
public BeanValidatingItemProcessor<Person> beanValidatingItemProcessor() throws Exception {
BeanValidatingItemProcessor<Person> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
beanValidatingItemProcessor.setFilter(true);
return beanValidatingItemProcessor;
}