WebSockets

参考文档的这一部分涵盖了对反应堆栈 WebSocket 消息传递的支持。

This part of the reference documentation covers support for reactive-stack WebSocket messaging.

Introduction to WebSocket

WebSocket 协议 RFC 6455 提供了在客户端和服务器之间建立全双工、双向通信信道的标准化方法,通过单一的 TCP 连接。它与 HTTP 是一种不同的 TCP 协议,但设计为通过 HTTP 运行,使用端口 80 和 443 以及允许重复使用现有的防火墙规则。

The WebSocket protocol, RFC 6455, provides a standardized way to establish a full-duplex, two-way communication channel between client and server over a single TCP connection. It is a different TCP protocol from HTTP but is designed to work over HTTP, using ports 80 and 443 and allowing re-use of existing firewall rules.

WebSocket 交互开始于一个 HTTP 请求,该请求使用 HTTP Upgrade 标头进行升级或(在本例中)切换到 WebSocket 协议。以下示例显示了这种交互:

A WebSocket interaction begins with an HTTP request that uses the HTTP Upgrade header to upgrade or, in this case, to switch to the WebSocket protocol. The following example shows such an interaction:

GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket 1
Connection: Upgrade 2
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
1 The Upgrade header.
2 Using the Upgrade connection.

服务器不再使用常见的 200 状态代码,而是返回类似以下内容的输出:

Instead of the usual 200 status code, a server with WebSocket support returns output similar to the following:

HTTP/1.1 101 Switching Protocols 1
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
1 Protocol switch

握手成功后,HTTP 升级请求中包含的 TCP 套接字保持打开状态,以便客户端和服务器继续发送和接收消息。

After a successful handshake, the TCP socket underlying the HTTP upgrade request remains open for both the client and the server to continue to send and receive messages.

WebSocket 工作原理的完整介绍超出了本文的范围。请参见 RFC 6455、HTML5 中的 WebSocket 章节或 Web 上的各种简介和教程。

A complete introduction of how WebSockets work is beyond the scope of this document. See RFC 6455, the WebSocket chapter of HTML5, or any of the many introductions and tutorials on the Web.

请注意,如果 WebSocket 服务器在 Web 服务器(例如 nginx)后运行,则可能需要对其进行配置,以将 WebSocket 升级请求传递到 WebSocket 服务器。同样,如果应用程序在云环境中运行,请查看云提供商有关 WebSocket 支持的说明。

Note that, if a WebSocket server is running behind a web server (e.g. nginx), you likely need to configure it to pass WebSocket upgrade requests on to the WebSocket server. Likewise, if the application runs in a cloud environment, check the instructions of the cloud provider related to WebSocket support.

HTTP Versus WebSocket

即使 WebSocket 被设计为与 HTTP 兼容且以 HTTP 请求开头,但了解两个协议导致非常不同的体系结构和应用程序编程模型非常重要。

Even though WebSocket is designed to be HTTP-compatible and starts with an HTTP request, it is important to understand that the two protocols lead to very different architectures and application programming models.

在 HTTP 和 REST 中,应用程序建模为多个 URL。要与应用程序交互,客户端以请求-响应样式访问这些 URL。服务器根据 HTTP URL、方法和标头将请求路由到适当的处理程序。

In HTTP and REST, an application is modeled as many URLs. To interact with the application, clients access those URLs, request-response style. Servers route requests to the appropriate handler based on the HTTP URL, method, and headers.

相比之下,在 WebSockets 中,最初连接通常只有一个 URL。随后,所有应用程序消息都在同一条 TCP 连接上流动。这指向一种完全不同的异步事件驱动消息架构。

By contrast, in WebSockets, there is usually only one URL for the initial connect. Subsequently, all application messages flow on that same TCP connection. This points to an entirely different asynchronous, event-driven, messaging architecture.

