Sub-elements

  • 异步处理器:将项目处理外包到新线程,提高性能。

  • 远程区块:将块分发到外部系统以进行处理,从而卸载复杂的处理。

  • 远程分区:将整个步骤发送到远程工作程序执行,以处理 I/O 瓶颈。

  • 事件驱动侦听器:允许基于特定事件触发动作。

通过将 Spring Batch 与 Spring Integration 集成,您可以充分利用事件处理、异步处理和分布式处理的优势,从而创建强大且可扩展的批处理解决方案。

当此 Gateway 从`PollableChannel` 接收消息时,您必须指定一个全局默认 Poller 或为`Job Launching Gateway` 提供一个 Poller 子元素。

Java

以下示例展示了如何用 Java 提供 poller:

Java Configuration
@Bean
@ServiceActivator(inputChannel = "queueChannel", poller = @Poller(fixedRate="1000"))
public JobLaunchingGateway sampleJobLaunchingGateway() {
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
    jobLaunchingGateway.setOutputChannel(replyChannel());
    return jobLaunchingGateway;
}
XML

以下示例展示了如何用 XML 提供 poller:

XML Configuration
<batch-int:job-launching-gateway request-channel="queueChannel"
    reply-channel="replyChannel" job-launcher="jobLauncher">
  <int:poller fixed-rate="1000">
</batch-int:job-launching-gateway>

Providing Feedback with Informational Messages

Spring Batch 作业可以长时间运行,通常提供进度信息至关重要。例如,如果某个批处理作业的部分或全部失败,利益相关者可能希望收到通知。Spring Batch 通过以下方式提供收集此信息的机制:

  • Active polling

  • Event-driven listeners

异步启动 Spring Batch 作业时(例如,使用 Job LaunchingGateway),将返回 JobExecution 实例。因此,您可以使用 JobExecution.getJobId() 通过从 JobRepository 检索 JobExecution 的更新实例来持续轮询状态更新。但是,这被认为不是最佳方案,建议采用事件驱动方法。

因此,Spring Batch 提供侦听器,包括三个最常用的侦听器:

  • StepListener

  • ChunkListener

  • JobExecutionListener

在以下图像中所示的示例中,Spring Batch 作业已配置了一个 StepExecutionListener。因此,Spring Integration 会接收和处理任何步骤,事件发生前或事件发生后。例如,您可以使用 Router 检查接收到的 StepExecution。基于该检查的结果,可能会发生各种情况(例如,将消息路由到邮件出站通道适配器),以便可以根据某些条件发送电子邮件通知。

handling informational messages
Figure 1. Handling Informational Messages

以下两部分示例展示了如何为 StepExecution 事件配置侦听器,以便向 Gateway 发送消息并将其输出日志记录到 logging-channel-adapter

首先,创建通知集成 bean。

Java

以下示例展示了如何在 Java 中创建通知集成 bean:

Java Configuration
@Bean
@ServiceActivator(inputChannel = "stepExecutionsChannel")
public LoggingHandler loggingHandler() {
    LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.WARN);
    adapter.setLoggerName("TEST_LOGGER");
    adapter.setLogExpressionString("headers.id + ': ' + payload");
    return adapter;
}

@MessagingGateway(name = "notificationExecutionsListener", defaultRequestChannel = "stepExecutionsChannel")
public interface NotificationExecutionListener extends StepExecutionListener {}

您需要向配置中添加 @IntegrationComponentScan 注释。

XML

以下示例展示了如何在 XML 中创建通知集成 bean:

XML Configuration
<int:channel id="stepExecutionsChannel"/>

<int:gateway id="notificationExecutionsListener"
    service-interface="org.springframework.batch.core.StepExecutionListener"
    default-request-channel="stepExecutionsChannel"/>

<int:logging-channel-adapter channel="stepExecutionsChannel"/>

第二,修改作业以添加步骤级侦听器。

Java

以下示例展示了如何在 Java 中添加步骤级侦听器:

Java Configuration
public Job importPaymentsJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new JobBuilder("importPayments", jobRepository)
        .start(new StepBuilder("step1", jobRepository)
                .chunk(200, transactionManager)
                .listener(notificationExecutionsListener())
                // ...
                .build();
              )
        .build();
}
XML

