The Domain Language of Batch
对于任何一位经验丰富的批处理架构师来说,Spring Batch 中所使用的批处理的基本概念都应该是熟悉且得心应手的。有“作业”和“步骤”,以及开发者提供的称为 ItemReader
和 ItemWriter
的处理单元。然而,由于 Spring 模式、操作、模板、回调函数和习语,以下事项将有机会:
-
极大地改进了对职责明确分工的遵守。
-
清楚地划定了以接口形式提供的体系结构层和服务。
-
简单且默认的实现,允许快速采用和开箱即用。
-
Significantly enhanced extensibility.
下图是一个简化的批处理参考架构版本,该架构已使用数十年。它概述了构成批处理领域语言的组件。此架构框架是一个蓝图,它已通过对上一代平台(大型机上的 COBOL、Unix 上的 C 和现在任何地方的 Java)的数十年实施得到验证。JCL 和 COBOL 开发者可能会像 C、C# 和 Java 开发者一样熟悉这些概念。Spring Batch 提供了对在用于解决简单到复杂批处理应用程序的创建的健壮、可维护系统中通常发现的层、组件和技术服务的物理实现,以及用于解决非常复杂的处理需求的基础设施和扩展。
.Batch Stereotypes
image::spring-batch-reference-model.png[]
上图重点介绍了构成 Spring Batch 领域语言的关键概念。一个 “作业”包含一步或多步,每一步恰好有一个 ItemReader
、一个 ItemProcessor
和一个 ItemWriter
。一个作业需要启动(使用 JobLauncher
),并且需要存储有关当前运行过程的元数据(在 JobRepository
中)。
Job
本部分描述与批处理作业概念相关的陈规定型观念。“作业”是一个实体,它封装整个批处理过程。与其他 Spring 项目一样,“作业”通过 XML 配置文件或基于 Java 的配置连接在一起。此配置可以称为“作业配置”。但正如下图所示,“作业”仅仅是一个整体层次结构的顶部:
在 Spring Batch 中,“作业”仅仅是 Step
实例的容器。它将逻辑上属于一个流程中的多个步骤结合在一起,并且允许为所有步骤进行全局的属性配置,例如可重新启动性。作业配置包含:
-
作业名称。
-
定义和排列
Step
实例。 -
作业是否可重新启动。
- Java
-
对于使用 Java 配置的用户,Spring Batch 以
SimpleJob
类为形式提供了一个Job
接口的默认实现,它在Job
之上创建了一些标准功能。在使用基于 Java 的配置时,提供一组构建器供实例化一个Job
,如下面的例子所示:
@Bean
public Job footballJob(JobRepository jobRepository) {
return new JobBuilder("footballJob", jobRepository)
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.build();
}
- XML
-
对于使用 XML 配置的用户,Spring Batch 以
SimpleJob
类为形式提供了一个Job
接口的默认实现,它在Job
之上创建了一些标准功能。然而,批处理命名空间免除了直接实例化它的需要。相反,你可以使用<job>
元素,如下面的例子所示:
<job id="footballJob">
<step id="playerload" next="gameLoad"/>
<step id="gameLoad" next="playerSummarization"/>
<step id="playerSummarization"/>
</job>
JobInstance
“作业实例”指的是逻辑作业运行的概念。考虑一个每天结束时应该运行一次的批处理作业,例如前面图表中的 EndOfDay
作业。有一个 EndOfDay
作业,但是作业的每次单独运行都必须单独跟踪。在作业的情况下,每天有一个逻辑的 JobInstance
。例如,有一个 1 月 1 日的运行、一个 1 月 2 日的运行,依此类推。如果 1 月 1 日的运行第一次失败,并且第二天再次运行,它仍然是 1 月 1 日的运行。(通常,这也对应于它正在处理的数据,即 1 月 1 日的运行处理 1 月 1 日的数据)。因此,每个 JobInstance
可以有多个执行(JobExecution
将在本章后面进行更详细的讨论),并且给定时间只能运行一个 JobInstance
(对应于一个特定的 Job
和标识 JobParameters
)。
“作业实例”的定义绝对不会对要加载的数据产生影响。完全由 ItemReader
实现决定如何加载数据。例如,在 EndOfDay
场景中,数据上可能有一列表示数据所属的“生效日期”或“预定日期”。因此,1 月 1 日的运行将只加载 1 号的数据,而 1 月 2 日的运行将只使用 2 号的数据。因为此确定很可能是一项业务决策,所以由 ItemReader
决定。然而,使用同样的 JobInstance
决定是否使用以前执行的“状态”(即执行上下文,将在本章后面讨论)。使用一个新的 JobInstance
意味着“从头开始”,而使用现有的实例通常意味着“从中断处开始”。
JobParameters
在讨论了 JobInstance
及其与 Job
的不同之处之后,接下来自然而然就会问:“一个 JobInstance
如何与另一个 JobInstance
区别开来?”答案是:JobParameters
。JobParameters
对象包含用于启动一个批处理作业的一组参数。它们可以用于标识,甚至可以在运行期间作为参考数据,如下面的图片所示:
在前面的例子中,有两个实例,一个用于 1 月 1 日,另一个用于 1 月 2 日,但实际上只有一个 Job
,但它有两个 JobParameter
对象:一个以 2017 年 1 月 1 日的作业参数启动,另一个以 2017 年 1 月 2 日的参数启动。因此,可以将契约定义为:JobInstance
= Job
+ 标识 JobParameters
。这允许开发人员有效地控制如何定义一个 JobInstance
,因为他们控制传入哪些参数。
并非所有作业参数都需要有助于识别 |
JobExecution
“作业执行”指的是在技术上尝试运行作业一次。执行可能会以失败或成功而结束,但是对应于给定执行的 JobInstance
不被视为完整,除非执行成功完成。使用前面描述的 EndOfDay
作业作为示例,考虑一个 2017 年 1 月 1 日的 JobInstance
,它在第一次运行时失败。如果它使用与第一次运行相同的标识作业参数(2017 年 1 月 1 日)再次运行,则创建一个新的 JobExecution
。然而,仍然只有一个 JobInstance
。
“作业”定义了作业是什么以及如何执行作业,“作业实例”是一个纯组织对象,用于将执行分组在一起,主要是为了启用正确的重启语义。“作业执行”则是用于存储运行期间实际发生的事情的主要机制,并且包含许多必须控制和保留的更多属性,如下表所示:
Property |
Definition |
|
一个 |
|
一个 |
|
一个 |
|
一个 |
|
一个 |
|
一个 |
|
一个 “property bag” 包含需要在执行之间持久化的任何用户数据。 |
|
在 |
这些属性很重要,因为它们是持久的并且可以用来完全确定执行的状态。例如,如果 2017 年 1 月 1 日的 EndOfDay
作业在晚上 9:00 执行,并在 9:30 失败,则在批处理元数据表中将创建以下条目:
JOB_INST_ID |
JOB_NAME |
1 |
EndOfDayJob |
JOB_EXECUTION_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
IDENTIFYING |
1 |
DATE |
schedule.Date |
2017-01-01 |
TRUE |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
列名称可能已缩写或删除,以提高清晰度和格式。 |
现在作业已失败,假设问题确定花费了整晚的时间,因此“批处理窗口”现在已关闭。进一步假设窗口从晚上 9:00 开始,作业再次为 2017 年 1 月 1 日启动,从中断处开始,并在 9:30 成功完成。由于现在是第二天,因此 2017 年 1 月 2 日的作业也必须运行,并且它在 9:31 之后立即启动,并在正常的一小时时间内于 10:30 完成。除非两个作业有可能尝试访问相同的数据,从而导致数据库级别锁定问题,否则不需要在一个作业实例启动后立即启动另一个作业实例。由调度程序完全决定何时应该运行一个作业。由于它们是独立的 JobInstance
,因此 Spring Batch 不会尝试阻止它们同时运行。(尝试在另一个作业正在运行时运行同一个 JobInstance
将导致抛出 JobExecutionAlreadyRunningException
)。现在 JobInstance
和 JobParameters
表中应有两个额外的条目,并且 JobExecution
表中应有两个额外的条目,如下表所示:
JOB_INST_ID |
JOB_NAME |
1 |
EndOfDayJob |
2 |
EndOfDayJob |
JOB_EXECUTION_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
IDENTIFYING |
1 |
DATE |
schedule.Date |
2017-01-01 00:00:00 |
TRUE |
2 |
DATE |
schedule.Date |
2017-01-01 00:00:00 |
TRUE |
3 |
DATE |
schedule.Date |
2017-01-02 00:00:00 |
TRUE |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
2 |
1 |
2017-01-02 21:00 |
2017-01-02 21:30 |
COMPLETED |
3 |
2 |
2017-01-02 21:31 |
2017-01-02 22:29 |
COMPLETED |
列名称可能已缩写或删除,以提高清晰度和格式。 |
Step
“步骤”是一个领域对象,它封装了一个批处理作业的一个独立的、顺序的阶段。因此,每个“作业”完全由一个或多个步骤组成。“步骤”包含所有必要的信息来定义和控制实际的批处理。这是一个必然含糊的描述,因为任何给定“步骤”的内容由编写“作业”的开发者决定。“步骤”可以像开发者希望的那样简单或复杂。一个简单的“步骤”可能将数据从文件加载到数据库,不需要很少的代码(取决于所使用的实现)。一个更复杂的“步骤”可能应用复杂的业务规则作为处理的一部分。与“作业”一样,“步骤”有一个单独的 StepExecution
,它与一个唯一的 JobExecution
相关联,如下面的图片所示:
StepExecution
StepExecution
表示执行一个“步骤”的一次尝试。每次运行一个“步骤”时都会创建一个新的 StepExecution
,类似于 JobExecution
。但是,如果一个步骤由于前面的步骤失败而未能执行,则不会持久化执行。只有当步骤实际启动时才会创建一个 StepExecution
。
“步骤”执行由 StepExecution
类的对象表示。每个执行都包含对其对应的步骤和 JobExecution
以及事务相关数据的引用,例如提交和回滚计数以及开始和结束时间。此外,每个步骤执行都包含一个 ExecutionContext
,其中包含开发者需要在批处理运行中持久化的任何数据,例如重启所需的统计信息或状态信息。下表列出了 StepExecution
的属性:
Property |
Definition |
|
一个 |
|
A |
|
|
|
|
|
一个 “property bag” 包含需要在执行之间持久化的任何用户数据。 |
|
已成功读取的项数。 |
|
已成功写入的项数。 |
|
此执行已提交的事务数。 |
|
|
|
|
|
|
|
“filtered” 的项数。 |
|
|
ExecutionContext
ExecutionContext
表示由框架持久化和控制的一组键值对,为开发者提供了一个存储持久化状态的地方,该状态限定在 StepExecution
对象或 JobExecution
对象中。(对于熟悉 Quartz 的人来说,它非常类似于 JobDataMap
。)最好的使用方法示例是方便重启。以平面文件输入为例,在处理单个行时,框架会定期在提交点持久化 ExecutionContext
executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());
以“作业”类型详解部分的“EndOfDay”示例为例,假定有一个加载文件到数据库的步骤“loadData”。在第一次运行失败后,元数据表将如下示例所示:
JOB_INST_ID |
JOB_NAME |
1 |
EndOfDayJob |
JOB_INST_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
1 |
DATE |
schedule.Date |
2017-01-01 |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
STEP_EXEC_ID |
JOB_EXEC_ID |
STEP_NAME |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
loadData |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
STEP_EXEC_ID |
SHORT_CONTEXT |
1 |
{piece.count=40321} |
在前一个案例中,“Step”运行了 30 分钟,处理了 40,321 个“pieces”,这是此场景中文件中的行数。此值在每次提交前由框架更新,并且可以包含与“ExecutionContext”中条目相对应的多行。在提交前收到通知需要各种“StepListener”实现之一(或“ItemStream”),本指南的后面部分将详细讨论这些内容。与上一个示例一样,假定第二天重新启动“Job”。重新启动后,数据库中将重建“ExecutionContext”的上一次运行的值。当打开“ItemReader”时,它会检查上下文中是否有任何已存储状态,并由此处进行初始化,如以下示例所示:
if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
log.debug("Initializing for restart. Restart data is: " + executionContext);
long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));
LineReader reader = getReader();
Object record = "";
while (reader.getPosition() < lineCount && record != null) {
record = readLine();
}
}
在这种情况下,在上述代码运行后,当前行是 40,322,从而让“Step”可以从中断处重新开始。您还可以使用“ExecutionContext”获取需要关于运行本身持久化的统计信息。例如,如果平面文件中包含跨多行存在的待处理订单,那么存储已处理订单数(与读取的行数完全不同)可能是必要的,以便可以在“Step”结束时通过邮件发送邮件,邮件正文显示已处理订单总数。框架会处理此项操作的存储,以正确的范围将其与单个“JobInstance”关联。以下应该注意的是,有必要很难弄清楚是否应该使用现有的“ExecutionContext”。例如,使用上面的“EndOfDay”示例时,当 01-01 运行第二次重新启动时,框架会识别出这是同一个“JobInstance”,并且在单个“Step”基础上,从数据库中提取“ExecutionContext”,并将它(作为“StepExecution”的一部分)传递给“Step”本身。另一方面,对于 01-02 运行,框架会识别出这是另一个实例,因此必须将空上下文传递给“Step”。框架会为开发人员做出许多此类确定,以确保在正确时间将状态提供给开发人员。还需要注意的是,在任何给定时间,每个“StepExecution”都存在一个唯一的“ExecutionContext”。“ExecutionContext”客户端应该小心,因为这会创建一个共享键空间。因此,在放入值时应该谨慎,以确保不会覆盖任何数据。但是,“Step”完全不存储上下文中任何数据,因此没有办法对框架产生不利影响。
请注意,“JobExecution”中至少有一个“ExecutionContext”,每个“StepExecution”也有一个。例如,考虑以下代码段:
ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob
如注释中所述,“ecStep”不等于“ecJob”。它们是两个不同的“ExecutionContext”。限定到“Step”的“ExecutionContext”在“Step”的每个提交点保存,限定到作业的“ExecutionContext”在每个“Step”执行之间保存。
在 |
JobRepository
“JobRepository”是之前提到的所有类型的持久性机制。它为“JobLauncher”、“Job”和“Step”实现提供 CRUD 操作。首次启动“Job”时,从存储库获取“JobExecution”。此外,在执行过程中,“StepExecution”和“JobExecution”实现通过传递给存储库而持久化。
- Java
-
使用 Java 配置时,“@EnableBatchProcessing”注解提供“JobRepository”,作为自动配置的组件之一。
- XML
-
Spring Batch XML 命名空间支持通过“<job-repository>”标记配置“JobRepository”实例,如下示例所示:
<job-repository id="jobRepository"/>
JobLauncher
“JobLauncher”表示一个简单界面,用于使用给定的“JobParameters”集启动“Job”,如下示例所示:
public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
需要实现从“JobRepository”获取有效的“JobExecution”并执行“Job”。
ItemReader
`ItemReader`是一种抽象,它代表一次一个条目地检索 `Step`的输入。当 `ItemReader`用尽了它可以提供的所有条目,它通过返回 `null`来指示这一点。你可以在 Readers And Writers中找到有关 `ItemReader`界面及其各种实现的更多详细信息。
ItemWriter
`ItemWriter`是一种抽象,它代表一次一个批次或块输出给 `Step`的输出。通常,`ItemWriter`并不知道它接下来应该接收的输入,只知道在其当前调用中传入的条目。你可以在 Readers And Writers中找到有关 `ItemWriter`界面及其各种实现的更多详细信息。
ItemProcessor
`ItemProcessor`是一种抽象,它代表条目的业务处理。在 `ItemReader`读取一个条目,`ItemWriter`写入一个条目的同时,`ItemProcessor`提供一个访问点来转换或应用其他业务处理。如果在处理条目时,确定该条目无效,返回 `null`表示不应写出该条目。你可以在 Readers And Writers中找到有关 `ItemProcessor`界面的更多详细信息。
Batch Namespace
许多先前列出的域概念需要在 Spring “ApplicationContext” 中配置。虽然可以将上述界面的实现用于标准 Bean 定义,但已提供了命名空间以便于配置,如下示例所示:
<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch
https://www.springframework.org/schema/batch/spring-batch.xsd">
<job id="ioSampleJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
</tasklet>
</step>
</job>
</beans:beans>
只要声明了批次命名空间,就可以使用它其中的任何元素。你可以在 Configuring and Running a Job中找到有关配置作业的更多信息。你可以在 Configuring a Step中找到有关配置 `Step`的更多信息。