The Domain Language of Batch

对于任何一位经验丰富的批处理架构师来说,Spring Batch 中所使用的批处理的基本概念都应该是熟悉且得心应手的。有“作业”和“步骤”,以及开发者提供的称为 ItemReaderItemWriter 的处理单元。然而,由于 Spring 模式、操作、模板、回调函数和习语,以下事项将有机会:

  • 极大地改进了对职责明确分工的遵守。

  • 清楚地划定了以接口形式提供的体系结构层和服务。

  • 简单且默认的实现,允许快速采用和开箱即用。

  • Significantly enhanced extensibility.

下图是一个简化的批处理参考架构版本,该架构已使用数十年。它概述了构成批处理领域语言的组件。此架构框架是一个蓝图,它已通过对上一代平台(大型机上的 COBOL、Unix 上的 C 和现在任何地方的 Java)的数十年实施得到验证。JCL 和 COBOL 开发者可能会像 C、C# 和 Java 开发者一样熟悉这些概念。Spring Batch 提供了对在用于解决简单到复杂批处理应用程序的创建的健壮、可维护系统中通常发现的层、组件和技术服务的物理实现,以及用于解决非常复杂的处理需求的基础设施和扩展。 .Batch Stereotypes image::spring-batch-reference-model.png[] 上图重点介绍了构成 Spring Batch 领域语言的关键概念。一个 “作业”包含一步或多步,每一步恰好有一个 ItemReader、一个 ItemProcessor 和一个 ItemWriter。一个作业需要启动(使用 JobLauncher),并且需要存储有关当前运行过程的元数据(在 JobRepository 中)。

Job

本部分描述与批处理作业概念相关的陈规定型观念。“作业”是一个实体,它封装整个批处理过程。与其他 Spring 项目一样,“作业”通过 XML 配置文件或基于 Java 的配置连接在一起。此配置可以称为“作业配置”。但正如下图所示,“作业”仅仅是一个整体层次结构的顶部:

job heirarchy
Figure 1. Job Hierarchy

在 Spring Batch 中,“作业”仅仅是 Step 实例的容器。它将逻辑上属于一个流程中的多个步骤结合在一起,并且允许为所有步骤进行全局的属性配置,例如可重新启动性。作业配置包含:

  • 作业名称。

  • 定义和排列 Step 实例。

  • 作业是否可重新启动。

Java

对于使用 Java 配置的用户,Spring Batch 以 SimpleJob 类为形式提供了一个 Job 接口的默认实现,它在 Job 之上创建了一些标准功能。在使用基于 Java 的配置时,提供一组构建器供实例化一个 Job,如下面的例子所示:

@Bean
public Job footballJob(JobRepository jobRepository) {
    return new JobBuilder("footballJob", jobRepository)
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .build();
}
XML

对于使用 XML 配置的用户,Spring Batch 以 SimpleJob 类为形式提供了一个 Job 接口的默认实现,它在 Job 之上创建了一些标准功能。然而,批处理命名空间免除了直接实例化它的需要。相反,你可以使用 <job> 元素,如下面的例子所示:

<job id="footballJob">
    <step id="playerload" next="gameLoad"/>
    <step id="gameLoad" next="playerSummarization"/>
    <step id="playerSummarization"/>
</job>

JobInstance

“作业实例”指的是逻辑作业运行的概念。考虑一个每天结束时应该运行一次的批处理作业,例如前面图表中的 EndOfDay 作业。有一个 EndOfDay 作业,但是作业的每次单独运行都必须单独跟踪。在作业的情况下,每天有一个逻辑的 JobInstance。例如,有一个 1 月 1 日的运行、一个 1 月 2 日的运行,依此类推。如果 1 月 1 日的运行第一次失败,并且第二天再次运行,它仍然是 1 月 1 日的运行。(通常,这也对应于它正在处理的数据,即 1 月 1 日的运行处理 1 月 1 日的数据)。因此,每个 JobInstance 可以有多个执行(JobExecution 将在本章后面进行更详细的讨论),并且给定时间只能运行一个 JobInstance(对应于一个特定的 Job 和标识 JobParameters)。