以下示例展示了如何在 XML 中添加步骤级侦听器:

XML Configuration
<job id="importPayments">
    <step id="step1">
        <tasklet ../>
            <chunk ../>
            <listeners>
                <listener ref="notificationExecutionsListener"/>
            </listeners>
        </tasklet>
        ...
    </step>
</job>

Asynchronous Processors

异步处理器帮助您扩展项目的处理。在异步处理器用例中,AsyncItemProcessor 用作调度器,在一个新线程上为项目执行 ItemProcessor 的逻辑。项目完成时,将 Future 传递给 AsynchItemWriter 以进行写入。

因此,你可以通过使用异步项目处理来提高性能,基本上允许你实现 fork-join 场景。 AsyncItemWriter 收集结果,并在所有结果均可用之后立即回写区块。

Java

以下示例展示了如何在 Java 中配置 AsyncItemProcessor

Java Configuration
@Bean
public AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) {
    AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();
    asyncItemProcessor.setTaskExecutor(taskExecutor);
    asyncItemProcessor.setDelegate(itemProcessor);
    return asyncItemProcessor;
}
XML

以下示例展示了如何在 XML 中配置 AsyncItemProcessor

XML Configuration
<bean id="processor"
    class="org.springframework.batch.integration.async.AsyncItemProcessor">
  <property name="delegate">
    <bean class="your.ItemProcessor"/>
  </property>
  <property name="taskExecutor">
    <bean class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
  </property>
</bean>

delegate 属性指代你的 ItemProcessor bean,taskExecutor 属性指代你选择的 TaskExecutor

Java

以下示例展示了如何在 Java 中配置 AsyncItemWriter

Java Configuration
@Bean
public AsyncItemWriter writer(ItemWriter itemWriter) {
    AsyncItemWriter asyncItemWriter = new AsyncItemWriter();
    asyncItemWriter.setDelegate(itemWriter);
    return asyncItemWriter;
}
XML

以下示例展示了如何在 XML 中配置 AsyncItemWriter

XML Configuration
<bean id="itemWriter"
    class="org.springframework.batch.integration.async.AsyncItemWriter">
  <property name="delegate">
    <bean id="itemWriter" class="your.ItemWriter"/>
  </property>
</bean>

同样,delegate 属性实际上指的是你的 ItemWriter bean。

Externalizing Batch Process Execution

到目前为止,讨论的集成方法表明了一些用例,其中 Spring Integration 像外壳一样包装着 Spring Batch。但是,Spring Batch 也可以在内部使用 Spring Integration。通过使用这种方法,Spring Batch 用户可以将项目甚至区块的处理委派给外部流程。这允许你卸载复杂的处理。Spring Batch Integration 为以下内容提供专门支持:

  • Remote Chunking

  • Remote Partitioning

Remote Chunking

下图展示了当你将 Spring Batch 与 Spring Integration 搭配使用时,远程区块处理的一种方式:

remote chunking sbi
Figure 2. Remote Chunking

更进一步,你还可以使用 ChunkMessageChannelItemWriter(由 Spring Batch Integration 提供)外化区块处理,该 ChunkMessageChannelItemWriter 将项目发送出去并收集结果。一旦发送,Spring Batch 就会继续读取和分组项目的流程,而无需等待结果。相反,由 ChunkMessageChannelItemWriter 负责收集结果,并将其集成回 Spring Batch 流程中。

借助 Spring Integration,你可以完全控制你的流程的并发性(例如,通过使用 QueueChannel 而不是 DirectChannel)。此外,通过依赖 Spring Integration 丰富的通道适配器集合(例如,JMS 和 AMQP),你可以将批处理作业的区块分发到外部系统以进行处理。

Java

一个包含一个要远程切分的步骤的作业可能在 Java 中具有类似于以下内容的配置:

Java Configuration
public Job chunkJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
     return new JobBuilder("personJob", jobRepository)
             .start(new StepBuilder("step1", jobRepository)
                     .<Person, Person>chunk(200, transactionManager)
                     .reader(itemReader())
                     .writer(itemWriter())
                     .build())
             .build();
 }
