Advanced Metadata Usage

  • JobExplorer:查询存储库以查找已执行的任务。

  • JobRegistry:跟踪应用程序上下文中可用的任务。

  • JobOperator:执行任务操作,例如停止、重新启动和总结任务。

  • JobParametersIncrementer:在每次执行时为任务生成新参数。

这些功能使应用程序能够管理复杂的任务调度和监控需求。

到目前为止,JobLauncher 和 JobRepository 接口都已被讨论过。它们共同表示对任务的简单启动和批处理域对象的 CRUD 基本操作:

So far, both the JobLauncher and JobRepository interfaces have been discussed. Together, they represent the simple launching of a job and basic CRUD operations of batch domain objects: .Job Repository image::job-repository.png[]

JobLauncher 使用 JobRepository 创建新的 JobExecution 对象并运行它们。在运行一个“任务”期间,“任务”和“步骤”实现随后使用相同的 JobRepository 进行相同执行的基本更新。基本操作足以满足简单场景。然而,在具有数百个批处理作业和复杂调度需求的大型批处理环境中,需要高级访问元数据:

A JobLauncher uses the JobRepository to create new JobExecution objects and run them. Job and Step implementations later use the same JobRepository for basic updates of the same executions during the running of a Job. The basic operations suffice for simple scenarios. However, in a large batch environment with hundreds of batch jobs and complex scheduling requirements, more advanced access to the metadata is required: .Advanced Job Repository Access image::job-repository-advanced.png[]

在即将讨论的章节中讨论的 JobExplorerJobOperator 接口添加了其他功能以查询和控制元数据。

The JobExplorer and JobOperator interfaces, which are discussed in the coming sections, add additional functionality for querying and controlling the metadata.

Querying the Repository

在使用任何高级功能之前,最基本的需求是能够查询存储库以查找已有的执行。此功能由 JobExplorer 接口提供:

The most basic need before any advanced features is the ability to query the repository for existing executions. This functionality is provided by the JobExplorer interface:

public interface JobExplorer {

    List<JobInstance> getJobInstances(String jobName, int start, int count);

    JobExecution getJobExecution(Long executionId);

    StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);

    JobInstance getJobInstance(Long instanceId);

    List<JobExecution> getJobExecutions(JobInstance jobInstance);

    Set<JobExecution> findRunningJobExecutions(String jobName);
}

正如从其方法签名中显而易见的那样,JobExplorerJobRepository 的只读版本,并且与 JobRepository 一样,它可以通过使用工厂 Bean 轻松配置。

As is evident from its method signatures, JobExplorer is a read-only version of the JobRepository, and, like the JobRepository, it can be easily configured by using a factory bean.

Java

下面的示例显示了如何在 Java 中配置 JobExplorer

The following example shows how to configure a JobExplorer in Java:

Java Configuration
...
// This would reside in your DefaultBatchConfiguration extension
@Bean
public JobExplorer jobExplorer() throws Exception {
	JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
	factoryBean.setDataSource(this.dataSource);
	return factoryBean.getObject();
}
...
XML

下面的示例显示了如何在 XML 中配置 JobExplorer

The following example shows how to configure a JobExplorer in XML:

XML Configuration
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
      p:dataSource-ref="dataSource" />

Earlier in this chapter 中,我们注意到您可以修改 JobRepository 的表前缀以允许使用不同版本或架构。由于 JobExplorer 用于相同表中,因此它还需要设置前缀的功能。

Earlier in this chapter, we noted that you can modify the table prefix of the JobRepository to allow for different versions or schemas. Because the JobExplorer works with the same tables, it also needs the ability to set a prefix.

Java

下面的示例显示了如何为 Java 中的 JobExplorer 设置表前缀:

The following example shows how to set the table prefix for a JobExplorer in Java:

Java Configuration
...
// This would reside in your DefaultBatchConfiguration extension
@Bean
public JobExplorer jobExplorer() throws Exception {
	JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
	factoryBean.setDataSource(this.dataSource);
	factoryBean.setTablePrefix("SYSTEM.");
	return factoryBean.getObject();
}
...
XML

下面的示例显示了如何在 XML 中为 JobExplorer 设置表前缀:

The following example shows how to set the table prefix for a JobExplorer in XML:

XML Configuration
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
		p:tablePrefix="SYSTEM."/>

JobRegistry

虽然 JobRegistry (及其父接口 JobLocator)并不是必需的,但如果你想要跟踪在上下文中可以使用的作业,那它可能很有用。它还用于在创建作业后(例如,在子上下文中)将其集中收集到应用程序上下文中也很有用。你还可以使用自定义 JobRegistry 实现来操作已注册作业的名称和其他属性。该框架只提供一种实现,该实现基于一个从作业名称到作业实例的简单映射。

