Sub-elements

  • 异步处理器:将项目处理外包到新线程,提高性能。

  • 远程区块:将块分发到外部系统以进行处理,从而卸载复杂的处理。

  • 远程分区:将整个步骤发送到远程工作程序执行,以处理 I/O 瓶颈。

  • 事件驱动侦听器:允许基于特定事件触发动作。

通过将 Spring Batch 与 Spring Integration 集成,您可以充分利用事件处理、异步处理和分布式处理的优势,从而创建强大且可扩展的批处理解决方案。

当此 Gateway 从`PollableChannel` 接收消息时,您必须指定一个全局默认 Poller 或为`Job Launching Gateway` 提供一个 Poller 子元素。

When this Gateway is receiving messages from a PollableChannel, you must either provide a global default Poller or provide a Poller sub-element to the Job Launching Gateway.

Java

以下示例展示了如何用 Java 提供 poller:

The following example shows how to provide a poller in Java:

Java Configuration
@Bean
@ServiceActivator(inputChannel = "queueChannel", poller = @Poller(fixedRate="1000"))
public JobLaunchingGateway sampleJobLaunchingGateway() {
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
    jobLaunchingGateway.setOutputChannel(replyChannel());
    return jobLaunchingGateway;
}
XML

以下示例展示了如何用 XML 提供 poller:

The following example shows how to provide a poller in XML:

XML Configuration
<batch-int:job-launching-gateway request-channel="queueChannel"
    reply-channel="replyChannel" job-launcher="jobLauncher">
  <int:poller fixed-rate="1000">
</batch-int:job-launching-gateway>

Providing Feedback with Informational Messages

Spring Batch 作业可以长时间运行,通常提供进度信息至关重要。例如,如果某个批处理作业的部分或全部失败,利益相关者可能希望收到通知。Spring Batch 通过以下方式提供收集此信息的机制:

As Spring Batch jobs can run for long times, providing progress information is often critical. For example, stakeholders may want to be notified if some or all parts of a batch job have failed. Spring Batch provides support for this information being gathered through:

  • Active polling

  • Event-driven listeners

异步启动 Spring Batch 作业时(例如,使用 Job LaunchingGateway),将返回 JobExecution 实例。因此,您可以使用 JobExecution.getJobId() 通过从 JobRepository 检索 JobExecution 的更新实例来持续轮询状态更新。但是,这被认为不是最佳方案,建议采用事件驱动方法。

When starting a Spring Batch job asynchronously (for example, by using the Job Launching Gateway), a JobExecution instance is returned. Thus, you can use JobExecution.getJobId() to continuously poll for status updates by retrieving updated instances of the JobExecution from the JobRepository by using the JobExplorer. However, this is considered sub-optimal, and an event-driven approach is preferred.

因此,Spring Batch 提供侦听器,包括三个最常用的侦听器:

Therefore, Spring Batch provides listeners, including the three most commonly used listeners:

  • StepListener

  • ChunkListener

  • JobExecutionListener

在以下图像中所示的示例中,Spring Batch 作业已配置了一个 StepExecutionListener。因此,Spring Integration 会接收和处理任何步骤,事件发生前或事件发生后。例如,您可以使用 Router 检查接收到的 StepExecution。基于该检查的结果,可能会发生各种情况(例如,将消息路由到邮件出站通道适配器),以便可以根据某些条件发送电子邮件通知。

In the example shown in the following image, a Spring Batch job has been configured with a StepExecutionListener. Thus, Spring Integration receives and processes any step before or after events. For example, you can inspect the received StepExecution by using a Router. Based on the results of that inspection, various things can occur (such as routing a message to a mail outbound channel adapter), so that an email notification can be sent out based on some condition.

handling informational messages
Figure 1. Handling Informational Messages

以下两部分示例展示了如何为 StepExecution 事件配置侦听器,以便向 Gateway 发送消息并将其输出日志记录到 logging-channel-adapter

The following two-part example shows how a listener is configured to send a message to a Gateway for a StepExecution events and log its output to a logging-channel-adapter.

首先,创建通知集成 bean。

First, create the notification integration beans.