WebSocket 也是一种低级传输协议,与 HTTP 不同,它不会规定消息内容的任何语义。这意味着除非客户端和服务器就消息语义达成一致,否则无法路由或处理消息。

WebSocket is also a low-level transport protocol, which, unlike HTTP, does not prescribe any semantics to the content of messages. That means that there is no way to route or process a message unless the client and the server agree on message semantics.

WebSocket 客户端和服务器可以通过 HTTP 握手请求上的 Sec-WebSocket-Protocol 标头协商使用更高级别的消息传递协议(例如,STOMP)。如果没有,则需要想出自己的约定。

WebSocket clients and servers can negotiate the use of a higher-level, messaging protocol (for example, STOMP), through the Sec-WebSocket-Protocol header on the HTTP handshake request. In the absence of that, they need to come up with their own conventions.

When to Use WebSockets

WebSocket 可以使网页变得动态和交互。但是,在许多情况下,AJAX 和 HTTP 流或长轮询的组合可以提供一个简单有效的解决方案。

WebSockets can make a web page be dynamic and interactive. However, in many cases, a combination of AJAX and HTTP streaming or long polling can provide a simple and effective solution.

例如,新闻、邮件和社交媒体需要动态更新,但每隔几分钟执行一次更新可能是完全可以的。另一方面,协作、游戏和金融应用需要接近实时。

For example, news, mail, and social feeds need to update dynamically, but it may be perfectly okay to do so every few minutes. Collaboration, games, and financial apps, on the other hand, need to be much closer to real-time.

延迟本身并不是决定因素。如果消息量相对较低(例如,监视网络故障),那么 HTTP 流或轮询可以提供一个有效的解决方案。低延迟、高频率和大容量的结合使 WebSocket 成为最佳案例。

Latency alone is not a deciding factor. If the volume of messages is relatively low (for example, monitoring network failures) HTTP streaming or polling can provide an effective solution. It is the combination of low latency, high frequency, and high volume that make the best case for the use of WebSocket.

另外请记住,在 Internet 上,不受你控制的限制性代理可能会阻止 WebSocket 交互,原因可能是因为它们未配置为传递 Upgrade 标头,也可能是因为它们关闭了看起来空闲的长期连接。这意味着对于防火墙内的内部应用程序,使用 WebSocket 比面向公众的应用程序更容易做出决定。

Keep in mind also that over the Internet, restrictive proxies that are outside of your control may preclude WebSocket interactions, either because they are not configured to pass on the Upgrade header or because they close long-lived connections that appear idle. This means that the use of WebSocket for internal applications within the firewall is a more straightforward decision than it is for public facing applications.

WebSocket API

Spring Framework 提供了 WebSocket API,您可以使用该 API 编写处理 WebSocket 消息的客户端和服务端应用程序。

The Spring Framework provides a WebSocket API that you can use to write client- and server-side applications that handle WebSocket messages.

Server

要创建 WebSocket 服务器,你可以先创建一个 WebSocketHandler。以下示例显示了如何执行此操作:

To create a WebSocket server, you can first create a WebSocketHandler. The following example shows how to do so:

  • Java

  • Kotlin

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class MyWebSocketHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		// ...
	}
}
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession

class MyWebSocketHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {
		// ...
	}
}

然后你可以将它映射到一个 URL:

Then you can map it to a URL:

  • Java

  • Kotlin

@Configuration
class WebConfig {

	@Bean
	public HandlerMapping handlerMapping() {
		Map<String, WebSocketHandler> map = new HashMap<>();
		map.put("/path", new MyWebSocketHandler());
		int order = -1; // before annotated controllers

		return new SimpleUrlHandlerMapping(map, order);
	}
}
@Configuration
class WebConfig {

	@Bean
	fun handlerMapping(): HandlerMapping {
		val map = mapOf("/path" to MyWebSocketHandler())
		val order = -1 // before annotated controllers

		return SimpleUrlHandlerMapping(map, order)
	}
}