A JobRegistry (and its parent interface, JobLocator) is not mandatory, but it can be useful if you want to keep track of which jobs are available in the context. It is also useful for collecting jobs centrally in an application context when they have been created elsewhere (for example, in child contexts). You can also use custom JobRegistry implementations to manipulate the names and other properties of the jobs that are registered. There is only one implementation provided by the framework and this is based on a simple map from job name to job instance.

Java

当你使用 @EnableBatchProcessing 时,会为你提供一个 JobRegistry。下面的示例显示了如何配置你自己的 JobRegistry

When using @EnableBatchProcessing, a JobRegistry is provided for you. The following example shows how to configure your own JobRegistry:

...
// This is already provided via the @EnableBatchProcessing but can be customized via
// overriding the bean in the DefaultBatchConfiguration
@Override
@Bean
public JobRegistry jobRegistry() throws Exception {
	return new MapJobRegistry();
}
...
XML

下面的示例显示了如何为 XML 中定义的作业包含 JobRegistry

The following example shows how to include a JobRegistry for a job defined in XML:

<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" />

你可以通过以下方式之一填充 JobRegistry:使用 bean 后置处理器、使用智能初始化单例或使用 registrar 生命周期组件。即将到来的部分将介绍这些机制。

You can populate a JobRegistry in one of the following ways: by using a bean post processor, or by using a smart initializing singleton or by using a registrar lifecycle component. The coming sections describe these mechanisms.

JobRegistryBeanPostProcessor

这是一个 bean 后置处理器,可以在创建时注册所有作业。

This is a bean post-processor that can register all jobs as they are created.

Java

下面的示例显示了如何为 Java 中定义的作业包括 JobRegistryBeanPostProcessor

The following example shows how to include the JobRegistryBeanPostProcessor for a job defined in Java:

Java Configuration
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
    JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
    postProcessor.setJobRegistry(jobRegistry);
    return postProcessor;
}
XML

下面的示例显示了如何为 XML 中定义的作业包括 JobRegistryBeanPostProcessor

The following example shows how to include the JobRegistryBeanPostProcessor for a job defined in XML:

XML Configuration
<bean id="jobRegistryBeanPostProcessor" class="org.spr...JobRegistryBeanPostProcessor">
    <property name="jobRegistry" ref="jobRegistry"/>
</bean>

虽然这不是绝对必要的,但示例中的后置处理器已经给出了一个 id,以便可以包含在子上下文中(例如,作为父 bean 定义),并导致在那里创建的所有作业也能自动注册。

Although it is not strictly necessary, the post-processor in the example has been given an id so that it can be included in child contexts (for example, as a parent bean definition) and cause all jobs created there to also be registered automatically.

从 5.1 版本开始,@EnableBatchProcessing 注释会在应用程序上下文中自动注册一个 jobRegistryBeanPostProcessor bean。

As of version 5.1, the @EnableBatchProcessing annotation automatically registers a jobRegistryBeanPostProcessor bean in the application context.

JobRegistrySmartInitializingSingleton

这是一个 SmartInitializingSingleton,用于在作业注册表中注册所有单例作业。

This is a SmartInitializingSingleton that registers all singleton jobs within the job registry.

Java

下面的示例显示了如何在 Java 中定义 JobRegistrySmartInitializingSingleton

The following example shows how to define a JobRegistrySmartInitializingSingleton in Java:

Java Configuration
@Bean
public JobRegistrySmartInitializingSingleton jobRegistrySmartInitializingSingleton(JobRegistry jobRegistry) {
    return new JobRegistrySmartInitializingSingleton(jobRegistry);
}
XML

下面的示例显示了如何在 XML 中定义 JobRegistrySmartInitializingSingleton

The following example shows how to define a JobRegistrySmartInitializingSingleton in XML:

XML Configuration
<bean class="org.springframework.batch.core.configuration.support.JobRegistrySmartInitializingSingleton">
    <property name="jobRegistry" ref="jobRegistry" />
</bean>

AutomaticJobRegistrar

这是一个创建子上下文并在其创建时从这些上下文中注册作业的生命周期组件。此操作的一个优点是,虽然子上下文中作业的名称在注册表中仍必须是全局唯一的,但其依赖项可以有“自然”名称。因此,例如,你可以创建一组 XML 配置文件,每个配置文件只有一个 Job,但都对具有相同 bean 名称(例如 reader)的 ItemReader 有不同的定义。如果将所有这些文件都导入同一个上下文中,则 reader 定义将产生冲突并相互覆盖,但是,通过自动 registrar,可以避免这种情况。这样可以更轻松地集成来自应用程序的不同模块的作业。

