Advanced Metadata Usage

  • JobExplorer:查询存储库以查找已执行的任务。

  • JobRegistry:跟踪应用程序上下文中可用的任务。

  • JobOperator:执行任务操作,例如停止、重新启动和总结任务。

  • JobParametersIncrementer:在每次执行时为任务生成新参数。

这些功能使应用程序能够管理复杂的任务调度和监控需求。

到目前为止,JobLauncher 和 JobRepository 接口都已被讨论过。它们共同表示对任务的简单启动和批处理域对象的 CRUD 基本操作: .Job Repository image::job-repository.png[] JobLauncher 使用 JobRepository 创建新的 JobExecution 对象并运行它们。在运行一个“任务”期间,“任务”和“步骤”实现随后使用相同的 JobRepository 进行相同执行的基本更新。基本操作足以满足简单场景。然而,在具有数百个批处理作业和复杂调度需求的大型批处理环境中,需要高级访问元数据: .Advanced Job Repository Access image::job-repository-advanced.png[] 在即将讨论的章节中讨论的 JobExplorerJobOperator 接口添加了其他功能以查询和控制元数据。

Querying the Repository

在使用任何高级功能之前,最基本的需求是能够查询存储库以查找已有的执行。此功能由 JobExplorer 接口提供:

public interface JobExplorer {

    List<JobInstance> getJobInstances(String jobName, int start, int count);

    JobExecution getJobExecution(Long executionId);

    StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);

    JobInstance getJobInstance(Long instanceId);

    List<JobExecution> getJobExecutions(JobInstance jobInstance);

    Set<JobExecution> findRunningJobExecutions(String jobName);
}

正如从其方法签名中显而易见的那样,JobExplorerJobRepository 的只读版本,并且与 JobRepository 一样,它可以通过使用工厂 Bean 轻松配置。

Java

下面的示例显示了如何在 Java 中配置 JobExplorer

Java Configuration
...
// This would reside in your DefaultBatchConfiguration extension
@Bean
public JobExplorer jobExplorer() throws Exception {
	JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
	factoryBean.setDataSource(this.dataSource);
	return factoryBean.getObject();
}
...
XML

下面的示例显示了如何在 XML 中配置 JobExplorer

XML Configuration
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
      p:dataSource-ref="dataSource" />

Earlier in this chapter 中,我们注意到您可以修改 JobRepository 的表前缀以允许使用不同版本或架构。由于 JobExplorer 用于相同表中,因此它还需要设置前缀的功能。

Java

下面的示例显示了如何为 Java 中的 JobExplorer 设置表前缀:

Java Configuration
...
// This would reside in your DefaultBatchConfiguration extension
@Bean
public JobExplorer jobExplorer() throws Exception {
	JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
	factoryBean.setDataSource(this.dataSource);
	factoryBean.setTablePrefix("SYSTEM.");
	return factoryBean.getObject();
}
...
XML

下面的示例显示了如何在 XML 中为 JobExplorer 设置表前缀:

XML Configuration
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
		p:tablePrefix="SYSTEM."/>

JobRegistry

虽然 JobRegistry (及其父接口 JobLocator)并不是必需的,但如果你想要跟踪在上下文中可以使用的作业,那它可能很有用。它还用于在创建作业后(例如,在子上下文中)将其集中收集到应用程序上下文中也很有用。你还可以使用自定义 JobRegistry 实现来操作已注册作业的名称和其他属性。该框架只提供一种实现,该实现基于一个从作业名称到作业实例的简单映射。

Java

当你使用 @EnableBatchProcessing 时,会为你提供一个 JobRegistry。下面的示例显示了如何配置你自己的 JobRegistry

...
// This is already provided via the @EnableBatchProcessing but can be customized via
// overriding the bean in the DefaultBatchConfiguration
@Override
@Bean
public JobRegistry jobRegistry() throws Exception {
	return new MapJobRegistry();
}
...
XML

下面的示例显示了如何为 XML 中定义的作业包含 JobRegistry

<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" />

你可以通过以下方式之一填充 JobRegistry:使用 bean 后置处理器、使用智能初始化单例或使用 registrar 生命周期组件。即将到来的部分将介绍这些机制。

JobRegistryBeanPostProcessor

这是一个 bean 后置处理器,可以在创建时注册所有作业。

Java

下面的示例显示了如何为 Java 中定义的作业包括 JobRegistryBeanPostProcessor

Java Configuration
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
    JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
    postProcessor.setJobRegistry(jobRegistry);
    return postProcessor;
}
XML

下面的示例显示了如何为 XML 中定义的作业包括 JobRegistryBeanPostProcessor

XML Configuration
<bean id="jobRegistryBeanPostProcessor" class="org.spr...JobRegistryBeanPostProcessor">
    <property name="jobRegistry" ref="jobRegistry"/>
</bean>

虽然这不是绝对必要的,但示例中的后置处理器已经给出了一个 id,以便可以包含在子上下文中(例如,作为父 bean 定义),并导致在那里创建的所有作业也能自动注册。