Java

以下示例展示了如何在 Java 中创建通知集成 bean:

The following example shows the how to create the notification integration beans in Java:

Java Configuration
@Bean
@ServiceActivator(inputChannel = "stepExecutionsChannel")
public LoggingHandler loggingHandler() {
    LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.WARN);
    adapter.setLoggerName("TEST_LOGGER");
    adapter.setLogExpressionString("headers.id + ': ' + payload");
    return adapter;
}

@MessagingGateway(name = "notificationExecutionsListener", defaultRequestChannel = "stepExecutionsChannel")
public interface NotificationExecutionListener extends StepExecutionListener {}

您需要向配置中添加 @IntegrationComponentScan 注释。

You need to add the @IntegrationComponentScan annotation to your configuration.

XML

以下示例展示了如何在 XML 中创建通知集成 bean:

The following example shows the how to create the notification integration beans in XML:

XML Configuration
<int:channel id="stepExecutionsChannel"/>

<int:gateway id="notificationExecutionsListener"
    service-interface="org.springframework.batch.core.StepExecutionListener"
    default-request-channel="stepExecutionsChannel"/>

<int:logging-channel-adapter channel="stepExecutionsChannel"/>

第二,修改作业以添加步骤级侦听器。

Second, modify your job to add a step-level listener.

Java

以下示例展示了如何在 Java 中添加步骤级侦听器:

The following example shows the how to add a step-level listener in Java:

Java Configuration
public Job importPaymentsJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new JobBuilder("importPayments", jobRepository)
        .start(new StepBuilder("step1", jobRepository)
                .chunk(200, transactionManager)
                .listener(notificationExecutionsListener())
                // ...
                .build();
              )
        .build();
}
XML

以下示例展示了如何在 XML 中添加步骤级侦听器:

The following example shows the how to add a step-level listener in XML:

XML Configuration
<job id="importPayments">
    <step id="step1">
        <tasklet ../>
            <chunk ../>
            <listeners>
                <listener ref="notificationExecutionsListener"/>
            </listeners>
        </tasklet>
        ...
    </step>
</job>

Asynchronous Processors

异步处理器帮助您扩展项目的处理。在异步处理器用例中,AsyncItemProcessor 用作调度器,在一个新线程上为项目执行 ItemProcessor 的逻辑。项目完成时,将 Future 传递给 AsynchItemWriter 以进行写入。

Asynchronous Processors help you scale the processing of items. In the asynchronous processor use case, an AsyncItemProcessor serves as a dispatcher, executing the logic of the ItemProcessor for an item on a new thread. Once the item completes, the Future is passed to the AsynchItemWriter to be written.

因此,你可以通过使用异步项目处理来提高性能,基本上允许你实现 fork-join 场景。 AsyncItemWriter 收集结果,并在所有结果均可用之后立即回写区块。

Therefore, you can increase performance by using asynchronous item processing, basically letting you implement fork-join scenarios. The AsyncItemWriter gathers the results and writes back the chunk as soon as all the results become available.

Java

以下示例展示了如何在 Java 中配置 AsyncItemProcessor

The following example shows how to configuration the AsyncItemProcessor in Java:

Java Configuration
@Bean
public AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) {
    AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();
    asyncItemProcessor.setTaskExecutor(taskExecutor);
    asyncItemProcessor.setDelegate(itemProcessor);
    return asyncItemProcessor;
}
XML

以下示例展示了如何在 XML 中配置 AsyncItemProcessor

The following example shows how to configuration the AsyncItemProcessor in XML:

XML Configuration
<bean id="processor"
    class="org.springframework.batch.integration.async.AsyncItemProcessor">
  <property name="delegate">
    <bean class="your.ItemProcessor"/>
  </property>
  <property name="taskExecutor">
    <bean class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
  </property>
</bean>

delegate 属性指代你的 ItemProcessor bean,taskExecutor 属性指代你选择的 TaskExecutor

The delegate property refers to your ItemProcessor bean, and the taskExecutor property refers to the TaskExecutor of your choice.

Java

以下示例展示了如何在 Java 中配置 AsyncItemWriter

