WebSockets Support

从 4.1 版本开始,Spring Integration 支持 WebSocket。它基于 Spring Framework 的 web-socket`模块的架构、基础设施和 API。因此,Spring WebSocket 的许多组件(例如 `SubProtocolHandler`或 `WebSocketClient)和配置选项(例如 @EnableWebSocketMessageBroker)可以在 Spring Integration 中重复使用。有关详细信息,请参阅 Spring Framework 参考手册中的 Spring Framework WebSocket Support章节。 你需要将此依赖项包含在你的项目中:

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-websocket</artifactId>
    <version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-websocket:{project-version}"

对于服务器端,org.springframework:spring-webmvc 依赖项必须显式包含。 Spring Framework WebSocket 基础设施基于 Spring 消息基础,并提供了一个基于 Spring Integration 使用的相同 MessageChannel 实现和 MessageHandler 实现(以及一些 POJO 方法注释映射)的基本消息框架。因此,即使没有 WebSocket 适配器,Spring Integration 也可以直接参与 WebSocket 流。为此,你可以使用适当的注释来配置一个 Spring Integration @MessagingGateway,如下面的示例所示:

@MessagingGateway
@Controller
public interface WebSocketGateway {

    @MessageMapping("/greeting")
    @SendToUser("/queue/answer")
    @Gateway(requestChannel = "greetingChannel")
    String greeting(String payload);

}

Overview

由于 WebSocket 协议按定义为流式,并且我们可以同时向和来自 WebSocket 发送和接收消息,因此可以处理适当的 WebSocketSession,无论是在客户端还是在服务器端。为了封装连接管理和 WebSocketSession 注册表,需通过 ClientWebSocketContainerServerWebSocketContainer 实现为 IntegrationWebSocketContainer 提供信息。借助 Spring 框架中的 WebSocket API 及其实现(包含许多扩展),在服务器端和客户端均使用相同的类(当然,这是从 Java 的角度来看的)。因此,在两端,大多数连接和 WebSocketSession 注册表选项都是相同的。这让我们可以重复使用许多配置项和基础架构挂钩,以便在服务器端和客户端上构建 WebSocket 应用程序。以下示例显示了组件如何同时服务于这两个目的:

//Client side
@Bean
public WebSocketClient webSocketClient() {
    return new SockJsClient(Collections.singletonList(new WebSocketTransport(new JettyWebSocketClient())));
}

@Bean
public IntegrationWebSocketContainer clientWebSocketContainer() {
    return new ClientWebSocketContainer(webSocketClient(), "ws://my.server.com/endpoint");
}

//Server side
@Bean
public IntegrationWebSocketContainer serverWebSocketContainer() {
    return new ServerWebSocketContainer("/endpoint").withSockJs();
}

IntegrationWebSocketContainer 旨在实现双向消息传递,可以在入站和出站通道适配器之间共享(见下文),在使用单向(发送或接收)WebSocket 消息传递时只能从中一个引用它。它可以在没有任何通道适配器的情况下使用,但是,在这种情况下,IntegrationWebSocketContainer 仅充当 WebSocketSession 注册表。

ServerWebSocketContainer 实现 WebSocketConfigurer 以将内部 IntegrationWebSocketContainer.IntegrationWebSocketHandler 注册为 Endpoint。它在为目标供应商 WebSocket 容器的 ServletWebSocketHandlerRegistry 中提供的 paths 和其他服务器 WebSocket 选项(例如 HandshakeHandlerSockJS fallback)下进行此操作。此注册使用基础结构 WebSocketIntegrationConfigurationInitializer 组件实现,它与 @EnableWebSocket 注解的作用相同。这意味着,通过在应用程序上下文中使用 @EnableIntegration(或任何 Spring Integration 命名空间),您可以忽略 @EnableWebSocket 声明,因为 Spring Integration 基础结构会检测所有 WebSocket 端点。

从 6.1 版本开始,ClientWebSocketContainer 可通过提供的 URI 进行配置,而不是 uriTemplateuriVariables 组合。在有些场景下需要对 URI 的某些部分进行自定义编码时,这种方式非常有用。较为便捷的方式是,参阅 UriComponentsBuilder API。

WebSocket Inbound Channel Adapter

WebSocketInboundChannelAdapter 实现了 WebSocketSession 交互中的接收部分。你必须向它提供一个 IntegrationWebSocketContainer 并且该适配器将自己注册为一个 WebSocketListener 来处理传入的消息和 WebSocketSession 事件。