This is a lifecycle component that creates child contexts and registers jobs from those contexts as they are created. One advantage of doing this is that, while the job names in the child contexts still have to be globally unique in the registry, their dependencies can have “natural” names. So, for example, you can create a set of XML configuration files that each have only one Job but that all have different definitions of an ItemReader with the same bean name, such as reader. If all those files were imported into the same context, the reader definitions would clash and override one another, but, with the automatic registrar, this is avoided. This makes it easier to integrate jobs that have been contributed from separate modules of an application.

Java

以下示例展示了如何在 Java 中为已定义任务包含 AutomaticJobRegistrar

The following example shows how to include the AutomaticJobRegistrar for a job defined in Java:

Java Configuration
@Bean
public AutomaticJobRegistrar registrar() {

    AutomaticJobRegistrar registrar = new AutomaticJobRegistrar();
    registrar.setJobLoader(jobLoader());
    registrar.setApplicationContextFactories(applicationContextFactories());
    registrar.afterPropertiesSet();
    return registrar;

}
XML

以下示例展示了如何在 XML 中为已定义任务包含 AutomaticJobRegistrar

The following example shows how to include the AutomaticJobRegistrar for a job defined in XML:

XML Configuration
<bean class="org.spr...AutomaticJobRegistrar">
   <property name="applicationContextFactories">
      <bean class="org.spr...ClasspathXmlApplicationContextsFactoryBean">
         <property name="resources" value="classpath*:/config/job*.xml" />
      </bean>
   </property>
   <property name="jobLoader">
      <bean class="org.spr...DefaultJobLoader">
         <property name="jobRegistry" ref="jobRegistry" />
      </bean>
   </property>
</bean>

注册器具有两个必备属性:ApplicationContextFactory 数组(在上一个示例中,此数组是通过便捷工厂 Bean 创建的)和 JobLoaderJobLoader 负责管理子上下文的生命周期,并在 JobRegistry 中注册任务。

The registrar has two mandatory properties: an array of ApplicationContextFactory (created from a convenient factory bean in the preceding example) and a JobLoader. The JobLoader is responsible for managing the lifecycle of the child contexts and registering jobs in the JobRegistry.

ApplicationContextFactory 负责创建子上下文。最常见的用法是(如上一个示例所示)使用 ClassPathXmlApplicationContextFactory。此工厂的一个特性是,它在默认情况下会将部分配置从父上下文中复制到子上下文中。因此,例如,不必在子上下文中重新定义 PropertyPlaceholderConfigurer 或 AOP 配置(只要该配置与父上下文中的配置相同即可)。

The ApplicationContextFactory is responsible for creating the child context. The most common usage is (as in the preceding example) to use a ClassPathXmlApplicationContextFactory. One of the features of this factory is that, by default, it copies some of the configuration down from the parent context to the child. So, for instance, you need not redefine the PropertyPlaceholderConfigurer or AOP configuration in the child, provided it should be the same as the parent.

可以将 AutomaticJobRegistrarJobRegistryBeanPostProcessor 结合使用(只要同时使用 DefaultJobLoader 即可)。例如,当除主父上下文中定义了任务外,还可以在子位置中定义任务时,这可能需要用。

You can use AutomaticJobRegistrar in conjunction with a JobRegistryBeanPostProcessor (as long as you also use DefaultJobLoader). For instance, this might be desirable if there are jobs defined in the main parent context as well as in the child locations.

JobOperator

如前所述,JobRepository 在元数据上提供 CRUD 操作,JobExplorer 在元数据上提供只读操作。但是,这些操作在与其它操作结合使用时最有用,可执行停止、重新启动或总结任务等常见监控任务,通常都是由批处理操作员来执行此类操作。Spring Batch 在 JobOperator 接口中提供这些类型的操作:

As previously discussed, the JobRepository provides CRUD operations on the meta-data, and the JobExplorer provides read-only operations on the metadata. However, those operations are most useful when used together to perform common monitoring tasks such as stopping, restarting, or summarizing a Job, as is commonly done by batch operators. Spring Batch provides these types of operations in the JobOperator interface:

public interface JobOperator {

    List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;

    List<Long> getJobInstances(String jobName, int start, int count)
          throws NoSuchJobException;

    Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;

    String getParameters(long executionId) throws NoSuchJobExecutionException;

