Spring Cloud Stream Integration

任务本身可能会很有用,但将任务集成到更大的生态系统中可让它适用于更复杂的处理和编排。本部分介绍 Spring Cloud Task 和 Spring Cloud Stream 的集成选项。

A task by itself can be useful, but integration of a task into a larger ecosystem lets it be useful for more complex processing and orchestration. This section covers the integration options for Spring Cloud Task with Spring Cloud Stream.

Launching a Task from a Spring Cloud Stream

你可以从流启动任务。为此,创建一个监听包含 TaskLaunchRequest 作为负载的消息的 sink。TaskLaunchRequest 包含:

You can launch tasks from a stream. To do so, create a sink that listens for a message that contains a TaskLaunchRequest as its payload. The TaskLaunchRequest contains:

  • uri: To the task artifact that is to be executed.

  • applicationName: The name that is associated with the task. If no applicationName is set, the TaskLaunchRequest generates a task name comprised of the following: Task-<UUID>.

  • commandLineArguments: A list containing the command line arguments for the task.

  • environmentProperties: A map containing the environment variables to be used by the task.

  • deploymentProperties: A map containing the properties that are used by the deployer to deploy the task.

如果有效负载属于不同类型,则接收器抛出异常。

If the payload is of a different type, the sink throws an exception.

例如,可以创建一个流,其具有一个从 HTTP 源获取数据并创建包含 TaskLaunchRequestGenericMessage 以及将该消息发送到其输出通道的处理器。然后,任务 sink 将从其输入通道接收消息,然后启动任务。

For example, a stream can be created that has a processor that takes in data from an HTTP source and creates a GenericMessage that contains the TaskLaunchRequest and sends the message to its output channel. The task sink would then receive the message from its input channel and then launch the task.

若要创建 taskSink,只需要创建一个包含 EnableTaskLauncher 注释的 Spring Boot 应用程序,如下例所示:

To create a taskSink, you need only create a Spring Boot application that includes the EnableTaskLauncher annotation, as shown in the following example:

@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
	public static void main(String[] args) {
		SpringApplication.run(TaskSinkApplication.class, args);
	}
}

Spring Cloud Task 项目的 samples module 部分包含一个示例接收器和处理器。要将这些示例安装到您的本地 maven 数据中心,请使用已将 skipInstall 属性设置为 falsespring-cloud-task-samples 目录运行 maven 构建,例如以下示例所示:

The samples module of the Spring Cloud Task project contains a sample Sink and Processor. To install these samples into your local maven repository, run a maven build from the spring-cloud-task-samples directory with the skipInstall property set to false, as shown in the following example:

mvn clean install

maven.remoteRepositories.springRepo.url 属性必须设置为从中获取 Spring Boot Uber-jar 的远程存储库的位置。如果没有设置,则没有远程存储库,因此它仅依赖于本地存储库。

The maven.remoteRepositories.springRepo.url property must be set to the location of the remote repository in which the Spring Boot Uber-jar is located. If not set, there is no remote repository, so it relies upon the local repository only.

Spring Cloud Data Flow

若要在 Spring Cloud Data Flow 中创建流,必须先注册我们创建的 Task SinkApplication。在以下示例中,我们使用 Spring Cloud Data Flow shell 注册 Processor 和 Sink 示例应用程序:

To create a stream in Spring Cloud Data Flow, you must first register the Task Sink Application we created. In the following example, we are registering the Processor and Sink sample applications by using the Spring Cloud Data Flow shell:

app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>

以下示例显示了如何从 Spring Cloud Data Flow shell 创建流:

The following example shows how to create a stream from the Spring Cloud Data Flow shell:

stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy

Spring Cloud Task Events

Spring Cloud Task 提供通过 Spring Cloud Stream 通道运行任务时在 Spring Cloud Stream 通道中发布事件的能力。任务监听器用于在名为 task-events 的消息通道中发布 TaskExecution。对于在类路径上具有 spring-cloud-streamspring-cloud-stream-<binder> 和已定义任务的任何任务,都会自动将该特性注入其中。

Spring Cloud Task provides the ability to emit events through a Spring Cloud Stream channel when the task is run through a Spring Cloud Stream channel. A task listener is used to publish the TaskExecution on a message channel named task-events. This feature is autowired into any task that has spring-cloud-stream, spring-cloud-stream-<binder>, and a defined task on its classpath.

要禁用事件发出侦听器,请将 spring.cloud.task.events.enabled 属性设置为 false

To disable the event emitting listener, set the spring.cloud.task.events.enabled property to false.

在定义了适当的类路径后,以下任务在 task-events 通道上将 TaskExecution 发送为事件(在任务的开始和结束时):

With the appropriate classpath defined, the following task emits the TaskExecution as an event on the task-events channel (at both the start and the end of the task):

@SpringBootApplication
public class TaskEventsApplication {

