Overview of Spring Integration Framework

Spring Integration 提供了 Spring 编程模型的扩展来支持众所周知的 { Enterprise Integration Patterns}。它启用了基于 Spring 的应用程序内的轻量级消息传递,并通过声明性适配器支持与外部系统的集成。这些适配器提供了比 Spring 对远程操作、消息传递和调度的支持更高级别的抽象。 Spring Integration 的主要目标是提供一个简单的模型,用于构建企业集成解决方案,同时保持关注点分离,这对于生成可维护、可测试的代码至关重要。

Spring Integration Overview

本章提供了对 Spring Integration 核心概念和组件的高级介绍。它包括一些编程技巧,以帮助您充分利用 Spring Integration。

Background

Spring 框架的一个关键主题是控制反转 (IoC)。从广义上讲,这意味着框架代表其上下文中管理的组件处理职责。组件本身被简化了,因为它们免除了那些职责。例如,依赖项注入减轻了组件查找或创建其依赖项的职责。同样地,面向方面的编程通过将通用横切关注点模块化为可重用方面,将业务组件从关注点中解放出来。在每种情况下,最终结果都是一个更容易测试、理解、维护和扩展的系统。

此外,Spring 框架和产品组合为构建企业应用程序提供了一个全面的编程模型。开发人员可以从该模型的一致性中受益,尤其是从它基于公认的最佳实践(例如面向接口编程、相较于继承更倾向于组合)这一事实中受益。Spring 简化的抽象和强大的支持库提高了开发人员的生产力,同时提高了测试和移植级别。

Spring Integration 受这些相同目标和原则的推动。它将 Spring 编程模型扩展到消息传递域,并在 Spring 现有的企业集成支持的基础上构建,以提供更高水平的抽象。它支持消息驱动的架构,其中控制反转适用于运行时问题,例如何时应运行某些业务逻辑以及应发送响应的位置。它支持消息路由和转换,以便集成不同的传输和不同数据格式,而不会影响可测试性。换句话说,消息传递和集成问题由框架处理。业务组件进一步与基础设施隔离,开发人员不再承担复杂的集成职责。

Spring Integration 是 Spring 编程模型的延伸,它提供了广泛的配置选项,其中包括注解、支持名称空间的 XML、带有通用 “bean” 元素的 XML 以及直接使用基础 API。该 API 基于明确策略接口和非侵入式委派适配器。Spring Integration 的设计灵感来自于对 Spring 中常见模式与 link:https://www.enterpriseintegrationpatterns.com/[Enterprise Integration Patterns 中描述的 Gregor Hohpe 和 Bobby Woolf(Addison Wesley,2004 年)中著名的模式之间紧密关联性的认识。阅读过本书的开发人员应该能够立即了解 Spring Integration 概念和术语。

Goals and Principles

Spring Integration 受以下目标驱动:

  • 为实现复杂的企业集成解决方案提供一个简单的模型。

  • 促进基于 Spring 的应用程序中的异步、消息驱动行为。

  • 促进现有 Spring 用户直观、逐步地采用。

Spring Integration 受以下原则指导:

  • 组件应松散耦合,以实现模块化和可测试性。

  • 框架应强制业务逻辑和集成逻辑之间的关注点分离。

  • 扩展点在本质上应该抽象的(但在明确定义的界限内),以提高复用性和可移植性。

Main Components

从垂直角度来看,分层架构促进了关注点分离,而基于接口的层间契约促进了松散耦合。Spring 驱动的应用程序通常这样设计,Spring 框架和产品组合为遵循针对企业应用程序整个堆栈的最佳实践提供了坚实的基础。消息驱动的架构添加了水平视角,但这些相同目标仍然相关。正如“分层架构”是一种极其通用和抽象的范例,消息传递系统通常遵循类似抽象的“管道和过滤器”模型。“过滤器”表示任何能够生成或消耗消息的组件,“管道”在过滤器之间传输消息,以便组件本身保持松散耦合。值得注意的是,这两种高级范例并不互斥。支持“管道”的底层消息传递基础设施仍应封装在一个契约定义为接口的层中。同样,“过滤器”本身也应该在逻辑上高于应用程序服务层的层中进行管理,通过接口与这些服务交互,就像 Web 层一样。

