Launching Batch Jobs through Messages
当使用核心 Spring Batch API 启动批处理作业时,你基本上有两个选择:
When starting batch jobs by using the core Spring Batch API, you basically have two options:
-
From the command line, with the
CommandLineJobRunner
-
Programmatically, with either
JobOperator.start()
orJobLauncher.run()
举例来说,当使用 shell 脚本调用批处理作业时,你可能希望使用 CommandLineJobRunner
。另外,你可以直接使用 JobOperator
(例如,在 Spring Batch 用作 Web 应用程序的一部分时)。然而,更复杂的用例如何?也许你需要轮询远程 (S)FTP 服务器来检索批处理作业的数据,或者你的应用程序必须同时支持多个不同的数据源。例如,你可能不仅从 Web,还从 FTP 和其他来源接收数据文件。在调用 Spring Batch 之前,可能需要对输入文件进行额外的转换。
For example, you may want to use the
CommandLineJobRunner
when invoking batch jobs by
using a shell script. Alternatively, you can use the
JobOperator
directly (for example, when using
Spring Batch as part of a web application). However, what about
more complex use cases? Maybe you need to poll a remote (S)FTP
server to retrieve the data for the Batch Job or your application
has to support multiple different data sources simultaneously. For
example, you may receive data files not only from the web but also from
FTP and other sources. Maybe additional transformation of the input files is
needed before invoking Spring Batch.
因此,使用 Spring Integration 及其众多适配器来执行批处理作业会更强大。例如,你可以使用 File Inbound Channel Adapter 监视文件系统中的目录,并在输入文件到达后立即启动批处理作业。此外,通过仅使用配置,你可以创建使用多种不同适配器轻松地从多个来源同时导入批处理作业数据的 Spring Integration 流程。使用 Spring Integration 实现所有这些场景都很容易,因为它允许对 JobLauncher
进行解耦的、事件驱动的执行。
Therefore, it would be much more powerful to execute the batch job
by using Spring Integration and its numerous adapters. For example,
you can use a File Inbound Channel Adapter to
monitor a directory in the file-system and start the batch job as
soon as the input file arrives. Additionally, you can create Spring
Integration flows that use multiple different adapters to easily
ingest data for your batch jobs from multiple sources
simultaneously by using only configuration. Implementing all these
scenarios with Spring Integration is easy, as it allows for
decoupled, event-driven execution of the
JobLauncher
.
Spring Batch Integration 提供了 JobLaunchingMessageHandler
类,你可以使用它来启动批处理作业。JobLaunchingMessageHandler
的输入由 Spring Integration 消息提供,该消息的有效负载类型为 JobLaunchRequest
。该类是对要启动的 Job
和启动批处理作业所需的 JobParameters
的包装。
Spring Batch Integration provides the
JobLaunchingMessageHandler
class that you can
use to launch batch jobs. The input for the
JobLaunchingMessageHandler
is provided by a
Spring Integration message, which has a payload of type
JobLaunchRequest
. This class is a wrapper around the Job
to be launched and around the JobParameters
that are
necessary to launch the Batch job.
下图显示了启动批处理作业所需的典型 Spring Integration 消息流。 EIP (Enterprise Integration Patterns) website提供了消息图标及其描述的完整概述。
The following image shows the typical Spring Integration message flow that is needed to start a Batch job. The EIP (Enterprise Integration Patterns) website provides a full overview of messaging icons and their descriptions. .Launch Batch Job image::launch-batch-job.png[]
Transforming a File into a JobLaunchRequest
以下示例将文件转换为 JobLaunchRequest
:
The following example transforms a file into a JobLaunchRequest
:
package io.spring.sbi;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;
import java.io.File;
public class FileMessageToJobRequest {
private Job job;
private String fileParameterName;
public void setFileParameterName(String fileParameterName) {
this.fileParameterName = fileParameterName;
}
public void setJob(Job job) {
this.job = job;
}
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder =
new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName,
message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
}
The JobExecution Response
当批处理作业正在执行时,将返回一个 JobExecution
实例。你可以使用此实例来确定执行的状态。如果可以成功创建 JobExecution
,它始终会被返回,无论实际执行是否成功。
When a batch job is being executed, a
JobExecution
instance is returned. You can use this
instance to determine the status of an execution. If
a JobExecution
is able to be created
successfully, it is always returned, regardless of whether
or not the actual execution is successful.
JobExecution`实例如何返回的确切行为取决于所提供的`TaskExecutor
。如果使用 synchronous
(单线程)TaskExecutor`实现,那么只有在作业完成后才会返回`JobExecution`响应。使用 `asynchronous``TaskExecutor`时,`JobExecution`实例会立即返回。然后,您可以获取 `id`的 `JobExecution`实例(带 `JobExecution.getJobId()
),然后使用 `JobExplorer`查询该 `JobRepository`以获取作业的更新状态。有关更多信息,请参阅Querying the Repository。
The exact behavior on how the JobExecution
instance is returned depends on the provided
TaskExecutor
. If a
synchronous
(single-threaded)
TaskExecutor
implementation is used, the
JobExecution
response is returned only
after
the job completes. When using an
asynchronous
TaskExecutor
, the
JobExecution
instance is returned
immediately. You can then take the id
of
JobExecution
instance
(with JobExecution.getJobId()
) and query the
JobRepository
for the job’s updated status
using the JobExplorer
. For more
information, see
Querying the Repository.
Spring Batch Integration Configuration
考虑一种情况,其中有人需要创建文件 inbound-channel-adapter
来侦听提供的目录中的 CSV 文件,将它们交给转换器(FileMessageToJobRequest
),通过作业启动网关启动作业,并使用 logging-channel-adapter
记录 JobExecution
的输出。
Consider a case where someone needs to create a file inbound-channel-adapter
to listen
for CSV files in the provided directory, hand them off to a transformer
(FileMessageToJobRequest
), launch the job through the job launching gateway, and
log the output of the JobExecution
with the logging-channel-adapter
.
- Java
-
以下示例展示了如何在 Java 中配置该常见案例:
The following example shows how that common case can be configured in Java:
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
fileMessageToJobRequest.setJob(personJob());
return fileMessageToJobRequest;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);
return jobLaunchingGateway;
}
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
filter(new SimplePatternFileListFilter("*.csv")),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
transform(fileMessageToJobRequest()).
handle(jobLaunchingGateway).
log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
get();
}
- XML
-
以下示例说明如何在 XML 中配置常见情况:
The following example shows how that common case can be configured in XML:
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<int-file:inbound-channel-adapter id="filePoller"
channel="inboundFileChannel"
directory="file:/tmp/myfiles/"
filename-pattern="*.csv">
<int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>
<int:transformer input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel">
<bean class="io.spring.sbi.FileMessageToJobRequest">
<property name="job" ref="personJob"/>
<property name="fileParameterName" value="input.file.name"/>
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel"/>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
Example ItemReader Configuration
现在我们正在轮询文件并启动作业,我们需要配置我们的 SpringBatch ItemReader
(例如)以使用由名为“input.file.name”的作业参数定义的位置中找到的文件,如下面的 bean 配置所示:
Now that we are polling for files and launching jobs, we need to configure our Spring
Batch ItemReader
(for example) to use the files found at the location defined by the job
parameter called "input.file.name", as the following bean configuration shows:
- Java
-
以下 Java 示例显示了必要的 bean 配置:
The following Java example shows the necessary bean configuration:
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
flatFileItemReader.setResource(new FileSystemResource(resource));
...
return flatFileItemReader;
}
- XML
-
以下 XML 示例显示了必要的 bean 配置:
The following XML example shows the necessary bean configuration:
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
scope="step">
<property name="resource" value="file://#{jobParameters['input.file.name']}"/>
...
</bean>
前面示例中的主要关注点是将 #{jobParameters['input.file.name']}
的值为资源属性值注入,并将 ItemReader
bean 设置为具有步骤范围。将 bean 设置为具有步骤范围利用了延迟绑定支持,这样可以访问 jobParameters
变量。
The main points of interest in the preceding example are injecting the value of
#{jobParameters['input.file.name']}
as the Resource property value and setting the ItemReader
bean
to have step scope. Setting the bean to have step scope takes advantage of
the late binding support, which allows access to the
jobParameters
variable.