如果使用 WebFlux Config 无需进行任何其他操作,或者如果您不使用 WebFlux config,您需要声明一个 WebSocketHandlerAdapter,如下所示:

If using the WebFlux Config there is nothing further to do, or otherwise if not using the WebFlux config you’ll need to declare a WebSocketHandlerAdapter as shown below:

  • Java

  • Kotlin

@Configuration
class WebConfig {

	// ...

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter();
	}
}
@Configuration
class WebConfig {

	// ...

	@Bean
	fun handlerAdapter() =  WebSocketHandlerAdapter()
}

WebSocketHandler

WebSocketHandlerhandle 方法采用 WebSocketSession,并返回 Mono<Void>,以指示处理会话的应用程序完成。会话通过两个流来处理,一个用于入站消息,另一个用于出站消息。下表描述了处理这些流的两种方法:

The handle method of WebSocketHandler takes WebSocketSession and returns Mono<Void> to indicate when application handling of the session is complete. The session is handled through two streams, one for inbound and one for outbound messages. The following table describes the two methods that handle the streams:

WebSocketSession method Description

Flux<WebSocketMessage> receive()

Provides access to the inbound message stream and completes when the connection is closed.

Mono<Void> send(Publisher<WebSocketMessage>)

Takes a source for outgoing messages, writes the messages, and returns a Mono<Void> that completes when the source completes and writing is done.

WebSocketHandler 必须将入站流和出站流组成一个统一的流,并返回一个表示该流完成请求的 Mono<Void>。根据应用程序需求,当以下情况发生时,统一的流将完成:

A WebSocketHandler must compose the inbound and outbound streams into a unified flow and return a Mono<Void> that reflects the completion of that flow. Depending on application requirements, the unified flow completes when:

  • Either the inbound or the outbound message stream completes.

  • The inbound stream completes (that is, the connection closed), while the outbound stream is infinite.

  • At a chosen point, through the close method of WebSocketSession.

当入站消息流和出站消息流组合在一起时,不需要检查连接是否处于打开状态,因为 Reactive Streams 会发信号结束活动。入站流会收到完成或错误信号,出站流会收到取消信号。

When inbound and outbound message streams are composed together, there is no need to check if the connection is open, since Reactive Streams signals end activity. The inbound stream receives a completion or error signal, and the outbound stream receives a cancellation signal.

处理程序的最基本实现是处理入站流的实现。以下示例展示了此类实现:

The most basic implementation of a handler is one that handles the inbound stream. The following example shows such an implementation:

Java
class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		return session.receive()			(1)
				.doOnNext(message -> {
					// ...					(2)
				})
				.concatMap(message -> {
					// ...					(3)
				})
				.then();					(4)
	}
}
1 Access the stream of inbound messages.
2 Do something with each message.
3 Perform nested asynchronous operations that use the message content.
4 Return a Mono<Void> that completes when receiving completes.
Kotlin
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {
		return session.receive()            (1)
				.doOnNext {
					// ...					(2)
				}
				.concatMap {
					// ...					(3)
				}
				.then()                     (4)
	}
}
5 Access the stream of inbound messages.
6 Do something with each message.
7 Perform nested asynchronous operations that use the message content.
8 Return a Mono<Void> that completes when receiving completes.

对于嵌套的异步操作,您可能需要调用 message.retain() 上的基础服务器,这些服务器使用池化数据缓冲区(例如,Netty)。否则,数据缓冲区可能会在您有机会读取数据之前释放。有关更多背景信息,请参阅 Data Buffers and Codecs

For nested, asynchronous operations, you may need to call message.retain() on underlying servers that use pooled data buffers (for example, Netty). Otherwise, the data buffer may be released before you have had a chance to read the data. For more background, see Data Buffers and Codecs.

以下实现将入站流和出站流组合:

The following implementation combines the inbound and outbound streams:

Java
class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Flux<WebSocketMessage> output = session.receive()				(1)
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.map(value -> session.textMessage("Echo " + value));	(2)

		return session.send(output);									(3)
	}
}
1 Handle the inbound message stream.
2 Create the outbound message, producing a combined flow.
3 Return a Mono<Void> that does not complete while we continue to receive.
Kotlin
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {

		val output = session.receive()                      (1)
				.doOnNext {
					// ...
				}
				.concatMap {
					// ...
				}
				.map { session.textMessage("Echo $it") }    (2)

		return session.send(output)                         (3)
	}
}
4 Handle the inbound message stream.
5 Create the outbound message, producing a combined flow.
6 Return a Mono<Void> that does not complete while we continue to receive.

入站流和出站流可以是独立的,并且仅在完成时才连接,如下示例所示:

Inbound and outbound streams can be independent and be joined only for completion, as the following example shows:

Java
class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Mono<Void> input = session.receive()								1
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.then();

		Flux<String> source = ... ;
		Mono<Void> output = session.send(source.map(session::textMessage));	2

		return Mono.zip(input, output).then();								3
	}
}
1 Handle inbound message stream.
2 Send outgoing messages.
3 Join the streams and return a Mono<Void> that completes when either stream ends.
Kotlin
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {

		val input = session.receive()									(1)
				.doOnNext {
					// ...
				}
				.concatMap {
					// ...
				}
				.then()

		val source: Flux<String> = ...
		val output = session.send(source.map(session::textMessage))		(2)

		return Mono.zip(input, output).then()							(3)
	}
}
4 Handle inbound message stream.
5 Send outgoing messages.
6 Join the streams and return a Mono<Void> that completes when either stream ends.

DataBuffer

`DataBuffer`是 WebFlux 中字节缓冲区的表示形式。参考资料的 Spring Core 部分在Data Buffers and Codecs的部分中对此进行了更详细的说明。要了解的主要要点是,在某些服务器(例如 Netty)上,字节缓冲区是池化和引用计数的,并且在使用时必须释放以避免内存泄漏。

DataBuffer is the representation for a byte buffer in WebFlux. The Spring Core part of the reference has more on that in the section on Data Buffers and Codecs. The key point to understand is that on some servers like Netty, byte buffers are pooled and reference counted, and must be released when consumed to avoid memory leaks.

在 Netty 上运行时,如果应用程序希望保留输入数据缓冲区以确保它们不被释放,则必须使用 DataBufferUtils.retain(dataBuffer),并在消耗缓冲区时使用 DataBufferUtils.release(dataBuffer)

When running on Netty, applications must use DataBufferUtils.retain(dataBuffer) if they wish to hold on input data buffers in order to ensure they are not released, and subsequently use DataBufferUtils.release(dataBuffer) when the buffers are consumed.

Handshake

WebSocketHandlerAdapter 委派给一个 WebSocketService。默认情况下,它是 HandshakeWebSocketService 的一个实例,它对 WebSocket 请求执行基本检查,然后为所用的服务器使用 RequestUpgradeStrategy。目前内置对 Reactor Netty、Tomcat、Jetty 和 Undertow 的支持。

WebSocketHandlerAdapter delegates to a WebSocketService. By default, that is an instance of HandshakeWebSocketService, which performs basic checks on the WebSocket request and then uses RequestUpgradeStrategy for the server in use. Currently, there is built-in support for Reactor Netty, Tomcat, Jetty, and Undertow.

HandshakeWebSocketService 公开了 una` sessionAttributePredicate` 属性,它允许设置一个`Predicate<String>` 来从 WebSession 中提取属性并将它们插入到 WebSocketSession 的属性中。

HandshakeWebSocketService exposes a sessionAttributePredicate property that allows setting a Predicate<String> to extract attributes from the WebSession and insert them into the attributes of the WebSocketSession.

Server Configuration