Message

在 Spring Integration 中,消息是对任何 Java 对象的通用包装,与框架在处理该对象时使用的元数据结合使用。它由有效负载和标头组成。有效负载可以是任何类型,标头包含一些常用的信息,例如 ID、时间戳、关联 ID 和返回地址。标头还用于将值传递到连接的传输和从传输中传递值。例如,当从收到的文件中创建消息时,文件名可能会存储在标头中,以便下游组件访问。同样,如果消息的内容最终将由 outbound 邮件适配器发送,则不同的属性(收件人、发件人、抄送、主题等)可能会由上游组件配置为消息标头值。开发人员还可以在标头中存储任意键值对。

message
Figure 1. Message

Message Channel

消息通道表示管道和过滤器架构的“管道”。生产者将消息发送到通道,消费者从通道接收消息。因此,消息通道解耦了消息传递组件,并且还提供了拦截和监视消息的便捷点。

channel
Figure 2. Message Channel

消息通道可能会遵循点对点或发布-订阅语义。使用点对点通道,每个发送到通道的消息最多只能由一个消费者接收。另一方面,发布-订阅通道尝试向通道上的所有订阅者广播每条消息。Spring Integration 支持这两种模型。

虽然 “point-to-point” 和“发布-订阅”定义了最终接收每条消息的消费者数量的两个选项,但还有另一个重要考虑:通道是否应该缓冲消息?在 Spring Integration 中,可轮询通道能够在队列中缓冲消息。缓冲的优点是可以节流入站消息,从而防止消费者过载。但是,正如其名称所暗示的,这也增加了一些复杂性,因为消费者只能从这样的通道接收消息,前提是配置了一个轮询器。另一方面,连接到可订阅通道的消费者仅仅是消息驱动的。Message Channel Implementations 详细讨论了 Spring Integration 中可用的各种通道实现。

Message Endpoint

Spring Integration 的主要目标之一是通过控制反转来简化企业集成解决方案的开发。这意味着您不必直接实现消费者和生产者,甚至不必在消息通道上构建消息并调用发送或接收操作。相反,您应该能够专注于基于普通对象的实现来您的特定域模型。然后,通过提供声明性配置,您可以将您的域特定代码“连接”到 Spring Integration 提供的消息传递基础设施。负责这些连接的组件是消息端点。这并不意味着您应该直接连接您现有的应用程序代码。任何现实世界的企业集成解决方案都需要一定数量的代码来关注集成问题,例如路由和转换。重要的是在集成逻辑和业务逻辑之间实现关注点分离。换句话说,与 Web 应用程序的模型-视图-控制器 (MVC) 范例一样,目标应该是提供一个薄但专门的层,将入站请求转换为服务层调用,然后将服务层返回值转换为出站回复。下一节概述了处理这些职责的消息端点类型,而在即将到来的章节中,您将了解 Spring Integration 的声明性配置选项如何提供一种非侵入性的方式来使用每个端点。

Message Endpoints

消息端点表示管道和过滤器架构的 {“filter”}。如前所述,端点的主要角色是将应用程序代码连接到消息传递框架,并以非侵入性的方式进行连接。换句话说,应用程序代码理想情况下应该不了解消息对象或消息通道。这类似于 MVC 范式中控制器的角色。就像控制器处理 HTTP 请求一样,消息端点处理消息。就像控制器映射到 URL 模式一样,消息端点映射到消息通道。两个示例的目标相同:让应用程序代码与基础设施隔离开。这些概念以及所有随后的模式都将在 { Enterprise Integration Patterns} 一书中进行详细讨论。在此,我们仅对 Spring Integration 支持的主要端点类型以及与这些类型关联的角色提供高级别描述。后面的章节会进行详细说明并提供示例代码以及配置示例。

Message Transformer

