STOMP Support

Spring 集成 4.2 版本引入了 STOMP(简单文本导向消息协议)客户端支持。它基于 Spring 框架消息模块中的架构、基础设施和 API,stomp 包。Spring 集成使用许多 Spring STOMP 组件(如 @{2} 和 @{3})。有关更多信息,请参阅 Spring 框架参考手册中的 @{4} 章节。 你需要将此依赖项包含在你的项目中:

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stomp</artifactId>
    <version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-stomp:{project-version}"

对于服务器端组件,你需要添加 org.springframework:spring-websocket 和/或 io.projectreactor.netty:reactor-netty 依赖项。

Overview

要配置 STOMP,你应该从 STOMP 客户端对象开始。Spring Framework 提供以下实现:

  • ReactorNettyTcpStompClient:建立在 ReactorNettyTcpClient 项目中 reactor-netty 的基础上。

  • 组件 Bean 名称。MessageHandler 会使用 id 加上 .handler 的 Bean 别名进行注册。如果你没有设置 channel 特性,则会使用此 id 特性的值作为 Bean 名称创建 DirectChannel 并将其注册在应用程序上下文中。

您可以提供任何其他 @{5} 实现。请参阅这些类的 @{6}。

StompClientSupport 类被设计为一个用于为提供的 StompSessionHandler 生成 StompSession 的工厂,所有剩余的工作都是通过到该 StompSessionHandlerStompSession 抽象的回调来完成。使用 Spring Integration 适配器抽象,我们需要提供一些受管理的共享对象来表示我们的应用程序作为具有其唯一会话的 STOMP 客户端。为此,Spring Integration 提供 StompSessionManager 抽象来管理任何提供的 StompSessionHandler 之间的单个 StompSession。这允许使用特定 STOMP 代理的入站或出站管道适配器(或两者)。有关更多信息,请参阅 StompSessionManager(及其实现)JavaDocs。

STOMP Inbound Channel Adapter

StompInboundChannelAdapter 是一个一站式 MessageProducer 组件,它将你的 Spring Integration 应用程序订阅到提供的 STOMP 目标并从它们接收消息(使用连接的 StompSession 上提供的 MessageConverter 从 STOMP 帧转换)。你可以通过对 StompInboundChannelAdapter 使用适当的 @ManagedOperation 注解在运行时更改目标(因此 STOMP 订阅)。

有关更多配置选项,请参阅 @{9} 和 @{7} @{8}。

STOMP Outbound Channel Adapter

StompMessageHandler<int-stomp:outbound-channel-adapter>MessageHandler,用于通过 StompSession(由共享的 StompSessionManager 提供)将传出的 Message<?> 实例发送到 STOMP 目标(预先配置或在运行时使用 SpEL 表达式确定)。

有关更多配置选项,请参阅 @{12} 和 @{10} @{11}。

STOMP Headers Mapping

STOMP 协议提供标头作为其帧的一部分。STOMP 帧的整个结构具有以下格式:

....
COMMAND
header1:value1
header2:value2

Body^@
....

Spring 框架提供 @{13} 来表示这些标头。请参阅 @{20} 了解更多详情。STOMP 帧将转换为 @{14} 实例并从中转换,而这些标头将映射到 @{15} 实例并从中映射。Spring 集成提供了一个 STOMP 适配器的默认 @{16} 实现。该实现是 @{17}。它分别为入站和出站适配器提供 @{18} 和 @{19} 操作。

与许多其他 Spring Integration 模块类似,IntegrationStompHeaders 类已被引入以将标准 STOMP 头部映射到 MessageHeaders,其中 stomp_ 为头部名称前缀。此外,所有带有该前缀的 MessageHeaders 实例在发送到目标时都将映射到 StompHeaders

有关更多信息,请参阅这些类的 @{22} 和 @{23} 中的 @{21} 属性说明。

STOMP Integration Events

