WebSockets

参考文档的这一部分涵盖了对反应堆栈 WebSocket 消息传递的支持。

Introduction to WebSocket

WebSocket 协议 RFC 6455 提供了在客户端和服务器之间建立全双工、双向通信信道的标准化方法,通过单一的 TCP 连接。它与 HTTP 是一种不同的 TCP 协议,但设计为通过 HTTP 运行,使用端口 80 和 443 以及允许重复使用现有的防火墙规则。 WebSocket 交互开始于一个 HTTP 请求,该请求使用 HTTP Upgrade 标头进行升级或(在本例中)切换到 WebSocket 协议。以下示例显示了这种交互:

GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket 1
Connection: Upgrade 2
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
1 The Upgrade header.
2 Using the Upgrade connection.

服务器不再使用常见的 200 状态代码,而是返回类似以下内容的输出:

HTTP/1.1 101 Switching Protocols 1
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
1 Protocol switch

握手成功后,HTTP 升级请求中包含的 TCP 套接字保持打开状态,以便客户端和服务器继续发送和接收消息。 WebSocket 工作原理的完整介绍超出了本文的范围。请参见 RFC 6455、HTML5 中的 WebSocket 章节或 Web 上的各种简介和教程。 请注意,如果 WebSocket 服务器在 Web 服务器(例如 nginx)后运行,则可能需要对其进行配置,以将 WebSocket 升级请求传递到 WebSocket 服务器。同样,如果应用程序在云环境中运行,请查看云提供商有关 WebSocket 支持的说明。

HTTP Versus WebSocket

即使 WebSocket 被设计为与 HTTP 兼容且以 HTTP 请求开头,但了解两个协议导致非常不同的体系结构和应用程序编程模型非常重要。

在 HTTP 和 REST 中,应用程序建模为多个 URL。要与应用程序交互,客户端以请求-响应样式访问这些 URL。服务器根据 HTTP URL、方法和标头将请求路由到适当的处理程序。

相比之下,在 WebSockets 中,最初连接通常只有一个 URL。随后,所有应用程序消息都在同一条 TCP 连接上流动。这指向一种完全不同的异步事件驱动消息架构。

WebSocket 也是一种低级传输协议,与 HTTP 不同,它不会规定消息内容的任何语义。这意味着除非客户端和服务器就消息语义达成一致,否则无法路由或处理消息。

WebSocket 客户端和服务器可以通过 HTTP 握手请求上的 Sec-WebSocket-Protocol 标头协商使用更高级别的消息传递协议(例如,STOMP)。如果没有,则需要想出自己的约定。

When to Use WebSockets

WebSocket 可以使网页变得动态和交互。但是,在许多情况下,AJAX 和 HTTP 流或长轮询的组合可以提供一个简单有效的解决方案。

例如,新闻、邮件和社交媒体需要动态更新,但每隔几分钟执行一次更新可能是完全可以的。另一方面,协作、游戏和金融应用需要接近实时。

延迟本身并不是决定因素。如果消息量相对较低(例如,监视网络故障),那么 HTTP 流或轮询可以提供一个有效的解决方案。低延迟、高频率和大容量的结合使 WebSocket 成为最佳案例。

另外请记住,在 Internet 上,不受你控制的限制性代理可能会阻止 WebSocket 交互,原因可能是因为它们未配置为传递 Upgrade 标头,也可能是因为它们关闭了看起来空闲的长期连接。这意味着对于防火墙内的内部应用程序,使用 WebSocket 比面向公众的应用程序更容易做出决定。

WebSocket API

Spring Framework 提供了 WebSocket API,您可以使用该 API 编写处理 WebSocket 消息的客户端和服务端应用程序。

Server

要创建 WebSocket 服务器,你可以先创建一个 WebSocketHandler。以下示例显示了如何执行此操作:

  • Java

  • Kotlin

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class MyWebSocketHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		// ...
	}
}
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession

class MyWebSocketHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {
		// ...
	}
}

然后你可以将它映射到一个 URL:

  • Java

  • Kotlin

@Configuration
class WebConfig {

	@Bean
	public HandlerMapping handlerMapping() {
		Map<String, WebSocketHandler> map = new HashMap<>();
		map.put("/path", new MyWebSocketHandler());
		int order = -1; // before annotated controllers

		return new SimpleUrlHandlerMapping(map, order);
	}
}
@Configuration
class WebConfig {