消息转换器负责转换消息的内容或结构并返回经过修改的消息。最常见的一种转换器可能是将消息的有效负载从一种格式转换为另一种格式(例如从 XML 转换为 java.lang.String)的转换器。同样,转换器可以添加、删除或修改消息的头值。

Message Filter

消息过滤器确定是否应将消息传递至输出通道。这只需一个提供布尔值测试方法即可,该方法可检查特定的有效负载内容类型、属性值、头信息的存在或其他条件。如果接受该消息,则将其发送至输出通道。如果没有,则将丢弃该消息(或对于更严格的实现,可能会引发 Exception)。消息过滤器通常与发布-订阅通道结合使用,其中多个使用者可以接收到相同的消息,并使用过滤器的条件缩小要处理的消息的集合。

小心不要将 “filter” 在管道和过滤器架构模式中的通用用法与这种特定的端点类型混淆,后者有选择地缩小在两个通道之间流动的消息。“filter” 的管道和过滤器概念与 Spring Integration 的消息端点更匹配:任何可以连接到消息通道以发送或接收消息的组件。

Message Router

消息路由器负责确定消息接下来应接收哪个(如果有)通道。通常,此决策基于消息的内容或消息头中的可用元数据。消息路由器通常用作服务激活器或其他能够发送答复消息的端点上静态配置输出通道的动态替代方案。同样,消息路由器提供了对先前描述的多订阅者使用的方法消息过滤器使用的响应式消息过滤器主动的替代方案。

router
Figure 3. Message Router

Splitter

拆分器是另一种类型的消息端点,其责任是从其输入通道接收一条消息,将该消息拆分为多条消息,并将每条消息发送其输出通道。此操作通常用于将“组合”有效负载对象拆分为包含细分有效负载的消息组。

Aggregator

合并器基本上是拆分器的镜像,是一种接收多条消息并将它们合并为单条消息的消息端点类型。事实上,合并器通常是包含拆分器的管道中的下游使用者。从技术上讲,合并器比拆分器更复杂,因为它需要维护状态(要合并的消息)、在完整的消息组可用时决定以及在必要时超时。此外,在超时的情况下,合并器需要知道是发送部分结果、丢弃部分结果,还是将部分结果发送到单独的通道。Spring Integration 提供了 CorrelationStrategyReleaseStrategy 以及可配置的超时设置、是否在超时后发送部分结果,以及丢弃通道。

Service Activator

服务激活器是用于将服务实例连接到消息传递系统的通用端点。必须配置输入消息通道,并且如果要调用的服务方法能返回值,则还可以提供输出消息通道。

输出通道是可选的,因为每条消息还可以提供自己的“返回地址”头。这个相同的规则适用于所有使用者端点。

服务激活器调用某个服务对象的某个操作来处理请求消息,提取请求消息的有效负载并进行转换(如果方法不期望消息类型参数)。每当服务对象的方法返回一个值时,该返回值同样会被转换为答复消息,如果需要(如果它已不是消息类型)。该答复消息被发送到输出通道。如果尚未配置输出通道,则会将答复发送至消息的“返回地址”中指定的通道(如果可用)。

请求-答复服务激活器端点将目标对象的函数连接到输入和输出消息通道。

handler endpoint
Figure 4. Service Activator

如前所述,在 Message Channel 中,通道可能是可轮询的或可订阅的。在前面的图表中,这由 “clock” 符号和实线箭头(轮询)和虚线箭头(订阅)描述。

Channel Adapter

通道适配器是一个将消息通道连接到其他系统或传送方式的端点。通道适配器可以是入站还是出站的。通常,通道适配器会在消息与从其他系统接收或发送至其他系统的任何对象或资源(文件、HTTP 请求、JMS 消息等)之间进行一些映射。根据传送方式的不同,通道适配器也可以填充或提取消息头值。Spring Integration 提供了大量通道适配器,这些通道适配器在后续章节中有所说明。

source endpoint
Figure 5. An inbound channel adapter endpoint connects a source system to a MessageChannel.