XML

一个包含一个要远程切分的步骤的作业可能在 XML 中具有类似于以下内容的配置:

XML Configuration
<job id="personJob">
  <step id="step1">
    <tasklet>
      <chunk reader="itemReader" writer="itemWriter" commit-interval="200"/>
    </tasklet>
    ...
  </step>
</job>

ItemReader 引用指向要用于在管理器上读取数据的 bean。ItemWriter 引用指向特殊的 ItemWriter(称为 ChunkMessageChannelItemWriter),如前所述。处理器(如果存在)则不属于管理器配置,因为它是配置在 worker 上的。在实现用例时,您应检查任何其他组件属性,例如节流限制等。

Java

下列 Java 配置提供基本管理器设置:

Java Configuration
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://localhost:61616");
    return factory;
}

/*
 * Configure outbound flow (requests going to workers)
 */
@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlow
            .from(requests())
            .handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
            .get();
}

/*
 * Configure inbound flow (replies coming from workers)
 */
@Bean
public QueueChannel replies() {
    return new QueueChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
            .channel(replies())
            .get();
}

/*
 * Configure the ChunkMessageChannelItemWriter
 */
@Bean
public ItemWriter<Integer> itemWriter() {
    MessagingTemplate messagingTemplate = new MessagingTemplate();
    messagingTemplate.setDefaultChannel(requests());
    messagingTemplate.setReceiveTimeout(2000);
    ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter
            = new ChunkMessageChannelItemWriter<>();
    chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
    chunkMessageChannelItemWriter.setReplyChannel(replies());
    return chunkMessageChannelItemWriter;
}
XML

下列 XML 配置提供基本管理器设置:

XML Configuration
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<int-jms:outbound-channel-adapter id="jmsRequests" destination-name="requests"/>

<bean id="messagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate">
  <property name="defaultChannel" ref="requests"/>
  <property name="receiveTimeout" value="2000"/>
</bean>

<bean id="itemWriter"
    class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
    scope="step">
  <property name="messagingOperations" ref="messagingTemplate"/>
  <property name="replyChannel" ref="replies"/>
</bean>

<int:channel id="replies">
  <int:queue/>
</int:channel>

<int-jms:message-driven-channel-adapter id="jmsReplies"
    destination-name="replies"
    channel="replies"/>

上述配置为我们提供了许多 bean。我们使用 ActiveMQ 和 Spring Integration 提供的入站和出站 JMS 适配器配置我们的消息中间件。如所示,我们的 itemWriter bean(我们的作业步骤引用它)使用 ChunkMessageChannelItemWriter 通过配置的中间件写入块。

现在我们可以继续进行 worker 配置,如下例所示:

Java

以下示例显示 Java 中的 worker 配置:

Java Configuration
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://localhost:61616");
    return factory;
}

/*
 * Configure inbound flow (requests coming from the manager)
 */
@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
            .channel(requests())
            .get();
}

/*
 * Configure outbound flow (replies going to the manager)
 */
@Bean
public DirectChannel replies() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlow
            .from(replies())
            .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
            .get();
}

/*
 * Configure the ChunkProcessorChunkHandler
 */
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
    ChunkProcessor<Integer> chunkProcessor
            = new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
    ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler
            = new ChunkProcessorChunkHandler<>();
    chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
    return chunkProcessorChunkHandler;
}
XML

以下示例显示 XML 中的 worker 配置:

XML Configuration
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<int:channel id="requests"/>
<int:channel id="replies"/>

<int-jms:message-driven-channel-adapter id="incomingRequests"
    destination-name="requests"
    channel="requests"/>

<int-jms:outbound-channel-adapter id="outgoingReplies"
    destination-name="replies"
    channel="replies">
</int-jms:outbound-channel-adapter>

<int:service-activator id="serviceActivator"
    input-channel="requests"
    output-channel="replies"
    ref="chunkProcessorChunkHandler"
    method="handleChunk"/>

