Batch

  • 关联作业执行与执行任务,以便从一个追溯到另一个

  • 使用 Spring Cloud Deployer 进行远程分区,为远程批处理任务启动并配置 Spring Boot Uber-jar

  • 在 Kubernetes 平台上部署分区的应用程序时需考虑的注意事项

  • 批处理通知消息和批处理作业退出代码处理

此章节详细介绍了 Spring Cloud 任务与 Spring Batch 的集成。本文档中包含跟踪作业执行与执行作业的任务之间的关联以及通过 Spring Cloud Deployer 进行远程分区。

Associating a Job Execution to the Task in which It Was Executed

Spring Boot 提供了在 Spring Boot Uber-jar 中执行批处理作业的工具。Spring Boot 对此功能的支持允许开发者在该执行中执行多个批处理作业。Spring Cloud 任务提供了将作业执行(作业执行)与任务执行关联起来的能力,以便可以从一个追溯到另一个。

Spring Cloud 任务通过使用 TaskBatchExecutionListener 来实现此功能。默认情况下,此侦听器在任何具有已配置的 Spring Batch 作业(通过在上下文中定义 Job 类型 bean)和类路径上具有 spring-cloud-task-batch jar 的上下文中自动配置。侦听器注入到所有满足这些条件的作业中。

Overriding the TaskBatchExecutionListener

若要防止将侦听器注入到当前上下文中的任何批处理作业中,可以使用标准的 Spring Boot 机制禁用自动配置。

若仅将侦听器注入到上下文中的特定作业中,则覆盖 batchTaskExecutionListenerBeanPostProcessor 并提供作业 Bean ID 列表,如下例所示:

public static TaskBatchExecutionListenerBeanPostProcessor batchTaskExecutionListenerBeanPostProcessor() {
	TaskBatchExecutionListenerBeanPostProcessor postProcessor =
		new TaskBatchExecutionListenerBeanPostProcessor();

	postProcessor.setJobNames(Arrays.asList(new String[] {"job1", "job2"}));

	return postProcessor;
}

可在 Spring Cloud Task 项目 here 的示例模块中找到一个示例批处理应用程序。

Remote Partitioning

Spring Cloud Deployer 提供了在大多数云基础设施上启动基于 Spring Boot 的应用程序的工具。DeployerPartitionHandlerDeployerStepExecutionHandler 将 worker 步骤执行的启动委派给 Spring Cloud Deployer。

要配置 DeployerStepExecutionHandler,必须提供一个代表要执行的 Spring Boot Uber-jar 的 Resource、一个 TaskLauncherHandler 和一个 JobExplorer。可以配置任何环境属性以及要同时执行的最大 worker 数、轮询结果的时间间隔(默认为 10 秒)和超时时间(默认为 -1 或不超时)。以下示例显示了配置此 PartitionHandler 的外观:

@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
		JobExplorer jobExplorer) throws Exception {

	MavenProperties mavenProperties = new MavenProperties();
	mavenProperties.setRemoteRepositories(new HashMap<>(Collections.singletonMap("springRepo",
		new MavenProperties.RemoteRepository(repository))));

 	Resource resource =
		MavenResource.parse(String.format("%s:%s:%s",
				"io.spring.cloud",
				"partitioned-batch-job",
				"1.1.0.RELEASE"), mavenProperties);

	DeployerPartitionHandler partitionHandler =
		new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep");

	List<String> commandLineArgs = new ArrayList<>(3);
	commandLineArgs.add("--spring.profiles.active=worker");
	commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
	commandLineArgs.add("--spring.batch.initializer.enabled=false");

	partitionHandler.setCommandLineArgsProvider(
		new PassThroughCommandLineArgsProvider(commandLineArgs));
	partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
	partitionHandler.setMaxWorkers(2);
	partitionHandler.setApplicationName("PartitionedBatchJobTask");

	return partitionHandler;
}