消息源可以是可轮询的(例如,POP3)或消息驱动的(例如,IMAP Idle)。在前面的图表中,这由 “clock” 符号和实线箭头(轮询)和虚线箭头(消息驱动)描绘。

target endpoint
Figure 6. An outbound channel adapter endpoint connects a MessageChannel to a target system.

如前所述,Message Channel 中通道可能是可轮询的或可订阅的。在前面的图表中,这由 “clock” 符号和实线箭头(轮询)和虚线箭头(订阅)描述。

Endpoint Bean Names

用户端点(任何有 inputChannel 的端点)包含两个 bean,使用者和消息处理程序。使用者对消息处理程序有引用,并且在消息到达时调用消息处理程序。

考虑以下 XML 示例:

<int:service-activator id = "someService" ... />

鉴于上述示例,bean 名称如下:

  • Consumer: someService (the id)

  • Handler: someService.handler

当使用 Enterprise Integration Pattern (EIP) 注释时,名称取决于诸多因素。考虑以下带注释的 POJO 示例:

@Component
public class SomeComponent {

    @ServiceActivator(inputChannel = ...)
    public String someMethod(...) {
        ...
    }

}

鉴于上述示例,bean 名称如下:

  • Consumer: someComponent.someMethod.serviceActivator

  • Handler: someComponent.someMethod.serviceActivator.handler

从版本 5.0.4 开始,您可以使用 @EndpointId 注释修改这些名称,如下例所示:

@Component
public class SomeComponent {

    @EndpointId("someService")
    @ServiceActivator(inputChannel = ...)
    public String someMethod(...) {
        ...
    }

}

鉴于上述示例,bean 名称如下:

  • Consumer: someService

  • Handler: someService.handler

@EndpointId 根据 XML 配置中的 id 属性创建名称。考虑以下带注释的 bean 示例:

@Configuration
public class SomeConfiguration {

    @Bean
    @ServiceActivator(inputChannel = ...)
    public MessageHandler someHandler() {
        ...
    }

}

鉴于上述示例,bean 名称如下:

  • Consumer: someConfiguration.someHandler.serviceActivator

  • 处理程序:someHandler@Bean 名称)

从版本 5.0.4 开始,您可以使用 @EndpointId 注释修改这些名称,如下例所示:

@Configuration
public class SomeConfiguration {

    @Bean("someService.handler")             1
    @EndpointId("someService")               2
    @ServiceActivator(inputChannel = ...)
    public MessageHandler someHandler() {
        ...
    }

}
1 处理程序:someService.handler(bean 名称)
2 消费者:someService(端点 ID)

@EndpointId 注释根据 XML 配置中的 id 属性创建名称,只要您使用向 @Bean 名称附加 .handler 的约定。

存在一个创建一个第三个 bean 的特殊情况:出于体系架构原因,如果 MessageHandler @Bean 未定义 AbstractReplyProducingMessageHandler,该框架会将提供的 bean 封装在 ReplyProducingMessageHandlerWrapper 中。此封装器支持请求处理程序建议处理并发出正常的“未产生答复”调试日志消息。其 bean 名称是处理程序 bean 名称加上 .wrapper(当有 @EndpointId 时——否则,它是正常生成的处理程序名称)。

类似地,{Pollable Message Sources} 创建两个 bean,一个 {SourcePollingChannelAdapter}(SPCA)和一个 {MessageSource}。

考虑以下 XML 配置:

<int:inbound-channel-adapter id = "someAdapter" ... />

根据前面提到的 XML 配置,bean 名称如下:

  • SPCA: someAdapter (the id)

  • Handler: someAdapter.source

考虑以下 Java 配置,以定义 POJO 到 @EndpointId

@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public String pojoSource() {
    ...
}

根据前面提到的 Java 配置示例,bean 名称如下:

  • SPCA: someAdapter

  • Handler: someAdapter.source

考虑以下 Java 配置,以定义 bean 到 @EndpointID

@Bean("someAdapter.source")
@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public MessageSource<?> source() {
    return () -> {
        ...
    };
}

