Sub-elements
-
异步处理器:将项目处理外包到新线程,提高性能。
-
远程区块:将块分发到外部系统以进行处理,从而卸载复杂的处理。
-
远程分区:将整个步骤发送到远程工作程序执行,以处理 I/O 瓶颈。
-
事件驱动侦听器:允许基于特定事件触发动作。
通过将 Spring Batch 与 Spring Integration 集成,您可以充分利用事件处理、异步处理和分布式处理的优势,从而创建强大且可扩展的批处理解决方案。
当此 Gateway
从`PollableChannel` 接收消息时,您必须指定一个全局默认 Poller
或为`Job Launching Gateway` 提供一个 Poller
子元素。
When this Gateway
is receiving messages from a
PollableChannel
, you must either provide
a global default Poller
or provide a Poller
sub-element to the
Job Launching Gateway
.
- Java
-
以下示例展示了如何用 Java 提供 poller:
The following example shows how to provide a poller in Java:
@Bean
@ServiceActivator(inputChannel = "queueChannel", poller = @Poller(fixedRate="1000"))
public JobLaunchingGateway sampleJobLaunchingGateway() {
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
jobLaunchingGateway.setOutputChannel(replyChannel());
return jobLaunchingGateway;
}
- XML
-
以下示例展示了如何用 XML 提供 poller:
The following example shows how to provide a poller in XML:
<batch-int:job-launching-gateway request-channel="queueChannel"
reply-channel="replyChannel" job-launcher="jobLauncher">
<int:poller fixed-rate="1000">
</batch-int:job-launching-gateway>
Providing Feedback with Informational Messages
Spring Batch 作业可以长时间运行,通常提供进度信息至关重要。例如,如果某个批处理作业的部分或全部失败,利益相关者可能希望收到通知。Spring Batch 通过以下方式提供收集此信息的机制:
As Spring Batch jobs can run for long times, providing progress information is often critical. For example, stakeholders may want to be notified if some or all parts of a batch job have failed. Spring Batch provides support for this information being gathered through:
-
Active polling
-
Event-driven listeners
异步启动 Spring Batch 作业时(例如,使用 Job LaunchingGateway),将返回 JobExecution
实例。因此,您可以使用 JobExecution.getJobId()
通过从 JobRepository
检索 JobExecution
的更新实例来持续轮询状态更新。但是,这被认为不是最佳方案,建议采用事件驱动方法。
When starting a Spring Batch job asynchronously (for example, by using the Job Launching
Gateway), a JobExecution
instance is returned. Thus, you can use JobExecution.getJobId()
to continuously poll for status updates by retrieving updated instances of the
JobExecution
from the JobRepository
by using the JobExplorer
. However, this is
considered sub-optimal, and an event-driven approach is preferred.
因此,Spring Batch 提供侦听器,包括三个最常用的侦听器:
Therefore, Spring Batch provides listeners, including the three most commonly used listeners:
-
StepListener
-
ChunkListener
-
JobExecutionListener
在以下图像中所示的示例中,Spring Batch 作业已配置了一个 StepExecutionListener
。因此,Spring Integration 会接收和处理任何步骤,事件发生前或事件发生后。例如,您可以使用 Router
检查接收到的 StepExecution
。基于该检查的结果,可能会发生各种情况(例如,将消息路由到邮件出站通道适配器),以便可以根据某些条件发送电子邮件通知。
In the example shown in the following image, a Spring Batch job has been configured with a
StepExecutionListener
. Thus, Spring Integration receives and processes any step before
or after events. For example, you can inspect the received StepExecution
by using a
Router
. Based on the results of that inspection, various things can occur (such as
routing a message to a mail outbound channel adapter), so that an email notification can
be sent out based on some condition.
以下两部分示例展示了如何为 StepExecution
事件配置侦听器,以便向 Gateway
发送消息并将其输出日志记录到 logging-channel-adapter
。
The following two-part example shows how a listener is configured to send a
message to a Gateway
for a StepExecution
events and log its output to a
logging-channel-adapter
.
首先,创建通知集成 bean。
First, create the notification integration beans.
- Java
-
以下示例展示了如何在 Java 中创建通知集成 bean:
The following example shows the how to create the notification integration beans in Java:
@Bean
@ServiceActivator(inputChannel = "stepExecutionsChannel")
public LoggingHandler loggingHandler() {
LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.WARN);
adapter.setLoggerName("TEST_LOGGER");
adapter.setLogExpressionString("headers.id + ': ' + payload");
return adapter;
}
@MessagingGateway(name = "notificationExecutionsListener", defaultRequestChannel = "stepExecutionsChannel")
public interface NotificationExecutionListener extends StepExecutionListener {}
您需要向配置中添加 @IntegrationComponentScan
注释。
You need to add the @IntegrationComponentScan
annotation to your configuration.
- XML
-
以下示例展示了如何在 XML 中创建通知集成 bean:
The following example shows the how to create the notification integration beans in XML:
<int:channel id="stepExecutionsChannel"/>
<int:gateway id="notificationExecutionsListener"
service-interface="org.springframework.batch.core.StepExecutionListener"
default-request-channel="stepExecutionsChannel"/>
<int:logging-channel-adapter channel="stepExecutionsChannel"/>
第二,修改作业以添加步骤级侦听器。
Second, modify your job to add a step-level listener.
- Java
-
以下示例展示了如何在 Java 中添加步骤级侦听器:
The following example shows the how to add a step-level listener in Java:
public Job importPaymentsJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("importPayments", jobRepository)
.start(new StepBuilder("step1", jobRepository)
.chunk(200, transactionManager)
.listener(notificationExecutionsListener())
// ...
.build();
)
.build();
}
- XML
-
以下示例展示了如何在 XML 中添加步骤级侦听器:
The following example shows the how to add a step-level listener in XML:
<job id="importPayments">
<step id="step1">
<tasklet ../>
<chunk ../>
<listeners>
<listener ref="notificationExecutionsListener"/>
</listeners>
</tasklet>
...
</step>
</job>
Asynchronous Processors
异步处理器帮助您扩展项目的处理。在异步处理器用例中,AsyncItemProcessor
用作调度器,在一个新线程上为项目执行 ItemProcessor
的逻辑。项目完成时,将 Future
传递给 AsynchItemWriter
以进行写入。
Asynchronous Processors help you scale the processing of items. In the asynchronous
processor use case, an AsyncItemProcessor
serves as a dispatcher, executing the logic of
the ItemProcessor
for an item on a new thread. Once the item completes, the Future
is
passed to the AsynchItemWriter
to be written.
因此,你可以通过使用异步项目处理来提高性能,基本上允许你实现 fork-join 场景。 AsyncItemWriter
收集结果,并在所有结果均可用之后立即回写区块。
Therefore, you can increase performance by using asynchronous item processing, basically
letting you implement fork-join scenarios. The AsyncItemWriter
gathers the results and
writes back the chunk as soon as all the results become available.
- Java
-
以下示例展示了如何在 Java 中配置
AsyncItemProcessor
:
The following example shows how to configuration the AsyncItemProcessor
in Java:
@Bean
public AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) {
AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();
asyncItemProcessor.setTaskExecutor(taskExecutor);
asyncItemProcessor.setDelegate(itemProcessor);
return asyncItemProcessor;
}
- XML
-
以下示例展示了如何在 XML 中配置
AsyncItemProcessor
:
The following example shows how to configuration the AsyncItemProcessor
in XML:
<bean id="processor"
class="org.springframework.batch.integration.async.AsyncItemProcessor">
<property name="delegate">
<bean class="your.ItemProcessor"/>
</property>
<property name="taskExecutor">
<bean class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
</property>
</bean>
delegate
属性指代你的 ItemProcessor
bean,taskExecutor
属性指代你选择的 TaskExecutor
。
The delegate
property refers to your ItemProcessor
bean, and the taskExecutor
property refers to the TaskExecutor
of your choice.
- Java
-
以下示例展示了如何在 Java 中配置
AsyncItemWriter
:
The following example shows how to configure the AsyncItemWriter
in Java:
@Bean
public AsyncItemWriter writer(ItemWriter itemWriter) {
AsyncItemWriter asyncItemWriter = new AsyncItemWriter();
asyncItemWriter.setDelegate(itemWriter);
return asyncItemWriter;
}
- XML
-
以下示例展示了如何在 XML 中配置
AsyncItemWriter
:
The following example shows how to configure the AsyncItemWriter
in XML:
<bean id="itemWriter"
class="org.springframework.batch.integration.async.AsyncItemWriter">
<property name="delegate">
<bean id="itemWriter" class="your.ItemWriter"/>
</property>
</bean>
同样,delegate
属性实际上指的是你的 ItemWriter
bean。
Again, the delegate
property is
actually a reference to your ItemWriter
bean.
Externalizing Batch Process Execution
到目前为止,讨论的集成方法表明了一些用例,其中 Spring Integration 像外壳一样包装着 Spring Batch。但是,Spring Batch 也可以在内部使用 Spring Integration。通过使用这种方法,Spring Batch 用户可以将项目甚至区块的处理委派给外部流程。这允许你卸载复杂的处理。Spring Batch Integration 为以下内容提供专门支持:
The integration approaches discussed so far suggest use cases where Spring Integration wraps Spring Batch like an outer shell. However, Spring Batch can also use Spring Integration internally. By using this approach, Spring Batch users can delegate the processing of items or even chunks to outside processes. This lets you offload complex processing. Spring Batch Integration provides dedicated support for:
-
Remote Chunking
-
Remote Partitioning
Remote Chunking
下图展示了当你将 Spring Batch 与 Spring Integration 搭配使用时,远程区块处理的一种方式:
The following image shows one way that remote chunking works when you use Spring Batch together with Spring Integration:
更进一步,你还可以使用 ChunkMessageChannelItemWriter
(由 Spring Batch Integration 提供)外化区块处理,该 ChunkMessageChannelItemWriter
将项目发送出去并收集结果。一旦发送,Spring Batch 就会继续读取和分组项目的流程,而无需等待结果。相反,由 ChunkMessageChannelItemWriter
负责收集结果,并将其集成回 Spring Batch 流程中。
Taking things one step further, you can also externalize the
chunk processing by using the
ChunkMessageChannelItemWriter
(provided by Spring Batch Integration), which sends items out
and collects the result. Once sent, Spring Batch continues the
process of reading and grouping items, without waiting for the results.
Rather, it is the responsibility of the ChunkMessageChannelItemWriter
to gather the results and integrate them back into the Spring Batch process.
借助 Spring Integration,你可以完全控制你的流程的并发性(例如,通过使用 QueueChannel
而不是 DirectChannel
)。此外,通过依赖 Spring Integration 丰富的通道适配器集合(例如,JMS 和 AMQP),你可以将批处理作业的区块分发到外部系统以进行处理。
With Spring Integration, you have full
control over the concurrency of your processes (for instance, by
using a QueueChannel
instead of a
DirectChannel
). Furthermore, by relying on
Spring Integration’s rich collection of channel adapters (such as
JMS and AMQP), you can distribute chunks of a batch job to
external systems for processing.
- Java
-
一个包含一个要远程切分的步骤的作业可能在 Java 中具有类似于以下内容的配置:
A job with a step to be remotely chunked might have a configuration similar to the following in Java:
public Job chunkJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("personJob", jobRepository)
.start(new StepBuilder("step1", jobRepository)
.<Person, Person>chunk(200, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.build())
.build();
}
- XML
-
一个包含一个要远程切分的步骤的作业可能在 XML 中具有类似于以下内容的配置:
A job with a step to be remotely chunked might have a configuration similar to the following in XML:
<job id="personJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="200"/>
</tasklet>
...
</step>
</job>
ItemReader
引用指向要用于在管理器上读取数据的 bean。ItemWriter
引用指向特殊的 ItemWriter
(称为 ChunkMessageChannelItemWriter
),如前所述。处理器(如果存在)则不属于管理器配置,因为它是配置在 worker 上的。在实现用例时,您应检查任何其他组件属性,例如节流限制等。
The ItemReader
reference points to the bean you want to use for reading data on the
manager. The ItemWriter
reference points to a special ItemWriter
(called
ChunkMessageChannelItemWriter
), as described earlier. The processor (if any) is left off
the manager configuration, as it is configured on the worker. You should check any
additional component properties, such as throttle limits and so on, when implementing
your use case.
- Java
-
下列 Java 配置提供基本管理器设置:
The following Java configuration provides a basic manager setup:
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://localhost:61616");
return factory;
}
/*
* Configure outbound flow (requests going to workers)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(requests())
.handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
.get();
}
/*
* Configure inbound flow (replies coming from workers)
*/
@Bean
public QueueChannel replies() {
return new QueueChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
.channel(replies())
.get();
}
/*
* Configure the ChunkMessageChannelItemWriter
*/
@Bean
public ItemWriter<Integer> itemWriter() {
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setDefaultChannel(requests());
messagingTemplate.setReceiveTimeout(2000);
ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter
= new ChunkMessageChannelItemWriter<>();
chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
chunkMessageChannelItemWriter.setReplyChannel(replies());
return chunkMessageChannelItemWriter;
}
- XML
-
下列 XML 配置提供基本管理器设置:
The following XML configuration provides a basic manager setup:
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<int-jms:outbound-channel-adapter id="jmsRequests" destination-name="requests"/>
<bean id="messagingTemplate"
class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="requests"/>
<property name="receiveTimeout" value="2000"/>
</bean>
<bean id="itemWriter"
class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
scope="step">
<property name="messagingOperations" ref="messagingTemplate"/>
<property name="replyChannel" ref="replies"/>
</bean>
<int:channel id="replies">
<int:queue/>
</int:channel>
<int-jms:message-driven-channel-adapter id="jmsReplies"
destination-name="replies"
channel="replies"/>
上述配置为我们提供了许多 bean。我们使用 ActiveMQ 和 Spring Integration 提供的入站和出站 JMS 适配器配置我们的消息中间件。如所示,我们的 itemWriter
bean(我们的作业步骤引用它)使用 ChunkMessageChannelItemWriter
通过配置的中间件写入块。
The preceding configuration provides us with a number of beans. We
configure our messaging middleware by using ActiveMQ and the
inbound and outbound JMS adapters provided by Spring Integration. As
shown, our itemWriter
bean, which is
referenced by our job step, uses the
ChunkMessageChannelItemWriter
to write chunks over the
configured middleware.
现在我们可以继续进行 worker 配置,如下例所示:
Now we can move on to the worker configuration, as the following example shows:
- Java
-
以下示例显示 Java 中的 worker 配置:
The following example shows the worker configuration in Java:
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://localhost:61616");
return factory;
}
/*
* Configure inbound flow (requests coming from the manager)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
.channel(requests())
.get();
}
/*
* Configure outbound flow (replies going to the manager)
*/
@Bean
public DirectChannel replies() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(replies())
.handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
.get();
}
/*
* Configure the ChunkProcessorChunkHandler
*/
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
ChunkProcessor<Integer> chunkProcessor
= new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler
= new ChunkProcessorChunkHandler<>();
chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
return chunkProcessorChunkHandler;
}
- XML
-
以下示例显示 XML 中的 worker 配置:
The following example shows the worker configuration in XML:
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<int:channel id="requests"/>
<int:channel id="replies"/>
<int-jms:message-driven-channel-adapter id="incomingRequests"
destination-name="requests"
channel="requests"/>
<int-jms:outbound-channel-adapter id="outgoingReplies"
destination-name="replies"
channel="replies">
</int-jms:outbound-channel-adapter>
<int:service-activator id="serviceActivator"
input-channel="requests"
output-channel="replies"
ref="chunkProcessorChunkHandler"
method="handleChunk"/>
<bean id="chunkProcessorChunkHandler"
class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler">
<property name="chunkProcessor">
<bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
<property name="itemWriter">
<bean class="io.spring.sbi.PersonItemWriter"/>
</property>
<property name="itemProcessor">
<bean class="io.spring.sbi.PersonItemProcessor"/>
</property>
</bean>
</property>
</bean>
这些配置项大部分应从管理器配置中看起来很熟悉。工作程序不需要访问 Spring Batch JobRepository
或实际作业配置文件。感兴趣的主要 Bean 是 chunkProcessorChunkHandler
。ChunkProcessorChunkHandler
的 chunkProcessor
属性采用一个已配置的 SimpleChunkProcessor
,您可以通过该配置提供对 ItemWriter
(以及可以选择地提供 ItemProcessor
)的引用,该 ItemWriter
(和 ItemProcessor
)将在工作程序从管理器接收到块时在工作程序上运行。
Most of these configuration items should look familiar from the
manager configuration. Workers do not need access to
the Spring Batch JobRepository
nor
to the actual job configuration file. The main bean of interest
is the chunkProcessorChunkHandler
. The
chunkProcessor
property of ChunkProcessorChunkHandler
takes a
configured SimpleChunkProcessor
, which is where you would provide a reference to your
ItemWriter
(and, optionally, your
ItemProcessor
) that will run on the worker
when it receives chunks from the manager.
有关详细信息,请参阅“可扩展性
”章节中的 远程分块 部分:https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#remoteChunking。
For more information, see the section of the “Scalability” chapter on Remote Chunking.
从 4.1 版开始,Spring Batch 集成引入了 @EnableBatchIntegration
注释,可用于简化远程分块设置。该注释提供两个 Bean,您可以在应用程序上下文中自动装配它们:
Starting from version 4.1, Spring Batch Integration introduces the @EnableBatchIntegration
annotation that can be used to simplify a remote chunking setup. This annotation provides
two beans that you can autowire in your application context:
-
RemoteChunkingManagerStepBuilderFactory
: Configures the manager step -
RemoteChunkingWorkerBuilder
: Configures the remote worker integration flow
这些 API 会负责配置许多组件,如下图所示:
These APIs take care of configuring a number of components, as the following diagram shows:
在管理器端,RemoteChunkingManagerStepBuilderFactory
允许您通过声明来配置管理器步骤:
On the manager side, the RemoteChunkingManagerStepBuilderFactory
lets you
configure a manager step by declaring:
-
The item reader to read items and send them to workers
-
The output channel ("Outgoing requests") to send requests to workers
-
The input channel ("Incoming replies") to receive replies from workers
您不需要显式配置 ChunkMessageChannelItemWriter
和 MessagingTemplate
。(您在找到这样做的原因时仍然可以显式配置它们。)
You need not explicitly configure ChunkMessageChannelItemWriter
and the MessagingTemplate
.
(You can still explicitly configure them if find a reason to do so).
在工作程序端,RemoteChunkingWorkerBuilder
允许您将工作程序配置为:
On the worker side, the RemoteChunkingWorkerBuilder
lets you configure a worker to:
-
Listen to requests sent by the manager on the input channel (“Incoming requests”)
-
Call the
handleChunk
method ofChunkProcessorChunkHandler
for each request with the configuredItemProcessor
andItemWriter
-
Send replies on the output channel (“Outgoing replies”) to the manager
您不需要显式配置 SimpleChunkProcessor
和 ChunkProcessorChunkHandler
。(您在找到这样做的原因时仍然可以显式配置它们。)
You need not explicitly configure the SimpleChunkProcessor
and the ChunkProcessorChunkHandler
. (You can still explicitly configure them if you find
a reason to do so).
以下示例演示了如何使用这些 API:
The following example shows how to use these APIs:
@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {
@Configuration
public static class ManagerConfiguration {
@Autowired
private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;
@Bean
public TaskletStep managerStep() {
return this.managerStepBuilderFactory.get("managerStep")
.chunk(100)
.reader(itemReader())
.outputChannel(requests()) // requests sent to workers
.inputChannel(replies()) // replies received from workers
.build();
}
// Middleware beans setup omitted
}
@Configuration
public static class WorkerConfiguration {
@Autowired
private RemoteChunkingWorkerBuilder workerBuilder;
@Bean
public IntegrationFlow workerFlow() {
return this.workerBuilder
.itemProcessor(itemProcessor())
.itemWriter(itemWriter())
.inputChannel(requests()) // requests received from the manager
.outputChannel(replies()) // replies sent to the manager
.build();
}
// Middleware beans setup omitted
}
}
您可以在此找到远程分块作业的完整示例:https://github.com/spring-projects/spring-batch/tree/main/spring-batch-samples#remote-chunking-sample。
You can find a complete example of a remote chunking job here.
Remote Partitioning
下图显示了典型的远程分区情况:
The following image shows a typical remote partitioning situation:
另一方面,当瓶颈不是项目的处理而是关联的 I/O 时,远程分区非常有用。使用远程分区,您可以将工作发送到执行完整 Spring Batch 步骤的工作程序。因此,每个工作程序都有其自己的 ItemReader
、ItemProcessor
和 ItemWriter
。出于此目的,Spring Batch 集成提供了 MessageChannelPartitionHandler
。
Remote Partitioning, on the other hand, is useful when it
is not the processing of items but rather the associated I/O that
causes the bottleneck. With remote partitioning, you can send work
to workers that execute complete Spring Batch
steps. Thus, each worker has its own ItemReader
, ItemProcessor
, and
ItemWriter
. For this purpose, Spring Batch
Integration provides the MessageChannelPartitionHandler
.
PartitionHandler
接口的此实现使用 MessageChannel
实例向远程工作程序发送指令并接收其响应。这提供了一种对传输(如 JMS 和 AMQP)的出色抽象,这些传输用于与远程工作程序进行通信。
This implementation of the PartitionHandler
interface uses MessageChannel
instances to
send instructions to remote workers and receive their responses.
This provides a nice abstraction from the transports (such as JMS
and AMQP) being used to communicate with the remote workers.
"`Scalability`"章节中用于解决remote partitioning的部分概述了配置远程分区所需的理念和组件,并展示了使用默认`TaskExecutorPartitionHandler`在单独的本地执行线程中进行分区的示例。对于多个 JVM 的远程分区,需要两个其他组件:
The section of the “Scalability” chapter that addresses
remote partitioning provides an overview of the concepts and
components needed to configure remote partitioning and shows an
example of using the default
TaskExecutorPartitionHandler
to partition
in separate local threads of execution. For remote partitioning
to multiple JVMs, two additional components are required:
-
A remoting fabric or grid environment
-
A
PartitionHandler
implementation that supports the desired remoting fabric or grid environment
类似于远程分块,您可以将 JMS 用作“远程结构
”。在这种情况下,请使用 MessageChannelPartitionHandler
实例作为 PartitionHandler
实现,如前所述。
Similar to remote chunking, you can use JMS as the “remoting fabric”. In that case, use
a MessageChannelPartitionHandler
instance as the PartitionHandler
implementation,
as described earlier.
- Java
-
以下示例假定存在分区分隔的作业,重点是 Java 中的
MessageChannelPartitionHandler
和 JMS 配置:
The following example assumes an existing partitioned job and focuses on the
MessageChannelPartitionHandler
and JMS configuration in Java:
/*
* Configuration of the manager side
*/
@Bean
public PartitionHandler partitionHandler() {
MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
partitionHandler.setStepName("step1");
partitionHandler.setGridSize(3);
partitionHandler.setReplyChannel(outboundReplies());
MessagingTemplate template = new MessagingTemplate();
template.setDefaultChannel(outboundRequests());
template.setReceiveTimeout(100000);
partitionHandler.setMessagingOperations(template);
return partitionHandler;
}
@Bean
public QueueChannel outboundReplies() {
return new QueueChannel();
}
@Bean
public DirectChannel outboundRequests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundJmsRequests() {
return IntegrationFlow.from("outboundRequests")
.handle(Jms.outboundGateway(connectionFactory())
.requestDestination("requestsQueue"))
.get();
}
@Bean
@ServiceActivator(inputChannel = "inboundStaging")
public AggregatorFactoryBean partitioningMessageHandler() throws Exception {
AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
aggregatorFactoryBean.setProcessorBean(partitionHandler());
aggregatorFactoryBean.setOutputChannel(outboundReplies());
// configure other propeties of the aggregatorFactoryBean
return aggregatorFactoryBean;
}
@Bean
public DirectChannel inboundStaging() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundJmsStaging() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("stagingQueue"))
.channel(inboundStaging())
.get();
}
/*
* Configuration of the worker side
*/
@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler() {
StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
stepExecutionRequestHandler.setJobExplorer(jobExplorer);
stepExecutionRequestHandler.setStepLocator(stepLocator());
return stepExecutionRequestHandler;
}
@Bean
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler serviceActivator() throws Exception {
return stepExecutionRequestHandler();
}
@Bean
public DirectChannel inboundRequests() {
return new DirectChannel();
}
public IntegrationFlow inboundJmsRequests() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("requestsQueue"))
.channel(inboundRequests())
.get();
}
@Bean
public DirectChannel outboundStaging() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundJmsStaging() {
return IntegrationFlow.from("outboundStaging")
.handle(Jms.outboundGateway(connectionFactory())
.requestDestination("stagingQueue"))
.get();
}
- XML
-
以下示例假定存在分区分隔的作业,重点是 XML 中的
MessageChannelPartitionHandler
和 JMS 配置:
The following example assumes an existing partitioned job and focuses on the
MessageChannelPartitionHandler
and JMS configuration in XML:
<bean id="partitionHandler"
class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
<property name="stepName" value="step1"/>
<property name="gridSize" value="3"/>
<property name="replyChannel" ref="outbound-replies"/>
<property name="messagingOperations">
<bean class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="outbound-requests"/>
<property name="receiveTimeout" value="100000"/>
</bean>
</property>
</bean>
<int:channel id="outbound-requests"/>
<int-jms:outbound-channel-adapter destination="requestsQueue"
channel="outbound-requests"/>
<int:channel id="inbound-requests"/>
<int-jms:message-driven-channel-adapter destination="requestsQueue"
channel="inbound-requests"/>
<bean id="stepExecutionRequestHandler"
class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
<property name="jobExplorer" ref="jobExplorer"/>
<property name="stepLocator" ref="stepLocator"/>
</bean>
<int:service-activator ref="stepExecutionRequestHandler" input-channel="inbound-requests"
output-channel="outbound-staging"/>
<int:channel id="outbound-staging"/>
<int-jms:outbound-channel-adapter destination="stagingQueue"
channel="outbound-staging"/>
<int:channel id="inbound-staging"/>
<int-jms:message-driven-channel-adapter destination="stagingQueue"
channel="inbound-staging"/>
<int:aggregator ref="partitionHandler" input-channel="inbound-staging"
output-channel="outbound-replies"/>
<int:channel id="outbound-replies">
<int:queue/>
</int:channel>
<bean id="stepLocator"
class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />
您还必须确保分区 handler
属性映射到 partitionHandler
Bean。
You must also ensure that the partition handler
attribute maps to the partitionHandler
bean.
- Java
-
以下示例将分区
handler
属性映射到中的partitionHandler
The following example maps the partition handler
attribute to the partitionHandler
in
- Java
-
Java Configuration
public Job personJob(JobRepository jobRepository) { return new JobBuilder("personJob", jobRepository) .start(new StepBuilder("step1.manager", jobRepository) .partitioner("step1.worker", partitioner()) .partitionHandler(partitionHandler()) .build()) .build(); }
- XML
-
以下示例将分区
handler
属性映射到 XML 中的partitionHandler
:
The following example maps the partition handler
attribute to the partitionHandler
in
XML:
<job id="personJob">
<step id="step1.manager">
<partition partitioner="partitioner" handler="partitionHandler"/>
...
</step>
</job>
你可以找到远程分区作业 here的完整示例。
You can find a complete example of a remote partitioning job here.
你可以使用 @EnableBatchIntegration
注解来简化远程分区设定。该注解提供了两个对远程分区有用的 Bean:
You can use the @EnableBatchIntegration
annotation to simplify a remote
partitioning setup. This annotation provides two beans that are useful for remote partitioning:
-
RemotePartitioningManagerStepBuilderFactory
: Configures the manager step -
RemotePartitioningWorkerStepBuilderFactory
: Configures the worker step
这些 API 负责配置多种组件,如下图所示:
These APIs take care of configuring a number of components, as the following diagrams show:
在管理器端,RemotePartitioningManagerStepBuilderFactory
可以通过以下申明来配置管理器步骤:
On the manager side, the RemotePartitioningManagerStepBuilderFactory
lets you
configure a manager step by declaring:
-
The
Partitioner
used to partition data -
The output channel (“Outgoing requests”) on which to send requests to workers
-
The input channel (“Incoming replies”) on which to receive replies from workers (when configuring replies aggregation)
-
The poll interval and timeout parameters (when configuring job repository polling)
你无需显式配置 MessageChannelPartitionHandler
和 MessagingTemplate
(如果你找到理由,仍然可以显式地配置)。
You need not explicitly configure The MessageChannelPartitionHandler
and the MessagingTemplate
.
(You can still explicitly configured them if you find a reason to do so).
在工作端,RemotePartitioningWorkerStepBuilderFactory
可以配置工作来:
On the worker side, the RemotePartitioningWorkerStepBuilderFactory
lets you configure a worker to:
-
Listen to requests sent by the manager on the input channel (“Incoming requests”)
-
Call the
handle
method ofStepExecutionRequestHandler
for each request -
Send replies on the output channel (“Outgoing replies”) to the manager
你无需显式配置 StepExecutionRequestHandler
(如果你找到理由,可以显式地配置)。
You need not explicitly configure the StepExecutionRequestHandler
.
(You can explicitly configure it if you find a reason to do so).
以下示例演示了如何使用这些 API:
The following example shows how to use these APIs:
@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningJobConfiguration {
@Configuration
public static class ManagerConfiguration {
@Autowired
private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
@Bean
public Step managerStep() {
return this.managerStepBuilderFactory
.get("managerStep")
.partitioner("workerStep", partitioner())
.gridSize(10)
.outputChannel(outgoingRequestsToWorkers())
.inputChannel(incomingRepliesFromWorkers())
.build();
}
// Middleware beans setup omitted
}
@Configuration
public static class WorkerConfiguration {
@Autowired
private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;
@Bean
public Step workerStep() {
return this.workerStepBuilderFactory
.get("workerStep")
.inputChannel(incomingRequestsFromManager())
.outputChannel(outgoingRepliesToManager())
.chunk(100)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
// Middleware beans setup omitted
}
}