Sub-elements
-
异步处理器:将项目处理外包到新线程,提高性能。
-
远程区块:将块分发到外部系统以进行处理,从而卸载复杂的处理。
-
远程分区:将整个步骤发送到远程工作程序执行,以处理 I/O 瓶颈。
-
事件驱动侦听器:允许基于特定事件触发动作。
通过将 Spring Batch 与 Spring Integration 集成,您可以充分利用事件处理、异步处理和分布式处理的优势,从而创建强大且可扩展的批处理解决方案。
当此 Gateway
从`PollableChannel` 接收消息时,您必须指定一个全局默认 Poller
或为`Job Launching Gateway` 提供一个 Poller
子元素。
- Java
-
以下示例展示了如何用 Java 提供 poller:
@Bean
@ServiceActivator(inputChannel = "queueChannel", poller = @Poller(fixedRate="1000"))
public JobLaunchingGateway sampleJobLaunchingGateway() {
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
jobLaunchingGateway.setOutputChannel(replyChannel());
return jobLaunchingGateway;
}
- XML
-
以下示例展示了如何用 XML 提供 poller:
<batch-int:job-launching-gateway request-channel="queueChannel"
reply-channel="replyChannel" job-launcher="jobLauncher">
<int:poller fixed-rate="1000">
</batch-int:job-launching-gateway>
Providing Feedback with Informational Messages
Spring Batch 作业可以长时间运行,通常提供进度信息至关重要。例如,如果某个批处理作业的部分或全部失败,利益相关者可能希望收到通知。Spring Batch 通过以下方式提供收集此信息的机制:
-
Active polling
-
Event-driven listeners
异步启动 Spring Batch 作业时(例如,使用 Job LaunchingGateway),将返回 JobExecution
实例。因此,您可以使用 JobExecution.getJobId()
通过从 JobRepository
检索 JobExecution
的更新实例来持续轮询状态更新。但是,这被认为不是最佳方案,建议采用事件驱动方法。
因此,Spring Batch 提供侦听器,包括三个最常用的侦听器:
-
StepListener
-
ChunkListener
-
JobExecutionListener
在以下图像中所示的示例中,Spring Batch 作业已配置了一个 StepExecutionListener
。因此,Spring Integration 会接收和处理任何步骤,事件发生前或事件发生后。例如,您可以使用 Router
检查接收到的 StepExecution
。基于该检查的结果,可能会发生各种情况(例如,将消息路由到邮件出站通道适配器),以便可以根据某些条件发送电子邮件通知。
以下两部分示例展示了如何为 StepExecution
事件配置侦听器,以便向 Gateway
发送消息并将其输出日志记录到 logging-channel-adapter
。
首先,创建通知集成 bean。
- Java
-
以下示例展示了如何在 Java 中创建通知集成 bean:
@Bean
@ServiceActivator(inputChannel = "stepExecutionsChannel")
public LoggingHandler loggingHandler() {
LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.WARN);
adapter.setLoggerName("TEST_LOGGER");
adapter.setLogExpressionString("headers.id + ': ' + payload");
return adapter;
}
@MessagingGateway(name = "notificationExecutionsListener", defaultRequestChannel = "stepExecutionsChannel")
public interface NotificationExecutionListener extends StepExecutionListener {}
您需要向配置中添加 @IntegrationComponentScan
注释。
- XML
-
以下示例展示了如何在 XML 中创建通知集成 bean:
<int:channel id="stepExecutionsChannel"/>
<int:gateway id="notificationExecutionsListener"
service-interface="org.springframework.batch.core.StepExecutionListener"
default-request-channel="stepExecutionsChannel"/>
<int:logging-channel-adapter channel="stepExecutionsChannel"/>
第二,修改作业以添加步骤级侦听器。
- Java
-
以下示例展示了如何在 Java 中添加步骤级侦听器:
public Job importPaymentsJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("importPayments", jobRepository)
.start(new StepBuilder("step1", jobRepository)
.chunk(200, transactionManager)
.listener(notificationExecutionsListener())
// ...
.build();
)
.build();
}
- XML
-
以下示例展示了如何在 XML 中添加步骤级侦听器:
<job id="importPayments">
<step id="step1">
<tasklet ../>
<chunk ../>
<listeners>
<listener ref="notificationExecutionsListener"/>
</listeners>
</tasklet>
...
</step>
</job>
Asynchronous Processors
异步处理器帮助您扩展项目的处理。在异步处理器用例中,AsyncItemProcessor
用作调度器,在一个新线程上为项目执行 ItemProcessor
的逻辑。项目完成时,将 Future
传递给 AsynchItemWriter
以进行写入。
因此,你可以通过使用异步项目处理来提高性能,基本上允许你实现 fork-join 场景。 AsyncItemWriter
收集结果,并在所有结果均可用之后立即回写区块。
- Java
-
以下示例展示了如何在 Java 中配置
AsyncItemProcessor
:
@Bean
public AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) {
AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();
asyncItemProcessor.setTaskExecutor(taskExecutor);
asyncItemProcessor.setDelegate(itemProcessor);
return asyncItemProcessor;
}
- XML
-
以下示例展示了如何在 XML 中配置
AsyncItemProcessor
:
<bean id="processor"
class="org.springframework.batch.integration.async.AsyncItemProcessor">
<property name="delegate">
<bean class="your.ItemProcessor"/>
</property>
<property name="taskExecutor">
<bean class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
</property>
</bean>
delegate
属性指代你的 ItemProcessor
bean,taskExecutor
属性指代你选择的 TaskExecutor
。
- Java
-
以下示例展示了如何在 Java 中配置
AsyncItemWriter
:
@Bean
public AsyncItemWriter writer(ItemWriter itemWriter) {
AsyncItemWriter asyncItemWriter = new AsyncItemWriter();
asyncItemWriter.setDelegate(itemWriter);
return asyncItemWriter;
}
- XML
-
以下示例展示了如何在 XML 中配置
AsyncItemWriter
:
<bean id="itemWriter"
class="org.springframework.batch.integration.async.AsyncItemWriter">
<property name="delegate">
<bean id="itemWriter" class="your.ItemWriter"/>
</property>
</bean>
同样,delegate
属性实际上指的是你的 ItemWriter
bean。
Externalizing Batch Process Execution
到目前为止,讨论的集成方法表明了一些用例,其中 Spring Integration 像外壳一样包装着 Spring Batch。但是,Spring Batch 也可以在内部使用 Spring Integration。通过使用这种方法,Spring Batch 用户可以将项目甚至区块的处理委派给外部流程。这允许你卸载复杂的处理。Spring Batch Integration 为以下内容提供专门支持:
-
Remote Chunking
-
Remote Partitioning
Remote Chunking
下图展示了当你将 Spring Batch 与 Spring Integration 搭配使用时,远程区块处理的一种方式:
更进一步,你还可以使用 ChunkMessageChannelItemWriter
(由 Spring Batch Integration 提供)外化区块处理,该 ChunkMessageChannelItemWriter
将项目发送出去并收集结果。一旦发送,Spring Batch 就会继续读取和分组项目的流程,而无需等待结果。相反,由 ChunkMessageChannelItemWriter
负责收集结果,并将其集成回 Spring Batch 流程中。
借助 Spring Integration,你可以完全控制你的流程的并发性(例如,通过使用 QueueChannel
而不是 DirectChannel
)。此外,通过依赖 Spring Integration 丰富的通道适配器集合(例如,JMS 和 AMQP),你可以将批处理作业的区块分发到外部系统以进行处理。
- Java
-
一个包含一个要远程切分的步骤的作业可能在 Java 中具有类似于以下内容的配置:
public Job chunkJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("personJob", jobRepository)
.start(new StepBuilder("step1", jobRepository)
.<Person, Person>chunk(200, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.build())
.build();
}
- XML
-
一个包含一个要远程切分的步骤的作业可能在 XML 中具有类似于以下内容的配置:
<job id="personJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="200"/>
</tasklet>
...
</step>
</job>
ItemReader
引用指向要用于在管理器上读取数据的 bean。ItemWriter
引用指向特殊的 ItemWriter
(称为 ChunkMessageChannelItemWriter
),如前所述。处理器(如果存在)则不属于管理器配置,因为它是配置在 worker 上的。在实现用例时,您应检查任何其他组件属性,例如节流限制等。
- Java
-
下列 Java 配置提供基本管理器设置:
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://localhost:61616");
return factory;
}
/*
* Configure outbound flow (requests going to workers)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(requests())
.handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
.get();
}
/*
* Configure inbound flow (replies coming from workers)
*/
@Bean
public QueueChannel replies() {
return new QueueChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
.channel(replies())
.get();
}
/*
* Configure the ChunkMessageChannelItemWriter
*/
@Bean
public ItemWriter<Integer> itemWriter() {
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setDefaultChannel(requests());
messagingTemplate.setReceiveTimeout(2000);
ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter
= new ChunkMessageChannelItemWriter<>();
chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
chunkMessageChannelItemWriter.setReplyChannel(replies());
return chunkMessageChannelItemWriter;
}
- XML
-
下列 XML 配置提供基本管理器设置:
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<int-jms:outbound-channel-adapter id="jmsRequests" destination-name="requests"/>
<bean id="messagingTemplate"
class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="requests"/>
<property name="receiveTimeout" value="2000"/>
</bean>
<bean id="itemWriter"
class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
scope="step">
<property name="messagingOperations" ref="messagingTemplate"/>
<property name="replyChannel" ref="replies"/>
</bean>
<int:channel id="replies">
<int:queue/>
</int:channel>
<int-jms:message-driven-channel-adapter id="jmsReplies"
destination-name="replies"
channel="replies"/>
上述配置为我们提供了许多 bean。我们使用 ActiveMQ 和 Spring Integration 提供的入站和出站 JMS 适配器配置我们的消息中间件。如所示,我们的 itemWriter
bean(我们的作业步骤引用它)使用 ChunkMessageChannelItemWriter
通过配置的中间件写入块。
现在我们可以继续进行 worker 配置,如下例所示:
- Java
-
以下示例显示 Java 中的 worker 配置:
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://localhost:61616");
return factory;
}
/*
* Configure inbound flow (requests coming from the manager)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
.channel(requests())
.get();
}
/*
* Configure outbound flow (replies going to the manager)
*/
@Bean
public DirectChannel replies() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(replies())
.handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
.get();
}
/*
* Configure the ChunkProcessorChunkHandler
*/
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
ChunkProcessor<Integer> chunkProcessor
= new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler
= new ChunkProcessorChunkHandler<>();
chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
return chunkProcessorChunkHandler;
}
- XML
-
以下示例显示 XML 中的 worker 配置:
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<int:channel id="requests"/>
<int:channel id="replies"/>
<int-jms:message-driven-channel-adapter id="incomingRequests"
destination-name="requests"
channel="requests"/>
<int-jms:outbound-channel-adapter id="outgoingReplies"
destination-name="replies"
channel="replies">
</int-jms:outbound-channel-adapter>
<int:service-activator id="serviceActivator"
input-channel="requests"
output-channel="replies"
ref="chunkProcessorChunkHandler"
method="handleChunk"/>
<bean id="chunkProcessorChunkHandler"
class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler">
<property name="chunkProcessor">
<bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
<property name="itemWriter">
<bean class="io.spring.sbi.PersonItemWriter"/>
</property>
<property name="itemProcessor">
<bean class="io.spring.sbi.PersonItemProcessor"/>
</property>
</bean>
</property>
</bean>
这些配置项大部分应从管理器配置中看起来很熟悉。工作程序不需要访问 Spring Batch JobRepository
或实际作业配置文件。感兴趣的主要 Bean 是 chunkProcessorChunkHandler
。ChunkProcessorChunkHandler
的 chunkProcessor
属性采用一个已配置的 SimpleChunkProcessor
,您可以通过该配置提供对 ItemWriter
(以及可以选择地提供 ItemProcessor
)的引用,该 ItemWriter
(和 ItemProcessor
)将在工作程序从管理器接收到块时在工作程序上运行。
有关详细信息,请参阅“可扩展性
”章节中的 远程分块 部分:https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#remoteChunking。
从 4.1 版开始,Spring Batch 集成引入了 @EnableBatchIntegration
注释,可用于简化远程分块设置。该注释提供两个 Bean,您可以在应用程序上下文中自动装配它们:
-
RemoteChunkingManagerStepBuilderFactory
:配置管理步骤 -
RemoteChunkingWorkerBuilder
:配置远程工作程序集成流程
这些 API 会负责配置许多组件,如下图所示:
在管理器端,RemoteChunkingManagerStepBuilderFactory
允许您通过声明来配置管理器步骤:
-
用于读取项并将它们发送给工作程序的项目读取器
-
将请求发送给工作程序的输出通道(“传出请求”)
-
接收工作人员回复的输入通道(“传入回复”)
您不需要显式配置 ChunkMessageChannelItemWriter
和 MessagingTemplate
。(您在找到这样做的原因时仍然可以显式配置它们。)
在工作程序端,RemoteChunkingWorkerBuilder
允许您将工作程序配置为:
-
在输入通道上收听经理发送的请求(“Incoming requests”)
-
对于每个请求,使用配置的
ItemProcessor
和ItemWriter
调用ChunkProcessorChunkHandler
的handleChunk
方法 -
在输出通道(“Outgoing replies”)上向经理发送回复
您不需要显式配置 SimpleChunkProcessor
和 ChunkProcessorChunkHandler
。(您在找到这样做的原因时仍然可以显式配置它们。)
以下示例演示了如何使用这些 API:
@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {
@Configuration
public static class ManagerConfiguration {
@Autowired
private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;
@Bean
public TaskletStep managerStep() {
return this.managerStepBuilderFactory.get("managerStep")
.chunk(100)
.reader(itemReader())
.outputChannel(requests()) // requests sent to workers
.inputChannel(replies()) // replies received from workers
.build();
}
// Middleware beans setup omitted
}
@Configuration
public static class WorkerConfiguration {
@Autowired
private RemoteChunkingWorkerBuilder workerBuilder;
@Bean
public IntegrationFlow workerFlow() {
return this.workerBuilder
.itemProcessor(itemProcessor())
.itemWriter(itemWriter())
.inputChannel(requests()) // requests received from the manager
.outputChannel(replies()) // replies sent to the manager
.build();
}
// Middleware beans setup omitted
}
}
您可以在此找到远程分块作业的完整示例:https://github.com/spring-projects/spring-batch/tree/main/spring-batch-samples#remote-chunking-sample。
Remote Partitioning
下图显示了典型的远程分区情况:
另一方面,当瓶颈不是项目的处理而是关联的 I/O 时,远程分区非常有用。使用远程分区,您可以将工作发送到执行完整 Spring Batch 步骤的工作程序。因此,每个工作程序都有其自己的 ItemReader
、ItemProcessor
和 ItemWriter
。出于此目的,Spring Batch 集成提供了 MessageChannelPartitionHandler
。
PartitionHandler
接口的此实现使用 MessageChannel
实例向远程工作程序发送指令并接收其响应。这提供了一种对传输(如 JMS 和 AMQP)的出色抽象,这些传输用于与远程工作程序进行通信。
"`Scalability`"章节中用于解决remote partitioning的部分概述了配置远程分区所需的理念和组件,并展示了使用默认`TaskExecutorPartitionHandler`在单独的本地执行线程中进行分区的示例。对于多个 JVM 的远程分区,需要两个其他组件:
-
一个远程制造或网格环境
-
一个支持所需远程制造或网格环境的
PartitionHandler
实现
类似于远程分块,您可以将 JMS 用作“远程结构
”。在这种情况下,请使用 MessageChannelPartitionHandler
实例作为 PartitionHandler
实现,如前所述。
- Java
-
以下示例假定存在分区分隔的作业,重点是 Java 中的
MessageChannelPartitionHandler
和 JMS 配置:
/*
* Configuration of the manager side
*/
@Bean
public PartitionHandler partitionHandler() {
MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
partitionHandler.setStepName("step1");
partitionHandler.setGridSize(3);
partitionHandler.setReplyChannel(outboundReplies());
MessagingTemplate template = new MessagingTemplate();
template.setDefaultChannel(outboundRequests());
template.setReceiveTimeout(100000);
partitionHandler.setMessagingOperations(template);
return partitionHandler;
}
@Bean
public QueueChannel outboundReplies() {
return new QueueChannel();
}
@Bean
public DirectChannel outboundRequests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundJmsRequests() {
return IntegrationFlow.from("outboundRequests")
.handle(Jms.outboundGateway(connectionFactory())
.requestDestination("requestsQueue"))
.get();
}
@Bean
@ServiceActivator(inputChannel = "inboundStaging")
public AggregatorFactoryBean partitioningMessageHandler() throws Exception {
AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
aggregatorFactoryBean.setProcessorBean(partitionHandler());
aggregatorFactoryBean.setOutputChannel(outboundReplies());
// configure other propeties of the aggregatorFactoryBean
return aggregatorFactoryBean;
}
@Bean
public DirectChannel inboundStaging() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundJmsStaging() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("stagingQueue"))
.channel(inboundStaging())
.get();
}
/*
* Configuration of the worker side
*/
@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler() {
StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
stepExecutionRequestHandler.setJobExplorer(jobExplorer);
stepExecutionRequestHandler.setStepLocator(stepLocator());
return stepExecutionRequestHandler;
}
@Bean
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler serviceActivator() throws Exception {
return stepExecutionRequestHandler();
}
@Bean
public DirectChannel inboundRequests() {
return new DirectChannel();
}
public IntegrationFlow inboundJmsRequests() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("requestsQueue"))
.channel(inboundRequests())
.get();
}
@Bean
public DirectChannel outboundStaging() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundJmsStaging() {
return IntegrationFlow.from("outboundStaging")
.handle(Jms.outboundGateway(connectionFactory())
.requestDestination("stagingQueue"))
.get();
}
- XML
-
以下示例假定存在分区分隔的作业,重点是 XML 中的
MessageChannelPartitionHandler
和 JMS 配置:
<bean id="partitionHandler"
class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
<property name="stepName" value="step1"/>
<property name="gridSize" value="3"/>
<property name="replyChannel" ref="outbound-replies"/>
<property name="messagingOperations">
<bean class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="outbound-requests"/>
<property name="receiveTimeout" value="100000"/>
</bean>
</property>
</bean>
<int:channel id="outbound-requests"/>
<int-jms:outbound-channel-adapter destination="requestsQueue"
channel="outbound-requests"/>
<int:channel id="inbound-requests"/>
<int-jms:message-driven-channel-adapter destination="requestsQueue"
channel="inbound-requests"/>
<bean id="stepExecutionRequestHandler"
class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
<property name="jobExplorer" ref="jobExplorer"/>
<property name="stepLocator" ref="stepLocator"/>
</bean>
<int:service-activator ref="stepExecutionRequestHandler" input-channel="inbound-requests"
output-channel="outbound-staging"/>
<int:channel id="outbound-staging"/>
<int-jms:outbound-channel-adapter destination="stagingQueue"
channel="outbound-staging"/>
<int:channel id="inbound-staging"/>
<int-jms:message-driven-channel-adapter destination="stagingQueue"
channel="inbound-staging"/>
<int:aggregator ref="partitionHandler" input-channel="inbound-staging"
output-channel="outbound-replies"/>
<int:channel id="outbound-replies">
<int:queue/>
</int:channel>
<bean id="stepLocator"
class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />
您还必须确保分区 handler
属性映射到 partitionHandler
Bean。
- Java
-
以下示例将分区
handler
属性映射到中的partitionHandler
- Java
-
Java Configuration
public Job personJob(JobRepository jobRepository) { return new JobBuilder("personJob", jobRepository) .start(new StepBuilder("step1.manager", jobRepository) .partitioner("step1.worker", partitioner()) .partitionHandler(partitionHandler()) .build()) .build(); }
- XML
-
以下示例将分区
handler
属性映射到 XML 中的partitionHandler
:
<job id="personJob">
<step id="step1.manager">
<partition partitioner="partitioner" handler="partitionHandler"/>
...
</step>
</job>
你可以找到远程分区作业 here的完整示例。
你可以使用 @EnableBatchIntegration
注解来简化远程分区设定。该注解提供了两个对远程分区有用的 Bean:
-
RemotePartitioningManagerStepBuilderFactory
:配置经理步骤 -
RemotePartitioningWorkerStepBuilderFactory
:配置工作人员步骤
这些 API 负责配置多种组件,如下图所示:
在管理器端,RemotePartitioningManagerStepBuilderFactory
可以通过以下申明来配置管理器步骤:
-
用来分区数据的
Partitioner
-
向工作人员发送请求的输出通道(“Outgoing requests”)
-
接收工作人员回复的输入通道(“Incoming replies”)(配置回复聚合时)
-
投票间隔和超时参数(配置作业库投票时)
你无需显式配置 MessageChannelPartitionHandler
和 MessagingTemplate
(如果你找到理由,仍然可以显式地配置)。
在工作端,RemotePartitioningWorkerStepBuilderFactory
可以配置工作来:
-
在输入通道上收听经理发送的请求(“Incoming requests”)
-
对于每个请求,调用
StepExecutionRequestHandler
的handle
方法 -
在输出通道(“Outgoing replies”)上向经理发送回复
你无需显式配置 StepExecutionRequestHandler
(如果你找到理由,可以显式地配置)。
以下示例演示了如何使用这些 API:
@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningJobConfiguration {
@Configuration
public static class ManagerConfiguration {
@Autowired
private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
@Bean
public Step managerStep() {
return this.managerStepBuilderFactory
.get("managerStep")
.partitioner("workerStep", partitioner())
.gridSize(10)
.outputChannel(outgoingRequestsToWorkers())
.inputChannel(incomingRepliesFromWorkers())
.build();
}
// Middleware beans setup omitted
}
@Configuration
public static class WorkerConfiguration {
@Autowired
private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;
@Bean
public Step workerStep() {
return this.workerStepBuilderFactory
.get("workerStep")
.inputChannel(incomingRequestsFromManager())
.outputChannel(outgoingRepliesToManager())
.chunk(100)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
// Middleware beans setup omitted
}
}