The following example shows how to configure the AsyncItemWriter in Java:

Java Configuration
@Bean
public AsyncItemWriter writer(ItemWriter itemWriter) {
    AsyncItemWriter asyncItemWriter = new AsyncItemWriter();
    asyncItemWriter.setDelegate(itemWriter);
    return asyncItemWriter;
}
XML

以下示例展示了如何在 XML 中配置 AsyncItemWriter

The following example shows how to configure the AsyncItemWriter in XML:

XML Configuration
<bean id="itemWriter"
    class="org.springframework.batch.integration.async.AsyncItemWriter">
  <property name="delegate">
    <bean id="itemWriter" class="your.ItemWriter"/>
  </property>
</bean>

同样,delegate 属性实际上指的是你的 ItemWriter bean。

Again, the delegate property is actually a reference to your ItemWriter bean.

Externalizing Batch Process Execution

到目前为止,讨论的集成方法表明了一些用例,其中 Spring Integration 像外壳一样包装着 Spring Batch。但是,Spring Batch 也可以在内部使用 Spring Integration。通过使用这种方法,Spring Batch 用户可以将项目甚至区块的处理委派给外部流程。这允许你卸载复杂的处理。Spring Batch Integration 为以下内容提供专门支持:

The integration approaches discussed so far suggest use cases where Spring Integration wraps Spring Batch like an outer shell. However, Spring Batch can also use Spring Integration internally. By using this approach, Spring Batch users can delegate the processing of items or even chunks to outside processes. This lets you offload complex processing. Spring Batch Integration provides dedicated support for:

  • Remote Chunking

  • Remote Partitioning

Remote Chunking

下图展示了当你将 Spring Batch 与 Spring Integration 搭配使用时,远程区块处理的一种方式:

The following image shows one way that remote chunking works when you use Spring Batch together with Spring Integration:

remote chunking sbi
Figure 2. Remote Chunking

更进一步,你还可以使用 ChunkMessageChannelItemWriter(由 Spring Batch Integration 提供)外化区块处理,该 ChunkMessageChannelItemWriter 将项目发送出去并收集结果。一旦发送,Spring Batch 就会继续读取和分组项目的流程,而无需等待结果。相反,由 ChunkMessageChannelItemWriter 负责收集结果,并将其集成回 Spring Batch 流程中。

Taking things one step further, you can also externalize the chunk processing by using the ChunkMessageChannelItemWriter (provided by Spring Batch Integration), which sends items out and collects the result. Once sent, Spring Batch continues the process of reading and grouping items, without waiting for the results. Rather, it is the responsibility of the ChunkMessageChannelItemWriter to gather the results and integrate them back into the Spring Batch process.

借助 Spring Integration,你可以完全控制你的流程的并发性(例如,通过使用 QueueChannel 而不是 DirectChannel)。此外,通过依赖 Spring Integration 丰富的通道适配器集合(例如,JMS 和 AMQP),你可以将批处理作业的区块分发到外部系统以进行处理。

With Spring Integration, you have full control over the concurrency of your processes (for instance, by using a QueueChannel instead of a DirectChannel). Furthermore, by relying on Spring Integration’s rich collection of channel adapters (such as JMS and AMQP), you can distribute chunks of a batch job to external systems for processing.

Java

一个包含一个要远程切分的步骤的作业可能在 Java 中具有类似于以下内容的配置:

A job with a step to be remotely chunked might have a configuration similar to the following in Java:

Java Configuration
public Job chunkJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
     return new JobBuilder("personJob", jobRepository)
             .start(new StepBuilder("step1", jobRepository)
                     .<Person, Person>chunk(200, transactionManager)
                     .reader(itemReader())
                     .writer(itemWriter())
                     .build())
             .build();
 }
XML

一个包含一个要远程切分的步骤的作业可能在 XML 中具有类似于以下内容的配置:

A job with a step to be remotely chunked might have a configuration similar to the following in XML:

XML Configuration
<job id="personJob">
  <step id="step1">
    <tasklet>
      <chunk reader="itemReader" writer="itemWriter" commit-interval="200"/>
    </tasklet>
    ...
  </step>
