Scaling and Parallel Processing

许多批量处理问题可以通过单线程单进程作业解决,因此在考虑更复杂的实现之前,最好先检查是否符合您的需求。测量实际作业的性能,并首先确定最简单的实现是否满足您的需求。您可以在不到一分钟的时间内读写数百兆字节的文件,即使使用标准硬件也能做到。 当您准备好开始通过并行处理实现作业时,Spring Batch 提供了一系列选项,本章中会介绍这些选项,尽管其他地方也介绍了一些功能。在高级别上,有两种并行处理模式:

  • Single-process, multi-threaded

  • Multi-process

它们也会细分为以下类别:

  • Multi-threaded Step (single-process)

  • Parallel Steps (single-process)

  • Remote Chunking of Step(多进程)

  • Partitioning a Step(单个或多进程)

首先,我们回顾一下单进程选项。然后,我们回顾一下多进程选项。

Multi-threaded Step

开始并行处理的最简单方法是将 TaskExecutor 添加到步骤配置。

Java

使用 Java 配置时,您可以向步骤添加 TaskExecutor,如下例所示:

Java Configuration
@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.taskExecutor(taskExecutor)
				.build();
}
XML

例如,您可以为 tasklet 添加一个属性,如下所示:

<step id="loading">
    <tasklet task-executor="taskExecutor">...</tasklet>
</step>

在此示例中,taskExecutor 是对实现 TaskExecutor 接口的另一个 Bean 定义的引用。 TaskExecutor 是标准 Spring 接口,因此请查阅 Spring 用户指南以了解可用的实现的详细信息。最简单的多线程 TaskExecutorSimpleAsyncTaskExecutor

前述配置的结果是,Step 通过在单独的执行线程中读取、处理和写入每一块项目(每个提交间隔)来执行。请注意,这意味着没有固定的项目处理顺序,并且一块可能包含与单线程情况相比是非连续的项目。除了任务执行程序放置的任何限制(例如它是否由线程池支持)之外,tasklet 配置还有限制节流(默认:4)。您可能需要增加此限制以确保线程池被充分利用。

Java

在使用 Java 配置时,构建器提供对节流限制的访问,如下所示:

Java Configuration
@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.taskExecutor(taskExecutor)
				.throttleLimit(20)
				.build();
}
XML

例如,您可以增加限制节流,如下所示:

<step id="loading"> <tasklet
    task-executor="taskExecutor"
    throttle-limit="20">...</tasklet>
</step>

还要注意,您的步骤中使用的任何池化资源(例如 DataSource)可能会对并发进行限制。请务必确保这些资源中的池至少与步骤中期望的并发线程数一样大。

Example 1. Throttle limit deprecation

从 v5.0 开始,节流限制已弃用,没有替代。如果您希望在默认的 TaskExecutorRepeatTemplate 中替换当前的限制机制,则需要提供一个自定义的 RepeatOperations 实现(基于具有有限任务队列的 TaskExecutor),并使用 StepBuilder#stepOperations 在步骤中设置它:

Java Configuration
@Bean
public Step sampleStep(RepeatOperations customRepeatOperations, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.stepOperations(customRepeatOperations)
				.build();
}

对于一些常见的批处理用例,使用多线程 Step 实现有一些实际限制。Step 中的很多参与者(例如读取器和写入器)是有状态的。如果状态未按线程隔离,则这些组件在多线程 Step 中不可用。特别是,Spring Batch 中的大多数读取器和写入器都并非设计用于多线程使用。但是,可以与无状态或线程安全的读取器和写入器一起使用,并且 Spring Batch Samples 中有一个示例(称为 parallelJob),它演示了如何使用进程指示器(请参见 Preventing State Persistence)来跟踪数据库输入表中已处理的项。