<bean id="chunkProcessorChunkHandler"
    class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler">
  <property name="chunkProcessor">
    <bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
      <property name="itemWriter">
        <bean class="io.spring.sbi.PersonItemWriter"/>
      </property>
      <property name="itemProcessor">
        <bean class="io.spring.sbi.PersonItemProcessor"/>
      </property>
    </bean>
  </property>
</bean>

这些配置项大部分应从管理器配置中看起来很熟悉。工作程序不需要访问 Spring Batch JobRepository 或实际作业配置文件。感兴趣的主要 Bean 是 chunkProcessorChunkHandlerChunkProcessorChunkHandlerchunkProcessor 属性采用一个已配置的 SimpleChunkProcessor,您可以通过该配置提供对 ItemWriter(以及可以选择地提供 ItemProcessor)的引用,该 ItemWriter(和 ItemProcessor)将在工作程序从管理器接收到块时在工作程序上运行。

有关详细信息,请参阅“可扩展性”章节中的 远程分块 部分:https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#remoteChunking。

从 4.1 版开始,Spring Batch 集成引入了 @EnableBatchIntegration 注释,可用于简化远程分块设置。该注释提供两个 Bean,您可以在应用程序上下文中自动装配它们:

  • RemoteChunkingManagerStepBuilderFactory:配置管理步骤

  • RemoteChunkingWorkerBuilder:配置远程工作程序集成流程

这些 API 会负责配置许多组件,如下图所示:

remote chunking config
Figure 3. Remote Chunking Configuration

在管理器端,RemoteChunkingManagerStepBuilderFactory 允许您通过声明来配置管理器步骤:

  • 用于读取项并将它们发送给工作程序的项目读取器

  • 将请求发送给工作程序的输出通道(“传出请求”)

  • 接收工作人员回复的输入通道(“传入回复”)

您不需要显式配置 ChunkMessageChannelItemWriterMessagingTemplate。(您在找到这样做的原因时仍然可以显式配置它们。)

在工作程序端,RemoteChunkingWorkerBuilder 允许您将工作程序配置为:

  • 在输入通道上收听经理发送的请求(“Incoming requests”)

  • 对于每个请求,使用配置的 ItemProcessorItemWriter 调用 ChunkProcessorChunkHandlerhandleChunk 方法

  • 在输出通道(“Outgoing replies”)上向经理发送回复

您不需要显式配置 SimpleChunkProcessorChunkProcessorChunkHandler。(您在找到这样做的原因时仍然可以显式配置它们。)

以下示例演示了如何使用这些 API:

@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {

    @Configuration
    public static class ManagerConfiguration {

        @Autowired
        private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;

        @Bean
        public TaskletStep managerStep() {
            return this.managerStepBuilderFactory.get("managerStep")
                       .chunk(100)
                       .reader(itemReader())
                       .outputChannel(requests()) // requests sent to workers
                       .inputChannel(replies())   // replies received from workers
                       .build();
        }

        // Middleware beans setup omitted

    }

    @Configuration
    public static class WorkerConfiguration {

        @Autowired
        private RemoteChunkingWorkerBuilder workerBuilder;

        @Bean
        public IntegrationFlow workerFlow() {
            return this.workerBuilder
                       .itemProcessor(itemProcessor())
                       .itemWriter(itemWriter())
                       .inputChannel(requests()) // requests received from the manager
                       .outputChannel(replies()) // replies sent to the manager
                       .build();
        }

        // Middleware beans setup omitted

    }

}

您可以在此找到远程分块作业的完整示例:https://github.com/spring-projects/spring-batch/tree/main/spring-batch-samples#remote-chunking-sample。

Remote Partitioning

下图显示了典型的远程分区情况:

remote partitioning
Figure 4. Remote Partitioning

另一方面,当瓶颈不是项目的处理而是关联的 I/O 时,远程分区非常有用。使用远程分区,您可以将工作发送到执行完整 Spring Batch 步骤的工作程序。因此,每个工作程序都有其自己的 ItemReaderItemProcessorItemWriter。出于此目的,Spring Batch 集成提供了 MessageChannelPartitionHandler

PartitionHandler 接口的此实现使用 MessageChannel 实例向远程工作程序发送指令并接收其响应。这提供了一种对传输(如 JMS 和 AMQP)的出色抽象,这些传输用于与远程工作程序进行通信。