鉴于上述示例,bean 名称如下:

  • SPCA: someAdapter

  • 处理程序:someAdapter.source(只要你使用在 `@Bean`名称后追加 `.source`的惯例)

Configuration and @EnableIntegration

在本文档中,您会看到 Spring Integration 流程中元素声明的 XML 命名空间支持引用的内容。此支持是由一系列命名空间解析器提供的,这些解析器会生成适当的 Bean 定义来实现特定组件。例如,许多终结点由 MessageHandler bean 和 ConsumerEndpointFactoryBean 组成,在其中注入处理程序和输入通道名称。

Spring Integration 命名空间元素首次遇到时,框架会自动声明过多 bean(任务调度程序、隐式通道创建者等),这些 bean 用于支持运行时环境。

版本 4.0 引入了 @EnableIntegration 注解,以允许注册 Spring Integration 基础结构 Bean(参见 Javadoc)。仅在使用 Java 配置时才需要此注释——例如,使用 Spring Boot 或 Spring Integration Messaging 注解支持以及 Spring Integration Java DSL,而没有 XML 集成配置。

当您拥有不具有 Spring Integration 组件的父上下文和使用 Spring Integration 的两个或多个子上下文时,@EnableIntegration 注释也十分有用。它使这些公共组件只能在父上下文中声明一次。

@EnableIntegration 注释会向应用程序上下文注册许多基础设施组件。特别是,它:

  • 注册一些内置 bean,例如 errorChannel`及其 `LoggingHandler,用于轮询器的 taskSchedulerjsonPath SpEL 函数以及其他 bean。

  • 添加数个 BeanFactoryPostProcessor`实例来增强全局和缺省集成环境的 `BeanFactory

  • 添加数个 `BeanPostProcessor`实例来增强或转换并包装特殊 bean 以用于集成。

  • 添加注释处理器来解析消息注释,并为应用程序上下文注册组件。

{@IntegrationComponentScan} 注解还允许类路径扫描。此注释发挥的作用与标准 Spring Framework {@ComponentScan} 注释的作用类似,但它仅限于 Spring Integration 特定的组件和注释,这是标准 Spring Framework 组件扫描机制无法触及的。有关示例,请参见 {@MessagingGateway Annotation}。

@EnablePublisher 注释会注册一个 PublisherAnnotationBeanPostProcessor bean,并为那些未提供 channel 属性的 @Publisher 注释配置 default-publisher-channel。如果找到多个 @EnablePublisher 注释,则对于默认通道,它们必须具有相同的值。有关更多信息,请参见 Annotation-driven Configuration with the @Publisher Annotation

@GlobalChannelInterceptor 注释已引入,用于标记全局通道拦截的 ChannelInterceptor bean。此注释类似于 <int:channel-interceptor> XML 元素(请参阅 Global Channel Interceptor Configuration)。@GlobalChannelInterceptor 注释可以放置在类级别(带有 @Component 构造型注释)或 @Configuration 类中的 @Bean 方法上。在任何情况下,bean 都必须实现 ChannelInterceptor

从版本 5.1 开始,全局通道拦截器适用于动态注册的通道,例如通过使用 beanFactory.initializeBean() 或在使用 Java DSL 时通过 IntegrationFlowContext 初始化的 bean。以前,在应用程序上下文刷新后创建 bean 时不应用拦截器。

@IntegrationConverter 注释将 ConverterGenericConverterConverterFactory bean 标记为 integrationConversionService 的候选转换器。此注释类似于 <int:converter> XML 元素(请参阅 Payload Type Conversion)。您可以将 @IntegrationConverter 注释放置在类级别(带有 @Component 构造型注释)或 @Configuration 类中的 @Bean 方法上。

有关消息传递注释的更多信息,请参见 Annotation Support

Programming Considerations

您应尽可能使用普通的旧 Java 对象 (POJO),并且只有在绝对必要时,才会在代码中公开框架。有关更多信息,请参见 POJO Method invocation