	public static void main(String[] args) {
		SpringApplication.run(TaskEventsApplication.class, args);
	}

	@Configuration
	public static class TaskConfiguration {

		@Bean
		public ApplicationRunner applicationRunner() {
			return new ApplicationRunner() {
				@Override
				public void run(ApplicationArguments args) {
					System.out.println("The ApplicationRunner was executed");
				}
			};
		}
	}
}

还要求绑定实现位于类路径上。

A binder implementation is also required to be on the classpath.

可在 Spring Cloud Task 项目 here 的示例模块中找到一个示例任务事件应用程序。

A sample task event application can be found in the samples module of the Spring Cloud Task Project, here.

Disabling Specific Task Events

若要禁用任务事件,可以将 spring.cloud.task.events.enabled 属性设置为 false

To disable task events, you can set the spring.cloud.task.events.enabled property to false.

Spring Batch Events

在通过任务执行 Spring Batch 作业时,Spring Cloud Task 可以配置为根据 Spring Batch 中可用的 Spring Batch 监听器发送告知性消息。具体而言,以下 Spring Batch 监听器自动配置到每个批作业中,并在通过 Spring Cloud Task 运行时在关联的 Spring Cloud Stream 通道上发送消息:

When executing a Spring Batch job through a task, Spring Cloud Task can be configured to emit informational messages based on the Spring Batch listeners available in Spring Batch. Specifically, the following Spring Batch listeners are autoconfigured into each batch job and emit messages on the associated Spring Cloud Stream channels when run through Spring Cloud Task:

  • JobExecutionListener listens for job-execution-events

  • StepExecutionListener listens for step-execution-events

  • ChunkListener listens for chunk-events

  • ItemReadListener listens for item-read-events

  • ItemProcessListener listens for item-process-events

  • ItemWriteListener listens for item-write-events

  • SkipListener listens for skip-events

当上下文中存在适当的 bean(JobTaskLifecycleListener)时,会将这些监听器自动配置到任何 AbstractJob 中。用于监听这些事件的配置的处理方式和绑定到任何其他 SpringCloud Stream 通道的处理方式相同。我们的任务(运行批作业的任务)充当 Source,侦听应用程序充当 ProcessorSink

These listeners are autoconfigured into any AbstractJob when the appropriate beans (a Job and a TaskLifecycleListener) exist in the context. Configuration to listen to these events is handled the same way binding to any other Spring Cloud Stream channel is done. Our task (the one running the batch job) serves as a Source, with the listening applications serving as either a Processor or a Sink.

一个示例可能是让一个应用程序侦听一个作业开始和停止的 job-execution-events 通道。若要配置侦听应用程序,请将输入配置为 job-execution-events,如下所示:

An example could be to have an application listening to the job-execution-events channel for the start and stop of a job. To configure the listening application, you would configure the input to be job-execution-events as follows:

spring.cloud.stream.bindings.input.destination=作业执行事件

spring.cloud.stream.bindings.input.destination=job-execution-events

还要求绑定实现位于类路径上。

A binder implementation is also required to be on the classpath.

可在 Spring Cloud Task 项目 here 的示例模块中找到一个示例批处理事件应用程序。

A sample batch event application can be found in the samples module of the Spring Cloud Task Project, here.

Sending Batch Events to Different Channels

Spring Cloud 任务为批处理事件提供的一种选项是更改通道,特定侦听器可将其消息发布到该通道。要执行此操作,请使用以下配置:spring.cloud.stream.bindings.<通道>.destination=<新目的地>。例如,如果 StepExecutionListener 需要将其消息发布到名为 my-step-execution-events 的另一个通道,而不是默认的 step-execution-events,则可以添加以下配置:

One of the options that Spring Cloud Task offers for batch events is the ability to alter the channel to which a specific listener can emit its messages. To do so, use the following configuration: spring.cloud.stream.bindings.<the channel>.destination=<new destination>. For example, if StepExecutionListener needs to emit its messages to another channel called my-step-execution-events instead of the default step-execution-events, you can add the following configuration:

spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events

Disabling Batch Events

要禁用所有批处理事件的侦听器功能,请使用以下配置:

To disable the listener functionality for all batch events, use the following configuration:

spring.cloud.task.batch.events.enabled=false

要禁用特定批处理事件,请使用以下配置:

To disable a specific batch event, use the following configuration:

spring.cloud.task.batch.events.<批处理事件侦听器>.enabled=false

spring.cloud.task.batch.events.<batch event listener>.enabled=false:

以下列表显示了可以禁用的各个侦听器:

The following listing shows individual listeners that you can disable:

spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false

Emit Order for Batch Events

默认情况下,批处理事件具有 Ordered.LOWEST_PRECEDENCE。要更改此值(例如,为 5),请使用以下配置:

By default, batch events have Ordered.LOWEST_PRECEDENCE. To change this value (for example, to 5 ), use the following configuration:

spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5