只能在 IntegrationWebSocketContainer 中注册一个 WebSocketListener

为 WebSocket 子协议,WebSocketInboundChannelAdapter 可以通过 SubProtocolHandlerRegistry 进行配置,SubProtocolHandlerRegistry 为第二个构造函数参数。该适配器委托给 SubProtocolHandlerRegistry 来根据已接受的 WebSocketSession 判断恰当的 SubProtocolHandler,并将一个 WebSocketMessage 转换为一个 Message,具体取决于子协议的实现。

默认情况下,WebSocketInboundChannelAdapter 仅依赖于原始 PassThruSubProtocolHandler 实现,该实现将 WebSocketMessage 转换为 Message

WebSocketInboundChannelAdapter 仅接受和发送包含 SimpMessageType.MESSAGE 或一个空的 simpMessageType 头的 Message 实例到底层集成流。所有其他 Message 类型通过从 SubProtocolHandler 实现(如 StompSubProtocolHandler)发出的 ApplicationEvent 实例来处理。

在服务器端,如果存在 @EnableWebSocketMessageBroker 配置,可以使用 useBroker = true 选项来配置 WebSocketInboundChannelAdapter。在这种情况下,所有 非 MESSAGEMessage 类型都将委托给提供的 AbstractBrokerMessageHandler。另外,如果代理中继使用 destination 前缀进行了配置,那么与代理目标匹配的消息会路由到 AbstractBrokerMessageHandler,而不是 WebSocketInboundChannelAdapteroutputChannel

如果 useBroker = false 且接收的消息类型是 SimpMessageType.CONNECT,那么 WebSocketInboundChannelAdapter 会立即向 WebSocketSession 发送一个 SimpMessageType.CONNECT_ACK 消息,而不会将它发送到通道。

Spring 的 WebSocket 支持只允许配置一个代理中继。因此,不需要 AbstractBrokerMessageHandler 引用。它在应用程序上下文中检测到。

有关更多配置选项,请参见 WebSockets Namespace Support

WebSocket Outbound Channel Adapter

WebSocketOutboundChannelAdapter

  1. 从其 MessageChannel 接受 Spring 集成消息

  2. MessageHeaders 中确定 WebSocketSession id

  3. 从提供的 IntegrationWebSocketContainer 中检索 WebSocketSession

  4. WebSocketMessage 的转换和发送工作委派给从提供的 SubProtocolHandlerRegistry 中获取的合适的 SubProtocolHandler

在客户端,并不需要 WebSocketSessionid 消息头,因为 ClientWebSocketContainer 只处理单个连接和它各自的 WebSocketSession

要使用 STOMP 子协议,你应该使用 StompSubProtocolHandler`配置该适配器。然后你可以使用 `StompHeaderAccessor.create(StompCommand…​)`和 `MessageBuilder`将任何 STOMP 消息类型发送到该适配器,或者只使用 `HeaderEnricher(请参见 Header Enricher)。

本章的剩余部分主要涵盖附加配置选项。

WebSockets Namespace Support

Spring Integration WebSocket 命名空间包括本章剩余部分描述的几个组件。要将它包含在你的配置中,在你的应用程序上下文配置文件中使用以下命名空间声明:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/websocket
    https://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd">
    ...
</beans>

<int-websocket:client-container> Attributes

以下清单展示了 <int-websocket:client-container> 元素可用的属性:

<int-websocket:client-container
                  id=""                             1
                  client=""                         2
                  uri=""                            3
                  uri-variables=""                  4
                  origin=""                         5
                  send-time-limit=""                6
                  send-buffer-size-limit=""         7
                  send-buffer-overflow-strategy=""  8
                  auto-startup=""                   9
                  phase="">                        10
                <int-websocket:http-headers>
                  <entry key="" value=""/>
                </int-websocket:http-headers>      11