很多 STOMP 操作都是异步的,包括错误处理。例如,STOMP 具有 @{24} 服务器帧,当客户端帧通过添加 @{25} 标头请求它时,它会返回该帧。Spring 集成会发出 @{26} 实例,以提供对这些异步事件的访问,您可以通过实现 @{27} 或使用 @{28} (请参阅 @{29})来获取该实例。

具体来说,当 stompSessionListenableFuture 由于无法连接到 STOMP 代理而接收到 onFailure() 时,AbstractStompSessionManager 会发出 StompExceptionEvent。另一个示例是 StompMessageHandler。它处理 ERROR STOMP 帧,这些帧是对该 StompMessageHandler 发送的不当(不被接受)消息的服务器响应。

StompMessageHandlerStompReceiptEvent 作为 StompSession.Receiptable 回调的一部分发出,作为发送到 StompSession 的消息的异步回答。StompReceiptEvent 可以是正面的或负面的,具体取决于是否在 receiptTimeLimit 周期内从服务器接收到 RECEIPT 帧,您可以在 StompClientSupport 实例上配置它。它的默认值为 15 * 1000(以毫秒为单位,即 15 秒)。

仅当要发送的消息的 RECEIPT STOMP 头文件不为 null 时才添加 StompSession.Receiptable 回调。您可以通过 StompSession 中的 autoReceipt 选项以及 StompSessionManager 中的 autoReceipt 选项启用 RECEIPT 头文件的自动生成。

请参阅 @{31} 了解有关如何配置 Spring 集成以接受这些 @{30} 实例的更多信息。

STOMP Adapters Java Configuration

以下示例展示了 STOMP 适配器的全面 Java 配置:

@Configuration
@EnableIntegration
public class StompConfiguration {

    @Bean
    public ReactorNettyTcpStompClient stompClient() {
        ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
        stompClient.setMessageConverter(new PassThruMessageConverter());
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.afterPropertiesSet();
        stompClient.setTaskScheduler(taskScheduler);
        stompClient.setReceiptTimeLimit(5000);
        return stompClient;
    }

    @Bean
    public StompSessionManager stompSessionManager() {
        ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
        stompSessionManager.setAutoReceipt(true);
        return stompSessionManager;
    }

    @Bean
    public PollableChannel stompInputChannel() {
        return new QueueChannel();
    }

