WebSockets
参考文档的这一部分涵盖了对反应堆栈 WebSocket 消息传递的支持。
This part of the reference documentation covers support for reactive-stack WebSocket messaging.
Introduction to WebSocket
WebSocket 协议 RFC 6455 提供了在客户端和服务器之间建立全双工、双向通信信道的标准化方法,通过单一的 TCP 连接。它与 HTTP 是一种不同的 TCP 协议,但设计为通过 HTTP 运行,使用端口 80 和 443 以及允许重复使用现有的防火墙规则。
The WebSocket protocol, RFC 6455, provides a standardized way to establish a full-duplex, two-way communication channel between client and server over a single TCP connection. It is a different TCP protocol from HTTP but is designed to work over HTTP, using ports 80 and 443 and allowing re-use of existing firewall rules.
WebSocket 交互开始于一个 HTTP 请求,该请求使用 HTTP Upgrade
标头进行升级或(在本例中)切换到 WebSocket 协议。以下示例显示了这种交互:
A WebSocket interaction begins with an HTTP request that uses the HTTP Upgrade
header
to upgrade or, in this case, to switch to the WebSocket protocol. The following example
shows such an interaction:
GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket 1
Connection: Upgrade 2
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
1 | The Upgrade header. |
2 | Using the Upgrade connection. |
服务器不再使用常见的 200 状态代码,而是返回类似以下内容的输出:
Instead of the usual 200 status code, a server with WebSocket support returns output similar to the following:
HTTP/1.1 101 Switching Protocols 1
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
1 | Protocol switch |
握手成功后,HTTP 升级请求中包含的 TCP 套接字保持打开状态,以便客户端和服务器继续发送和接收消息。
After a successful handshake, the TCP socket underlying the HTTP upgrade request remains open for both the client and the server to continue to send and receive messages.
WebSocket 工作原理的完整介绍超出了本文的范围。请参见 RFC 6455、HTML5 中的 WebSocket 章节或 Web 上的各种简介和教程。
A complete introduction of how WebSockets work is beyond the scope of this document. See RFC 6455, the WebSocket chapter of HTML5, or any of the many introductions and tutorials on the Web.
请注意,如果 WebSocket 服务器在 Web 服务器(例如 nginx)后运行,则可能需要对其进行配置,以将 WebSocket 升级请求传递到 WebSocket 服务器。同样,如果应用程序在云环境中运行,请查看云提供商有关 WebSocket 支持的说明。
Note that, if a WebSocket server is running behind a web server (e.g. nginx), you likely need to configure it to pass WebSocket upgrade requests on to the WebSocket server. Likewise, if the application runs in a cloud environment, check the instructions of the cloud provider related to WebSocket support.
HTTP Versus WebSocket
即使 WebSocket 被设计为与 HTTP 兼容且以 HTTP 请求开头,但了解两个协议导致非常不同的体系结构和应用程序编程模型非常重要。
Even though WebSocket is designed to be HTTP-compatible and starts with an HTTP request, it is important to understand that the two protocols lead to very different architectures and application programming models.
在 HTTP 和 REST 中,应用程序建模为多个 URL。要与应用程序交互,客户端以请求-响应样式访问这些 URL。服务器根据 HTTP URL、方法和标头将请求路由到适当的处理程序。
In HTTP and REST, an application is modeled as many URLs. To interact with the application, clients access those URLs, request-response style. Servers route requests to the appropriate handler based on the HTTP URL, method, and headers.
相比之下,在 WebSockets 中,最初连接通常只有一个 URL。随后,所有应用程序消息都在同一条 TCP 连接上流动。这指向一种完全不同的异步事件驱动消息架构。
By contrast, in WebSockets, there is usually only one URL for the initial connect. Subsequently, all application messages flow on that same TCP connection. This points to an entirely different asynchronous, event-driven, messaging architecture.
WebSocket 也是一种低级传输协议,与 HTTP 不同,它不会规定消息内容的任何语义。这意味着除非客户端和服务器就消息语义达成一致,否则无法路由或处理消息。
WebSocket is also a low-level transport protocol, which, unlike HTTP, does not prescribe any semantics to the content of messages. That means that there is no way to route or process a message unless the client and the server agree on message semantics.
WebSocket 客户端和服务器可以通过 HTTP 握手请求上的 Sec-WebSocket-Protocol
标头协商使用更高级别的消息传递协议(例如,STOMP)。如果没有,则需要想出自己的约定。
WebSocket clients and servers can negotiate the use of a higher-level, messaging protocol
(for example, STOMP), through the Sec-WebSocket-Protocol
header on the HTTP handshake request.
In the absence of that, they need to come up with their own conventions.
When to Use WebSockets
WebSocket 可以使网页变得动态和交互。但是,在许多情况下,AJAX 和 HTTP 流或长轮询的组合可以提供一个简单有效的解决方案。
WebSockets can make a web page be dynamic and interactive. However, in many cases, a combination of AJAX and HTTP streaming or long polling can provide a simple and effective solution.
例如,新闻、邮件和社交媒体需要动态更新,但每隔几分钟执行一次更新可能是完全可以的。另一方面,协作、游戏和金融应用需要接近实时。
For example, news, mail, and social feeds need to update dynamically, but it may be perfectly okay to do so every few minutes. Collaboration, games, and financial apps, on the other hand, need to be much closer to real-time.
延迟本身并不是决定因素。如果消息量相对较低(例如,监视网络故障),那么 HTTP 流或轮询可以提供一个有效的解决方案。低延迟、高频率和大容量的结合使 WebSocket 成为最佳案例。
Latency alone is not a deciding factor. If the volume of messages is relatively low (for example, monitoring network failures) HTTP streaming or polling can provide an effective solution. It is the combination of low latency, high frequency, and high volume that make the best case for the use of WebSocket.
另外请记住,在 Internet 上,不受你控制的限制性代理可能会阻止 WebSocket 交互,原因可能是因为它们未配置为传递 Upgrade
标头,也可能是因为它们关闭了看起来空闲的长期连接。这意味着对于防火墙内的内部应用程序,使用 WebSocket 比面向公众的应用程序更容易做出决定。
Keep in mind also that over the Internet, restrictive proxies that are outside of your control
may preclude WebSocket interactions, either because they are not configured to pass on the
Upgrade
header or because they close long-lived connections that appear idle. This
means that the use of WebSocket for internal applications within the firewall is a more
straightforward decision than it is for public facing applications.
WebSocket API
Spring Framework 提供了 WebSocket API,您可以使用该 API 编写处理 WebSocket 消息的客户端和服务端应用程序。
The Spring Framework provides a WebSocket API that you can use to write client- and server-side applications that handle WebSocket messages.
Server
要创建 WebSocket 服务器,你可以先创建一个 WebSocketHandler
。以下示例显示了如何执行此操作:
To create a WebSocket server, you can first create a WebSocketHandler
.
The following example shows how to do so:
-
Java
-
Kotlin
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
public class MyWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
// ...
}
}
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession
class MyWebSocketHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
// ...
}
}
然后你可以将它映射到一个 URL:
Then you can map it to a URL:
-
Java
-
Kotlin
@Configuration
class WebConfig {
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/path", new MyWebSocketHandler());
int order = -1; // before annotated controllers
return new SimpleUrlHandlerMapping(map, order);
}
}
@Configuration
class WebConfig {
@Bean
fun handlerMapping(): HandlerMapping {
val map = mapOf("/path" to MyWebSocketHandler())
val order = -1 // before annotated controllers
return SimpleUrlHandlerMapping(map, order)
}
}
如果使用 WebFlux Config 无需进行任何其他操作,或者如果您不使用 WebFlux config,您需要声明一个 WebSocketHandlerAdapter
,如下所示:
If using the WebFlux Config there is nothing
further to do, or otherwise if not using the WebFlux config you’ll need to declare a
WebSocketHandlerAdapter
as shown below:
-
Java
-
Kotlin
@Configuration
class WebConfig {
// ...
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
@Configuration
class WebConfig {
// ...
@Bean
fun handlerAdapter() = WebSocketHandlerAdapter()
}
WebSocketHandler
WebSocketHandler
的 handle
方法采用 WebSocketSession
,并返回 Mono<Void>
,以指示处理会话的应用程序完成。会话通过两个流来处理,一个用于入站消息,另一个用于出站消息。下表描述了处理这些流的两种方法:
The handle
method of WebSocketHandler
takes WebSocketSession
and returns Mono<Void>
to indicate when application handling of the session is complete. The session is handled
through two streams, one for inbound and one for outbound messages. The following table
describes the two methods that handle the streams:
WebSocketSession method |
Description |
---|---|
|
Provides access to the inbound message stream and completes when the connection is closed. |
|
Takes a source for outgoing messages, writes the messages, and returns a |
WebSocketHandler 必须将入站流和出站流组成一个统一的流,并返回一个表示该流完成请求的 Mono<Void>
。根据应用程序需求,当以下情况发生时,统一的流将完成:
A WebSocketHandler
must compose the inbound and outbound streams into a unified flow and
return a Mono<Void>
that reflects the completion of that flow. Depending on application
requirements, the unified flow completes when:
-
Either the inbound or the outbound message stream completes.
-
The inbound stream completes (that is, the connection closed), while the outbound stream is infinite.
-
At a chosen point, through the
close
method ofWebSocketSession
.
当入站消息流和出站消息流组合在一起时,不需要检查连接是否处于打开状态,因为 Reactive Streams 会发信号结束活动。入站流会收到完成或错误信号,出站流会收到取消信号。
When inbound and outbound message streams are composed together, there is no need to check if the connection is open, since Reactive Streams signals end activity. The inbound stream receives a completion or error signal, and the outbound stream receives a cancellation signal.
处理程序的最基本实现是处理入站流的实现。以下示例展示了此类实现:
The most basic implementation of a handler is one that handles the inbound stream. The following example shows such an implementation:
- Java
-
class ExampleHandler implements WebSocketHandler { @Override public Mono<Void> handle(WebSocketSession session) { return session.receive() (1) .doOnNext(message -> { // ... (2) }) .concatMap(message -> { // ... (3) }) .then(); (4) } }
1 | Access the stream of inbound messages. |
2 | Do something with each message. |
3 | Perform nested asynchronous operations that use the message content. |
4 | Return a Mono<Void> that completes when receiving completes.
|
5 | Access the stream of inbound messages. |
6 | Do something with each message. |
7 | Perform nested asynchronous operations that use the message content. |
8 | Return a Mono<Void> that completes when receiving completes. |
对于嵌套的异步操作,您可能需要调用 |
For nested, asynchronous operations, you may need to call |
以下实现将入站流和出站流组合:
The following implementation combines the inbound and outbound streams:
- Java
-
class ExampleHandler implements WebSocketHandler { @Override public Mono<Void> handle(WebSocketSession session) { Flux<WebSocketMessage> output = session.receive() (1) .doOnNext(message -> { // ... }) .concatMap(message -> { // ... }) .map(value -> session.textMessage("Echo " + value)); (2) return session.send(output); (3) } }
1 | Handle the inbound message stream. |
2 | Create the outbound message, producing a combined flow. |
3 | Return a Mono<Void> that does not complete while we continue to receive.
|
4 | Handle the inbound message stream. |
5 | Create the outbound message, producing a combined flow. |
6 | Return a Mono<Void> that does not complete while we continue to receive. |
入站流和出站流可以是独立的,并且仅在完成时才连接,如下示例所示:
Inbound and outbound streams can be independent and be joined only for completion, as the following example shows:
- Java
-
class ExampleHandler implements WebSocketHandler { @Override public Mono<Void> handle(WebSocketSession session) { Mono<Void> input = session.receive() 1 .doOnNext(message -> { // ... }) .concatMap(message -> { // ... }) .then(); Flux<String> source = ... ; Mono<Void> output = session.send(source.map(session::textMessage)); 2 return Mono.zip(input, output).then(); 3 } }
1 | Handle inbound message stream. |
2 | Send outgoing messages. |
3 | Join the streams and return a Mono<Void> that completes when either stream ends.
|
4 | Handle inbound message stream. |
5 | Send outgoing messages. |
6 | Join the streams and return a Mono<Void> that completes when either stream ends. |
DataBuffer
`DataBuffer`是 WebFlux 中字节缓冲区的表示形式。参考资料的 Spring Core 部分在Data Buffers and Codecs的部分中对此进行了更详细的说明。要了解的主要要点是,在某些服务器(例如 Netty)上,字节缓冲区是池化和引用计数的,并且在使用时必须释放以避免内存泄漏。
DataBuffer
is the representation for a byte buffer in WebFlux. The Spring Core part of
the reference has more on that in the section on
Data Buffers and Codecs. The key point to understand is that on some
servers like Netty, byte buffers are pooled and reference counted, and must be released
when consumed to avoid memory leaks.
在 Netty 上运行时,如果应用程序希望保留输入数据缓冲区以确保它们不被释放,则必须使用 DataBufferUtils.retain(dataBuffer)
,并在消耗缓冲区时使用 DataBufferUtils.release(dataBuffer)
。
When running on Netty, applications must use DataBufferUtils.retain(dataBuffer)
if they
wish to hold on input data buffers in order to ensure they are not released, and
subsequently use DataBufferUtils.release(dataBuffer)
when the buffers are consumed.
Handshake
WebSocketHandlerAdapter
委派给一个 WebSocketService
。默认情况下,它是 HandshakeWebSocketService
的一个实例,它对 WebSocket 请求执行基本检查,然后为所用的服务器使用 RequestUpgradeStrategy
。目前内置对 Reactor Netty、Tomcat、Jetty 和 Undertow 的支持。
WebSocketHandlerAdapter
delegates to a WebSocketService
. By default, that is an instance
of HandshakeWebSocketService
, which performs basic checks on the WebSocket request and
then uses RequestUpgradeStrategy
for the server in use. Currently, there is built-in
support for Reactor Netty, Tomcat, Jetty, and Undertow.
HandshakeWebSocketService
公开了 una` sessionAttributePredicate` 属性,它允许设置一个`Predicate<String>` 来从 WebSession
中提取属性并将它们插入到 WebSocketSession
的属性中。
HandshakeWebSocketService
exposes a sessionAttributePredicate
property that allows
setting a Predicate<String>
to extract attributes from the WebSession
and insert them
into the attributes of the WebSocketSession
.
Server Configuration
每个服务器的 RequestUpgradeStrategy
都会公开特定于底层 WebSocket 服务器引擎的配置。在使用 WebFlux Java config 时,您可以根据 WebFlux Config 对应部分所示来自定义此类属性,或者如果您不使用 WebFlux config,请使用以下内容:
The RequestUpgradeStrategy
for each server exposes configuration specific to the
underlying WebSocket server engine. When using the WebFlux Java config you can customize
such properties as shown in the corresponding section of the
WebFlux Config, or otherwise if
not using the WebFlux config, use the below:
-
Java
-
Kotlin
@Configuration
class WebConfig {
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter(webSocketService());
}
@Bean
public WebSocketService webSocketService() {
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
strategy.setMaxSessionIdleTimeout(0L);
return new HandshakeWebSocketService(strategy);
}
}
@Configuration
class WebConfig {
@Bean
fun handlerAdapter() =
WebSocketHandlerAdapter(webSocketService())
@Bean
fun webSocketService(): WebSocketService {
val strategy = TomcatRequestUpgradeStrategy().apply {
setMaxSessionIdleTimeout(0L)
}
return HandshakeWebSocketService(strategy)
}
}
检查你的服务器的升级策略以查看有哪些选项可用。目前只有 Tomcat 和 Jetty 公开了此类选项。
Check the upgrade strategy for your server to see what options are available. Currently, only Tomcat and Jetty expose such options.
CORS
配置 CORS 并限制对 WebSocket 端点的访问的最简单方法是让你的 WebSocketHandler
实现 CorsConfigurationSource
并返回一个带有允许的源、标题和其他详细信息的`CorsConfiguration`。如果你无法做到这一点,你还可以设置 SimpleUrlHandler
的 corsConfigurations
属性,以按 URL 模式指定 CORS 设置。如果同时指定了两者,它们会通过 CorsConfiguration
上的`combine` 方法合并。
The easiest way to configure CORS and restrict access to a WebSocket endpoint is to
have your WebSocketHandler
implement CorsConfigurationSource
and return a
CorsConfiguration
with allowed origins, headers, and other details. If you cannot do
that, you can also set the corsConfigurations
property on the SimpleUrlHandler
to
specify CORS settings by URL pattern. If both are specified, they are combined by using the
combine
method on CorsConfiguration
.
Client
Spring WebFlux 提供了一个 WebSocketClient
抽象,其中包含 Reactor Netty、Tomcat、Jetty、Undertow 和标准 Java(即 JSR-356)的实现。
Spring WebFlux provides a WebSocketClient
abstraction with implementations for
Reactor Netty, Tomcat, Jetty, Undertow, and standard Java (that is, JSR-356).
Tomcat 客户端实际上是标准 Java 客户端的扩展,在 |
The Tomcat client is effectively an extension of the standard Java one with some extra
functionality in the |
若要启动 WebSocket 会话,你可以创建一个客户端实例并使用其 execute
方法:
To start a WebSocket session, you can create an instance of the client and use its execute
methods:
-
Java
-
Kotlin
WebSocketClient client = new ReactorNettyWebSocketClient();
URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
session.receive()
.doOnNext(System.out::println)
.then());
val client = ReactorNettyWebSocketClient()
val url = URI("ws://localhost:8080/path")
client.execute(url) { session ->
session.receive()
.doOnNext(::println)
.then()
}
某些客户端,例如 Jetty,实现 Lifecycle
,需要停止并启动,你才能使用它们。所有客户端都具有与基础 WebSocket 客户端配置相关的构造函数选项。
Some clients, such as Jetty, implement Lifecycle
and need to be stopped and started
before you can use them. All clients have constructor options related to configuration
of the underlying WebSocket client.