每个服务器的 RequestUpgradeStrategy 都会公开特定于底层 WebSocket 服务器引擎的配置。在使用 WebFlux Java config 时,您可以根据 WebFlux Config 对应部分所示来自定义此类属性,或者如果您不使用 WebFlux config,请使用以下内容:

The RequestUpgradeStrategy for each server exposes configuration specific to the underlying WebSocket server engine. When using the WebFlux Java config you can customize such properties as shown in the corresponding section of the WebFlux Config, or otherwise if not using the WebFlux config, use the below:

  • Java

  • Kotlin

@Configuration
class WebConfig {

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter(webSocketService());
	}

	@Bean
	public WebSocketService webSocketService() {
		TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
		strategy.setMaxSessionIdleTimeout(0L);
		return new HandshakeWebSocketService(strategy);
	}
}
@Configuration
class WebConfig {

	@Bean
	fun handlerAdapter() =
			WebSocketHandlerAdapter(webSocketService())

	@Bean
	fun webSocketService(): WebSocketService {
		val strategy = TomcatRequestUpgradeStrategy().apply {
			setMaxSessionIdleTimeout(0L)
		}
		return HandshakeWebSocketService(strategy)
	}
}

检查你的服务器的升级策略以查看有哪些选项可用。目前只有 Tomcat 和 Jetty 公开了此类选项。

Check the upgrade strategy for your server to see what options are available. Currently, only Tomcat and Jetty expose such options.

CORS

配置 CORS 并限制对 WebSocket 端点的访问的最简单方法是让你的 WebSocketHandler 实现 CorsConfigurationSource 并返回一个带有允许的源、标题和其他详细信息的`CorsConfiguration`。如果你无法做到这一点,你还可以设置 SimpleUrlHandlercorsConfigurations 属性,以按 URL 模式指定 CORS 设置。如果同时指定了两者,它们会通过 CorsConfiguration 上的`combine` 方法合并。

The easiest way to configure CORS and restrict access to a WebSocket endpoint is to have your WebSocketHandler implement CorsConfigurationSource and return a CorsConfiguration with allowed origins, headers, and other details. If you cannot do that, you can also set the corsConfigurations property on the SimpleUrlHandler to specify CORS settings by URL pattern. If both are specified, they are combined by using the combine method on CorsConfiguration.

Client

Spring WebFlux 提供了一个 WebSocketClient 抽象,其中包含 Reactor Netty、Tomcat、Jetty、Undertow 和标准 Java(即 JSR-356)的实现。

Spring WebFlux provides a WebSocketClient abstraction with implementations for Reactor Netty, Tomcat, Jetty, Undertow, and standard Java (that is, JSR-356).

Tomcat 客户端实际上是标准 Java 客户端的扩展,在 WebSocketSession 处理中具有一些额外功能,以利用 Tomcat 特定的 API 来暂停接收反压消息。

The Tomcat client is effectively an extension of the standard Java one with some extra functionality in the WebSocketSession handling to take advantage of the Tomcat-specific API to suspend receiving messages for back pressure.

若要启动 WebSocket 会话,你可以创建一个客户端实例并使用其 execute 方法:

To start a WebSocket session, you can create an instance of the client and use its execute methods:

  • Java

  • Kotlin

WebSocketClient client = new ReactorNettyWebSocketClient();

URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
		session.receive()
				.doOnNext(System.out::println)
				.then());
val client = ReactorNettyWebSocketClient()

		val url = URI("ws://localhost:8080/path")
		client.execute(url) { session ->
			session.receive()
					.doOnNext(::println)
			.then()
		}

某些客户端,例如 Jetty,实现 Lifecycle,需要停止并启动,你才能使用它们。所有客户端都具有与基础 WebSocket 客户端配置相关的构造函数选项。

Some clients, such as Jetty, implement Lifecycle and need to be stopped and started before you can use them. All clients have constructor options related to configuration of the underlying WebSocket client.