</job>

ItemReader 引用指向要用于在管理器上读取数据的 bean。ItemWriter 引用指向特殊的 ItemWriter(称为 ChunkMessageChannelItemWriter),如前所述。处理器(如果存在)则不属于管理器配置,因为它是配置在 worker 上的。在实现用例时,您应检查任何其他组件属性,例如节流限制等。

The ItemReader reference points to the bean you want to use for reading data on the manager. The ItemWriter reference points to a special ItemWriter (called ChunkMessageChannelItemWriter), as described earlier. The processor (if any) is left off the manager configuration, as it is configured on the worker. You should check any additional component properties, such as throttle limits and so on, when implementing your use case.

Java

下列 Java 配置提供基本管理器设置:

The following Java configuration provides a basic manager setup:

Java Configuration
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://localhost:61616");
    return factory;
}

/*
 * Configure outbound flow (requests going to workers)
 */
@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlow
            .from(requests())
            .handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
            .get();
}

/*
 * Configure inbound flow (replies coming from workers)
 */
@Bean
public QueueChannel replies() {
    return new QueueChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
            .channel(replies())
            .get();
}

/*
 * Configure the ChunkMessageChannelItemWriter
 */
@Bean
public ItemWriter<Integer> itemWriter() {
    MessagingTemplate messagingTemplate = new MessagingTemplate();
    messagingTemplate.setDefaultChannel(requests());
    messagingTemplate.setReceiveTimeout(2000);
    ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter
            = new ChunkMessageChannelItemWriter<>();
    chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
    chunkMessageChannelItemWriter.setReplyChannel(replies());
    return chunkMessageChannelItemWriter;
}
XML

下列 XML 配置提供基本管理器设置:

The following XML configuration provides a basic manager setup:

XML Configuration
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<int-jms:outbound-channel-adapter id="jmsRequests" destination-name="requests"/>

<bean id="messagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate">
  <property name="defaultChannel" ref="requests"/>
  <property name="receiveTimeout" value="2000"/>
</bean>

<bean id="itemWriter"
    class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
    scope="step">
  <property name="messagingOperations" ref="messagingTemplate"/>
  <property name="replyChannel" ref="replies"/>
</bean>

<int:channel id="replies">
  <int:queue/>
</int:channel>

<int-jms:message-driven-channel-adapter id="jmsReplies"
    destination-name="replies"
    channel="replies"/>

上述配置为我们提供了许多 bean。我们使用 ActiveMQ 和 Spring Integration 提供的入站和出站 JMS 适配器配置我们的消息中间件。如所示,我们的 itemWriter bean(我们的作业步骤引用它)使用 ChunkMessageChannelItemWriter 通过配置的中间件写入块。

The preceding configuration provides us with a number of beans. We configure our messaging middleware by using ActiveMQ and the inbound and outbound JMS adapters provided by Spring Integration. As shown, our itemWriter bean, which is referenced by our job step, uses the ChunkMessageChannelItemWriter to write chunks over the configured middleware.

现在我们可以继续进行 worker 配置,如下例所示:

Now we can move on to the worker configuration, as the following example shows:

Java

以下示例显示 Java 中的 worker 配置:

The following example shows the worker configuration in Java:

Java Configuration
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://localhost:61616");
    return factory;
}

/*
 * Configure inbound flow (requests coming from the manager)
 */
@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
            .channel(requests())
            .get();
}

/*
 * Configure outbound flow (replies going to the manager)
 */
@Bean
public DirectChannel replies() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlow
            .from(replies())
            .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
            .get();
}

/*
 * Configure the ChunkProcessorChunkHandler
 */
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
    ChunkProcessor<Integer> chunkProcessor
            = new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
    ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler
            = new ChunkProcessorChunkHandler<>();
    chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
    return chunkProcessorChunkHandler;
}
XML

以下示例显示 XML 中的 worker 配置:

The following example shows the worker configuration in XML:

XML Configuration
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<int:channel id="requests"/>
<int:channel id="replies"/>