“作业实例”的定义绝对不会对要加载的数据产生影响。完全由 ItemReader 实现决定如何加载数据。例如,在 EndOfDay 场景中,数据上可能有一列表示数据所属的“生效日期”或“预定日期”。因此,1 月 1 日的运行将只加载 1 号的数据,而 1 月 2 日的运行将只使用 2 号的数据。因为此确定很可能是一项业务决策,所以由 ItemReader 决定。然而,使用同样的 JobInstance 决定是否使用以前执行的“状态”(即执行上下文,将在本章后面讨论)。使用一个新的 JobInstance 意味着“从头开始”,而使用现有的实例通常意味着“从中断处开始”。

JobParameters

在讨论了 JobInstance 及其与 Job 的不同之处之后,接下来自然而然就会问:“一个 JobInstance 如何与另一个 JobInstance 区别开来?”答案是:JobParametersJobParameters 对象包含用于启动一个批处理作业的一组参数。它们可以用于标识,甚至可以在运行期间作为参考数据,如下面的图片所示:

job stereotypes parameters
Figure 2. Job Parameters

在前面的例子中,有两个实例,一个用于 1 月 1 日,另一个用于 1 月 2 日,但实际上只有一个 Job,但它有两个 JobParameter 对象:一个以 2017 年 1 月 1 日的作业参数启动,另一个以 2017 年 1 月 2 日的参数启动。因此,可以将契约定义为:JobInstance = Job + 标识 JobParameters。这允许开发人员有效地控制如何定义一个 JobInstance,因为他们控制传入哪些参数。

并非所有作业参数都需要有助于识别 JobInstance。默认情况下,它们这样做。但是,该框架还允许使用不有助于 JobInstance 身份的参数提交 Job

JobExecution

“作业执行”指的是在技术上尝试运行作业一次。执行可能会以失败或成功而结束,但是对应于给定执行的 JobInstance 不被视为完整,除非执行成功完成。使用前面描述的 EndOfDay 作业作为示例,考虑一个 2017 年 1 月 1 日的 JobInstance,它在第一次运行时失败。如果它使用与第一次运行相同的标识作业参数(2017 年 1 月 1 日)再次运行,则创建一个新的 JobExecution。然而,仍然只有一个 JobInstance

“作业”定义了作业是什么以及如何执行作业,“作业实例”是一个纯组织对象,用于将执行分组在一起,主要是为了启用正确的重启语义。“作业执行”则是用于存储运行期间实际发生的事情的主要机制,并且包含许多必须控制和保留的更多属性,如下表所示:

Table 1. JobExecution Properties

Property

Definition

Status

一个 BatchStatus 对象,该对象指示执行的状态。在运行时,它为 BatchStatus#STARTED。如果它失败,它为 BatchStatus#FAILED。如果它成功完成,它为 BatchStatus#COMPLETED

startTime

一个 java.time.LocalDateTime,表示开始执行时的当前系统时间。如果作业尚未开始,此字段为空。

endTime

一个 java.time.LocalDateTime 表示执行完成时的当前系统时间,无论是否成功。如果作业尚未完成,此字段为空。

exitStatus

一个 ExitStatus,表示运行结果。它是最重要的,因为它包含返回给调用方的退出代码。有关更多详情,请参阅章节 5。如果作业尚未完成,此字段为空。

createTime

一个 java.time.LocalDateTime 表示 JobExecution 首次持久化时的当前系统时间。作业可能尚未启动(因此没有开始时间),但它始终有一个 createTime,这是框架管理作业级 ExecutionContexts 所必需的。

lastUpdated

一个 java.time.LocalDateTime 表示上次`JobExecution` 持久化的时间。如果作业尚未开始,此字段为空。

executionContext

一个 “property bag” 包含需要在执行之间持久化的任何用户数据。

failureExceptions

Job 执行期间遇到的异常列表。如果在 Job 失败期间遇到多个异常,则可以使用以下方法。

