Advanced Metadata Usage
-
JobExplorer
:查询存储库以查找已执行的任务。 -
JobRegistry
:跟踪应用程序上下文中可用的任务。 -
JobOperator
:执行任务操作,例如停止、重新启动和总结任务。 -
JobParametersIncrementer
:在每次执行时为任务生成新参数。
这些功能使应用程序能够管理复杂的任务调度和监控需求。
到目前为止,JobLauncher 和 JobRepository 接口都已被讨论过。它们共同表示对任务的简单启动和批处理域对象的 CRUD 基本操作:
So far, both the JobLauncher
and JobRepository
interfaces have been
discussed. Together, they represent the simple launching of a job and basic
CRUD operations of batch domain objects:
.Job Repository
image::job-repository.png[]
JobLauncher 使用 JobRepository 创建新的 JobExecution 对象并运行它们。在运行一个“任务”期间,“任务”和“步骤”实现随后使用相同的 JobRepository 进行相同执行的基本更新。基本操作足以满足简单场景。然而,在具有数百个批处理作业和复杂调度需求的大型批处理环境中,需要高级访问元数据:
A JobLauncher
uses the
JobRepository
to create new
JobExecution
objects and run them.
Job
and Step
implementations
later use the same JobRepository
for basic updates
of the same executions during the running of a Job
.
The basic operations suffice for simple scenarios. However, in a large batch
environment with hundreds of batch jobs and complex scheduling
requirements, more advanced access to the metadata is required:
.Advanced Job Repository Access
image::job-repository-advanced.png[]
在即将讨论的章节中讨论的 JobExplorer
和 JobOperator
接口添加了其他功能以查询和控制元数据。
The JobExplorer
and
JobOperator
interfaces, which are discussed
in the coming sections, add additional functionality for querying and controlling the metadata.
Querying the Repository
在使用任何高级功能之前,最基本的需求是能够查询存储库以查找已有的执行。此功能由 JobExplorer
接口提供:
The most basic need before any advanced features is the ability to
query the repository for existing executions. This functionality is
provided by the JobExplorer
interface:
public interface JobExplorer {
List<JobInstance> getJobInstances(String jobName, int start, int count);
JobExecution getJobExecution(Long executionId);
StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);
JobInstance getJobInstance(Long instanceId);
List<JobExecution> getJobExecutions(JobInstance jobInstance);
Set<JobExecution> findRunningJobExecutions(String jobName);
}
正如从其方法签名中显而易见的那样,JobExplorer
是 JobRepository
的只读版本,并且与 JobRepository
一样,它可以通过使用工厂 Bean 轻松配置。
As is evident from its method signatures, JobExplorer
is a read-only version of
the JobRepository
, and, like the JobRepository
, it can be easily configured by using a
factory bean.
- Java
-
下面的示例显示了如何在 Java 中配置
JobExplorer
:
The following example shows how to configure a JobExplorer
in Java:
...
// This would reside in your DefaultBatchConfiguration extension
@Bean
public JobExplorer jobExplorer() throws Exception {
JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
factoryBean.setDataSource(this.dataSource);
return factoryBean.getObject();
}
...
- XML
-
下面的示例显示了如何在 XML 中配置
JobExplorer
:
The following example shows how to configure a JobExplorer
in XML:
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
p:dataSource-ref="dataSource" />
Earlier in this chapter 中,我们注意到您可以修改 JobRepository
的表前缀以允许使用不同版本或架构。由于 JobExplorer
用于相同表中,因此它还需要设置前缀的功能。
Earlier in this chapter, we noted that you can modify the table prefix
of the JobRepository
to allow for different versions or schemas. Because
the JobExplorer
works with the same tables, it also needs the ability to set a prefix.
- Java
-
下面的示例显示了如何为 Java 中的
JobExplorer
设置表前缀:
The following example shows how to set the table prefix for a JobExplorer
in Java:
...
// This would reside in your DefaultBatchConfiguration extension
@Bean
public JobExplorer jobExplorer() throws Exception {
JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
factoryBean.setDataSource(this.dataSource);
factoryBean.setTablePrefix("SYSTEM.");
return factoryBean.getObject();
}
...
- XML
-
下面的示例显示了如何在 XML 中为
JobExplorer
设置表前缀:
The following example shows how to set the table prefix for a JobExplorer
in XML:
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
p:tablePrefix="SYSTEM."/>
JobRegistry
虽然 JobRegistry
(及其父接口 JobLocator
)并不是必需的,但如果你想要跟踪在上下文中可以使用的作业,那它可能很有用。它还用于在创建作业后(例如,在子上下文中)将其集中收集到应用程序上下文中也很有用。你还可以使用自定义 JobRegistry
实现来操作已注册作业的名称和其他属性。该框架只提供一种实现,该实现基于一个从作业名称到作业实例的简单映射。
A JobRegistry
(and its parent interface, JobLocator
) is not mandatory, but it can be
useful if you want to keep track of which jobs are available in the context. It is also
useful for collecting jobs centrally in an application context when they have been created
elsewhere (for example, in child contexts). You can also use custom JobRegistry
implementations
to manipulate the names and other properties of the jobs that are registered.
There is only one implementation provided by the framework and this is based on a simple
map from job name to job instance.
- Java
-
当你使用
@EnableBatchProcessing
时,会为你提供一个JobRegistry
。下面的示例显示了如何配置你自己的JobRegistry
:
When using @EnableBatchProcessing
, a JobRegistry
is provided for you.
The following example shows how to configure your own JobRegistry
:
...
// This is already provided via the @EnableBatchProcessing but can be customized via
// overriding the bean in the DefaultBatchConfiguration
@Override
@Bean
public JobRegistry jobRegistry() throws Exception {
return new MapJobRegistry();
}
...
- XML
-
下面的示例显示了如何为 XML 中定义的作业包含
JobRegistry
:
The following example shows how to include a JobRegistry
for a job defined in XML:
<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" />
你可以通过以下方式之一填充 JobRegistry
:使用 bean 后置处理器、使用智能初始化单例或使用 registrar 生命周期组件。即将到来的部分将介绍这些机制。
You can populate a JobRegistry
in one of the following ways: by using
a bean post processor, or by using a smart initializing singleton or by using
a registrar lifecycle component. The coming sections describe these mechanisms.
JobRegistryBeanPostProcessor
这是一个 bean 后置处理器,可以在创建时注册所有作业。
This is a bean post-processor that can register all jobs as they are created.
- Java
-
下面的示例显示了如何为 Java 中定义的作业包括
JobRegistryBeanPostProcessor
:
The following example shows how to include the JobRegistryBeanPostProcessor
for a job
defined in Java:
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
postProcessor.setJobRegistry(jobRegistry);
return postProcessor;
}
- XML
-
下面的示例显示了如何为 XML 中定义的作业包括
JobRegistryBeanPostProcessor
:
The following example shows how to include the JobRegistryBeanPostProcessor
for a job
defined in XML:
<bean id="jobRegistryBeanPostProcessor" class="org.spr...JobRegistryBeanPostProcessor">
<property name="jobRegistry" ref="jobRegistry"/>
</bean>
虽然这不是绝对必要的,但示例中的后置处理器已经给出了一个 id
,以便可以包含在子上下文中(例如,作为父 bean 定义),并导致在那里创建的所有作业也能自动注册。
Although it is not strictly necessary, the post-processor in the
example has been given an id
so that it can be included in child
contexts (for example, as a parent bean definition) and cause all jobs created
there to also be registered automatically.
从 5.1 版本开始,@EnableBatchProcessing
注释会在应用程序上下文中自动注册一个 jobRegistryBeanPostProcessor
bean。
As of version 5.1, the @EnableBatchProcessing
annotation automatically registers a jobRegistryBeanPostProcessor
bean in the application context.
JobRegistrySmartInitializingSingleton
这是一个 SmartInitializingSingleton
,用于在作业注册表中注册所有单例作业。
This is a SmartInitializingSingleton
that registers all singleton jobs within the job registry.
- Java
-
下面的示例显示了如何在 Java 中定义
JobRegistrySmartInitializingSingleton
:
The following example shows how to define a JobRegistrySmartInitializingSingleton
in Java:
@Bean
public JobRegistrySmartInitializingSingleton jobRegistrySmartInitializingSingleton(JobRegistry jobRegistry) {
return new JobRegistrySmartInitializingSingleton(jobRegistry);
}
- XML
-
下面的示例显示了如何在 XML 中定义
JobRegistrySmartInitializingSingleton
:
The following example shows how to define a JobRegistrySmartInitializingSingleton
in XML:
<bean class="org.springframework.batch.core.configuration.support.JobRegistrySmartInitializingSingleton">
<property name="jobRegistry" ref="jobRegistry" />
</bean>
AutomaticJobRegistrar
这是一个创建子上下文并在其创建时从这些上下文中注册作业的生命周期组件。此操作的一个优点是,虽然子上下文中作业的名称在注册表中仍必须是全局唯一的,但其依赖项可以有“自然”名称。因此,例如,你可以创建一组 XML 配置文件,每个配置文件只有一个 Job,但都对具有相同 bean 名称(例如 reader
)的 ItemReader
有不同的定义。如果将所有这些文件都导入同一个上下文中,则 reader 定义将产生冲突并相互覆盖,但是,通过自动 registrar,可以避免这种情况。这样可以更轻松地集成来自应用程序的不同模块的作业。
This is a lifecycle component that creates child contexts and registers jobs from those
contexts as they are created. One advantage of doing this is that, while the job names in
the child contexts still have to be globally unique in the registry, their dependencies
can have “natural” names. So, for example, you can create a set of XML configuration files
that each have only one Job but that all have different definitions of an ItemReader
with the
same bean name, such as reader
. If all those files were imported into the same context,
the reader definitions would clash and override one another, but, with the automatic
registrar, this is avoided. This makes it easier to integrate jobs that have been contributed from
separate modules of an application.
- Java
-
以下示例展示了如何在 Java 中为已定义任务包含
AutomaticJobRegistrar
:
The following example shows how to include the AutomaticJobRegistrar
for a job defined
in Java:
@Bean
public AutomaticJobRegistrar registrar() {
AutomaticJobRegistrar registrar = new AutomaticJobRegistrar();
registrar.setJobLoader(jobLoader());
registrar.setApplicationContextFactories(applicationContextFactories());
registrar.afterPropertiesSet();
return registrar;
}
- XML
-
以下示例展示了如何在 XML 中为已定义任务包含
AutomaticJobRegistrar
:
The following example shows how to include the AutomaticJobRegistrar
for a job defined
in XML:
<bean class="org.spr...AutomaticJobRegistrar">
<property name="applicationContextFactories">
<bean class="org.spr...ClasspathXmlApplicationContextsFactoryBean">
<property name="resources" value="classpath*:/config/job*.xml" />
</bean>
</property>
<property name="jobLoader">
<bean class="org.spr...DefaultJobLoader">
<property name="jobRegistry" ref="jobRegistry" />
</bean>
</property>
</bean>
注册器具有两个必备属性:ApplicationContextFactory
数组(在上一个示例中,此数组是通过便捷工厂 Bean 创建的)和 JobLoader
。JobLoader
负责管理子上下文的生命周期,并在 JobRegistry
中注册任务。
The registrar has two mandatory properties: an array of
ApplicationContextFactory
(created from a
convenient factory bean in the preceding example) and a
JobLoader
. The JobLoader
is responsible for managing the lifecycle of the child contexts and
registering jobs in the JobRegistry
.
ApplicationContextFactory
负责创建子上下文。最常见的用法是(如上一个示例所示)使用 ClassPathXmlApplicationContextFactory
。此工厂的一个特性是,它在默认情况下会将部分配置从父上下文中复制到子上下文中。因此,例如,不必在子上下文中重新定义 PropertyPlaceholderConfigurer
或 AOP 配置(只要该配置与父上下文中的配置相同即可)。
The ApplicationContextFactory
is
responsible for creating the child context. The most common usage
is (as in the preceding example) to use a
ClassPathXmlApplicationContextFactory
. One of
the features of this factory is that, by default, it copies some of the
configuration down from the parent context to the child. So, for
instance, you need not redefine the
PropertyPlaceholderConfigurer
or AOP
configuration in the child, provided it should be the same as the
parent.
可以将 AutomaticJobRegistrar
与 JobRegistryBeanPostProcessor
结合使用(只要同时使用 DefaultJobLoader
即可)。例如,当除主父上下文中定义了任务外,还可以在子位置中定义任务时,这可能需要用。
You can use AutomaticJobRegistrar
in
conjunction with a JobRegistryBeanPostProcessor
(as long as you also use DefaultJobLoader
).
For instance, this might be desirable if there are jobs
defined in the main parent context as well as in the child
locations.
JobOperator
如前所述,JobRepository
在元数据上提供 CRUD 操作,JobExplorer
在元数据上提供只读操作。但是,这些操作在与其它操作结合使用时最有用,可执行停止、重新启动或总结任务等常见监控任务,通常都是由批处理操作员来执行此类操作。Spring Batch 在 JobOperator
接口中提供这些类型的操作:
As previously discussed, the JobRepository
provides CRUD operations on the meta-data, and the
JobExplorer
provides read-only operations on the
metadata. However, those operations are most useful when used together
to perform common monitoring tasks such as stopping, restarting, or
summarizing a Job, as is commonly done by batch operators. Spring Batch
provides these types of operations in the
JobOperator
interface:
public interface JobOperator {
List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;
List<Long> getJobInstances(String jobName, int start, int count)
throws NoSuchJobException;
Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;
String getParameters(long executionId) throws NoSuchJobExecutionException;
Long start(String jobName, String parameters)
throws NoSuchJobException, JobInstanceAlreadyExistsException;
Long restart(long executionId)
throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
NoSuchJobException, JobRestartException;
Long startNextInstance(String jobName)
throws NoSuchJobException, JobParametersNotFoundException, JobRestartException,
JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException;
boolean stop(long executionId)
throws NoSuchJobExecutionException, JobExecutionNotRunningException;
String getSummary(long executionId) throws NoSuchJobExecutionException;
Map<Long, String> getStepExecutionSummaries(long executionId)
throws NoSuchJobExecutionException;
Set<String> getJobNames();
}
前述操作代表来自许多不同接口的方法,例如 JobLauncher
、JobRepository
、JobExplorer
和 JobRegistry
。因此,提供的 JobOperator
(SimpleJobOperator
) 实现具有许多依赖关系。
The preceding operations represent methods from many different interfaces, such as
JobLauncher
, JobRepository
, JobExplorer
, and JobRegistry
. For this reason, the
provided implementation of JobOperator
(SimpleJobOperator
) has many dependencies.
- Java
-
以下示例展示了 Java 中
SimpleJobOperator
的典型 Bean 定义:
The following example shows a typical bean definition for SimpleJobOperator
in Java:
/**
* All injected dependencies for this bean are provided by the @EnableBatchProcessing
* infrastructure out of the box.
*/
@Bean
public SimpleJobOperator jobOperator(JobExplorer jobExplorer,
JobRepository jobRepository,
JobRegistry jobRegistry,
JobLauncher jobLauncher) {
SimpleJobOperator jobOperator = new SimpleJobOperator();
jobOperator.setJobExplorer(jobExplorer);
jobOperator.setJobRepository(jobRepository);
jobOperator.setJobRegistry(jobRegistry);
jobOperator.setJobLauncher(jobLauncher);
return jobOperator;
}
- XML
-
以下示例展示了 XML 中
SimpleJobOperator
的典型 Bean 定义:
The following example shows a typical bean definition for SimpleJobOperator
in XML:
<bean id="jobOperator" class="org.spr...SimpleJobOperator">
<property name="jobExplorer">
<bean class="org.spr...JobExplorerFactoryBean">
<property name="dataSource" ref="dataSource" />
</bean>
</property>
<property name="jobRepository" ref="jobRepository" />
<property name="jobRegistry" ref="jobRegistry" />
<property name="jobLauncher" ref="jobLauncher" />
</bean>
从 5.0 版本开始,@EnableBatchProcessing
注释会自动将任务操作程序 Bean 注册到应用程序上下文中。
As of version 5.0, the @EnableBatchProcessing
annotation automatically registers a job operator bean
in the application context.
如果在作业资料库上设置了表前缀,请记得在作业浏览器上也设置它。 |
If you set the table prefix on the job repository, do not forget to set it on the job explorer as well. |
JobParametersIncrementer
JobOperator
上的大多数方法都是不言自明的,你可以在 Javadoc of the interface 中找到更详细的说明。但是,startNextInstance
方法值得注意。此方法始终启动 Job
的新实例。如果 JobExecution
中出现严重问题且 Job
需要从头开始重新启动,这将非常有用。与 JobLauncher
(需要一个触发新 JobInstance
的新 JobParameters
对象)不同,如果参数与前一组参数不同,则 startNextInstance
方法使用 Job
绑定的 JobParametersIncrementer
来强制 Job
为新实例 :
Most of the methods on JobOperator
are
self-explanatory, and you can find more detailed explanations in the
Javadoc of the interface. However, the
startNextInstance
method is worth noting. This
method always starts a new instance of a Job
.
This can be extremely useful if there are serious issues in a
JobExecution
and the Job
needs to be started over again from the beginning. Unlike
JobLauncher
(which requires a new
JobParameters
object that triggers a new
JobInstance
), if the parameters are different from
any previous set of parameters, the
startNextInstance
method uses the
JobParametersIncrementer
tied to the
Job
to force the Job
to a
new instance:
public interface JobParametersIncrementer {
JobParameters getNext(JobParameters parameters);
}
JobParametersIncrementer
的契约是给定一个 JobParameters 对象,它将通过增加其中可能包含的任何必要值来返回 “next” JobParameters
对象。此策略很有用,因为框架无法知道对 JobParameters
的哪些更改使其成为 “next” 实例。例如,如果 JobParameters
中的唯一值是日期,并且应该创建下一个实例,那么该值应该增加一天还是一周(例如,如果该作业是每周的)。对于有助于标识 Job
的任何数值也是如此,如下例所示:
The contract of JobParametersIncrementer
is
that, given a JobParameters
object, it returns the “next” JobParameters
object by incrementing any necessary values it may contain. This
strategy is useful because the framework has no way of knowing what
changes to the JobParameters
make it the “next”
instance. For example, if the only value in
JobParameters
is a date and the next instance
should be created, should that value be incremented by one day or one
week (if the job is weekly, for instance)? The same can be said for any
numerical values that help to identify the Job
,
as the following example shows:
public class SampleIncrementer implements JobParametersIncrementer {
public JobParameters getNext(JobParameters parameters) {
if (parameters==null || parameters.isEmpty()) {
return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();
}
long id = parameters.getLong("run.id",1L) + 1;
return new JobParametersBuilder().addLong("run.id", id).toJobParameters();
}
}
在此示例中,具有键 run.id
的值用于区分 JobInstances
。如果传入的 JobParameters
为 null,则可以假定该 Job
之前从未运行过,因此,可以返回其初始状态。但是,如果没有,则获取旧值,将其增加 1,然后返回。
In this example, the value with a key of run.id
is used to
discriminate between JobInstances
. If the
JobParameters
passed in is null, it can be
assumed that the Job
has never been run before
and, thus, its initial state can be returned. However, if not, the old
value is obtained, incremented by one, and returned.
- Java
-
对于 Java 中定义的任务,可以通过构建器中提供的
incrementer
方法将增量器与Job
关联,如下所示:
For jobs defined in Java, you can associate an incrementer with a Job
through the
incrementer
method provided in the builders, as follows:
@Bean
public Job footballJob(JobRepository jobRepository) {
return new JobBuilder("footballJob", jobRepository)
.incrementer(sampleIncrementer())
...
.build();
}
- XML
-
对于 XML 中定义的任务,可以通过命名空间中的
incrementer
属性将增量器与Job
关联,如下所示:
For jobs defined in XML, you can associate an incrementer with a Job
through the
incrementer
attribute in the namespace, as follows:
<job id="footballJob" incrementer="sampleIncrementer">
...
</job>
Stopping a Job
JobOperator
最常见的用例之一是正常停止任务:
One of the most common use cases of
JobOperator
is gracefully stopping a
Job:
Set<Long> executions = jobOperator.getRunningExecutions("sampleJob");
jobOperator.stop(executions.iterator().next());
关闭不是立即执行的,因为没有办法强制立即关闭,特别是如果执行目前处于框架无法控制的开发人员代码中(例如业务服务)。但是,只要控制权返回到框架,框架就会将当前 StepExecution
的状态设置为 BatchStatus.STOPPED
,将其保存,并在完成之前对 JobExecution
也执行相同的操作。
The shutdown is not immediate, since there is no way to force
immediate shutdown, especially if the execution is currently in
developer code that the framework has no control over, such as a
business service. However, as soon as control is returned back to the
framework, it sets the status of the current
StepExecution
to
BatchStatus.STOPPED
, saves it, and does the same
for the JobExecution
before finishing.
Aborting a Job
FAILED
的任务执行可以(如果此 Job
是可重新启动的)重新启动。状态为 ABANDONED
的任务执行无法由框架重新启动。ABANDONED
状态也在步骤执行中使用,用于将其标记为可在重新启动的任务执行中跳过。如果任务正在运行且遇到在前一个失败的任务执行中已标记为 ABANDONED
的步骤,它会移动到下一步(由任务流程定义和步骤执行退出状态确定)。
A job execution that is FAILED
can be
restarted (if the Job
is restartable). A job execution whose status is
ABANDONED
cannot be restarted by the framework.
The ABANDONED
status is also used in step
executions to mark them as skippable in a restarted job execution. If a
job is running and encounters a step that has been marked
ABANDONED
in the previous failed job execution, it
moves on to the next step (as determined by the job flow definition
and the step execution exit status).
如果进程已关闭(kill -9
或服务器故障),那么该任务当然没有在运行,但是 JobRepository
无从得知,因为在进程关闭之前没有人告诉它。您必须手动告诉它您知道该执行已失败,或者应被认为已中止(将其状态更改为 FAILED
或 ABANDONED
)。这是一个业务决策,没有办法将其自动化。只有在任务可重新启动并且您知道重启数据有效时,才能将状态更改为 FAILED
。
If the process died (kill -9
or server
failure), the job is, of course, not running, but the JobRepository
has
no way of knowing because no one told it before the process died. You
have to tell it manually that you know that the execution either failed
or should be considered aborted (change its status to
FAILED
or ABANDONED
). This is
a business decision, and there is no way to automate it. Change the
status to FAILED
only if it is restartable and you know that the restart data is valid.