<int-jms:message-driven-channel-adapter id="incomingRequests"
    destination-name="requests"
    channel="requests"/>

<int-jms:outbound-channel-adapter id="outgoingReplies"
    destination-name="replies"
    channel="replies">
</int-jms:outbound-channel-adapter>

<int:service-activator id="serviceActivator"
    input-channel="requests"
    output-channel="replies"
    ref="chunkProcessorChunkHandler"
    method="handleChunk"/>

<bean id="chunkProcessorChunkHandler"
    class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler">
  <property name="chunkProcessor">
    <bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
      <property name="itemWriter">
        <bean class="io.spring.sbi.PersonItemWriter"/>
      </property>
      <property name="itemProcessor">
        <bean class="io.spring.sbi.PersonItemProcessor"/>
      </property>
    </bean>
  </property>
</bean>

这些配置项大部分应从管理器配置中看起来很熟悉。工作程序不需要访问 Spring Batch JobRepository 或实际作业配置文件。感兴趣的主要 Bean 是 chunkProcessorChunkHandlerChunkProcessorChunkHandlerchunkProcessor 属性采用一个已配置的 SimpleChunkProcessor,您可以通过该配置提供对 ItemWriter(以及可以选择地提供 ItemProcessor)的引用,该 ItemWriter(和 ItemProcessor)将在工作程序从管理器接收到块时在工作程序上运行。

Most of these configuration items should look familiar from the manager configuration. Workers do not need access to the Spring Batch JobRepository nor to the actual job configuration file. The main bean of interest is the chunkProcessorChunkHandler. The chunkProcessor property of ChunkProcessorChunkHandler takes a configured SimpleChunkProcessor, which is where you would provide a reference to your ItemWriter (and, optionally, your ItemProcessor) that will run on the worker when it receives chunks from the manager.

有关详细信息,请参阅“可扩展性”章节中的 远程分块 部分:https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#remoteChunking。

For more information, see the section of the “Scalability” chapter on Remote Chunking.

从 4.1 版开始,Spring Batch 集成引入了 @EnableBatchIntegration 注释,可用于简化远程分块设置。该注释提供两个 Bean,您可以在应用程序上下文中自动装配它们:

Starting from version 4.1, Spring Batch Integration introduces the @EnableBatchIntegration annotation that can be used to simplify a remote chunking setup. This annotation provides two beans that you can autowire in your application context:

  • RemoteChunkingManagerStepBuilderFactory: Configures the manager step

  • RemoteChunkingWorkerBuilder: Configures the remote worker integration flow

这些 API 会负责配置许多组件,如下图所示:

These APIs take care of configuring a number of components, as the following diagram shows:

remote chunking config
Figure 3. Remote Chunking Configuration

在管理器端,RemoteChunkingManagerStepBuilderFactory 允许您通过声明来配置管理器步骤:

On the manager side, the RemoteChunkingManagerStepBuilderFactory lets you configure a manager step by declaring:

  • The item reader to read items and send them to workers

  • The output channel ("Outgoing requests") to send requests to workers

  • The input channel ("Incoming replies") to receive replies from workers

您不需要显式配置 ChunkMessageChannelItemWriterMessagingTemplate。(您在找到这样做的原因时仍然可以显式配置它们。)

You need not explicitly configure ChunkMessageChannelItemWriter and the MessagingTemplate. (You can still explicitly configure them if find a reason to do so).

在工作程序端,RemoteChunkingWorkerBuilder 允许您将工作程序配置为:

On the worker side, the RemoteChunkingWorkerBuilder lets you configure a worker to:

  • Listen to requests sent by the manager on the input channel (“Incoming requests”)

  • Call the handleChunk method of ChunkProcessorChunkHandler for each request with the configured ItemProcessor and ItemWriter

  • Send replies on the output channel (“Outgoing replies”) to the manager

您不需要显式配置 SimpleChunkProcessorChunkProcessorChunkHandler。(您在找到这样做的原因时仍然可以显式配置它们。)

You need not explicitly configure the SimpleChunkProcessor and the ChunkProcessorChunkHandler. (You can still explicitly configure them if you find a reason to do so).