这些属性很重要,因为它们是持久的并且可以用来完全确定执行的状态。例如,如果 2017 年 1 月 1 日的 EndOfDay 作业在晚上 9:00 执行,并在 9:30 失败,则在批处理元数据表中将创建以下条目:

Table 2. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

Table 3. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_ID

TYPE_CD

KEY_NAME

DATE_VAL

IDENTIFYING

1

DATE

schedule.Date

2017-01-01

TRUE

Table 4. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

START_TIME

END_TIME

STATUS

1

1

2017-01-01 21:00

2017-01-01 21:30

FAILED

列名称可能已缩写或删除,以提高清晰度和格式。

现在作业已失败,假设问题确定花费了整晚的时间,因此“批处理窗口”现在已关闭。进一步假设窗口从晚上 9:00 开始,作业再次为 2017 年 1 月 1 日启动,从中断处开始,并在 9:30 成功完成。由于现在是第二天,因此 2017 年 1 月 2 日的作业也必须运行,并且它在 9:31 之后立即启动,并在正常的一小时时间内于 10:30 完成。除非两个作业有可能尝试访问相同的数据,从而导致数据库级别锁定问题,否则不需要在一个作业实例启动后立即启动另一个作业实例。由调度程序完全决定何时应该运行一个作业。由于它们是独立的 JobInstance,因此 Spring Batch 不会尝试阻止它们同时运行。(尝试在另一个作业正在运行时运行同一个 JobInstance 将导致抛出 JobExecutionAlreadyRunningException)。现在 JobInstanceJobParameters 表中应有两个额外的条目,并且 JobExecution 表中应有两个额外的条目,如下表所示:

Table 5. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

2

EndOfDayJob

Table 6. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_ID

TYPE_CD

KEY_NAME

DATE_VAL

IDENTIFYING

1

DATE

schedule.Date

2017-01-01 00:00:00

TRUE

2

DATE

schedule.Date

2017-01-01 00:00:00

TRUE

3

DATE

schedule.Date

2017-01-02 00:00:00

TRUE

Table 7. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

START_TIME

END_TIME

STATUS

1

1

2017-01-01 21:00

2017-01-01 21:30

FAILED

2

1

2017-01-02 21:00

2017-01-02 21:30

COMPLETED

3

2

2017-01-02 21:31

2017-01-02 22:29

COMPLETED

列名称可能已缩写或删除,以提高清晰度和格式。

Step

“步骤”是一个领域对象,它封装了一个批处理作业的一个独立的、顺序的阶段。因此,每个“作业”完全由一个或多个步骤组成。“步骤”包含所有必要的信息来定义和控制实际的批处理。这是一个必然含糊的描述,因为任何给定“步骤”的内容由编写“作业”的开发者决定。“步骤”可以像开发者希望的那样简单或复杂。一个简单的“步骤”可能将数据从文件加载到数据库,不需要很少的代码(取决于所使用的实现)。一个更复杂的“步骤”可能应用复杂的业务规则作为处理的一部分。与“作业”一样,“步骤”有一个单独的 StepExecution,它与一个唯一的 JobExecution 相关联,如下面的图片所示:

jobHeirarchyWithSteps
Figure 3. Job Hierarchy With Steps

StepExecution

StepExecution 表示执行一个“步骤”的一次尝试。每次运行一个“步骤”时都会创建一个新的 StepExecution,类似于 JobExecution。但是,如果一个步骤由于前面的步骤失败而未能执行,则不会持久化执行。只有当步骤实际启动时才会创建一个 StepExecution

“步骤”执行由 StepExecution 类的对象表示。每个执行都包含对其对应的步骤和 JobExecution 以及事务相关数据的引用,例如提交和回滚计数以及开始和结束时间。此外,每个步骤执行都包含一个 ExecutionContext,其中包含开发者需要在批处理运行中持久化的任何数据,例如重启所需的统计信息或状态信息。下表列出了 StepExecution 的属性:

Table 8. StepExecution Properties

Property

Definition

Status

一个 BatchStatus 对象,表示执行的状态。在运行时,状态为 BatchStatus.STARTED。如果失败,状态为 BatchStatus.FAILED。如果成功完成,状态为 BatchStatus.COMPLETED