Spring Batch 提供了一些 ItemWriterItemReader 的实现。通常,它们会在 Javadoc 中说明它们是否是线程安全的或在并发环境中避免问题需要执行什么操作。如果 Javadoc 中没有信息,您可以检查实现以查看是否有任何状态。如果读取器不是线程安全的,您可以用提供的 SynchronizedItemStreamReader 修饰它或在您自己的同步委托程序中使用它。您可以同步对 read() 的调用,并且只要处理和写入是块中最昂贵的部分,您的步骤仍然可能比在单线程配置中完成得快得多。

Parallel Steps

只要需要并行化的应用程序逻辑可以拆分为不同的职责并分配给各个步骤,它就可以在一个进程中并行化。并行步骤执行很容易配置和使用。

Java

在使用 Java 配置时,使用 step3 并行执行步骤 `(step1, step2)`很简单,如下:

Java Configuration
@Bean
public Job job(JobRepository jobRepository) {
    return new JobBuilder("job", jobRepository)
        .start(splitFlow())
        .next(step4())
        .build()        //builds FlowJobBuilder instance
        .build();       //builds Job instance
}

@Bean
public Flow splitFlow() {
    return new FlowBuilder<SimpleFlow>("splitFlow")
        .split(taskExecutor())
        .add(flow1(), flow2())
        .build();
}

@Bean
public Flow flow1() {
    return new FlowBuilder<SimpleFlow>("flow1")
        .start(step1())
        .next(step2())
        .build();
}

@Bean
public Flow flow2() {
    return new FlowBuilder<SimpleFlow>("flow2")
        .start(step3())
        .build();
}

@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("spring_batch");
}
XML

例如,使用 step3 并行执行步骤 `(step1, step2)`很简单,如下:

<job id="job1">
    <split id="split1" task-executor="taskExecutor" next="step4">
        <flow>
            <step id="step1" parent="s1" next="step2"/>
            <step id="step2" parent="s2"/>
        </flow>
        <flow>
            <step id="step3" parent="s3"/>
        </flow>
    </split>
    <step id="step4" parent="s4"/>
</job>

<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>

可配置的任务执行程序用于指定应由哪个 TaskExecutor 实现执行各个流程。默认值为 SyncTaskExecutor,但需要异步的 TaskExecutor 来并行运行步骤。请注意,作业确保在聚合退出状态并转换之前每个流程中的流程都完成。

有关更多详细信息,请参阅有关 Split Flows 的部分。

Remote Chunking

在远程分块中,Step 处理在多个进程之间拆分,通过某些中间件彼此通信。下图显示了模式:

remote chunking
Figure 1. Remote Chunking

管理器组件是一个单一进程,工人是多个远程进程。如果管理器不是瓶颈,则此模式最有效,因此处理必须比读取项目更昂贵(在实践中通常如此)。

管理器是 Spring Batch Step 的实现,其 ItemWriter 被一个泛型版本替换,该版本知道如何将块项目作为消息发送到中间件。工人是为所使用的任何中间件的标准侦听器(例如,对于 JMS,它们将是 MesssageListener 实现),并且它们的角色是通过 ChunkProcessor 接口使用标准 ItemWriterItemProcessor 加上 ItemWriter 处理块项目。使用此模式的优点之一是读取器、处理器和写入器组件是现成的(与用于步骤本地执行的组件相同)。项目被动态划分,并且工作通过中间件共享,这样,如果所有侦听器都是急切的使用者,则负载平衡将是自动的。

中间件必须是持久的,并且必须为每条消息保证交付且只有单个使用者。JMS 是显而易见的候选者,但在网格计算和共享内存产品空间中还存在其他选项(例如 JavaSpaces)。

有关更多详细信息,请参阅有关 Spring Batch Integration - Remote Chunking 的部分。

Partitioning

Spring Batch 还为分区分块的 Step 执行并远程执行它提供了 SPI。在这种情况下,远程参与者是 Step 实例,可以像本地处理一样轻松地对其进行配置和使用。下图显示了模式:

partitioning overview
Figure 2. Partitioning

“作业”在左侧以“步骤”实例的序列运行,并且其中一个“步骤”实例被标记为管理器。此图中的工作器都是“步骤”的同等实例,实际上可以取代管理器,从而导致“作业”的相同结果。工作器通常是远程服务,但也可能是执行的本地线程。此模式中管理器发送给工作器 的消息不必持久,也不必有保证的交付。JobRepository 中的 Spring Batch 元数据确保每个工作器对每个“作业”执行只执行一次。

Spring Batch 中的 SPI 包括 Step 的特殊实现(称为 PartitionStep)和两个需要针对特定环境实现的策略接口。策略接口是 PartitionHandlerStepExecutionSplitter,以下序列图显示了它们的作用:

partitioning spi
Figure 3. Partitioning SPI

在这种情况下,右侧的“步骤”是 “”远程“工作器,因此可能有多个对象或进程扮演此角色,并且显示 PartitionStep 驱动执行。

Java

以下示例显示了在使用 Java 配置时 PartitionStep 配置:

Java Configuration
@Bean
public Step step1Manager() {
    return stepBuilderFactory.get("step1.manager")
        .<String, String>partitioner("step1", partitioner())
        .step(step1())
        .gridSize(10)
        .taskExecutor(taskExecutor())
        .build();
}

与多线程步骤的 throttleLimit 方法类似,gridSize 方法防止任务执行器被单个步骤的请求饱和。

XML

以下示例显示了使用 XML 配置时`PartitionStep` 配置:

<step id="step1.manager">
    <partition step="step1" partitioner="partitioner">
        <handler grid-size="10" task-executor="taskExecutor"/>
    </partition>
</step>

与多线程步骤的 throttle-limit 属性类似,grid-size 属性可防止任务执行器被单个步骤的请求饱和。

Spring Batch Samples 的单元测试套件(请参见 partition*Job.xml 配置)有一个简单的示例,你可以复制并扩展该示例。

Spring Batch 为称为 step1:partition0 的分区创建步骤执行,依此类推。许多人出于一致性考虑,更喜欢将管理器步骤称为 step1:manager。您可以使用步骤的别名(通过指定 name 属性而不是 id 属性)。

PartitionHandler

PartitionHandler 是了解远程处理结构或网格环境的组件。它能够以某种特定于结构的格式(如 DTO)将 StepExecution 请求发送到远程 Step 实例。它不必知道如何分割输入数据或如何聚合多个 Step 执行的结果。一般来说,它可能也不需要了解恢复能力或故障转移,因为在许多情况下这些都是结构的特性。在任何情况下,Spring Batch 始终提供独立于结构的可重启性。失败的“ 作业”总是可以重新启动,并且在这种情况下,只有失败的“步骤”会被重新执行。

PartitionHandler 接口可以针对各种类型的结构具有专门的实现,包括简单的 RMI 远程处理、EJB 远程处理、自定义 Web 服务、JMS、JavaSpaces、共享内存网格(如 Terracotta 或 Coherence)和网格执行结构(如 GridGain)。Spring Batch 不包含任何专有网格或远程处理结构的实现。

然而,Spring Batch 提供了 PartitionHandler 的一个有用实现,它使用 Spring 的 TaskExecutor 策略在执行的单独线程中本地执行 Step 实例。该实现称为 TaskExecutorPartitionHandler

Java

您可以显式配置 TaskExecutorPartitionHandler,如下例所示:

Java Configuration
@Bean
public Step step1Manager(JobRepository jobRepository) {
    return new StepBuilder("step1.manager", jobRepository)
        .partitioner("step1", partitioner())
        .partitionHandler(partitionHandler())
        .build();
}

@Bean
public PartitionHandler partitionHandler() {
    TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
    retVal.setTaskExecutor(taskExecutor());
    retVal.setStep(step1());
    retVal.setGridSize(10);
    return retVal;
}
XML