以下示例演示了如何使用这些 API:

The following example shows how to use these APIs:

@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {

    @Configuration
    public static class ManagerConfiguration {

        @Autowired
        private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;

        @Bean
        public TaskletStep managerStep() {
            return this.managerStepBuilderFactory.get("managerStep")
                       .chunk(100)
                       .reader(itemReader())
                       .outputChannel(requests()) // requests sent to workers
                       .inputChannel(replies())   // replies received from workers
                       .build();
        }

        // Middleware beans setup omitted

    }

    @Configuration
    public static class WorkerConfiguration {

        @Autowired
        private RemoteChunkingWorkerBuilder workerBuilder;

        @Bean
        public IntegrationFlow workerFlow() {
            return this.workerBuilder
                       .itemProcessor(itemProcessor())
                       .itemWriter(itemWriter())
                       .inputChannel(requests()) // requests received from the manager
                       .outputChannel(replies()) // replies sent to the manager
                       .build();
        }

        // Middleware beans setup omitted

    }

}

您可以在此找到远程分块作业的完整示例:https://github.com/spring-projects/spring-batch/tree/main/spring-batch-samples#remote-chunking-sample。

You can find a complete example of a remote chunking job here.

Remote Partitioning

下图显示了典型的远程分区情况:

The following image shows a typical remote partitioning situation:

remote partitioning
Figure 4. Remote Partitioning

另一方面,当瓶颈不是项目的处理而是关联的 I/O 时,远程分区非常有用。使用远程分区,您可以将工作发送到执行完整 Spring Batch 步骤的工作程序。因此,每个工作程序都有其自己的 ItemReaderItemProcessorItemWriter。出于此目的,Spring Batch 集成提供了 MessageChannelPartitionHandler

Remote Partitioning, on the other hand, is useful when it is not the processing of items but rather the associated I/O that causes the bottleneck. With remote partitioning, you can send work to workers that execute complete Spring Batch steps. Thus, each worker has its own ItemReader, ItemProcessor, and ItemWriter. For this purpose, Spring Batch Integration provides the MessageChannelPartitionHandler.

PartitionHandler 接口的此实现使用 MessageChannel 实例向远程工作程序发送指令并接收其响应。这提供了一种对传输(如 JMS 和 AMQP)的出色抽象,这些传输用于与远程工作程序进行通信。

This implementation of the PartitionHandler interface uses MessageChannel instances to send instructions to remote workers and receive their responses. This provides a nice abstraction from the transports (such as JMS and AMQP) being used to communicate with the remote workers.

"`Scalability`"章节中用于解决remote partitioning的部分概述了配置远程分区所需的理念和组件,并展示了使用默认`TaskExecutorPartitionHandler`在单独的本地执行线程中进行分区的示例。对于多个 JVM 的远程分区,需要两个其他组件:

The section of the “Scalability” chapter that addresses remote partitioning provides an overview of the concepts and components needed to configure remote partitioning and shows an example of using the default TaskExecutorPartitionHandler to partition in separate local threads of execution. For remote partitioning to multiple JVMs, two additional components are required:

  • A remoting fabric or grid environment

  • A PartitionHandler implementation that supports the desired remoting fabric or grid environment

类似于远程分块,您可以将 JMS 用作“远程结构”。在这种情况下,请使用 MessageChannelPartitionHandler 实例作为 PartitionHandler 实现,如前所述。

Similar to remote chunking, you can use JMS as the “remoting fabric”. In that case, use a MessageChannelPartitionHandler instance as the PartitionHandler implementation, as described earlier.

Java

以下示例假定存在分区分隔的作业,重点是 Java 中的 MessageChannelPartitionHandler 和 JMS 配置:

The following example assumes an existing partitioned job and focuses on the MessageChannelPartitionHandler and JMS configuration in Java:

Java Configuration
/*
 * Configuration of the manager side
 */
@Bean
public PartitionHandler partitionHandler() {
    MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
    partitionHandler.setStepName("step1");
    partitionHandler.setGridSize(3);
    partitionHandler.setReplyChannel(outboundReplies());
    MessagingTemplate template = new MessagingTemplate();
    template.setDefaultChannel(outboundRequests());
    template.setReceiveTimeout(100000);
    partitionHandler.setMessagingOperations(template);
    return partitionHandler;
}