如果您确实向您的类公开了该框架,则需要考虑一些注意事项,尤其是在应用程序启动期间:

  • 如果你的组件是 ApplicationContextAware,通常不应该在 setApplicationContext()`方法中使用 `ApplicationContext。相反,存储一个引用并在上下文生命周期的后期推迟使用它们。

  • 如果你的组件是一个 InitializingBean`或使用 `@PostConstruct`方法,不要从这些初始化方法发送任何消息。在调用这些方法时应用程序上下文尚未初始化,发送这些消息很可能会失败。如果你需要在启动期间发送消息,请实现 `ApplicationListener`并等待 `ContextRefreshedEvent。或者,实现 SmartLifecycle,将你的 bean 置于后期,并从 `start()`方法发送消息。

Considerations When Using Packaged (for example, Shaded) Jars

Spring Integration 通过使用 Spring Framework 的 SpringFactories 机制来加载多个 IntegrationConfigurationInitializer 类来引导某些功能。这包括 -core jar 以及某些其他 jar,包括 -http-jmx。此流程的信息存储在每个 jar 中的 META-INF/spring.factories 文件中。

一些开发人员更喜欢使用众所周知的工具(如 Apache Maven Shade Plugin)将应用程序及其所有依赖项重新打包到一个 Jar 中。

默认情况下,在生成遮盖 jar 时,遮盖插件不会合并 spring.factories 文件。

spring.factories 以外,其他 META-INF 文件 (spring.handlersspring.schemas) 用于 XML 配置。这些文件也需要合并。

Spring Boot’s executable jar mechanism 采取不同的方法,它嵌套 JAR,从而保留类路径上的每个 spring.factories 文件。因此,对于 Spring Boot 应用程序,如果你使用其默认的可执行 JAR 格式,则不需要做任何其他事情。

即使您不使用 Spring Boot,您仍然可以使用 Boot 提供的工具通过添加上述文件的转换器来增强 shade 插件。以下示例展示了如何配置插件:

Example 1. pom.xml
...
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <configuration>
                <keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
                <createDependencyReducedPom>true</createDependencyReducedPom>
            </configuration>
            <dependencies>
                <dependency> 1
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring.boot.version}</version>
                </dependency>
            </dependencies>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers> 2
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                <resource>META-INF/spring.handlers</resource>
                            </transformer>
                            <transformer
                                implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
                                <resource>META-INF/spring.factories</resource>
                            </transformer>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                <resource>META-INF/spring.schemas</resource>
                            </transformer>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
...

具体而言,

1 添加 `spring-boot-maven-plugin`作为依赖关系。
2 Configure the transformers.

您可以为 ${spring.boot.version} 添加属性,或使用显式版本。

Programming Tips and Tricks

此部分文件记录了一些从 Spring Integration 中获得最大收益的方法。

XML Schemas

在使用 XML 配置时,为避免收到错误的模式验证错误,您应该使用“支持 Spring”的 IDE,例如 Spring Tool Suite (STS)、带有 Spring IDE 插件的 Eclipse 或 IntelliJ IDEA。这些 IDE 知道如何从类路径(通过 jar 中的 META-INF/spring.schemas 文件)解析正确的 XML 模式。在使用带有插件的 STS 或 Eclipse 时,您必须在项目中启用“Spring Project Nature”。

为了兼容性原因,某些旧模块(存在于 1.0 版本中的)在互联网上托管的模式是 1.0 版本。如果您的 IDE 使用这些模式,则可能会看到错误的错误。

每个这些在线模式都有类似于以下内容的警告:

这个模式是 Spring Integration Core 的 1.0 版本。我们无法将其更新为当前模式,因为这将破坏任何使用 1.0.3 或更低版本的应用程序。对于后续版本,“未加版本”的模式是从类路径解析并从 jar 中获取的。请参阅 GitHub: [role="bare"][role="bare"]https://github.com/spring-projects/spring-integration/tree/main/spring-integration-core/src/main/resources/org/springframework/integration/config

受影响的模块为

  • core (spring-integration.xsd)

  • file

  • http

  • jms

  • mail

  • security

  • stream

  • ws

  • xml