    Long start(String jobName, String parameters)
          throws NoSuchJobException, JobInstanceAlreadyExistsException;

    Long restart(long executionId)
          throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
                  NoSuchJobException, JobRestartException;

    Long startNextInstance(String jobName)
          throws NoSuchJobException, JobParametersNotFoundException, JobRestartException,
                 JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException;

    boolean stop(long executionId)
          throws NoSuchJobExecutionException, JobExecutionNotRunningException;

    String getSummary(long executionId) throws NoSuchJobExecutionException;

    Map<Long, String> getStepExecutionSummaries(long executionId)
          throws NoSuchJobExecutionException;

    Set<String> getJobNames();

}

前述操作代表来自许多不同接口的方法,例如 JobLauncherJobRepositoryJobExplorerJobRegistry。因此,提供的 JobOperator (SimpleJobOperator) 实现具有许多依赖关系。

The preceding operations represent methods from many different interfaces, such as JobLauncher, JobRepository, JobExplorer, and JobRegistry. For this reason, the provided implementation of JobOperator (SimpleJobOperator) has many dependencies.

Java

以下示例展示了 Java 中 SimpleJobOperator 的典型 Bean 定义:

The following example shows a typical bean definition for SimpleJobOperator in Java:

 /**
  * All injected dependencies for this bean are provided by the @EnableBatchProcessing
  * infrastructure out of the box.
  */
 @Bean
 public SimpleJobOperator jobOperator(JobExplorer jobExplorer,
                                JobRepository jobRepository,
                                JobRegistry jobRegistry,
                                JobLauncher jobLauncher) {

	SimpleJobOperator jobOperator = new SimpleJobOperator();
	jobOperator.setJobExplorer(jobExplorer);
	jobOperator.setJobRepository(jobRepository);
	jobOperator.setJobRegistry(jobRegistry);
	jobOperator.setJobLauncher(jobLauncher);

	return jobOperator;
 }
XML

以下示例展示了 XML 中 SimpleJobOperator 的典型 Bean 定义:

The following example shows a typical bean definition for SimpleJobOperator in XML:

<bean id="jobOperator" class="org.spr...SimpleJobOperator">
    <property name="jobExplorer">
        <bean class="org.spr...JobExplorerFactoryBean">
            <property name="dataSource" ref="dataSource" />
        </bean>
    </property>
    <property name="jobRepository" ref="jobRepository" />
    <property name="jobRegistry" ref="jobRegistry" />
    <property name="jobLauncher" ref="jobLauncher" />
</bean>

从 5.0 版本开始,@EnableBatchProcessing 注释会自动将任务操作程序 Bean 注册到应用程序上下文中。

As of version 5.0, the @EnableBatchProcessing annotation automatically registers a job operator bean in the application context.

如果在作业资料库上设置了表前缀,请记得在作业浏览器上也设置它。

If you set the table prefix on the job repository, do not forget to set it on the job explorer as well.

JobParametersIncrementer

JobOperator 上的大多数方法都是不言自明的,你可以在 Javadoc of the interface 中找到更详细的说明。但是,startNextInstance 方法值得注意。此方法始终启动 Job 的新实例。如果 JobExecution 中出现严重问题且 Job 需要从头开始重新启动,这将非常有用。与 JobLauncher (需要一个触发新 JobInstance 的新 JobParameters 对象)不同,如果参数与前一组参数不同,则 startNextInstance 方法使用 Job 绑定的 JobParametersIncrementer 来强制 Job 为新实例 :

Most of the methods on JobOperator are self-explanatory, and you can find more detailed explanations in the Javadoc of the interface. However, the startNextInstance method is worth noting. This method always starts a new instance of a Job. This can be extremely useful if there are serious issues in a JobExecution and the Job needs to be started over again from the beginning. Unlike JobLauncher (which requires a new JobParameters object that triggers a new JobInstance), if the parameters are different from any previous set of parameters, the startNextInstance method uses the JobParametersIncrementer tied to the Job to force the Job to a new instance:

public interface JobParametersIncrementer {

    JobParameters getNext(JobParameters parameters);

}

JobParametersIncrementer 的契约是给定一个 JobParameters 对象,它将通过增加其中可能包含的任何必要值来返回 “next” JobParameters 对象。此策略很有用,因为框架无法知道对 JobParameters 的哪些更改使其成为 “next” 实例。例如,如果 JobParameters 中的唯一值是日期,并且应该创建下一个实例,那么该值应该增加一天还是一周(例如,如果该作业是每周的)。对于有助于标识 Job 的任何数值也是如此,如下例所示:

The contract of JobParametersIncrementer is that, given a JobParameters object, it returns the “next” JobParameters object by incrementing any necessary values it may contain. This strategy is useful because the framework has no way of knowing what changes to the JobParameters make it the “next” instance. For example, if the only value in JobParameters is a date and the next instance should be created, should that value be incremented by one day or one week (if the job is weekly, for instance)? The same can be said for any numerical values that help to identify the Job, as the following example shows:

public class SampleIncrementer implements JobParametersIncrementer {

    public JobParameters getNext(JobParameters parameters) {
        if (parameters==null || parameters.isEmpty()) {
            return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();
        }
        long id = parameters.getLong("run.id",1L) + 1;
        return new JobParametersBuilder().addLong("run.id", id).toJobParameters();
    }
}

在此示例中,具有键 run.id 的值用于区分 JobInstances。如果传入的 JobParameters 为 null,则可以假定该 Job 之前从未运行过,因此,可以返回其初始状态。但是,如果没有,则获取旧值,将其增加 1,然后返回。

In this example, the value with a key of run.id is used to discriminate between JobInstances. If the JobParameters passed in is null, it can be assumed that the Job has never been run before and, thus, its initial state can be returned. However, if not, the old value is obtained, incremented by one, and returned.

Java

对于 Java 中定义的任务,可以通过构建器中提供的 incrementer 方法将增量器与 Job 关联,如下所示:

For jobs defined in Java, you can associate an incrementer with a Job through the incrementer method provided in the builders, as follows:

@Bean
public Job footballJob(JobRepository jobRepository) {
    return new JobBuilder("footballJob", jobRepository)
    				 .incrementer(sampleIncrementer())
    				 ...
                     .build();
}
XML

对于 XML 中定义的任务,可以通过命名空间中的 incrementer 属性将增量器与 Job 关联,如下所示:

For jobs defined in XML, you can associate an incrementer with a Job through the incrementer attribute in the namespace, as follows:

<job id="footballJob" incrementer="sampleIncrementer">
    ...
</job>

Stopping a Job

JobOperator 最常见的用例之一是正常停止任务:

One of the most common use cases of JobOperator is gracefully stopping a Job:

Set<Long> executions = jobOperator.getRunningExecutions("sampleJob");
jobOperator.stop(executions.iterator().next());

关闭不是立即执行的,因为没有办法强制立即关闭,特别是如果执行目前处于框架无法控制的开发人员代码中(例如业务服务)。但是,只要控制权返回到框架,框架就会将当前 StepExecution 的状态设置为 BatchStatus.STOPPED,将其保存,并在完成之前对 JobExecution 也执行相同的操作。

The shutdown is not immediate, since there is no way to force immediate shutdown, especially if the execution is currently in developer code that the framework has no control over, such as a business service. However, as soon as control is returned back to the framework, it sets the status of the current StepExecution to BatchStatus.STOPPED, saves it, and does the same for the JobExecution before finishing.

Aborting a Job

FAILED 的任务执行可以(如果此 Job 是可重新启动的)重新启动。状态为 ABANDONED 的任务执行无法由框架重新启动。ABANDONED 状态也在步骤执行中使用,用于将其标记为可在重新启动的任务执行中跳过。如果任务正在运行且遇到在前一个失败的任务执行中已标记为 ABANDONED 的步骤,它会移动到下一步(由任务流程定义和步骤执行退出状态确定)。

A job execution that is FAILED can be restarted (if the Job is restartable). A job execution whose status is ABANDONED cannot be restarted by the framework. The ABANDONED status is also used in step executions to mark them as skippable in a restarted job execution. If a job is running and encounters a step that has been marked ABANDONED in the previous failed job execution, it moves on to the next step (as determined by the job flow definition and the step execution exit status).

如果进程已关闭(kill -9 或服务器故障),那么该任务当然没有在运行,但是 JobRepository 无从得知,因为在进程关闭之前没有人告诉它。您必须手动告诉它您知道该执行已失败,或者应被认为已中止(将其状态更改为 FAILEDABANDONED)。这是一个业务决策,没有办法将其自动化。只有在任务可重新启动并且您知道重启数据有效时,才能将状态更改为 FAILED

If the process died (kill -9 or server failure), the job is, of course, not running, but the JobRepository has no way of knowing because no one told it before the process died. You have to tell it manually that you know that the execution either failed or should be considered aborted (change its status to FAILED or ABANDONED). This is a business decision, and there is no way to automate it. Change the status to FAILED only if it is restartable and you know that the restart data is valid.