Spring Cloud Stream Integration

任务本身可能会很有用,但将任务集成到更大的生态系统中可让它适用于更复杂的处理和编排。本部分介绍 Spring Cloud Task 和 Spring Cloud Stream 的集成选项。

Launching a Task from a Spring Cloud Stream

你可以从流启动任务。为此,创建一个监听包含 TaskLaunchRequest 作为负载的消息的 sink。TaskLaunchRequest 包含:

  • uri:已执行的任务工件。

  • applicationName: 与任务关联的名称。如果没有设置应用程序名,则`TaskLaunchRequest`生成任务名称,其中包含以下内容:Task-<UUID>

  • commandLineArguments:包含任务的命令行参数的列表。

  • environmentProperties:包含任务使用的环境变量的地图。

  • deploymentProperties:包含部署程序用于部署任务的属性的地图。

如果有效负载属于不同类型,则接收器抛出异常。

例如,可以创建一个流,其具有一个从 HTTP 源获取数据并创建包含 TaskLaunchRequestGenericMessage 以及将该消息发送到其输出通道的处理器。然后,任务 sink 将从其输入通道接收消息,然后启动任务。

若要创建 taskSink,只需要创建一个包含 EnableTaskLauncher 注释的 Spring Boot 应用程序,如下例所示:

@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
	public static void main(String[] args) {
		SpringApplication.run(TaskSinkApplication.class, args);
	}
}

Spring Cloud Task 项目的 samples module 部分包含一个示例接收器和处理器。要将这些示例安装到您的本地 maven 数据中心,请使用已将 skipInstall 属性设置为 falsespring-cloud-task-samples 目录运行 maven 构建,例如以下示例所示:

mvn clean install

maven.remoteRepositories.springRepo.url 属性必须设置为从中获取 Spring Boot Uber-jar 的远程存储库的位置。如果没有设置,则没有远程存储库,因此它仅依赖于本地存储库。

Spring Cloud Data Flow

若要在 Spring Cloud Data Flow 中创建流,必须先注册我们创建的 Task SinkApplication。在以下示例中,我们使用 Spring Cloud Data Flow shell 注册 Processor 和 Sink 示例应用程序:

app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>

以下示例显示了如何从 Spring Cloud Data Flow shell 创建流:

stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy

Spring Cloud Task Events

Spring Cloud Task 提供通过 Spring Cloud Stream 通道运行任务时在 Spring Cloud Stream 通道中发布事件的能力。任务监听器用于在名为 task-events 的消息通道中发布 TaskExecution。对于在类路径上具有 spring-cloud-streamspring-cloud-stream-<binder> 和已定义任务的任何任务,都会自动将该特性注入其中。

要禁用事件发出侦听器,请将 spring.cloud.task.events.enabled 属性设置为 false

在定义了适当的类路径后,以下任务在 task-events 通道上将 TaskExecution 发送为事件(在任务的开始和结束时):

@SpringBootApplication
public class TaskEventsApplication {

	public static void main(String[] args) {
		SpringApplication.run(TaskEventsApplication.class, args);
	}

	@Configuration
	public static class TaskConfiguration {

		@Bean
		public ApplicationRunner applicationRunner() {
			return new ApplicationRunner() {
				@Override
				public void run(ApplicationArguments args) {
					System.out.println("The ApplicationRunner was executed");
				}
			};
		}
	}
}

还要求绑定实现位于类路径上。

可在 Spring Cloud Task 项目 here 的示例模块中找到一个示例任务事件应用程序。

Disabling Specific Task Events

若要禁用任务事件,可以将 spring.cloud.task.events.enabled 属性设置为 false

Spring Batch Events

在通过任务执行 Spring Batch 作业时,Spring Cloud Task 可以配置为根据 Spring Batch 中可用的 Spring Batch 监听器发送告知性消息。具体而言,以下 Spring Batch 监听器自动配置到每个批作业中,并在通过 Spring Cloud Task 运行时在关联的 Spring Cloud Stream 通道上发送消息:

  • JobExecutionListener listens for job-execution-events

  • StepExecutionListener listens for step-execution-events

  • ChunkListener listens for chunk-events

  • ItemReadListener listens for item-read-events

  • ItemProcessListener listens for item-process-events

  • ItemWriteListener listens for item-write-events

  • SkipListener listens for skip-events

当上下文中存在适当的 bean(JobTaskLifecycleListener)时,会将这些监听器自动配置到任何 AbstractJob 中。用于监听这些事件的配置的处理方式和绑定到任何其他 SpringCloud Stream 通道的处理方式相同。我们的任务(运行批作业的任务)充当 Source,侦听应用程序充当 ProcessorSink

一个示例可能是让一个应用程序侦听一个作业开始和停止的 job-execution-events 通道。若要配置侦听应用程序,请将输入配置为 job-execution-events,如下所示:

spring.cloud.stream.bindings.input.destination=作业执行事件

还要求绑定实现位于类路径上。

可在 Spring Cloud Task 项目 here 的示例模块中找到一个示例批处理事件应用程序。

Sending Batch Events to Different Channels

Spring Cloud 任务为批处理事件提供的一种选项是更改通道,特定侦听器可将其消息发布到该通道。要执行此操作,请使用以下配置:spring.cloud.stream.bindings.<通道>.destination=<新目的地>。例如,如果 StepExecutionListener 需要将其消息发布到名为 my-step-execution-events 的另一个通道,而不是默认的 step-execution-events,则可以添加以下配置:

spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events

Disabling Batch Events

要禁用所有批处理事件的侦听器功能,请使用以下配置:

spring.cloud.task.batch.events.enabled=false

要禁用特定批处理事件,请使用以下配置:

spring.cloud.task.batch.events.<批处理事件侦听器>.enabled=false

以下列表显示了可以禁用的各个侦听器:

spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false

Emit Order for Batch Events

默认情况下,批处理事件具有 Ordered.LOWEST_PRECEDENCE。要更改此值(例如,为 5),请使用以下配置:

spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5