startTime

A java.time.LocalDateTime,表示启动进程时的当前系统时间。如果步骤尚未启动,该字段为空。

endTime

java.time.LocalDateTime 表示执行结束时的当前系统时间,而不管是否成功。如果步骤尚未结束,此字段为空。

exitStatus

ExitStatus 表示执行结果。此字段非常重要,因为它包含返回给调用者的退出代码。有关更多详细信息,请参见第 5 章。如果作业尚未结束,则此字段为空。

executionContext

一个 “property bag” 包含需要在执行之间持久化的任何用户数据。

readCount

已成功读取的项数。

writeCount

已成功写入的项数。

commitCount

此执行已提交的事务数。

rollbackCount

Step 控制的事务已回滚的次数。

readSkipCount

read 失败的次数导致跳过项。

processSkipCount

process 失败的次数导致跳过项。

filterCount

“filtered” 的项数。

writeSkipCount

ItemProcessor 失败的次数导致跳过项。

ExecutionContext

ExecutionContext 表示由框架持久化和控制的一组键值对,为开发者提供了一个存储持久化状态的地方,该状态限定在 StepExecution 对象或 JobExecution 对象中。(对于熟悉 Quartz 的人来说,它非常类似于 JobDataMap。)最好的使用方法示例是方便重启。以平面文件输入为例,在处理单个行时,框架会定期在提交点持久化 ExecutionContext

executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());

以“作业”类型详解部分的“EndOfDay”示例为例,假定有一个加载文件到数据库的步骤“loadData”。在第一次运行失败后,元数据表将如下示例所示:

Table 9. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

Table 10. BATCH_JOB_EXECUTION_PARAMS

JOB_INST_ID

TYPE_CD

KEY_NAME

DATE_VAL

1

DATE

schedule.Date

2017-01-01

Table 11. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

START_TIME

END_TIME

STATUS

1

1

2017-01-01 21:00

2017-01-01 21:30

FAILED

Table 12. BATCH_STEP_EXECUTION

STEP_EXEC_ID

JOB_EXEC_ID

STEP_NAME

START_TIME

END_TIME

STATUS

1

1

loadData

2017-01-01 21:00

2017-01-01 21:30

FAILED

Table 13. BATCH_STEP_EXECUTION_CONTEXT

STEP_EXEC_ID

SHORT_CONTEXT

1

{piece.count=40321}

在前一个案例中,“Step”运行了 30 分钟,处理了 40,321 个“pieces”,这是此场景中文件中的行数。此值在每次提交前由框架更新,并且可以包含与“ExecutionContext”中条目相对应的多行。在提交前收到通知需要各种“StepListener”实现之一(或“ItemStream”),本指南的后面部分将详细讨论这些内容。与上一个示例一样,假定第二天重新启动“Job”。重新启动后,数据库中将重建“ExecutionContext”的上一次运行的值。当打开“ItemReader”时,它会检查上下文中是否有任何已存储状态,并由此处进行初始化,如以下示例所示:

if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
    log.debug("Initializing for restart. Restart data is: " + executionContext);

    long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));

    LineReader reader = getReader();

    Object record = "";
    while (reader.getPosition() < lineCount && record != null) {
        record = readLine();
    }
}

在这种情况下,在上述代码运行后,当前行是 40,322,从而让“Step”可以从中断处重新开始。您还可以使用“ExecutionContext”获取需要关于运行本身持久化的统计信息。例如,如果平面文件中包含跨多行存在的待处理订单,那么存储已处理订单数(与读取的行数完全不同)可能是必要的,以便可以在“Step”结束时通过邮件发送邮件,邮件正文显示已处理订单总数。框架会处理此项操作的存储,以正确的范围将其与单个“JobInstance”关联。以下应该注意的是,有必要很难弄清楚是否应该使用现有的“ExecutionContext”。例如,使用上面的“EndOfDay”示例时,当 01-01 运行第二次重新启动时,框架会识别出这是同一个“JobInstance”,并且在单个“Step”基础上,从数据库中提取“ExecutionContext”,并将它(作为“StepExecution”的一部分)传递给“Step”本身。另一方面,对于 01-02 运行,框架会识别出这是另一个实例,因此必须将空上下文传递给“Step”。框架会为开发人员做出许多此类确定,以确保在正确时间将状态提供给开发人员。还需要注意的是,在任何给定时间,每个“StepExecution”都存在一个唯一的“ExecutionContext”。“ExecutionContext”客户端应该小心,因为这会创建一个共享键空间。因此,在放入值时应该谨慎,以确保不会覆盖任何数据。但是,“Step”完全不存储上下文中任何数据,因此没有办法对框架产生不利影响。