从 5.1 版本开始,@EnableBatchProcessing 注释会在应用程序上下文中自动注册一个 jobRegistryBeanPostProcessor bean。

JobRegistrySmartInitializingSingleton

这是一个 SmartInitializingSingleton,用于在作业注册表中注册所有单例作业。

Java

下面的示例显示了如何在 Java 中定义 JobRegistrySmartInitializingSingleton

Java Configuration
@Bean
public JobRegistrySmartInitializingSingleton jobRegistrySmartInitializingSingleton(JobRegistry jobRegistry) {
    return new JobRegistrySmartInitializingSingleton(jobRegistry);
}
XML

下面的示例显示了如何在 XML 中定义 JobRegistrySmartInitializingSingleton

XML Configuration
<bean class="org.springframework.batch.core.configuration.support.JobRegistrySmartInitializingSingleton">
    <property name="jobRegistry" ref="jobRegistry" />
</bean>

AutomaticJobRegistrar

这是一个创建子上下文并在其创建时从这些上下文中注册作业的生命周期组件。此操作的一个优点是,虽然子上下文中作业的名称在注册表中仍必须是全局唯一的,但其依赖项可以有“自然”名称。因此,例如,你可以创建一组 XML 配置文件,每个配置文件只有一个 Job,但都对具有相同 bean 名称(例如 reader)的 ItemReader 有不同的定义。如果将所有这些文件都导入同一个上下文中,则 reader 定义将产生冲突并相互覆盖,但是,通过自动 registrar,可以避免这种情况。这样可以更轻松地集成来自应用程序的不同模块的作业。

Java

以下示例展示了如何在 Java 中为已定义任务包含 AutomaticJobRegistrar

Java Configuration
@Bean
public AutomaticJobRegistrar registrar() {

    AutomaticJobRegistrar registrar = new AutomaticJobRegistrar();
    registrar.setJobLoader(jobLoader());
    registrar.setApplicationContextFactories(applicationContextFactories());
    registrar.afterPropertiesSet();
    return registrar;

}
XML

以下示例展示了如何在 XML 中为已定义任务包含 AutomaticJobRegistrar

XML Configuration
<bean class="org.spr...AutomaticJobRegistrar">
   <property name="applicationContextFactories">
      <bean class="org.spr...ClasspathXmlApplicationContextsFactoryBean">
         <property name="resources" value="classpath*:/config/job*.xml" />
      </bean>
   </property>
   <property name="jobLoader">
      <bean class="org.spr...DefaultJobLoader">
         <property name="jobRegistry" ref="jobRegistry" />
      </bean>
   </property>
</bean>

注册器具有两个必备属性:ApplicationContextFactory 数组(在上一个示例中,此数组是通过便捷工厂 Bean 创建的)和 JobLoaderJobLoader 负责管理子上下文的生命周期,并在 JobRegistry 中注册任务。

ApplicationContextFactory 负责创建子上下文。最常见的用法是(如上一个示例所示)使用 ClassPathXmlApplicationContextFactory。此工厂的一个特性是,它在默认情况下会将部分配置从父上下文中复制到子上下文中。因此,例如,不必在子上下文中重新定义 PropertyPlaceholderConfigurer 或 AOP 配置(只要该配置与父上下文中的配置相同即可)。

可以将 AutomaticJobRegistrarJobRegistryBeanPostProcessor 结合使用(只要同时使用 DefaultJobLoader 即可)。例如,当除主父上下文中定义了任务外,还可以在子位置中定义任务时,这可能需要用。

JobOperator

如前所述,JobRepository 在元数据上提供 CRUD 操作,JobExplorer 在元数据上提供只读操作。但是,这些操作在与其它操作结合使用时最有用,可执行停止、重新启动或总结任务等常见监控任务,通常都是由批处理操作员来执行此类操作。Spring Batch 在 JobOperator 接口中提供这些类型的操作:

public interface JobOperator {

    List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;

    List<Long> getJobInstances(String jobName, int start, int count)
          throws NoSuchJobException;

    Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;

    String getParameters(long executionId) throws NoSuchJobExecutionException;

    Long start(String jobName, String parameters)
          throws NoSuchJobException, JobInstanceAlreadyExistsException;

    Long restart(long executionId)
          throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
                  NoSuchJobException, JobRestartException;

    Long startNextInstance(String jobName)
          throws NoSuchJobException, JobParametersNotFoundException, JobRestartException,
                 JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException;

    boolean stop(long executionId)
          throws NoSuchJobExecutionException, JobExecutionNotRunningException;

    String getSummary(long executionId) throws NoSuchJobExecutionException;

    Map<Long, String> getStepExecutionSummaries(long executionId)
          throws NoSuchJobExecutionException;

    Set<String> getJobNames();

}

前述操作代表来自许多不同接口的方法,例如 JobLauncherJobRepositoryJobExplorerJobRegistry。因此,提供的 JobOperator (SimpleJobOperator) 实现具有许多依赖关系。

Java

