DSL Basics

`org.springframework.integration.dsl`软件包包含之前提到的 `IntegrationFlowBuilder`API,以及许多 `IntegrationComponentSpec`实现,它们同时也是生成器,并提供用于配置具体端点的 Fluent API。`IntegrationFlowBuilder`基础设施为基于消息的应用程序(如通道、端点、轮询器和通道拦截器)提供常见 enterprise integration patterns(EIP)。

The org.springframework.integration.dsl package contains the IntegrationFlowBuilder API mentioned earlier and a number of IntegrationComponentSpec implementations, which are also builders and provide the fluent API to configure concrete endpoints. The IntegrationFlowBuilder infrastructure provides common enterprise integration patterns (EIP) for message-based applications, such as channels, endpoints, pollers, and channel interceptors.

IMPORTANT

The IntegrationComponentSpec is a FactoryBean implementation, therefore its getObject() method must not be called from bean definitions. The IntegrationComponentSpec implementation must be left as is for bean definitions and the framework will manage its lifecycle. Bean method parameter injection for the target IntegrationComponentSpec type (a FactoryBean value) must be used for IntegrationFlow bean definitions instead of bean method references.

端点在 DSL 中表示为动词以提高可读性。以下列表包括常见的 DSL 方法名称和关联的 EIP 端点:

Endpoints are expressed as verbs in the DSL to improve readability. The following list includes the common DSL method names and the associated EIP endpoint:

  • transform → Transformer

  • filter → Filter

  • handle → ServiceActivator

  • split → Splitter

  • aggregate → Aggregator

  • route → Router

  • bridge → Bridge

从概念上,集成过程通过在一条或多条消息流中组成这些端点来构建。请注意,EIP 并不要正式定义 “消息流” 这个术语,但是将它视为一个使用知名的消息模式的工作单元是有用的。DSL 提供了一个 IntegrationFlow 组件,用于在通道和二者之间的端点间定义一个组合,但现在 IntegrationFlow 只起配置作用,以便在应用程序上下文中填充实际的 bean,并不在运行时使用。但是,可以将 IntegrationFlow 的 bean 作为 Lifecycle 自动注入,以便控制整个流的 start()stop(),整个流委托给与此 IntegrationFlow 相关的所有 Spring Integration 组件。以下示例使用 IntegrationFlow fluent API 来使用 IntegrationFlowBuilder 中的 EIP 方法定义一个 IntegrationFlow bean:

Conceptually, integration processes are constructed by composing these endpoints into one or more message flows. Note that EIP does not formally define the term 'message flow', but it is useful to think of it as a unit of work that uses well known messaging patterns. The DSL provides an IntegrationFlow component to define a composition of channels and endpoints between them, but now IntegrationFlow plays only the configuration role to populate real beans in the application context and is not used at runtime. However, the bean for IntegrationFlow can be autowired as a Lifecycle to control start() and stop() for the whole flow which is delegated to all the Spring Integration components associated with this IntegrationFlow. The following example uses the IntegrationFlow fluent API to define an IntegrationFlow bean by using EIP-methods from IntegrationFlowBuilder:

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlow.from("input")
            .<String, Integer>transform(Integer::parseInt)
            .get();
}

transform 方法接受一个 lambda 作为端点参数,以便对消息有效负载进行操作。此方法的实际参数是一个 GenericTransformer<S, T> 实例。因此,此处可使用任何提供的转换器 (ObjectToJsonTransformerFileToStringTransformer 等)。

The transform method accepts a lambda as an endpoint argument to operate on the message payload. The real argument of this method is a GenericTransformer<S, T> instance. Consequently, any of the provided transformers (ObjectToJsonTransformer, FileToStringTransformer, and other) can be used here.

在后台,IntegrationFlowBuilder 分别将 MessageHandler 和它的端点识别为 MessageTransformingHandlerConsumerEndpointFactoryBean。考虑另一个示例:

Under the covers, IntegrationFlowBuilder recognizes the MessageHandler and the endpoint for it, with MessageTransformingHandler and ConsumerEndpointFactoryBean, respectively. Consider another example:

@Bean
public IntegrationFlow myFlow() {
    return IntegrationFlow.from("input")
                .filter("World"::equals)
                .transform("Hello "::concat)
                .handle(System.out::println)
                .get();
}

前一个示例组合了一系列 Filter → Transformer → Service Activator。此流是 “单向” 的。换句话说,它不提供回复消息,而只将有效负载打印到 STDOUT。端点通过使用直接通道自动连线在一起。

The preceding example composes a sequence of Filter → Transformer → Service Activator. The flow is "'one way'". That is, it does not provide a reply message but only prints the payload to STDOUT. The endpoints are automatically wired together by using direct channels.

Example 1. Lambdas And Message<?> Arguments

在 EIP 方法中使用 lambda 时,“输入” 参数通常是消息有效负载。如果你想要访问整个消息,请使用将 Class<?> 作为第一个参数的重载方法之一。例如,以下代码不起作用:

When using lambdas in EIP methods, the "input" argument is generally the message payload. If you wish to access the entire message, use one of the overloaded methods that take a Class<?> as the first parameter. For example, this won’t work:

.<Message<?>, Foo>transform(m -> newFooFromMessage(m))

此代码将在运行时失败并引发 ClassCastException,因为 lambda 不会保留参数类型,并且框架将尝试将有效负载强制转换为 Message<?>

This will fail at runtime with a ClassCastException because the lambda doesn’t retain the argument type and the framework will attempt to cast the payload to a Message<?>.

请改用以下代码:

Instead, use:

.(Message.class, m -> newFooFromMessage(m))
Example 2. Bean Definitions override

Java DSL 可以在流程定义中为行内定义的对象注册 bean,还可以重新使用现有的注入 bean。如果为行内对象和现有的 bean 定义定义了相同的 bean 名称,则会抛出 BeanDefinitionOverrideException,表示此类配置错误。但是,当你处理 prototype bean 时,没有办法从集成流处理器检测到现有的 bean 定义,因为每次我们从 BeanFactory 调用一个 prototype bean 时,我们都会得到一个新的实例。这样,一个提供的实例就会被用于 IntegrationFlow,无需任何 bean 注册,也无需对比现有的 prototype bean 定义。但是,如果此对象具有显式的 id 并且此名称的 bean 定义位于 prototype 范围内,将会调用 BeanFactory.initializeBean()

The Java DSL can register beans for the object defined in-line in the flow definition, as well as can reuse existing, injected beans. In case of the same bean name defined for in-line object and existing bean definition, a BeanDefinitionOverrideException is thrown indicating that such a configuration is wrong. However, when you deal with prototype beans, there is no way to detect from the integration flow processor an existing bean definition because every time we call a prototype bean from the BeanFactory we get a new instance. This way a provided instance is used in the IntegrationFlow as is without any bean registration and any possible check against existing prototype bean definition. However BeanFactory.initializeBean() is called for this object if it has an explicit id and bean definition for this name is in prototype scope.