Spring Cloud Stream Integration
任务本身可能会很有用,但将任务集成到更大的生态系统中可让它适用于更复杂的处理和编排。本部分介绍 Spring Cloud Task 和 Spring Cloud Stream 的集成选项。
Launching a Task from a Spring Cloud Stream
你可以从流启动任务。为此,创建一个监听包含 TaskLaunchRequest
作为负载的消息的 sink。TaskLaunchRequest
包含:
-
uri
:已执行的任务工件。 -
applicationName
: 与任务关联的名称。如果没有设置应用程序名,则`TaskLaunchRequest`生成任务名称,其中包含以下内容:Task-<UUID>
。 -
commandLineArguments
:包含任务的命令行参数的列表。 -
environmentProperties
:包含任务使用的环境变量的地图。 -
deploymentProperties
:包含部署程序用于部署任务的属性的地图。
如果有效负载属于不同类型,则接收器抛出异常。 |
例如,可以创建一个流,其具有一个从 HTTP 源获取数据并创建包含 TaskLaunchRequest
的 GenericMessage
以及将该消息发送到其输出通道的处理器。然后,任务 sink 将从其输入通道接收消息,然后启动任务。
若要创建 taskSink,只需要创建一个包含 EnableTaskLauncher
注释的 Spring Boot 应用程序,如下例所示:
@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
public static void main(String[] args) {
SpringApplication.run(TaskSinkApplication.class, args);
}
}
Spring Cloud Task 项目的 samples
module 部分包含一个示例接收器和处理器。要将这些示例安装到您的本地 maven 数据中心,请使用已将 skipInstall
属性设置为 false
的 spring-cloud-task-samples
目录运行 maven 构建,例如以下示例所示:
mvn clean install
|
Spring Cloud Data Flow
若要在 Spring Cloud Data Flow 中创建流,必须先注册我们创建的 Task SinkApplication。在以下示例中,我们使用 Spring Cloud Data Flow shell 注册 Processor 和 Sink 示例应用程序:
app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>
以下示例显示了如何从 Spring Cloud Data Flow shell 创建流:
stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy
Spring Cloud Task Events
Spring Cloud Task 提供通过 Spring Cloud Stream 通道运行任务时在 Spring Cloud Stream 通道中发布事件的能力。任务监听器用于在名为 task-events
的消息通道中发布 TaskExecution
。对于在类路径上具有 spring-cloud-stream
、spring-cloud-stream-<binder>
和已定义任务的任何任务,都会自动将该特性注入其中。
要禁用事件发出侦听器,请将 |
在定义了适当的类路径后,以下任务在 task-events
通道上将 TaskExecution
发送为事件(在任务的开始和结束时):
@SpringBootApplication
public class TaskEventsApplication {
public static void main(String[] args) {
SpringApplication.run(TaskEventsApplication.class, args);
}
@Configuration
public static class TaskConfiguration {
@Bean
public ApplicationRunner applicationRunner() {
return new ApplicationRunner() {
@Override
public void run(ApplicationArguments args) {
System.out.println("The ApplicationRunner was executed");
}
};
}
}
}
还要求绑定实现位于类路径上。 |
可在 Spring Cloud Task 项目 here 的示例模块中找到一个示例任务事件应用程序。 |
Spring Batch Events
在通过任务执行 Spring Batch 作业时,Spring Cloud Task 可以配置为根据 Spring Batch 中可用的 Spring Batch 监听器发送告知性消息。具体而言,以下 Spring Batch 监听器自动配置到每个批作业中,并在通过 Spring Cloud Task 运行时在关联的 Spring Cloud Stream 通道上发送消息:
-
JobExecutionListener
listens forjob-execution-events
-
StepExecutionListener
listens forstep-execution-events
-
ChunkListener
listens forchunk-events
-
ItemReadListener
listens foritem-read-events
-
ItemProcessListener
listens foritem-process-events
-
ItemWriteListener
listens foritem-write-events
-
SkipListener
listens forskip-events
当上下文中存在适当的 bean(Job
和 TaskLifecycleListener
)时,会将这些监听器自动配置到任何 AbstractJob
中。用于监听这些事件的配置的处理方式和绑定到任何其他 SpringCloud Stream 通道的处理方式相同。我们的任务(运行批作业的任务)充当 Source
,侦听应用程序充当 Processor
或 Sink
。
一个示例可能是让一个应用程序侦听一个作业开始和停止的 job-execution-events
通道。若要配置侦听应用程序,请将输入配置为 job-execution-events
,如下所示:
spring.cloud.stream.bindings.input.destination=作业执行事件
还要求绑定实现位于类路径上。 |
可在 Spring Cloud Task 项目 here 的示例模块中找到一个示例批处理事件应用程序。 |
Sending Batch Events to Different Channels
Spring Cloud 任务为批处理事件提供的一种选项是更改通道,特定侦听器可将其消息发布到该通道。要执行此操作,请使用以下配置:spring.cloud.stream.bindings.<通道>.destination=<新目的地>
。例如,如果 StepExecutionListener
需要将其消息发布到名为 my-step-execution-events
的另一个通道,而不是默认的 step-execution-events
,则可以添加以下配置:
spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events
Disabling Batch Events
要禁用所有批处理事件的侦听器功能,请使用以下配置:
spring.cloud.task.batch.events.enabled=false
要禁用特定批处理事件,请使用以下配置:
spring.cloud.task.batch.events.<批处理事件侦听器>.enabled=false
:
以下列表显示了可以禁用的各个侦听器:
spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false
Emit Order for Batch Events
默认情况下,批处理事件具有 Ordered.LOWEST_PRECEDENCE
。要更改此值(例如,为 5),请使用以下配置:
spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5