Spring Cloud Stream Integration
任务本身可能会很有用,但将任务集成到更大的生态系统中可让它适用于更复杂的处理和编排。本部分介绍 Spring Cloud Task 和 Spring Cloud Stream 的集成选项。
A task by itself can be useful, but integration of a task into a larger ecosystem lets it be useful for more complex processing and orchestration. This section covers the integration options for Spring Cloud Task with Spring Cloud Stream.
Launching a Task from a Spring Cloud Stream
你可以从流启动任务。为此,创建一个监听包含 TaskLaunchRequest
作为负载的消息的 sink。TaskLaunchRequest
包含:
You can launch tasks from a stream. To do so, create a sink that listens for a message
that contains a TaskLaunchRequest
as its payload. The TaskLaunchRequest
contains:
-
uri
: To the task artifact that is to be executed. -
applicationName
: The name that is associated with the task. If no applicationName is set, theTaskLaunchRequest
generates a task name comprised of the following:Task-<UUID>
. -
commandLineArguments
: A list containing the command line arguments for the task. -
environmentProperties
: A map containing the environment variables to be used by the task. -
deploymentProperties
: A map containing the properties that are used by the deployer to deploy the task.
如果有效负载属于不同类型,则接收器抛出异常。 |
If the payload is of a different type, the sink throws an exception. |
例如,可以创建一个流,其具有一个从 HTTP 源获取数据并创建包含 TaskLaunchRequest
的 GenericMessage
以及将该消息发送到其输出通道的处理器。然后,任务 sink 将从其输入通道接收消息,然后启动任务。
For example, a stream can be created that has a processor that takes in data from an
HTTP source and creates a GenericMessage
that contains the TaskLaunchRequest
and sends
the message to its output channel. The task sink would then receive the message from its
input channel and then launch the task.
若要创建 taskSink,只需要创建一个包含 EnableTaskLauncher
注释的 Spring Boot 应用程序,如下例所示:
To create a taskSink, you need only create a Spring Boot application that includes the
EnableTaskLauncher
annotation, as shown in the following example:
@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
public static void main(String[] args) {
SpringApplication.run(TaskSinkApplication.class, args);
}
}
Spring Cloud Task 项目的 samples
module 部分包含一个示例接收器和处理器。要将这些示例安装到您的本地 maven 数据中心,请使用已将 skipInstall
属性设置为 false
的 spring-cloud-task-samples
目录运行 maven 构建,例如以下示例所示:
The samples
module of the Spring Cloud Task project contains a sample Sink and Processor. To install
these samples into your local maven repository, run a maven build from the
spring-cloud-task-samples
directory with the skipInstall
property set to false
, as
shown in the following example:
mvn clean install
|
The |
Spring Cloud Data Flow
若要在 Spring Cloud Data Flow 中创建流,必须先注册我们创建的 Task SinkApplication。在以下示例中,我们使用 Spring Cloud Data Flow shell 注册 Processor 和 Sink 示例应用程序:
To create a stream in Spring Cloud Data Flow, you must first register the Task Sink Application we created. In the following example, we are registering the Processor and Sink sample applications by using the Spring Cloud Data Flow shell:
app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>
以下示例显示了如何从 Spring Cloud Data Flow shell 创建流:
The following example shows how to create a stream from the Spring Cloud Data Flow shell:
stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy
Spring Cloud Task Events
Spring Cloud Task 提供通过 Spring Cloud Stream 通道运行任务时在 Spring Cloud Stream 通道中发布事件的能力。任务监听器用于在名为 task-events
的消息通道中发布 TaskExecution
。对于在类路径上具有 spring-cloud-stream
、spring-cloud-stream-<binder>
和已定义任务的任何任务,都会自动将该特性注入其中。
Spring Cloud Task provides the ability to emit events through a Spring Cloud Stream
channel when the task is run through a Spring Cloud Stream channel. A task listener is
used to publish the TaskExecution
on a message channel named task-events
. This feature
is autowired into any task that has spring-cloud-stream
, spring-cloud-stream-<binder>
,
and a defined task on its classpath.
要禁用事件发出侦听器,请将 |
To disable the event emitting listener, set the |
在定义了适当的类路径后,以下任务在 task-events
通道上将 TaskExecution
发送为事件(在任务的开始和结束时):
With the appropriate classpath defined, the following task emits the TaskExecution
as an
event on the task-events
channel (at both the start and the end of the task):
@SpringBootApplication
public class TaskEventsApplication {
public static void main(String[] args) {
SpringApplication.run(TaskEventsApplication.class, args);
}
@Configuration
public static class TaskConfiguration {
@Bean
public ApplicationRunner applicationRunner() {
return new ApplicationRunner() {
@Override
public void run(ApplicationArguments args) {
System.out.println("The ApplicationRunner was executed");
}
};
}
}
}
还要求绑定实现位于类路径上。 |
A binder implementation is also required to be on the classpath. |
可在 Spring Cloud Task 项目 here 的示例模块中找到一个示例任务事件应用程序。 |
A sample task event application can be found in the samples module of the Spring Cloud Task Project, here. |
Spring Batch Events
在通过任务执行 Spring Batch 作业时,Spring Cloud Task 可以配置为根据 Spring Batch 中可用的 Spring Batch 监听器发送告知性消息。具体而言,以下 Spring Batch 监听器自动配置到每个批作业中,并在通过 Spring Cloud Task 运行时在关联的 Spring Cloud Stream 通道上发送消息:
When executing a Spring Batch job through a task, Spring Cloud Task can be configured to emit informational messages based on the Spring Batch listeners available in Spring Batch. Specifically, the following Spring Batch listeners are autoconfigured into each batch job and emit messages on the associated Spring Cloud Stream channels when run through Spring Cloud Task:
-
JobExecutionListener
listens forjob-execution-events
-
StepExecutionListener
listens forstep-execution-events
-
ChunkListener
listens forchunk-events
-
ItemReadListener
listens foritem-read-events
-
ItemProcessListener
listens foritem-process-events
-
ItemWriteListener
listens foritem-write-events
-
SkipListener
listens forskip-events
当上下文中存在适当的 bean(Job
和 TaskLifecycleListener
)时,会将这些监听器自动配置到任何 AbstractJob
中。用于监听这些事件的配置的处理方式和绑定到任何其他 SpringCloud Stream 通道的处理方式相同。我们的任务(运行批作业的任务)充当 Source
,侦听应用程序充当 Processor
或 Sink
。
These listeners are autoconfigured into any AbstractJob
when the appropriate
beans (a Job
and a TaskLifecycleListener
) exist in the context. Configuration to
listen to these events is handled the same way binding to any other Spring
Cloud Stream channel is done. Our task (the one running the batch job) serves as a
Source
, with the listening applications serving as either a Processor
or a Sink
.
一个示例可能是让一个应用程序侦听一个作业开始和停止的 job-execution-events
通道。若要配置侦听应用程序,请将输入配置为 job-execution-events
,如下所示:
An example could be to have an application listening to the job-execution-events
channel
for the start and stop of a job. To configure the listening application, you would
configure the input to be job-execution-events
as follows:
spring.cloud.stream.bindings.input.destination=作业执行事件
spring.cloud.stream.bindings.input.destination=job-execution-events
还要求绑定实现位于类路径上。 |
A binder implementation is also required to be on the classpath. |
可在 Spring Cloud Task 项目 here 的示例模块中找到一个示例批处理事件应用程序。 |
A sample batch event application can be found in the samples module of the Spring Cloud Task Project, here. |
Sending Batch Events to Different Channels
Spring Cloud 任务为批处理事件提供的一种选项是更改通道,特定侦听器可将其消息发布到该通道。要执行此操作,请使用以下配置:spring.cloud.stream.bindings.<通道>.destination=<新目的地>
。例如,如果 StepExecutionListener
需要将其消息发布到名为 my-step-execution-events
的另一个通道,而不是默认的 step-execution-events
,则可以添加以下配置:
One of the options that Spring Cloud Task offers for batch events is the ability to alter
the channel to which a specific listener can emit its messages. To do so, use the
following configuration:
spring.cloud.stream.bindings.<the channel>.destination=<new destination>
. For example,
if StepExecutionListener
needs to emit its messages to another channel called
my-step-execution-events
instead of the default step-execution-events
, you can add the
following configuration:
spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events
Disabling Batch Events
要禁用所有批处理事件的侦听器功能,请使用以下配置:
To disable the listener functionality for all batch events, use the following configuration:
spring.cloud.task.batch.events.enabled=false
要禁用特定批处理事件,请使用以下配置:
To disable a specific batch event, use the following configuration:
spring.cloud.task.batch.events.<批处理事件侦听器>.enabled=false
:
spring.cloud.task.batch.events.<batch event listener>.enabled=false
:
以下列表显示了可以禁用的各个侦听器:
The following listing shows individual listeners that you can disable:
spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false
Emit Order for Batch Events
默认情况下,批处理事件具有 Ordered.LOWEST_PRECEDENCE
。要更改此值(例如,为 5),请使用以下配置:
By default, batch events have Ordered.LOWEST_PRECEDENCE
. To change this value (for
example, to 5 ), use the following configuration:
spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5