向分区传递环境变量时,不同的分区可能位于不同的机器上,环境设置不同。所以只应传递那些必需的环境变量。

请注意,在上面的示例中,我们已将最大 worker 数设置为 2。设置最大 worker 数会确定一次应运行的最大分区数。

要执行的 Resource 期望是一个 Spring Boot Uber-jar,其中 DeployerStepExecutionHandler 配置为当前上下文中的 CommandLineRunner。前面示例中列举的存储库应该是其中 Spring Boot Uber-jar 所在的远程存储库。预期管理器和 worker 都具有对用作作业存储库和任务存储库的同一数据存储的可见性。一旦底层基础设施引导了 Spring Boot jar,Spring Boot 就会启动 DeployerStepExecutionHandler,该步骤处理程序会执行请求的 Step。以下示例展示了如何配置 DeployerStepExecutionHandler

@Bean
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
	DeployerStepExecutionHandler handler =
		new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);

	return handler;
}

可在 Spring Cloud Task 项目 here 的示例模块中找到一个示例远程分区应用程序。

Asynchronously launch remote batch partitions

默认情况下,批处理分区按顺序启动。但是,在某些情况下,这可能会影响性能,因为每次启动都会阻塞,直到资源(例如:在 Kubernetes 中配置容器)配置好。在这些情况下,可以为 DeployerPartitionHandler 提供一个 ThreadPoolTaskExecutor。这将基于 ThreadPoolTaskExecutor 的配置来启动远程批处理分区。例如:

	@Bean
	public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		executor.setCorePoolSize(4);
		executor.setThreadNamePrefix("default_task_executor_thread");
		executor.setWaitForTasksToCompleteOnShutdown(true);
		executor.initialize();
		return executor;
	}

	@Bean
	public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
		TaskRepository taskRepository, ThreadPoolTaskExecutor executor) throws Exception {
		Resource resource = this.resourceLoader
			.getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT");

		DeployerPartitionHandler partitionHandler =
			new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
				"workerStep", taskRepository, executor);
	...
	}

我们需要关闭上下文,因为 ThreadPoolTaskExecutor 的使用会留有一个活动线程,因此应用程序不会终止。要适当地关闭应用程序,需要将 spring.cloud.task.closecontextEnabled 属性设置为 true

Notes on Developing a Batch-partitioned application for the Kubernetes Platform

  • 在 Kubernetes 平台上部署分区应用程序时,您必须为 Spring Cloud Kubernetes Deployer 使用以下依赖项:[source, xml]

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-deployer-kubernetes</artifactId>
</dependency>
  • 任务应用程序及其分区的应用程序名称需要遵循以下正则表达式模式:[a-z0-9]([-a-z0-9]*[a-z0-9])。否则,将引发异常。

Batch Informational Messages

Spring Cloud Task 为批量作业提供了发送信息消息的能力。“Spring Batch Events” 部分详细介绍了此功能。

Batch Job Exit Codes

earlier 所述,Spring Cloud Task 应用程序支持记录任务执行的退出代码。但是,如果您在任务中运行 Spring Batch Job,则无论 Batch JobExecution 如何完成,使用默认 Batch/Boot 行为时,任务的结果始终为零。请记住,任务是一个引导应用程序,并且从任务返回的退出代码与引导应用程序的退出代码相同。要覆盖此行为并允许任务在批量作业返回 BatchStatusFAILED 时返回非零退出代码,请将 spring.cloud.task.batch.fail-on-job-failure 设置为 true。然后退出代码可以为 1(默认值),也可以基于 specified ExitCodeGenerator)。

此功能使用一个新的 ApplicationRunner,该功能替换了 Spring Boot 提供的功能。默认情况下,它使用相同顺序进行配置。但是,如果你想要自定义 ApplicationRunner 的运行顺序,则可以通过设置 spring.cloud.task.batch.applicationRunnerOrder 属性来设置其顺序。要使任务根据批处理作业执行的结果返回退出码,你需编写自己的 CommandLineRunner