@Bean
public QueueChannel outboundReplies() {
    return new QueueChannel();
}

@Bean
public DirectChannel outboundRequests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsRequests() {
    return IntegrationFlow.from("outboundRequests")
            .handle(Jms.outboundGateway(connectionFactory())
                    .requestDestination("requestsQueue"))
            .get();
}

@Bean
@ServiceActivator(inputChannel = "inboundStaging")
public AggregatorFactoryBean partitioningMessageHandler() throws Exception {
    AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
    aggregatorFactoryBean.setProcessorBean(partitionHandler());
    aggregatorFactoryBean.setOutputChannel(outboundReplies());
    // configure other propeties of the aggregatorFactoryBean
    return aggregatorFactoryBean;
}

@Bean
public DirectChannel inboundStaging() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow inboundJmsStaging() {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(connectionFactory())
                    .configureListenerContainer(c -> c.subscriptionDurable(false))
                    .destination("stagingQueue"))
            .channel(inboundStaging())
            .get();
}

/*
 * Configuration of the worker side
 */
@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler() {
    StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
    stepExecutionRequestHandler.setJobExplorer(jobExplorer);
    stepExecutionRequestHandler.setStepLocator(stepLocator());
    return stepExecutionRequestHandler;
}

@Bean
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler serviceActivator() throws Exception {
    return stepExecutionRequestHandler();
}

@Bean
public DirectChannel inboundRequests() {
    return new DirectChannel();
}

public IntegrationFlow inboundJmsRequests() {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(connectionFactory())
                    .configureListenerContainer(c -> c.subscriptionDurable(false))
                    .destination("requestsQueue"))
            .channel(inboundRequests())
            .get();
}

@Bean
public DirectChannel outboundStaging() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsStaging() {
    return IntegrationFlow.from("outboundStaging")
            .handle(Jms.outboundGateway(connectionFactory())
                    .requestDestination("stagingQueue"))
            .get();
}
XML

以下示例假定存在分区分隔的作业,重点是 XML 中的 MessageChannelPartitionHandler 和 JMS 配置:

The following example assumes an existing partitioned job and focuses on the MessageChannelPartitionHandler and JMS configuration in XML:

XML Configuration
<bean id="partitionHandler"
   class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
  <property name="stepName" value="step1"/>
  <property name="gridSize" value="3"/>
  <property name="replyChannel" ref="outbound-replies"/>
  <property name="messagingOperations">
    <bean class="org.springframework.integration.core.MessagingTemplate">
      <property name="defaultChannel" ref="outbound-requests"/>
      <property name="receiveTimeout" value="100000"/>
    </bean>
  </property>
</bean>

<int:channel id="outbound-requests"/>
<int-jms:outbound-channel-adapter destination="requestsQueue"
    channel="outbound-requests"/>

<int:channel id="inbound-requests"/>
<int-jms:message-driven-channel-adapter destination="requestsQueue"
    channel="inbound-requests"/>

<bean id="stepExecutionRequestHandler"
    class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
  <property name="jobExplorer" ref="jobExplorer"/>
  <property name="stepLocator" ref="stepLocator"/>
</bean>

<int:service-activator ref="stepExecutionRequestHandler" input-channel="inbound-requests"
    output-channel="outbound-staging"/>

<int:channel id="outbound-staging"/>
<int-jms:outbound-channel-adapter destination="stagingQueue"
    channel="outbound-staging"/>

<int:channel id="inbound-staging"/>
<int-jms:message-driven-channel-adapter destination="stagingQueue"
    channel="inbound-staging"/>

<int:aggregator ref="partitionHandler" input-channel="inbound-staging"
    output-channel="outbound-replies"/>

<int:channel id="outbound-replies">
  <int:queue/>
</int:channel>

<bean id="stepLocator"
    class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />

您还必须确保分区 handler 属性映射到 partitionHandler Bean。