Finding Class Names for Java and DSL Configuration

使用 XML 配置和 Spring Integration Namespace 支持,XML 解析器隐藏了目标 Bean 的声明方式,以及它们如何连接在一起。对于 Java 配置,了解针对最终用户应用程序的 Framework API 非常重要。

EIP 实现的一等公民是 MessageChannelEndpoint(参见本章前面的 Main Components)。它们的实现(契约)为:

  • org.springframework.messaging.Message: See Message;

  • org.springframework.messaging.MessageChannel: See Message Channels;

  • org.springframework.integration.endpoint.AbstractEndpoint: See Poller.

前两个足够简单,可以理解如何实现、配置和使用。最后一个需要更多的关注

AbstractEndpoint 在 Spring Framework 中广泛用于不同的组件实现。其主要实现是:

  • 当我们订阅一个 SubscribableChannel`以侦听消息时,使用 `EventDrivenConsumer

  • 当我们从一个 PollableChannel`轮询消息时,使用 `PollingConsumer

当您使用消息传递注释或 Java DSL 时,您不必担心这些组件,因为 Framework 会自动使用适当的注释和 BeanPostProcessor 实现生成它们。在手动构建组件时,您应该使用 ConsumerEndpointFactoryBean 帮助确定要创建的目标 AbstractEndpoint 消费者实现,具体基于提供的 inputChannel 属性。

另一方面,ConsumerEndpointFactoryBean 委托给框架中的另一个一等公民-org.springframework.messaging.MessageHandler。实现此界面的目的是处理来自通道的端点使用的消息。Spring Integration 中的所有 EIP 组件均为 MessageHandler 实现(例如,AggregatingMessageHandlerMessageTransformingHandlerAbstractMessageSplitter 等)。目标协议出站适配器(FileWritingMessageHandlerHttpRequestExecutingMessageHandlerAbstractMqttMessageHandler 等)也是 MessageHandler 实现。使用 Java 配置开发 Spring Integration 应用程序时,您应该查看 Spring Integration 模块以查找适合用于 @ServiceActivator 配置的 MessageHandler 实现。例如,要发送 XMPP 消息(参见 XMPP Support),您应该配置类似以下内容:

@Bean
@ServiceActivator(inputChannel = "input")
public MessageHandler sendChatMessageHandler(XMPPConnection xmppConnection) {
    ChatMessageSendingMessageHandler handler = new ChatMessageSendingMessageHandler(xmppConnection);

    DefaultXmppHeaderMapper xmppHeaderMapper = new DefaultXmppHeaderMapper();
    xmppHeaderMapper.setRequestHeaderNames("*");
    handler.setHeaderMapper(xmppHeaderMapper);

    return handler;
}

MessageHandler 实现表示消息流的出站和处理部分。

入站消息流方面有其自身组件,它们分为轮询和侦听行为。侦听(消息驱动的)组件简单,通常只需要一个目标类实现即可生成消息。侦听组件可以是一次性的 MessageProducerSupport 实现(例如 AbstractMqttMessageDrivenChannelAdapterImapIdleChannelAdapter),或请求答复 MessagingGatewaySupport 实现(例如 AmqpInboundGatewayAbstractWebServiceInboundGateway)。

轮询入站端点适合那些不提供侦听器 API 或不适用于此类行为的协议,包括基于文件的协议(如 FTP)、任何数据库(RDBMS 或 NoSQL)等。

这些入站端点包含两个组件:轮询程序配置,用于定期启动轮询任务;以及消息源类,用于从目标协议读取数据并为下游集成流生成消息。轮询程序配置的第一个类是 SourcePollingChannelAdapter。它是另一个 AbstractEndpoint 实现,但专门用于轮询以启动集成流。通常,使用消息批注或 Java DSL 时,不必担心此类。框架会根据 @InboundChannelAdapter 配置或 Java DSL 构建器规范为此类生成一个 bean。