    @Bean
    public StompInboundChannelAdapter stompInboundChannelAdapter() {
        StompInboundChannelAdapter adapter =
        		new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
        adapter.setOutputChannel(stompInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "stompOutputChannel")
    public MessageHandler stompMessageHandler() {
        StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
        handler.setDestination("/topic/myTopic");
        return handler;
    }

    @Bean
    public PollableChannel stompEvents() {
        return new QueueChannel();
    }

    @Bean
    public ApplicationListener<ApplicationEvent> stompEventListener() {
        ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
        producer.setEventTypes(StompIntegrationEvent.class);
        producer.setOutputChannel(stompEvents());
        return producer;
    }

}

STOMP Namespace Support

Spring Integration STOMP 名称空间实现了入站和出站通道适配器组件。要在您的配置中包含它,请在您的应用程序上下文配置文件中提供以下名称空间声明:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/stomp
    https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
    ...
</beans>

Understanding the <int-stomp:outbound-channel-adapter> Element

以下清单展示了 STOMP 出站通道适配器的可用属性:

<int-stomp:outbound-channel-adapter
                           id=""                      1
                           channel=""                 2
                           stomp-session-manager=""   3
                           header-mapper=""           4
                           mapped-headers=""          5
                           destination=""             6
                           destination-expression=""  7
                           auto-startup=""            8
                           phase=""/>                 9
1 在这种情况下,该端点会使用 id 加上 .adapter 作为 Bean 名称进行注册。
2 标识此适配器所连接的通道(如果 id 存在)。请参见 id。可选。
3 StompSessionManager Bean 的引用,其中封装了低级别连接和 StompSession 处理操作。必需。
4 引用实现 HeaderMapper&lt;StompHeaders&gt; 的 bean,它将 Spring Integration MessageHeaders 映射至自 STOMP 帧标头、或者自 STOMP 帧标头映射至 Spring Integration MessageHeaders。它与 mapped-headers 相互排斥。它的默认值是 StompHeaderMapper
5 以逗号分隔的 STOMP 标头名称列表,用于映射至 STOMP 帧标头。仅在未设置 header-mapper 引用时才能提供它。此列表中的值还可以是针对标头名称(例如 myheader**myheader)进行匹配的简单模式。特殊标记 (STOMP_OUTBOUND_HEADERS) 表示所有标准 STOMP 标头(内容长度、收据、心跳等)。它们默认包括在内。如果您想添加自己的标头并希望也映射标准标头,则必须包括此标记或使用 header-mapperHeaderMapper 实现提供您自己的 HeaderMapper 实现。
6 STOMP 消息发送到的目标的名称。它与 destination-expression 相互排斥。
7 在运行时针对每个 Spring Integration Message 进行评估的 SpEL 表达式,作为根对象。它与 destination 相互排斥。
8 布尔值,指示此端点是否应自动启动。它默认为 true
9 此端点应在其中启动和停止的生命周期阶段。值越低,此端点启动得越早,停止得越晚。默认值为 Integer.MIN_VALUE。值可以为负。请参见 SmartLifeCycle

Understanding the <int-stomp:inbound-channel-adapter> Element

以下清单展示了 STOMP 入站通道适配器的可用属性:

<int-stomp:inbound-channel-adapter
                           id=""                     1
                           channel=""                2
                           error-channel=""          3
                           stomp-session-manager=""  4
                           header-mapper=""          5
                           mapped-headers=""         6
                           destinations=""           7
                           send-timeout=""           8
                           payload-type=""           9
                           auto-startup=""           10
                           phase=""/>                11
1 组件 Bean 名称。如果您未设置 channel 属性,系统将创建一个 DirectChannel,并在应用程序上下文中使用此 id 属性的值作为 Bean 名称进行注册。在这种情况下,此端点将使用 Bean 名称 id 加上 .adapter 进行注册。
2 标识连接到此适配器的通道。
3 MessageChannel Bean 引用,ErrorMessage 实例应发送至其中。
4 请参见 `<<<int-stomp:outbound-channel-adapter>`,stomp-outbound-channel-adapter>>` 上的相同选项。
5 以逗号分隔的 STOMP 标头名称列表,用于从 STOMP 帧标头进行映射。仅当未设置 header-mapper 引用时,您才能提供这个列表。此列表中的值还可以是针对标头名称(例如 myheader**myheader)进行匹配的简单模式。特殊标记 (STOMP_INBOUND_HEADERS) 表示所有标准 STOMP 标头(内容长度、收据、心跳等)。它们默认包括在内。如果您想添加自己的标头并希望也映射标准标头,则您还必须包括此标记或使用 header-mapperHeaderMapper 实现提供您自己的 HeaderMapper 实现。
6 请参见 `<<<int-stomp:outbound-channel-adapter>`,stomp-outbound-channel-adapter>>` 上的相同选项。
7 以逗号分隔的 STOMP 目标名称列表,用于订阅。可以通过 addDestination()removeDestination() @ManagedOperation 注释在运行时修改目标(因此也修改订阅)的列表。
8 将消息发送到通道(如果通道可以阻塞)时等待所需的最大时间(以毫秒为单位)。例如,如果 QueueChannel 已达到其最大容量,则 QueueChannel 可以在有空间可用时进行阻塞。
9 用于从传入 STOMP 帧转换的 payload 目标的 Java 类型的完全限定名。它默认为 String.class
10 请参见 `<<<int-stomp:outbound-channel-adapter>`,stomp-outbound-channel-adapter>>` 上的相同选项。
11 请参见 `<<<int-stomp:outbound-channel-adapter>`,stomp-outbound-channel-adapter>>` 上的相同选项。