	@Bean
	fun handlerMapping(): HandlerMapping {
		val map = mapOf("/path" to MyWebSocketHandler())
		val order = -1 // before annotated controllers

		return SimpleUrlHandlerMapping(map, order)
	}
}

如果使用 WebFlux Config 无需进行任何其他操作,或者如果您不使用 WebFlux config,您需要声明一个 WebSocketHandlerAdapter,如下所示:

  • Java

  • Kotlin

@Configuration
class WebConfig {

	// ...

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter();
	}
}
@Configuration
class WebConfig {

	// ...

	@Bean
	fun handlerAdapter() =  WebSocketHandlerAdapter()
}

WebSocketHandler

WebSocketHandlerhandle 方法采用 WebSocketSession,并返回 Mono<Void>,以指示处理会话的应用程序完成。会话通过两个流来处理,一个用于入站消息,另一个用于出站消息。下表描述了处理这些流的两种方法:

WebSocketSession method Description

Flux<WebSocketMessage> receive()

提供对入站消息流的访问,并在关闭连接时完成。

Mono<Void> send(Publisher<WebSocketMessage>)

为传出邮件获取来源,编写邮件,并返回一个 Mono&lt;Void&gt;,该邮件在来源完成后并完成后完成。

WebSocketHandler 必须将入站流和出站流组成一个统一的流,并返回一个表示该流完成请求的 Mono<Void>。根据应用程序需求,当以下情况发生时,统一的流将完成:

  • 输入消息流或输出消息流完成。

  • 输入流完成(即,连接关闭),而输出流是无限的。

  • 通过 `WebSocketSession`的 `close`方法选择一个时间点。

当入站消息流和出站消息流组合在一起时,不需要检查连接是否处于打开状态,因为 Reactive Streams 会发信号结束活动。入站流会收到完成或错误信号,出站流会收到取消信号。

处理程序的最基本实现是处理入站流的实现。以下示例展示了此类实现:

Java
class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		return session.receive()			(1)
				.doOnNext(message -> {
					// ...					(2)
				})
				.concatMap(message -> {
					// ...					(3)
				})
				.then();					(4)
	}
}
1 访问输入消息流。
2 根据每条消息执行某些操作。
3 执行使用消息内容的嵌套异步操作。
4 在接收完成时返回完成的 Mono<Void>
Kotlin
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {
		return session.receive()            (1)
				.doOnNext {
					// ...					(2)
				}
				.concatMap {
					// ...					(3)
				}
				.then()                     (4)
	}
}
5 访问输入消息流。
6 根据每条消息执行某些操作。
7 执行使用消息内容的嵌套异步操作。
8 在接收完成时返回完成的 Mono<Void>

对于嵌套的异步操作,您可能需要调用 message.retain() 上的基础服务器,这些服务器使用池化数据缓冲区(例如,Netty)。否则,数据缓冲区可能会在您有机会读取数据之前释放。有关更多背景信息,请参阅 Data Buffers and Codecs

以下实现将入站流和出站流组合:

Java
class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Flux<WebSocketMessage> output = session.receive()				(1)
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.map(value -> session.textMessage("Echo " + value));	(2)

		return session.send(output);									(3)
	}
}
1 处理输出消息流。
2 创建输出消息,生成组合流。
3 返回一个在接收持续进行时不会完成的 Mono<Void>
Kotlin
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {

		val output = session.receive()                      (1)
				.doOnNext {
					// ...
				}
				.concatMap {
					// ...
				}
				.map { session.textMessage("Echo $it") }    (2)

		return session.send(output)                         (3)
	}
}
4 处理输出消息流。
5 创建输出消息,生成组合流。
6 返回一个在接收持续进行时不会完成的 Mono<Void>

入站流和出站流可以是独立的,并且仅在完成时才连接,如下示例所示:

Java
class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Mono<Void> input = session.receive()								1
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.then();

		Flux<String> source = ... ;
		Mono<Void> output = session.send(source.map(session::textMessage));	2

		return Mono.zip(input, output).then();								3
	}
}
1 Handle inbound message stream.
2 Send outgoing messages.
3 连接流并返回一个在任何一个流结束时完成的 Mono<Void>
Kotlin
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {

		val input = session.receive()									(1)
				.doOnNext {
					// ...
				}
				.concatMap {
					// ...
				}
				.then()

		val source: Flux<String> = ...
		val output = session.send(source.map(session::textMessage))		(2)

		return Mono.zip(input, output).then()							(3)
	}
}
4 Handle inbound message stream.
5 Send outgoing messages.
6 连接流并返回一个在任何一个流结束时完成的 Mono<Void>

DataBuffer

`DataBuffer`是 WebFlux 中字节缓冲区的表示形式。参考资料的 Spring Core 部分在Data Buffers and Codecs的部分中对此进行了更详细的说明。要了解的主要要点是,在某些服务器(例如 Netty)上,字节缓冲区是池化和引用计数的,并且在使用时必须释放以避免内存泄漏。

在 Netty 上运行时,如果应用程序希望保留输入数据缓冲区以确保它们不被释放,则必须使用 DataBufferUtils.retain(dataBuffer),并在消耗缓冲区时使用 DataBufferUtils.release(dataBuffer)

Handshake

WebSocketHandlerAdapter 委派给一个 WebSocketService。默认情况下,它是 HandshakeWebSocketService 的一个实例,它对 WebSocket 请求执行基本检查,然后为所用的服务器使用 RequestUpgradeStrategy。目前内置对 Reactor Netty、Tomcat、Jetty 和 Undertow 的支持。

HandshakeWebSocketService 公开了 una` sessionAttributePredicate` 属性,它允许设置一个`Predicate<String>` 来从 WebSession 中提取属性并将它们插入到 WebSocketSession 的属性中。

Server Configuration

每个服务器的 RequestUpgradeStrategy 都会公开特定于底层 WebSocket 服务器引擎的配置。在使用 WebFlux Java config 时,您可以根据 WebFlux Config 对应部分所示来自定义此类属性,或者如果您不使用 WebFlux config,请使用以下内容:

  • Java

  • Kotlin

@Configuration
class WebConfig {

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter(webSocketService());
	}

	@Bean
	public WebSocketService webSocketService() {
		TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
		strategy.setMaxSessionIdleTimeout(0L);
		return new HandshakeWebSocketService(strategy);
	}
}
@Configuration
class WebConfig {

	@Bean
	fun handlerAdapter() =
			WebSocketHandlerAdapter(webSocketService())

	@Bean
	fun webSocketService(): WebSocketService {
		val strategy = TomcatRequestUpgradeStrategy().apply {
			setMaxSessionIdleTimeout(0L)
		}
		return HandshakeWebSocketService(strategy)
	}
}

检查你的服务器的升级策略以查看有哪些选项可用。目前只有 Tomcat 和 Jetty 公开了此类选项。

CORS

配置 CORS 并限制对 WebSocket 端点的访问的最简单方法是让你的 WebSocketHandler 实现 CorsConfigurationSource 并返回一个带有允许的源、标题和其他详细信息的`CorsConfiguration`。如果你无法做到这一点,你还可以设置 SimpleUrlHandlercorsConfigurations 属性,以按 URL 模式指定 CORS 设置。如果同时指定了两者,它们会通过 CorsConfiguration 上的`combine` 方法合并。

Client

Spring WebFlux 提供了一个 WebSocketClient 抽象,其中包含 Reactor Netty、Tomcat、Jetty、Undertow 和标准 Java(即 JSR-356)的实现。

Tomcat 客户端实际上是标准 Java 客户端的扩展,在 WebSocketSession 处理中具有一些额外功能,以利用 Tomcat 特定的 API 来暂停接收反压消息。

若要启动 WebSocket 会话,你可以创建一个客户端实例并使用其 execute 方法:

  • Java

  • Kotlin

WebSocketClient client = new ReactorNettyWebSocketClient();

URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
		session.receive()
				.doOnNext(System.out::println)
				.then());
val client = ReactorNettyWebSocketClient()

		val url = URI("ws://localhost:8080/path")
		client.execute(url) { session ->
			session.receive()
					.doOnNext(::println)
			.then()
		}

某些客户端,例如 Jetty,实现 Lifecycle,需要停止并启动,你才能使用它们。所有客户端都具有与基础 WebSocket 客户端配置相关的构造函数选项。