</int-websocket:client-container>
1 The component bean name.
2 The WebSocketClient bean reference.
3 指向目标 WebSocket 服务的 uriuriTemplate。如果使用它作为具有 URI 变量占位符的 uriTemplate,则需要提供 uri-variables 属性。
4 uri 属性值中 URI 变量占位符的逗号分隔值。这些值根据它们在 uri 中的顺序替换到占位符中。请参见 UriComponents.expand(Object&#8230;&#8203;uriVariableValues)
5 Origin 握手 HTTP 头部值。
6 WebSocket 会话“发送”超时限制。默认为 10000
7 WebSocket 会话“发送”消息大小限制。默认为 524288
8 WebSocket 会话发送缓冲区溢出策略确定会话的出站消息缓冲区达到 send-buffer-size-limit 时的行为。有关可能的值和更多详细信息,请参见 ConcurrentWebSocketSessionDecorator.OverflowStrategy
9 布尔值,表示此端点是否应自动启动。默认为 false,假设此容器是从 WebSocket inbound adapter 启动的。
10 此端点在其中启动和停止的生命周期阶段。值越小,此端点启动越早,停止越晚。默认为 Integer.MAX_VALUE。值可以为负数。请参见 SmartLifeCycle
11 一个 MapHttpHeaders,用于与握手请求一起使用。

<int-websocket:server-container> Attributes

以下清单展示了 <int-websocket:server-container> 元素可用的属性:

<int-websocket:server-container
          id=""                             1
          path=""                           2
          handshake-handler=""              3
          handshake-interceptors=""         4
          decorator-factories=""            5
          send-time-limit=""                6
          send-buffer-size-limit=""         7
          send-buffer-overflow-strategy=""  8
          allowed-origins="">               9
          <int-websocket:sockjs
            client-library-url=""          10
            stream-bytes-limit=""          11
            session-cookie-needed=""       12
            heartbeat-time=""              13
            disconnect-delay=""            14
            message-cache-size=""          15
            websocket-enabled=""           16
            scheduler=""                   17
            message-codec=""               18
            transport-handlers=""          19
            suppress-cors="true" />        20
</int-websocket:server-container>
1 The component bean name.
2 路径(或逗号分隔的路径),将特定的请求映射到 WebSocketHandler。支持精确路径映射 URI(如 /myPath)和 ant 样式路径模式(如 /myPath/**)。
3 HandshakeHandler bean 引用。默认为 DefaultHandshakeHandler
4 HandshakeInterceptor bean 引用的列表。
5 一个或多个工厂 (WebSocketHandlerDecoratorFactory) 的列表,用于装饰处理 WebSocket 消息的处理器。这对于一些高级用例可能很有用(例如,允许 Spring Security 在相应的 HTTP 会话过期时强制关闭 WebSocket 会话)。有关更多信息,请参见 Spring Session Project
6 请参见 《< int-websocket:client-container >》,websocket-client-container-attributes 中的相同选项。
7 请参见 《< int-websocket:client-container >》,websocket-client-container-attributes 中的相同选项。
8 WebSocket 会话发送缓冲区溢出策略确定会话的出站消息缓冲区达到 send-buffer-size-limit 时的行为。有关可能的值和更多详细信息,请参见 ConcurrentWebSocketSessionDecorator.OverflowStrategy
9 允许的来源标头值。你可以将多个来源指定为逗号分隔的列表。此项检查主要针对浏览器客户端设计。没有任何内容阻止其他类型的客户端修改来源标头值。当 SockJS 处于启用状态,且允许的来源受限时,不支持不使用来源标头进行跨域请求的传输类型(jsonp-pollingiframe-xhr-pollingiframe-eventsource`和 `iframe-htmlfile)。因此,不支持 IE6 和 IE7,只无 cookie 支持 IE8 和 IE9。默认情况下,所有来源均被允许。
10 没有原生的跨域通信的传输(例如 eventsourcehtmlfile)必须在一个不可见的 iframe 中从 “foreign” 域获取一个简单的页面,以便 iframe 中的代码可以从一个本地的域运行到 SockJS 服务器。由于 iframe 需要加载 SockJS javascript 客户端库,此属性允许你指定将其加载到的位置。默认情况下,它指向 https://d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js。但是,你还可以将其设置为指向应用程序提供的 URL。请注意,可以指定相对 URL,在这种情况下,URL 必须相对于 iframe URL。例如,假设一个 SockJS 端点已映射到 /sockjs,而最终的 iframe URL 是 /sockjs/iframe.html,则相对 URL 必须以“../../”开头,才能遍历到 SockJS 映射上方的位置。对于基于前缀的 servlet 映射,你可能需要再进行一次遍历。
11 单个 HTTP 流请求在关闭前可以发送的最小字节数。默认为 128K(即 128*1024 或 131072 字节)。
12 cookie_needed 在来自 SockJs /info 端点的响应中。此属性指出是否需要 JSESSIONID cookie 使应用程序能够正常运行(例如,负载平衡或在 Java Servlet 容器中使用 HTTP 会话)。
13 在服务器未发送任何消息且之后服务器应向客户端发送心跳帧以防止断开连接的时间量(以毫秒为单位)。默认值为 25,000(25 秒)。
14 客户端被视为断开连接前未进行接收连接(即,服务器可以通过该连接将数据发送到客户端的活动连接)的时间量(以毫秒为单位)。默认值为 5000
15 在等待来自客户端的下一个 HTTP 轮询请求时,会话可以缓存的服务器到客户端消息数。默认大小为 100
16 有些负载均衡器不支持 WebSockets。将此选项设置为 “false” 以禁用服务器端的 WebSocket 传输。默认值为 “true”。
17 TaskScheduler” bean 引用。如果未提供任何值,将创建一个新的 “ThreadPoolTaskScheduler” 实例。此调度器实例用于调度心跳消息。
18 用于对 SockJS 消息进行编码和解码的 “SockJsMessageCodec” bean 引用。默认情况下,使用 “Jackson2SockJsMessageCodec”,它要求类路径中存在 Jackson 库。
19 TransportHandler” bean 引用列表。
20 是否禁用为 SockJS 请求自动添加 CORS 标头。默认值为 “false”。

<int-websocket:outbound-channel-adapter> Attributes

以下清单展示了 <int-websocket:outbound-channel-adapter> 元素可用的属性:

<int-websocket:outbound-channel-adapter
                          id=""                             1
                          channel=""                        2
                          container=""                      3
                          default-protocol-handler=""       4
                          protocol-handlers=""              5
                          message-converters=""             6
                          merge-with-default-converters=""  7
                          auto-startup=""                   8
                          phase=""/>                        9
1 组件 Bean 名称。如果你不提供 “channel” 属性,则会创建并注册“DirectChannel”,并在应用程序上下文中使用此 “id” 属性作为 bean 名称。在这种情况下,端点将使用 bean 名称 “id” 加 “.adapter” 进行注册。而且 “MessageHandler” 将使用 bean 别名 “id” 加 “.handler” 进行注册。
2 标识连接到此适配器的通道。
3 对封装底层连接和 “WebSocketSession” 处理操作的 “IntegrationWebSocketContainer” bean 的引用。必须提供。
4 对 “SubProtocolHandler” 实例的可选引用。如果客户端未请求子协议,或者它是一个单协议处理程序,则会使用它。如果未提供此引用或 “protocol-handlers” 列表,则默认情况下会使用 “PassThruSubProtocolHandler”。
5 此频道适配器的 “SubProtocolHandler” bean 引用列表。如果你只提供了一个 bean 引用并且未提供 “default-protocol-handler”,则该单一 “SubProtocolHandler” 会用作 “default-protocol-handler”。如果你未设置此属性或 “default-protocol-handler”,则默认情况下会使用 “PassThruSubProtocolHandler”。
6 此频道适配器的 “MessageConverter” bean 引用列表。
7 布尔值,指示是否应在任何自定义转换器之后注册默认转换器。仅当提供了 “message-converters” 时才会使用此标志。否则,将注册所有默认转换器。默认为 “false”。默认转换器为(按顺序):“StringMessageConverter”、“ByteArrayMessageConverter” 和 “MappingJackson2MessageConverter”(如果类路径中存在 Jackson 库)。
8 布尔值,指示此端点是否应自动启动。默认为 “true”。
9 此端点应在其中启动和停止的生命周期阶段。值越低,此端点启动得越早,停止得越晚。默认值为 Integer.MIN_VALUE。值可以为负。请参见 SmartLifeCycle

<int-websocket:inbound-channel-adapter> Attributes

以下清单展示了 <int-websocket:outbound-channel-adapter> 元素可用的属性:

<int-websocket:inbound-channel-adapter
                            id=""  1
                            channel=""  2
                            error-channel=""  3
                            container=""  4
                            default-protocol-handler=""  5
                            protocol-handlers=""  6
                            message-converters=""  7
                            merge-with-default-converters=""  8
                            send-timeout=""  9
                            payload-type=""  10
                            use-broker=""  11
                            auto-startup=""  12
                            phase=""/>  13
1 组件 Bean 名称。如果你不设置 “channel” 属性,则会创建并注册“DirectChannel”,并在应用程序上下文中使用此 “id” 属性作为 bean 名称。在这种情况下,端点将使用 bean 名称 “id” 加 “.adapter” 进行注册。
2 标识连接到此适配器的通道。
3 MessageChannel” bean 引用,应该向其发送 “ErrorMessage” 实例。
4 请参阅 <<`<int-websocket:outbound-channel-adapter>`,websocket-outbound-channel-adapter-attributes>> 上的相同选项。
5 请参阅 <<`<int-websocket:outbound-channel-adapter>`,websocket-outbound-channel-adapter-attributes>> 上的相同选项。
6 请参阅 <<`<int-websocket:outbound-channel-adapter>`,websocket-outbound-channel-adapter-attributes>> 上的相同选项。
7 请参阅 <<`<int-websocket:outbound-channel-adapter>`,websocket-outbound-channel-adapter-attributes>> 上的相同选项。
8 请参阅 <<`<int-websocket:outbound-channel-adapter>`,websocket-outbound-channel-adapter-attributes>> 上的相同选项。
9 将消息发送到通道(如果通道可以阻塞)时等待所需的最大时间(以毫秒为单位)。例如,如果 QueueChannel 已达到其最大容量,则 QueueChannel 可以在有空间可用时进行阻塞。
10 Java 类型目标 “payload” 的完全限定名称,用于从传入 “WebSocketMessage” 中进行转换。默认为 “java.lang.String”。
11 指示该适配器是否向应用程序上下文的 “AbstractBrokerMessageHandler” 发送 “non-MESSAGE” “WebSocketMessage” 实例和带经纪目的地信息的消息。当此属性为 “true” 时,需要 “Broker Relay” 配置。此属性仅在服务器端使用。在客户端,它会被忽略。默认为 “false”。
12 请参阅 <<`<int-websocket:outbound-channel-adapter>`,websocket-outbound-channel-adapter-attributes>> 上的相同选项。
13 请参阅 <<`<int-websocket:outbound-channel-adapter>`,websocket-outbound-channel-adapter-attributes>> 上的相同选项。

Using ClientStompEncoder

从 4.3.13 版本开始,Spring Integration 提供 ClientStompEncoder (作为标准 StompEncoder 的扩展)以供在 WebSocket 信道适配器的客户端使用。为了正确准备客户端消息,您必须将 ClientStompEncoder 的实例注入 StompSubProtocolHandler。默认 StompSubProtocolHandler 的一个问题在于它是为服务器端设计的,因此它会将 SEND stompCommand 标头更新为 MESSAGE(这是服务器端 STOMP 协议的要求)。如果客户端不以正确的 SEND Web 套接字框架发送其消息,则部分 STOMP 代理不会接受它们。在这种情况下,ClientStompEncoder 的目的是覆盖 stompCommand 标头,并将其设置为 SEND 值,然后再将消息编码为 byte[]

Dynamic WebSocket Endpoints Registration

从 5.5 版本开始,WebSocket 服务器端点(基于 ServerWebSocketContainer 的信道适配器)现可在运行时注册(和移除) - 映射到 ServerWebSocketContainerpaths 通过 HandlerMapping 公开到 DispatcherServlet 中,并可供 WebSocket 客户端访问。Dynamic and Runtime Integration Flows 支持有助于以透明方式注册这些端点:

@Autowired
IntegrationFlowContext integrationFlowContext;

@Autowired
HandshakeHandler handshakeHandler;
...
ServerWebSocketContainer serverWebSocketContainer =
       new ServerWebSocketContainer("/dynamic")
               .setHandshakeHandler(this.handshakeHandler);

WebSocketInboundChannelAdapter webSocketInboundChannelAdapter =
       new WebSocketInboundChannelAdapter(serverWebSocketContainer);

QueueChannel dynamicRequestsChannel = new QueueChannel();

IntegrationFlow serverFlow =
       IntegrationFlow.from(webSocketInboundChannelAdapter)
               .channel(dynamicRequestsChannel)
               .get();

IntegrationFlowContext.IntegrationFlowRegistration dynamicServerFlow =
       this.integrationFlowContext.registration(serverFlow)
               .addBean(serverWebSocketContainer)
               .register();
...
dynamicServerFlow.destroy();

在动态流注册中调用 .addBean(serverWebSocketContainer) 以将 ServerWebSocketContainer 实例添加到 ApplicationContext 中以进行端点注册非常重要。当动态流注册被销毁时,关联的 ServerWebSocketContainer 实例也会被销毁,以及相应的端点注册(包括 URL 路径映射)。

动态 WebSocket 端点只能通过 Spring Integration 机制注册:当使用常规 Spring @EnableWebsocket 时,Spring Integration 配置会退出,不会注册用于动态端点的任何基础结构。