WebSockets Support
从 4.1 版本开始,Spring Integration 支持 WebSocket。它基于 Spring Framework 的 web-socket`模块的架构、基础设施和 API。因此,Spring WebSocket 的许多组件(例如 `SubProtocolHandler`或 `WebSocketClient
)和配置选项(例如 @EnableWebSocketMessageBroker
)可以在 Spring Integration 中重复使用。有关详细信息,请参阅 Spring Framework 参考手册中的 Spring Framework WebSocket Support章节。
你需要将此依赖项包含在你的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-websocket</artifactId>
<version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-websocket:{project-version}"
对于服务器端,org.springframework:spring-webmvc
依赖项必须显式包含。
Spring Framework WebSocket 基础设施基于 Spring 消息基础,并提供了一个基于 Spring Integration 使用的相同 MessageChannel
实现和 MessageHandler
实现(以及一些 POJO 方法注释映射)的基本消息框架。因此,即使没有 WebSocket 适配器,Spring Integration 也可以直接参与 WebSocket 流。为此,你可以使用适当的注释来配置一个 Spring Integration @MessagingGateway
,如下面的示例所示:
@MessagingGateway
@Controller
public interface WebSocketGateway {
@MessageMapping("/greeting")
@SendToUser("/queue/answer")
@Gateway(requestChannel = "greetingChannel")
String greeting(String payload);
}
Overview
由于 WebSocket 协议按定义为流式,并且我们可以同时向和来自 WebSocket 发送和接收消息,因此可以处理适当的 WebSocketSession
,无论是在客户端还是在服务器端。为了封装连接管理和 WebSocketSession
注册表,需通过 ClientWebSocketContainer
和 ServerWebSocketContainer
实现为 IntegrationWebSocketContainer
提供信息。借助 Spring 框架中的 WebSocket API 及其实现(包含许多扩展),在服务器端和客户端均使用相同的类(当然,这是从 Java 的角度来看的)。因此,在两端,大多数连接和 WebSocketSession
注册表选项都是相同的。这让我们可以重复使用许多配置项和基础架构挂钩,以便在服务器端和客户端上构建 WebSocket 应用程序。以下示例显示了组件如何同时服务于这两个目的:
//Client side
@Bean
public WebSocketClient webSocketClient() {
return new SockJsClient(Collections.singletonList(new WebSocketTransport(new JettyWebSocketClient())));
}
@Bean
public IntegrationWebSocketContainer clientWebSocketContainer() {
return new ClientWebSocketContainer(webSocketClient(), "ws://my.server.com/endpoint");
}
//Server side
@Bean
public IntegrationWebSocketContainer serverWebSocketContainer() {
return new ServerWebSocketContainer("/endpoint").withSockJs();
}
IntegrationWebSocketContainer
旨在实现双向消息传递,可以在入站和出站通道适配器之间共享(见下文),在使用单向(发送或接收)WebSocket 消息传递时只能从中一个引用它。它可以在没有任何通道适配器的情况下使用,但是,在这种情况下,IntegrationWebSocketContainer
仅充当 WebSocketSession
注册表。
|
从 6.1 版本开始,ClientWebSocketContainer
可通过提供的 URI
进行配置,而不是 uriTemplate
和 uriVariables
组合。在有些场景下需要对 URI 的某些部分进行自定义编码时,这种方式非常有用。较为便捷的方式是,参阅 UriComponentsBuilder
API。
WebSocket Inbound Channel Adapter
WebSocketInboundChannelAdapter
实现了 WebSocketSession
交互中的接收部分。你必须向它提供一个 IntegrationWebSocketContainer
并且该适配器将自己注册为一个 WebSocketListener
来处理传入的消息和 WebSocketSession
事件。
只能在 |
为 WebSocket 子协议,WebSocketInboundChannelAdapter
可以通过 SubProtocolHandlerRegistry
进行配置,SubProtocolHandlerRegistry
为第二个构造函数参数。该适配器委托给 SubProtocolHandlerRegistry
来根据已接受的 WebSocketSession
判断恰当的 SubProtocolHandler
,并将一个 WebSocketMessage
转换为一个 Message
,具体取决于子协议的实现。
默认情况下, |
WebSocketInboundChannelAdapter
仅接受和发送包含 SimpMessageType.MESSAGE
或一个空的 simpMessageType
头的 Message
实例到底层集成流。所有其他 Message
类型通过从 SubProtocolHandler
实现(如 StompSubProtocolHandler
)发出的 ApplicationEvent
实例来处理。
在服务器端,如果存在 @EnableWebSocketMessageBroker
配置,可以使用 useBroker = true
选项来配置 WebSocketInboundChannelAdapter
。在这种情况下,所有 非 MESSAGE
的 Message
类型都将委托给提供的 AbstractBrokerMessageHandler
。另外,如果代理中继使用 destination 前缀进行了配置,那么与代理目标匹配的消息会路由到 AbstractBrokerMessageHandler
,而不是 WebSocketInboundChannelAdapter
的 outputChannel
。
如果 useBroker = false
且接收的消息类型是 SimpMessageType.CONNECT
,那么 WebSocketInboundChannelAdapter
会立即向 WebSocketSession
发送一个 SimpMessageType.CONNECT_ACK
消息,而不会将它发送到通道。
Spring 的 WebSocket 支持只允许配置一个代理中继。因此,不需要 |
有关更多配置选项,请参见 WebSockets Namespace Support。
WebSocket Outbound Channel Adapter
WebSocketOutboundChannelAdapter
:
-
从其
MessageChannel
接受 Spring 集成消息 -
从
MessageHeaders
中确定WebSocketSession
id
-
从提供的
IntegrationWebSocketContainer
中检索WebSocketSession
-
将
WebSocketMessage
的转换和发送工作委派给从提供的SubProtocolHandlerRegistry
中获取的合适的SubProtocolHandler
。
在客户端,并不需要 WebSocketSession
的 id
消息头,因为 ClientWebSocketContainer
只处理单个连接和它各自的 WebSocketSession
。
要使用 STOMP 子协议,你应该使用 StompSubProtocolHandler`配置该适配器。然后你可以使用 `StompHeaderAccessor.create(StompCommand…)`和 `MessageBuilder`将任何 STOMP 消息类型发送到该适配器,或者只使用 `HeaderEnricher
(请参见 Header Enricher)。
本章的剩余部分主要涵盖附加配置选项。
WebSockets Namespace Support
Spring Integration WebSocket 命名空间包括本章剩余部分描述的几个组件。要将它包含在你的配置中,在你的应用程序上下文配置文件中使用以下命名空间声明:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/websocket
https://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd">
...
</beans>
<int-websocket:client-container>
Attributes
以下清单展示了 <int-websocket:client-container>
元素可用的属性:
<int-websocket:client-container
id="" 1
client="" 2
uri="" 3
uri-variables="" 4
origin="" 5
send-time-limit="" 6
send-buffer-size-limit="" 7
send-buffer-overflow-strategy="" 8
auto-startup="" 9
phase=""> 10
<int-websocket:http-headers>
<entry key="" value=""/>
</int-websocket:http-headers> 11
</int-websocket:client-container>
1 | The component bean name. |
2 | The WebSocketClient bean reference. |
3 | 指向目标 WebSocket 服务的 uri 或 uriTemplate 。如果使用它作为具有 URI 变量占位符的 uriTemplate ,则需要提供 uri-variables 属性。 |
4 | uri 属性值中 URI 变量占位符的逗号分隔值。这些值根据它们在 uri 中的顺序替换到占位符中。请参见 UriComponents.expand(Object…​uriVariableValues) 。 |
5 | Origin 握手 HTTP 头部值。 |
6 | WebSocket 会话“发送”超时限制。默认为 10000 。 |
7 | WebSocket 会话“发送”消息大小限制。默认为 524288 。 |
8 | WebSocket 会话发送缓冲区溢出策略确定会话的出站消息缓冲区达到 send-buffer-size-limit 时的行为。有关可能的值和更多详细信息,请参见 ConcurrentWebSocketSessionDecorator.OverflowStrategy 。 |
9 | 布尔值,表示此端点是否应自动启动。默认为 false ,假设此容器是从 WebSocket inbound adapter 启动的。 |
10 | 此端点在其中启动和停止的生命周期阶段。值越小,此端点启动越早,停止越晚。默认为 Integer.MAX_VALUE 。值可以为负数。请参见 SmartLifeCycle 。 |
11 | 一个 Map 的 HttpHeaders ,用于与握手请求一起使用。 |
<int-websocket:server-container>
Attributes
以下清单展示了 <int-websocket:server-container>
元素可用的属性:
<int-websocket:server-container
id="" 1
path="" 2
handshake-handler="" 3
handshake-interceptors="" 4
decorator-factories="" 5
send-time-limit="" 6
send-buffer-size-limit="" 7
send-buffer-overflow-strategy="" 8
allowed-origins=""> 9
<int-websocket:sockjs
client-library-url="" 10
stream-bytes-limit="" 11
session-cookie-needed="" 12
heartbeat-time="" 13
disconnect-delay="" 14
message-cache-size="" 15
websocket-enabled="" 16
scheduler="" 17
message-codec="" 18
transport-handlers="" 19
suppress-cors="true" /> 20
</int-websocket:server-container>
1 | The component bean name. |
2 | 路径(或逗号分隔的路径),将特定的请求映射到 WebSocketHandler 。支持精确路径映射 URI(如 /myPath )和 ant 样式路径模式(如 /myPath/** )。 |
3 | HandshakeHandler bean 引用。默认为 DefaultHandshakeHandler 。 |
4 | HandshakeInterceptor bean 引用的列表。 |
5 | 一个或多个工厂 (WebSocketHandlerDecoratorFactory ) 的列表,用于装饰处理 WebSocket 消息的处理器。这对于一些高级用例可能很有用(例如,允许 Spring Security 在相应的 HTTP 会话过期时强制关闭 WebSocket 会话)。有关更多信息,请参见 Spring Session Project。 |
6 | 请参见 《< int-websocket:client-container >》,websocket-client-container-attributes 中的相同选项。 |
7 | 请参见 《< int-websocket:client-container >》,websocket-client-container-attributes 中的相同选项。 |
8 | WebSocket 会话发送缓冲区溢出策略确定会话的出站消息缓冲区达到 send-buffer-size-limit 时的行为。有关可能的值和更多详细信息,请参见 ConcurrentWebSocketSessionDecorator.OverflowStrategy 。 |
9 | 允许的来源标头值。你可以将多个来源指定为逗号分隔的列表。此项检查主要针对浏览器客户端设计。没有任何内容阻止其他类型的客户端修改来源标头值。当 SockJS 处于启用状态,且允许的来源受限时,不支持不使用来源标头进行跨域请求的传输类型(jsonp-polling 、iframe-xhr-polling 、iframe-eventsource`和 `iframe-htmlfile )。因此,不支持 IE6 和 IE7,只无 cookie 支持 IE8 和 IE9。默认情况下,所有来源均被允许。 |
10 | 没有原生的跨域通信的传输(例如 eventsource 和 htmlfile )必须在一个不可见的 iframe 中从 “foreign” 域获取一个简单的页面,以便 iframe 中的代码可以从一个本地的域运行到 SockJS 服务器。由于 iframe 需要加载 SockJS javascript 客户端库,此属性允许你指定将其加载到的位置。默认情况下,它指向 https://d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js 。但是,你还可以将其设置为指向应用程序提供的 URL。请注意,可以指定相对 URL,在这种情况下,URL 必须相对于 iframe URL。例如,假设一个 SockJS 端点已映射到 /sockjs ,而最终的 iframe URL 是 /sockjs/iframe.html ,则相对 URL 必须以“../../”开头,才能遍历到 SockJS 映射上方的位置。对于基于前缀的 servlet 映射,你可能需要再进行一次遍历。 |
11 | 单个 HTTP 流请求在关闭前可以发送的最小字节数。默认为 128K (即 128*1024 或 131072 字节)。 |
12 | 值 cookie_needed 在来自 SockJs /info 端点的响应中。此属性指出是否需要 JSESSIONID cookie 使应用程序能够正常运行(例如,负载平衡或在 Java Servlet 容器中使用 HTTP 会话)。 |
13 | 在服务器未发送任何消息且之后服务器应向客户端发送心跳帧以防止断开连接的时间量(以毫秒为单位)。默认值为 25,000 (25 秒)。 |
14 | 客户端被视为断开连接前未进行接收连接(即,服务器可以通过该连接将数据发送到客户端的活动连接)的时间量(以毫秒为单位)。默认值为 5000 。 |
15 | 在等待来自客户端的下一个 HTTP 轮询请求时,会话可以缓存的服务器到客户端消息数。默认大小为 100 。 |
16 | 有些负载均衡器不支持 WebSockets。将此选项设置为 “false ” 以禁用服务器端的 WebSocket 传输。默认值为 “true ”。 |
17 | “TaskScheduler ” bean 引用。如果未提供任何值,将创建一个新的 “ThreadPoolTaskScheduler ” 实例。此调度器实例用于调度心跳消息。 |
18 | 用于对 SockJS 消息进行编码和解码的 “SockJsMessageCodec ” bean 引用。默认情况下,使用 “Jackson2SockJsMessageCodec ”,它要求类路径中存在 Jackson 库。 |
19 | “TransportHandler ” bean 引用列表。 |
20 | 是否禁用为 SockJS 请求自动添加 CORS 标头。默认值为 “false ”。 |
<int-websocket:outbound-channel-adapter>
Attributes
以下清单展示了 <int-websocket:outbound-channel-adapter>
元素可用的属性:
<int-websocket:outbound-channel-adapter
id="" 1
channel="" 2
container="" 3
default-protocol-handler="" 4
protocol-handlers="" 5
message-converters="" 6
merge-with-default-converters="" 7
auto-startup="" 8
phase=""/> 9
1 | 组件 Bean 名称。如果你不提供 “channel ” 属性,则会创建并注册“DirectChannel ”,并在应用程序上下文中使用此 “id ” 属性作为 bean 名称。在这种情况下,端点将使用 bean 名称 “id ” 加 “.adapter ” 进行注册。而且 “MessageHandler ” 将使用 bean 别名 “id ” 加 “.handler ” 进行注册。 |
2 | 标识连接到此适配器的通道。 |
3 | 对封装底层连接和 “WebSocketSession ” 处理操作的 “IntegrationWebSocketContainer ” bean 的引用。必须提供。 |
4 | 对 “SubProtocolHandler ” 实例的可选引用。如果客户端未请求子协议,或者它是一个单协议处理程序,则会使用它。如果未提供此引用或 “protocol-handlers ” 列表,则默认情况下会使用 “PassThruSubProtocolHandler ”。 |
5 | 此频道适配器的 “SubProtocolHandler ” bean 引用列表。如果你只提供了一个 bean 引用并且未提供 “default-protocol-handler ”,则该单一 “SubProtocolHandler ” 会用作 “default-protocol-handler ”。如果你未设置此属性或 “default-protocol-handler ”,则默认情况下会使用 “PassThruSubProtocolHandler ”。 |
6 | 此频道适配器的 “MessageConverter ” bean 引用列表。 |
7 | 布尔值,指示是否应在任何自定义转换器之后注册默认转换器。仅当提供了 “message-converters ” 时才会使用此标志。否则,将注册所有默认转换器。默认为 “false ”。默认转换器为(按顺序):“StringMessageConverter ”、“ByteArrayMessageConverter ” 和 “MappingJackson2MessageConverter ”(如果类路径中存在 Jackson 库)。 |
8 | 布尔值,指示此端点是否应自动启动。默认为 “true ”。 |
9 | 此端点应在其中启动和停止的生命周期阶段。值越低,此端点启动得越早,停止得越晚。默认值为 Integer.MIN_VALUE 。值可以为负。请参见 SmartLifeCycle 。 |
<int-websocket:inbound-channel-adapter>
Attributes
以下清单展示了 <int-websocket:outbound-channel-adapter>
元素可用的属性:
<int-websocket:inbound-channel-adapter
id="" 1
channel="" 2
error-channel="" 3
container="" 4
default-protocol-handler="" 5
protocol-handlers="" 6
message-converters="" 7
merge-with-default-converters="" 8
send-timeout="" 9
payload-type="" 10
use-broker="" 11
auto-startup="" 12
phase=""/> 13
1 | 组件 Bean 名称。如果你不设置 “channel ” 属性,则会创建并注册“DirectChannel ”,并在应用程序上下文中使用此 “id ” 属性作为 bean 名称。在这种情况下,端点将使用 bean 名称 “id ” 加 “.adapter ” 进行注册。 |
2 | 标识连接到此适配器的通道。 |
3 | “MessageChannel ” bean 引用,应该向其发送 “ErrorMessage ” 实例。 |
4 | 请参阅 <<`<int-websocket:outbound-channel-adapter>`,websocket-outbound-channel-adapter-attributes>> 上的相同选项。 |
5 | 请参阅 <<`<int-websocket:outbound-channel-adapter>`,websocket-outbound-channel-adapter-attributes>> 上的相同选项。 |
6 | 请参阅 <<`<int-websocket:outbound-channel-adapter>`,websocket-outbound-channel-adapter-attributes>> 上的相同选项。 |
7 | 请参阅 <<`<int-websocket:outbound-channel-adapter>`,websocket-outbound-channel-adapter-attributes>> 上的相同选项。 |
8 | 请参阅 <<`<int-websocket:outbound-channel-adapter>`,websocket-outbound-channel-adapter-attributes>> 上的相同选项。 |
9 | 将消息发送到通道(如果通道可以阻塞)时等待所需的最大时间(以毫秒为单位)。例如,如果 QueueChannel 已达到其最大容量,则 QueueChannel 可以在有空间可用时进行阻塞。 |
10 | Java 类型目标 “payload ” 的完全限定名称,用于从传入 “WebSocketMessage ” 中进行转换。默认为 “java.lang.String ”。 |
11 | 指示该适配器是否向应用程序上下文的 “AbstractBrokerMessageHandler ” 发送 “non-MESSAGE ” “WebSocketMessage ” 实例和带经纪目的地信息的消息。当此属性为 “true ” 时,需要 “Broker Relay ” 配置。此属性仅在服务器端使用。在客户端,它会被忽略。默认为 “false ”。 |
12 | 请参阅 <<`<int-websocket:outbound-channel-adapter>`,websocket-outbound-channel-adapter-attributes>> 上的相同选项。 |
13 | 请参阅 <<`<int-websocket:outbound-channel-adapter>`,websocket-outbound-channel-adapter-attributes>> 上的相同选项。 |
Using ClientStompEncoder
从 4.3.13 版本开始,Spring Integration 提供 ClientStompEncoder
(作为标准 StompEncoder
的扩展)以供在 WebSocket 信道适配器的客户端使用。为了正确准备客户端消息,您必须将 ClientStompEncoder
的实例注入 StompSubProtocolHandler
。默认 StompSubProtocolHandler
的一个问题在于它是为服务器端设计的,因此它会将 SEND
stompCommand
标头更新为 MESSAGE
(这是服务器端 STOMP 协议的要求)。如果客户端不以正确的 SEND
Web 套接字框架发送其消息,则部分 STOMP 代理不会接受它们。在这种情况下,ClientStompEncoder
的目的是覆盖 stompCommand
标头,并将其设置为 SEND
值,然后再将消息编码为 byte[]
。
Dynamic WebSocket Endpoints Registration
从 5.5 版本开始,WebSocket 服务器端点(基于 ServerWebSocketContainer
的信道适配器)现可在运行时注册(和移除) - 映射到 ServerWebSocketContainer
的 paths
通过 HandlerMapping
公开到 DispatcherServlet
中,并可供 WebSocket 客户端访问。Dynamic and Runtime Integration Flows 支持有助于以透明方式注册这些端点:
@Autowired
IntegrationFlowContext integrationFlowContext;
@Autowired
HandshakeHandler handshakeHandler;
...
ServerWebSocketContainer serverWebSocketContainer =
new ServerWebSocketContainer("/dynamic")
.setHandshakeHandler(this.handshakeHandler);
WebSocketInboundChannelAdapter webSocketInboundChannelAdapter =
new WebSocketInboundChannelAdapter(serverWebSocketContainer);
QueueChannel dynamicRequestsChannel = new QueueChannel();
IntegrationFlow serverFlow =
IntegrationFlow.from(webSocketInboundChannelAdapter)
.channel(dynamicRequestsChannel)
.get();
IntegrationFlowContext.IntegrationFlowRegistration dynamicServerFlow =
this.integrationFlowContext.registration(serverFlow)
.addBean(serverWebSocketContainer)
.register();
...
dynamicServerFlow.destroy();
在动态流注册中调用 |
动态 WebSocket 端点只能通过 Spring Integration 机制注册:当使用常规 Spring @EnableWebsocket
时,Spring Integration 配置会退出,不会注册用于动态端点的任何基础结构。