TaskExecutorPartitionHandler 是使用前面显示的 XML 命名空间配置的步骤的默认值。您还可以显式配置它,如下所示:

<step id="step1.manager">
    <partition step="step1" handler="handler"/>
</step>

<bean class="org.spr...TaskExecutorPartitionHandler">
    <property name="taskExecutor" ref="taskExecutor"/>
    <property name="step" ref="step1" />
    <property name="gridSize" value="10" />
</bean>

gridSize 属性确定要创建的独立步骤执行数,以便可以与 TaskExecutor 中线程池的大小匹配。或者,可以将其设置为大于可用线程数,这会减小工作块。

TaskExecutorPartitionHandler 适用于以 IO 为主导的 Step 实例,例如复制大量文件或将文件系统复制到内容管理系统。还可以通过提供一个 Step 实现(如使用 Spring Remoting)来充当远程调用的代理,从而用于远程执行。

Partitioner

Partitioner 责任更简单:只生成执行上下文作为新步骤执行的输入参数(不必担心重新启动)。它有一个方法,如下所示的接口定义中所示:

public interface Partitioner {
    Map<String, ExecutionContext> partition(int gridSize);
}

此方法的返回值将每个步骤执行的唯一名称(“String”)与作为 ExecutionContext 输入参数的形式关联起来。这些名称稍后将在 Batch 元数据中显示为分区 StepExecutions 中的步骤名称。ExecutionContext 只是一个名称-值对集合,因此它可能包含一系列主键、行号或输入文件的位置。然后,远程 Step 通常使用 #{…​} 占位符(在步骤作用域中后期绑定)绑定到上下文输入,如下一节中所示。

步骤执行的名称(Partitioner 返回的 Map 中的键)需要在“作业”的步骤执行中是唯一的,但没有其他特定要求。执行此操作的最简单方法(并为用户使名称更有意义)是使用前缀+后缀命名约定,其中前缀是要执行的步骤的名称(本身在“作业”中是唯一的),后缀只是一个计数器。该框架中有一个使用此约定的 SimplePartitioner

您可以使用一个可选的接口 PartitionNameProvider 来单独提供分区名称和分区本身。如果 Partitioner 实现此接口,则在重新启动时只查询名称。如果分区成本很高,这可能是一个有用的优化。PartitionNameProvider 提供的名称必须与 Partitioner 提供的名称匹配。

Binding Input Data to Steps

对于 PartitionHandler 执行的步骤使用相同的配置,并且在运行时从 ExecutionContext 中绑定它们的输入参数非常有效。使用 Spring Batch 的 StepScope 功能可以轻松完成此操作(有关 Late Binding 部分中更多详细信息)。例如,如果 Partitioner 使用名为 fileName 的属性键创建 ExecutionContext 实例,该属性键指向每个步骤调用不同的文件(或目录),则 Partitioner 输出可能类似于下表的正文:

Table 1. Example step execution name to execution context provided by Partitioner targeting directory processing

Step Execution Name (key)

ExecutionContext (value)

filecopy:partition0

fileName=/home/data/one

filecopy:partition1

fileName=/home/data/two

filecopy:partition2

fileName=/home/data/three

然后,可以使用延迟绑定到执行上下文将文件名绑定到某个步骤。

Java

以下示例演示如何在 Java 中定义延迟绑定:

Java Configuration
@Bean
public MultiResourceItemReader itemReader(
	@Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
	return new MultiResourceItemReaderBuilder<String>()
			.delegate(fileReader())
			.name("itemReader")
			.resources(resources)
			.build();
}
XML

以下示例演示如何在 XML 中定义延迟绑定:

XML Configuration
<bean id="itemReader" scope="step"
      class="org.spr...MultiResourceItemReader">
    <property name="resources" value="#{stepExecutionContext[fileName]}/*"/>
</bean>