"`Scalability`"章节中用于解决remote partitioning的部分概述了配置远程分区所需的理念和组件,并展示了使用默认`TaskExecutorPartitionHandler`在单独的本地执行线程中进行分区的示例。对于多个 JVM 的远程分区,需要两个其他组件:

  • 一个远程制造或网格环境

  • 一个支持所需远程制造或网格环境的 PartitionHandler 实现

类似于远程分块,您可以将 JMS 用作“远程结构”。在这种情况下,请使用 MessageChannelPartitionHandler 实例作为 PartitionHandler 实现,如前所述。

Java

以下示例假定存在分区分隔的作业,重点是 Java 中的 MessageChannelPartitionHandler 和 JMS 配置:

Java Configuration
/*
 * Configuration of the manager side
 */
@Bean
public PartitionHandler partitionHandler() {
    MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
    partitionHandler.setStepName("step1");
    partitionHandler.setGridSize(3);
    partitionHandler.setReplyChannel(outboundReplies());
    MessagingTemplate template = new MessagingTemplate();
    template.setDefaultChannel(outboundRequests());
    template.setReceiveTimeout(100000);
    partitionHandler.setMessagingOperations(template);
    return partitionHandler;
}

@Bean
public QueueChannel outboundReplies() {
    return new QueueChannel();
}

@Bean
public DirectChannel outboundRequests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsRequests() {
    return IntegrationFlow.from("outboundRequests")
            .handle(Jms.outboundGateway(connectionFactory())
                    .requestDestination("requestsQueue"))
            .get();
}

@Bean
@ServiceActivator(inputChannel = "inboundStaging")
public AggregatorFactoryBean partitioningMessageHandler() throws Exception {
    AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
    aggregatorFactoryBean.setProcessorBean(partitionHandler());
    aggregatorFactoryBean.setOutputChannel(outboundReplies());
    // configure other propeties of the aggregatorFactoryBean
    return aggregatorFactoryBean;
}

@Bean
public DirectChannel inboundStaging() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow inboundJmsStaging() {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(connectionFactory())
                    .configureListenerContainer(c -> c.subscriptionDurable(false))
                    .destination("stagingQueue"))
            .channel(inboundStaging())
            .get();
}

/*
 * Configuration of the worker side
 */
@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler() {
    StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
    stepExecutionRequestHandler.setJobExplorer(jobExplorer);
    stepExecutionRequestHandler.setStepLocator(stepLocator());
    return stepExecutionRequestHandler;
}

@Bean
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler serviceActivator() throws Exception {
    return stepExecutionRequestHandler();
}

@Bean
public DirectChannel inboundRequests() {
    return new DirectChannel();
}

public IntegrationFlow inboundJmsRequests() {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(connectionFactory())
                    .configureListenerContainer(c -> c.subscriptionDurable(false))
                    .destination("requestsQueue"))
            .channel(inboundRequests())
            .get();
}

@Bean
public DirectChannel outboundStaging() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsStaging() {
    return IntegrationFlow.from("outboundStaging")
            .handle(Jms.outboundGateway(connectionFactory())
                    .requestDestination("stagingQueue"))
            .get();
}
XML

以下示例假定存在分区分隔的作业,重点是 XML 中的 MessageChannelPartitionHandler 和 JMS 配置:

XML Configuration
<bean id="partitionHandler"
   class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
  <property name="stepName" value="step1"/>
  <property name="gridSize" value="3"/>
  <property name="replyChannel" ref="outbound-replies"/>
  <property name="messagingOperations">
    <bean class="org.springframework.integration.core.MessagingTemplate">
      <property name="defaultChannel" ref="outbound-requests"/>
      <property name="receiveTimeout" value="100000"/>
    </bean>
  </property>
</bean>

<int:channel id="outbound-requests"/>
<int-jms:outbound-channel-adapter destination="requestsQueue"
    channel="outbound-requests"/>

<int:channel id="inbound-requests"/>
<int-jms:message-driven-channel-adapter destination="requestsQueue"
    channel="inbound-requests"/>