请注意,“JobExecution”中至少有一个“ExecutionContext”,每个“StepExecution”也有一个。例如,考虑以下代码段:

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob

如注释中所述,“ecStep”不等于“ecJob”。它们是两个不同的“ExecutionContext”。限定到“Step”的“ExecutionContext”在“Step”的每个提交点保存,限定到作业的“ExecutionContext”在每个“Step”执行之间保存。

ExecutionContext,所有非瞬态条目都必须 Serializable。执行上下文的正确序列化支撑着步骤和作业的重启能力。如果您使用不是原生可序列化的键或值,那么您必须采用量身定制的序列化方法。如果无法序列化执行上下文,可能会危及状态持久化进程,使无法正确恢复失败的作业。

JobRepository

“JobRepository”是之前提到的所有类型的持久性机制。它为“JobLauncher”、“Job”和“Step”实现提供 CRUD 操作。首次启动“Job”时,从存储库获取“JobExecution”。此外,在执行过程中,“StepExecution”和“JobExecution”实现通过传递给存储库而持久化。

Java

使用 Java 配置时,“@EnableBatchProcessing”注解提供“JobRepository”,作为自动配置的组件之一。

XML

Spring Batch XML 命名空间支持通过“<job-repository>”标记配置“JobRepository”实例,如下示例所示:

<job-repository id="jobRepository"/>

JobLauncher

“JobLauncher”表示一个简单界面,用于使用给定的“JobParameters”集启动“Job”,如下示例所示:

public interface JobLauncher {

public JobExecution run(Job job, JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException,
                   JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}

需要实现从“JobRepository”获取有效的“JobExecution”并执行“Job”。

ItemReader

`ItemReader`是一种抽象,它代表一次一个条目地检索 `Step`的输入。当 `ItemReader`用尽了它可以提供的所有条目,它通过返回 `null`来指示这一点。你可以在 Readers And Writers中找到有关 `ItemReader`界面及其各种实现的更多详细信息。

ItemWriter

`ItemWriter`是一种抽象,它代表一次一个批次或块输出给 `Step`的输出。通常,`ItemWriter`并不知道它接下来应该接收的输入,只知道在其当前调用中传入的条目。你可以在 Readers And Writers中找到有关 `ItemWriter`界面及其各种实现的更多详细信息。

ItemProcessor

`ItemProcessor`是一种抽象,它代表条目的业务处理。在 `ItemReader`读取一个条目,`ItemWriter`写入一个条目的同时,`ItemProcessor`提供一个访问点来转换或应用其他业务处理。如果在处理条目时,确定该条目无效,返回 `null`表示不应写出该条目。你可以在 Readers And Writers中找到有关 `ItemProcessor`界面的更多详细信息。

Batch Namespace

许多先前列出的域概念需要在 Spring “ApplicationContext” 中配置。虽然可以将上述界面的实现用于标准 Bean 定义,但已提供了命名空间以便于配置,如下示例所示:

<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
   http://www.springframework.org/schema/beans
   https://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/batch
   https://www.springframework.org/schema/batch/spring-batch.xsd">

<job id="ioSampleJob">
    <step id="step1">
        <tasklet>
            <chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
        </tasklet>
    </step>
</job>

</beans:beans>

只要声明了批次命名空间,就可以使用它其中的任何元素。你可以在 Configuring and Running a Job中找到有关配置作业的更多信息。你可以在 Configuring a Step中找到有关配置 `Step`的更多信息。