消息源组件对目标应用程序开发更为重要,并且它们全部实现 MessageSource 接口(例如 MongoDbMessageSourceAbstractTwitterMessageSource)。牢记这一点,我们用于从 JDBC 读取 RDBMS 表数据的配置可以类似以下内容:

@Bean
@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixedDelay="5000"))
public MessageSource<?> storedProc(DataSource dataSource) {
    return new JdbcPollingChannelAdapter(dataSource, "SELECT * FROM foo where status = 0");
}

您可以在特定的 Spring 集成模块中查找目标协议所需的所有入站和出站类(在大多数情况下,在各自的包中)。例如,spring-integration-websocket 适配器为:

  • o.s.i.websocket.inbound.WebSocketInboundChannelAdapter:实现 `MessageProducerSupport`以侦听套接字上的帧并生成消息到通道。

  • o.s.i.websocket.outbound.WebSocketOutboundMessageHandler:单向 `AbstractMessageHandler`实现,将传入消息转换为合适的帧并在 WebSocket 上发送。

如果您熟悉 Spring 集成的 XML 配置,从 4.3 版开始,我们将在 XSD 元素定义中提供有关用于声明适配器或网关 bean 的目标类的信息,如以下示例所示:

<xsd:element name="outbound-async-gateway">
    <xsd:annotation>
		<xsd:documentation>
Configures a Consumer Endpoint for the 'o.s.i.amqp.outbound.AsyncAmqpOutboundGateway'
that will publish an AMQP Message to the provided Exchange and expect a reply Message.
The sending thread returns immediately; the reply is sent asynchronously; uses 'AsyncRabbitTemplate.sendAndReceive()'.
       </xsd:documentation>
	</xsd:annotation>

POJO Method invocation

正如在 Programming Considerations 中所讨论的,我们建议使用 POJO 编程风格,如下例所示:

@ServiceActivator
public String myService(String payload) { ... }

在这种情况下,框架将提取 String 有效负载、调用您的方法,并将结果包装成消息,以便发送到流中的下一个组件(原始标题将复制到新消息中)。事实上,如果您使用 XML 配置,则甚至不需要 @ServiceActivator 批注,如以下配对示例所示:

<int:service-activator ... ref="myPojo" method="myService" />
public String myService(String payload) { ... }

只要类上的公共方法中没有歧义,就可以省略 method 属性。

您还可以在 POJO 方法中获取标题信息,如以下示例所示:

@ServiceActivator
public String myService(@Payload String payload, @Header("foo") String fooHeader) { ... }

您还可以取消引用消息上的属性,如以下示例所示:

@ServiceActivator
public String myService(@Payload("payload.foo") String foo, @Header("bar.baz") String barbaz) { ... }

由于可以使用多种 POJO 方法调用,因此 5.0 之前的版本会使用 SpEL(Spring 表达式语言)来调用 POJO 方法。与在方法中通常执行的实际工作相比,SpEL(甚至是已解释的)对于这些操作通常是“足够快”的。但是,从 5.0 版本开始,只要有可能,默认情况下将使用 org.springframework.messaging.handler.invocation.InvocableHandlerMethod。此技术的执行速度通常比已解释的 SpEL 更快,并且与其他 Spring 消息传递项目一致。InvocableHandlerMethod 类似于在 Spring MVC 中用来调用控制器方法的技术。在使用 SpEL 时,仍然总有一些方法会被调用。示例包括前面讨论过的带取消引用属性的批注参数。这是因为 SpEL 具有导航属性路径的能力。

还有一些我们尚未考虑的边缘情况也不适用于 InvocableHandlerMethod 实例。因此,在这些情况下,我们会自动退回到使用 SpEL。

如果您愿意,还可以使用 UseSpelInvoker 批注设置 POJO 方法,使其始终使用 SpEL,如以下示例所示:

@UseSpelInvoker(compilerMode = "IMMEDIATE")
public void bar(String bar) { ... }

如果省略 compilerMode 属性,则 spring.expression.compiler.mode 系统属性会确定编译器模式。有关已编译 SpEL 的更多信息,请参见 SpEL compilation