You must also ensure that the partition handler attribute maps to the partitionHandler bean.

Java

以下示例将分区 handler 属性映射到中的 partitionHandler

The following example maps the partition handler attribute to the partitionHandler in

Java
Java Configuration
	public Job personJob(JobRepository jobRepository) {
		return new JobBuilder("personJob", jobRepository)
				.start(new StepBuilder("step1.manager", jobRepository)
						.partitioner("step1.worker", partitioner())
						.partitionHandler(partitionHandler())
						.build())
				.build();
	}
XML

以下示例将分区 handler 属性映射到 XML 中的 partitionHandler

The following example maps the partition handler attribute to the partitionHandler in XML:

XML Configuration
<job id="personJob">
  <step id="step1.manager">
    <partition partitioner="partitioner" handler="partitionHandler"/>
    ...
  </step>
</job>

你可以找到远程分区作业 here的完整示例。

You can find a complete example of a remote partitioning job here.

你可以使用 @EnableBatchIntegration 注解来简化远程分区设定。该注解提供了两个对远程分区有用的 Bean:

You can use the @EnableBatchIntegration annotation to simplify a remote partitioning setup. This annotation provides two beans that are useful for remote partitioning:

  • RemotePartitioningManagerStepBuilderFactory: Configures the manager step

  • RemotePartitioningWorkerStepBuilderFactory: Configures the worker step

这些 API 负责配置多种组件,如下图所示:

These APIs take care of configuring a number of components, as the following diagrams show:

remote partitioning polling config
Figure 5. Remote Partitioning Configuration (with job repository polling)
remote partitioning aggregation config
Figure 6. Remote Partitioning Configuration (with replies aggregation)

在管理器端,RemotePartitioningManagerStepBuilderFactory 可以通过以下申明来配置管理器步骤:

On the manager side, the RemotePartitioningManagerStepBuilderFactory lets you configure a manager step by declaring:

  • The Partitioner used to partition data

  • The output channel (“Outgoing requests”) on which to send requests to workers

  • The input channel (“Incoming replies”) on which to receive replies from workers (when configuring replies aggregation)

  • The poll interval and timeout parameters (when configuring job repository polling)

你无需显式配置 MessageChannelPartitionHandlerMessagingTemplate(如果你找到理由,仍然可以显式地配置)。

You need not explicitly configure The MessageChannelPartitionHandler and the MessagingTemplate. (You can still explicitly configured them if you find a reason to do so).

在工作端,RemotePartitioningWorkerStepBuilderFactory 可以配置工作来:

On the worker side, the RemotePartitioningWorkerStepBuilderFactory lets you configure a worker to:

  • Listen to requests sent by the manager on the input channel (“Incoming requests”)

  • Call the handle method of StepExecutionRequestHandler for each request

  • Send replies on the output channel (“Outgoing replies”) to the manager

你无需显式配置 StepExecutionRequestHandler(如果你找到理由,可以显式地配置)。

You need not explicitly configure the StepExecutionRequestHandler. (You can explicitly configure it if you find a reason to do so).

以下示例演示了如何使用这些 API:

The following example shows how to use these APIs:

@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningJobConfiguration {

    @Configuration
    public static class ManagerConfiguration {

        @Autowired
        private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;

        @Bean
        public Step managerStep() {
                 return this.managerStepBuilderFactory
                    .get("managerStep")
                    .partitioner("workerStep", partitioner())
                    .gridSize(10)
                    .outputChannel(outgoingRequestsToWorkers())
                    .inputChannel(incomingRepliesFromWorkers())
                    .build();
        }

        // Middleware beans setup omitted

    }

    @Configuration
    public static class WorkerConfiguration {

        @Autowired
        private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;

        @Bean
        public Step workerStep() {
                 return this.workerStepBuilderFactory
                    .get("workerStep")
                    .inputChannel(incomingRequestsFromManager())
                    .outputChannel(outgoingRepliesToManager())
                    .chunk(100)
                    .reader(itemReader())
                    .processor(itemProcessor())
                    .writer(itemWriter())
                    .build();
        }

        // Middleware beans setup omitted

    }

}