以下示例展示了 Java 中 SimpleJobOperator 的典型 Bean 定义:

 /**
  * All injected dependencies for this bean are provided by the @EnableBatchProcessing
  * infrastructure out of the box.
  */
 @Bean
 public SimpleJobOperator jobOperator(JobExplorer jobExplorer,
                                JobRepository jobRepository,
                                JobRegistry jobRegistry,
                                JobLauncher jobLauncher) {

	SimpleJobOperator jobOperator = new SimpleJobOperator();
	jobOperator.setJobExplorer(jobExplorer);
	jobOperator.setJobRepository(jobRepository);
	jobOperator.setJobRegistry(jobRegistry);
	jobOperator.setJobLauncher(jobLauncher);

	return jobOperator;
 }
XML

以下示例展示了 XML 中 SimpleJobOperator 的典型 Bean 定义:

<bean id="jobOperator" class="org.spr...SimpleJobOperator">
    <property name="jobExplorer">
        <bean class="org.spr...JobExplorerFactoryBean">
            <property name="dataSource" ref="dataSource" />
        </bean>
    </property>
    <property name="jobRepository" ref="jobRepository" />
    <property name="jobRegistry" ref="jobRegistry" />
    <property name="jobLauncher" ref="jobLauncher" />
</bean>

从 5.0 版本开始,@EnableBatchProcessing 注释会自动将任务操作程序 Bean 注册到应用程序上下文中。

如果在作业资料库上设置了表前缀,请记得在作业浏览器上也设置它。

JobParametersIncrementer

JobOperator 上的大多数方法都是不言自明的,你可以在 Javadoc of the interface 中找到更详细的说明。但是,startNextInstance 方法值得注意。此方法始终启动 Job 的新实例。如果 JobExecution 中出现严重问题且 Job 需要从头开始重新启动,这将非常有用。与 JobLauncher (需要一个触发新 JobInstance 的新 JobParameters 对象)不同,如果参数与前一组参数不同,则 startNextInstance 方法使用 Job 绑定的 JobParametersIncrementer 来强制 Job 为新实例 :

public interface JobParametersIncrementer {

    JobParameters getNext(JobParameters parameters);

}

JobParametersIncrementer 的契约是给定一个 JobParameters 对象,它将通过增加其中可能包含的任何必要值来返回 “next” JobParameters 对象。此策略很有用,因为框架无法知道对 JobParameters 的哪些更改使其成为 “next” 实例。例如,如果 JobParameters 中的唯一值是日期,并且应该创建下一个实例,那么该值应该增加一天还是一周(例如,如果该作业是每周的)。对于有助于标识 Job 的任何数值也是如此,如下例所示:

public class SampleIncrementer implements JobParametersIncrementer {

    public JobParameters getNext(JobParameters parameters) {
        if (parameters==null || parameters.isEmpty()) {
            return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();
        }
        long id = parameters.getLong("run.id",1L) + 1;
        return new JobParametersBuilder().addLong("run.id", id).toJobParameters();
    }
}

在此示例中,具有键 run.id 的值用于区分 JobInstances。如果传入的 JobParameters 为 null,则可以假定该 Job 之前从未运行过,因此,可以返回其初始状态。但是,如果没有,则获取旧值,将其增加 1,然后返回。

Java

对于 Java 中定义的任务,可以通过构建器中提供的 incrementer 方法将增量器与 Job 关联,如下所示:

@Bean
public Job footballJob(JobRepository jobRepository) {
    return new JobBuilder("footballJob", jobRepository)
    				 .incrementer(sampleIncrementer())
    				 ...
                     .build();
}
XML

对于 XML 中定义的任务,可以通过命名空间中的 incrementer 属性将增量器与 Job 关联,如下所示:

<job id="footballJob" incrementer="sampleIncrementer">
    ...
</job>

Stopping a Job

JobOperator 最常见的用例之一是正常停止任务:

Set<Long> executions = jobOperator.getRunningExecutions("sampleJob");
jobOperator.stop(executions.iterator().next());

关闭不是立即执行的,因为没有办法强制立即关闭,特别是如果执行目前处于框架无法控制的开发人员代码中(例如业务服务)。但是,只要控制权返回到框架,框架就会将当前 StepExecution 的状态设置为 BatchStatus.STOPPED,将其保存,并在完成之前对 JobExecution 也执行相同的操作。

Aborting a Job

FAILED 的任务执行可以(如果此 Job 是可重新启动的)重新启动。状态为 ABANDONED 的任务执行无法由框架重新启动。ABANDONED 状态也在步骤执行中使用,用于将其标记为可在重新启动的任务执行中跳过。如果任务正在运行且遇到在前一个失败的任务执行中已标记为 ABANDONED 的步骤,它会移动到下一步(由任务流程定义和步骤执行退出状态确定)。

如果进程已关闭(kill -9 或服务器故障),那么该任务当然没有在运行,但是 JobRepository 无从得知,因为在进程关闭之前没有人告诉它。您必须手动告诉它您知道该执行已失败,或者应被认为已中止(将其状态更改为 FAILEDABANDONED)。这是一个业务决策,没有办法将其自动化。只有在任务可重新启动并且您知道重启数据有效时,才能将状态更改为 FAILED