<bean id="stepExecutionRequestHandler"
    class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
  <property name="jobExplorer" ref="jobExplorer"/>
  <property name="stepLocator" ref="stepLocator"/>
</bean>

<int:service-activator ref="stepExecutionRequestHandler" input-channel="inbound-requests"
    output-channel="outbound-staging"/>

<int:channel id="outbound-staging"/>
<int-jms:outbound-channel-adapter destination="stagingQueue"
    channel="outbound-staging"/>

<int:channel id="inbound-staging"/>
<int-jms:message-driven-channel-adapter destination="stagingQueue"
    channel="inbound-staging"/>

<int:aggregator ref="partitionHandler" input-channel="inbound-staging"
    output-channel="outbound-replies"/>

<int:channel id="outbound-replies">
  <int:queue/>
</int:channel>

<bean id="stepLocator"
    class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />

您还必须确保分区 handler 属性映射到 partitionHandler Bean。

Java

以下示例将分区 handler 属性映射到中的 partitionHandler

Java
Java Configuration
	public Job personJob(JobRepository jobRepository) {
		return new JobBuilder("personJob", jobRepository)
				.start(new StepBuilder("step1.manager", jobRepository)
						.partitioner("step1.worker", partitioner())
						.partitionHandler(partitionHandler())
						.build())
				.build();
	}
XML

以下示例将分区 handler 属性映射到 XML 中的 partitionHandler

XML Configuration
<job id="personJob">
  <step id="step1.manager">
    <partition partitioner="partitioner" handler="partitionHandler"/>
    ...
  </step>
</job>

你可以找到远程分区作业 here的完整示例。

你可以使用 @EnableBatchIntegration 注解来简化远程分区设定。该注解提供了两个对远程分区有用的 Bean:

  • RemotePartitioningManagerStepBuilderFactory:配置经理步骤

  • RemotePartitioningWorkerStepBuilderFactory:配置工作人员步骤

这些 API 负责配置多种组件,如下图所示:

remote partitioning polling config
Figure 5. Remote Partitioning Configuration (with job repository polling)
remote partitioning aggregation config
Figure 6. Remote Partitioning Configuration (with replies aggregation)

在管理器端,RemotePartitioningManagerStepBuilderFactory 可以通过以下申明来配置管理器步骤:

  • 用来分区数据的 Partitioner

  • 向工作人员发送请求的输出通道(“Outgoing requests”)

  • 接收工作人员回复的输入通道(“Incoming replies”)(配置回复聚合时)

  • 投票间隔和超时参数(配置作业库投票时)

你无需显式配置 MessageChannelPartitionHandlerMessagingTemplate(如果你找到理由,仍然可以显式地配置)。

在工作端,RemotePartitioningWorkerStepBuilderFactory 可以配置工作来:

  • 在输入通道上收听经理发送的请求(“Incoming requests”)

  • 对于每个请求,调用 StepExecutionRequestHandlerhandle 方法

  • 在输出通道(“Outgoing replies”)上向经理发送回复

你无需显式配置 StepExecutionRequestHandler(如果你找到理由,可以显式地配置)。

以下示例演示了如何使用这些 API:

@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningJobConfiguration {

    @Configuration
    public static class ManagerConfiguration {

        @Autowired
        private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;

        @Bean
        public Step managerStep() {
                 return this.managerStepBuilderFactory
                    .get("managerStep")
                    .partitioner("workerStep", partitioner())
                    .gridSize(10)
                    .outputChannel(outgoingRequestsToWorkers())
                    .inputChannel(incomingRepliesFromWorkers())
                    .build();
        }

        // Middleware beans setup omitted

    }

    @Configuration
    public static class WorkerConfiguration {

        @Autowired
        private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;

        @Bean
        public Step workerStep() {
                 return this.workerStepBuilderFactory
                    .get("workerStep")
                    .inputChannel(incomingRequestsFromManager())
                    .outputChannel(outgoingRepliesToManager())
                    .chunk(100)
                    .reader(itemReader())
                    .processor(itemProcessor())
                    .writer(itemWriter())
                    .build();
        }

        // Middleware beans setup omitted

    }

}