WebSockets Next extension reference guide
该技术被认为是 {extension-status}。 有关可能状态的完整列表,请查看我们的 FAQ entry. |
`quarkus-websockets-next`扩展提供了用于定义 WebSocket 服务器和客户端端点的现代声明性 API。
The WebSocket protocol
_WebSocket_协议,在 RFC6455中有文档说明,建立了一个通过单一 TCP 连接在客户端和服务器之间创建双向通信通道的标准化方法。与 HTTP 不同,WebSocket 作为一种独立的 TCP 协议运行,但被设计成可以与 HTTP 无缝协作。例如,它重用了相同的端口,并与相同安全机制兼容。
使用 WebSocket 进行交互会以使用“升级”标头的 HTTP 请求开始,以过渡到 WebSocket 协议。服务器不会用 `200 OK`响应,而是用 `101 Switching Protocols`响应来升级到 WebSocket 连接。在握手成功后,最初的 HTTP 升级请求中使用的 TCP 套接字仍然处于打开状态,这允许客户端和服务器持续地双向交换消息。
HTTP and WebSocket architecture styles
尽管 WebSocket 与 HTTP 兼容并在 HTTP 请求中启动,但至关重要的是意识到这两个协议导致不同的架构和编程模型。
使用 HTTP/REST 时,应用程序围绕处理各种 HTTP 方法和路径的资源/端点进行构造。客户端交互通过使用适当的方法和路径发出 HTTP 请求实现,遵循请求-响应模式。服务器根据路径、方法和标头将传入请求路由到相应处理程序,然后以明确定义的响应进行回复。
相反,WebSocket 通常涉及一个端点,用于初始 HTTP 连接,之后所有消息都使用同一条 TCP 连接。它引入了一个完全不同的交互模型:异步且消息驱动的。
与 HTTP 相反,WebSocket 是一个低级传输协议。消息格式、路由或处理要求客户端与服务器之间就消息语义达成事先协议。
对于 WebSocket 客户端和服务器,HTTP 握手请求中的 Sec-WebSocket-Protocol
标头允许多个级别的消息协议协商。如果不存在,服务器和客户端必须建立自己的约定。
Quarkus WebSockets vs. Quarkus WebSockets Next
本指南使用 quarkus-websockets-next
扩展,它是 WebSocket API 的一个实现,与旧版 quarkus-websockets
扩展相比,效率和可用性更高。原始的 quarkus-websockets
扩展仍然可以访问,将获得持续的支持,但不太可能获得功能开发。
与 quarkus-websockets
不同的是,quarkus-websockets-next
扩展 not 不会实现 Jakarta WebSocket 规范。相反,它引入了现代 API,优先考虑使用简单性。此外,它经过专门设计,可与 Quarkus 的反应架构和网络层无缝集成。
Quarkus WebSockets Next 扩展使用的注解不同于 JSR 356 中的注解,尽管有时候它们使用相同的名称。JSR 注解带有 Quarkus WebSockets Next 扩展不遵循的语义。
Project setup
要使用 websockets-next
扩展,您需要将 io.quarkus:quarkus-websockets-next
依赖项添加到项目中。
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-websockets-next</artifactId>
</dependency>
implementation("io.quarkus:quarkus-websockets-next")
Endpoints
服务器和客户端 API 都允许您定义 endpoints,用于接收和发送消息。端点实现为 CDI bean,并支持注入。端点声明了 callback methods,其使用 @OnTextMessage
、@OnBinaryMessage
、@OnPong
、@OnOpen
、@OnClose
和 @OnError
进行注解。这些方法用于处理各种 WebSocket 事件。通常,当已连接的客户端向服务器发送消息时,会调用用 @OnTextMessage
注解的方法,反之亦然。
客户端 API 还包括 connectors,用于配置和创建新的 WebSocket 连接。 |
Server endpoints
服务器端点是使用 @io.quarkus.websockets.next.WebSocket
注解的类。WebSocket#path()
的值为用来定义端点的路径。
package org.acme.websockets;
import io.quarkus.websockets.next.WebSocket;
import jakarta.inject.Inject;
@WebSocket(path = "/chat/{username}") 1
public class ChatWebSocket {
}
因此,客户端可以使用 ws://localhost:8080/chat/your-name
连接到此 Web 套接字端点。如果使用 TLS,URL 是 wss://localhost:8443/chat/your-name
。
endpoint 路径相对于由 |
Client endpoints
客户端端点是使用 @io.quarkus.websockets.next.WebSocketClient
注解的类。WebSocketClient#path()
的值为用来定义此客户端将连接到的端点的路径。
package org.acme.websockets;
import io.quarkus.websockets.next.WebSocketClient;
import jakarta.inject.Inject;
@WebSocketClient(path = "/chat/{username}") 1
public class ChatWebSocket {
}
客户端端点用于接收和发送消息。您需要使用 connectors API 来配置和打开新的 WebSocket 连接。 |
Path parameters
WebSocket 端点的路径可以包含路径参数。语法与 JAX-RS 资源相同: {parameterName}
。
您可以分别使用 io.quarkus.websockets.next.WebSocketConnection#pathParam(String)
方法或 io.quarkus.websockets.next.WebSocketClientConnection#pathParam(String)
访问路径参数值。或者,自动注入用 @io.quarkus.websockets.next.PathParam
注解的端点回调方法参数。
WebSocketConnection#pathParam(String)
example@Inject io.quarkus.websockets.next.WebSocketConnection connection;
// ...
String value = connection.pathParam("parameterName");
路径参数值始终为字符串。如果路径中不存在路径参数,则 WebSocketConnection#pathParam(String)
/WebSocketClientConnection#pathParam(String)
方法会返回 null
。如果存在一个用 @PathParam
注解的端点回调方法参数,而参数名称在端点路径中未定义,则构建会失败。
不支持查询参数。但是,您可以使用 |
CDI scopes
端点被管理为 CDI Bean。默认情况下,使用了 @Singleton
范围。但是,开发者可以指定备用范围来满足其特定要求。
@Singleton
和 @ApplicationScoped
端点在所有 WebSocket 连接之间共享。因此,实现应该无状态或线程安全。
import jakarta.enterprise.context.SessionScoped;
@WebSocket(path = "/ws")
@SessionScoped 1
public class MyWebSocket {
}
1 | 此服务器端点不共享,范围限定于会话。 |
每个 WebSocket 连接都与其自己的 session 上下文相关联。当调用 @OnOpen
方法时,将创建与 WebSocket 连接对应的会话上下文。对 @On[Text|Binary]Message
或 @OnClose
方法的后续调用将使用此相同的会话上下文。会话上下文保持活动状态,直到 @OnClose
方法完成执行,此时会话上下文会终止。
如果 WebSocket 端点未声明 @OnOpen
方法,则仍会创建会话上下文。无论是否具有 @OnClose
方法,会话上下文都将保持活动状态,直到连接终止。
带 @OnTextMessage,
和 @OnBinaryMessage,
@OnOpen@OnClose
注释的方法在方法执行期间(直至生成结果)也将激活请求范围。
Callback methods
WebSocket 端点可以声明:
-
最多一个
@OnTextMessage
方法:处理来自已连接客户端/服务器的文本消息。 -
最多一个
@OnBinaryMessage
方法:处理来自已连接客户端/服务器的二进制消息。 -
最多一个
@OnPongMessage
方法:处理来自已连接客户端/服务器的 pong 消息。 -
最多一个
@OnOpen
方法:在连接打开时调用。 -
最多一个
@OnClose
方法:在连接关闭时执行。 -
任意数量的
@OnError
方法:在发生错误时调用;即,当端点回调抛出运行时错误,或发生转换错误,或返回的io.smallrye.mutiny.Uni
/io.smallrye.mutiny.Multi
接收失败时调用。
并非所有端点都需要包含所有方法。但是,它必须至少包含 @On[Text|Binary]Message
或 @OnOpen
。
如果任何端点违反这些规则,则会在构建时抛出错误。表示子 Websocket 的静态嵌套类遵循相同的准则。
在 WebSocket 端点外部使用 @OnTextMessage
、@OnBinaryMessage
、@OnOpen
和 @OnClose
注释的任何方法都被视为错误,并且会导致构建失败,并显示适当的错误消息。
Processing messages
从客户端接收消息的方法使用 @OnTextMessage
或 @OnBinaryMessage
注释。
对于从客户端接收的每个 text 消息,都会调用 OnTextMessage
。对于客户端接收的每个 binary 消息,都会调用 OnBinaryMessage
。
Invocation rules
调用这些带注释的方法时,链接到 WebSocket 连接的 session 作用域保持活动状态。此外,请求作用域将持续处于活动状态,直至方法完成(或直至它为异步和反应式方法生成其结果)。
与 Quarkus REST 类似,Quarkus WebSocket Next 支持 blocking 和 non-blocking 逻辑,这由方法签名和附加注释(如 @Blocking
和 @NonBlocking
)决定。
以下规则控制执行:
-
非阻塞方法必须在连接的事件循环中执行。
-
用
@RunOnVirtualThread
注释的方法被视为阻塞方法,应在虚拟线程中执行。 -
如果没有用
@RunOnVirtualThread
注释,则阻塞方法必须在一个工作线程中执行。 -
当使用
@RunOnVirtualThread
时,每个调用会产生一个新的虚拟线程。 -
返回
CompletionStage
、Uni
和Multi
的方法被视为非阻塞方法。 -
返回
void
或普通对象的方法被视为阻塞方法。 -
Kotlin
suspend
函数被视为非阻塞函数。
Method parameters
该方法必须精确接受一个消息参数:
-
消息对象(任何类型)。
-
消息类型为 X 的
Multi<X>
。
但是,它还可以接受以下参数:
-
WebSocketConnection
/WebSocketClientConnection
-
HandshakeRequest
-
用
@PathParam
注释的String
参数
消息对象表示发送的数据,可以作为原始内容 (String
、JsonObject
、JsonArray
、Buffer
或 byte[]
) 访问,也可以访问反序列化的高级对象,这是推荐的方法。
接收 Multi
时,会为每个连接调用一次该方法,并且所提供的 Multi
接收此连接传输的项。该方法必须订阅 Multi
以接收这些项(或返回一个 Multi)。
Supported return types
用 @OnTextMessage
或 @OnBinaryMessage
注释的方法可以返回各种类型,以有效地处理 WebSocket 通信:
-
void
:表示阻断方法,其中不会向客户端发送明确的响应。 -
Uni<Void>
:表示在返回Uni
完成处理时,才表示非阻塞方法。不会向客户端返回显式响应。 -
类型为
X
的对象表示阻塞方法,其中返回的对象将序列化并作为响应返还给客户端。 -
Uni<X>
:指定非阻塞方法,其中由非空Uni
发出的项将作为响应发送到客户端。 -
Multi<X>
:表示非阻塞方法,其中由非空Multi
发出的项将按顺序发送给客户端,直至完成或取消。 -
返回
Unit
的 Kotlinsuspend
函数:表示非阻塞方法,此时不会向客户端返回显式响应。 -
返回
X
的 Kotlinsuspend
函数:指定非阻塞方法,其中返回的项将作为响应发送到客户端。
以下是这些方法的一些示例:
@OnTextMessage
void consume(Message m) {
// Process the incoming message. The method is called on an executor thread for each incoming message.
}
@OnTextMessage
Uni<Void> consumeAsync(Message m) {
// Process the incoming message. The method is called on an event loop thread for each incoming message.
// The method completes when the returned Uni emits its item.
}
@OnTextMessage
ResponseMessage process(Message m) {
// Process the incoming message and send a response to the client.
// The method is called for each incoming message.
// Note that if the method returns `null`, no response will be sent to the client.
}
@OnTextMessage
Uni<ResponseMessage> processAsync(Message m) {
// Process the incoming message and send a response to the client.
// The method is called for each incoming message.
// Note that if the method returns `null`, no response will be sent to the client. The method completes when the returned Uni emits its item.
}
@OnTextMessage
Multi<ResponseMessage> stream(Message m) {
// Process the incoming message and send multiple responses to the client.
// The method is called for each incoming message.
// The method completes when the returned Multi emits its completion signal.
// The method cannot return `null` (but an empty multi if no response must be sent)
}
返回 Multi 时,Quarkus 会自动订阅返回的 Multi,并一直写入已发出的项,直到完成、失败或取消。失败或取消将终止连接。
Streams
除了各个消息之外,WebSocket 端点还可以处理消息流。在这种情况下,该方法接收一个 Multi<X>
作为参数。每个 X
实例都使用上面列出的相同规则进行反序列化。
接收 Multi
的方法可以返回另一个 Multi
或 void
。如果该方法返回 Multi
,则不必订阅传入的 multi
:
@OnTextMessage
public Multi<ChatMessage> stream(Multi<ChatMessage> incoming) {
return incoming.log();
}
这种方法允许双向流。
当方法返回 void
时,它必须订阅传入的 Multi
:
@OnTextMessage
public void stream(Multi<ChatMessage> incoming) {
incoming.subscribe().with(item -> log(item));
}
OnOpen and OnClose methods
还可以在客户端连接或断开连接时通知 WebSocket 端点。
这是通过使用 @OnOpen
或 @OnClose
来注释方法完成的:
@OnOpen(broadcast = true)
public ChatMessage onOpen() {
return new ChatMessage(MessageType.USER_JOINED, connection.pathParam("username"), null);
}
@Inject WebSocketConnection connection;
@OnClose
public void onClose() {
ChatMessage departure = new ChatMessage(MessageType.USER_LEFT, connection.pathParam("username"), null);
connection.broadcast().sendTextAndAwait(departure);
}
在客户端连接时触发 @OnOpen
,而在客户端断开连接时调用 @OnClose
。
这些方法可以访问 session-scoped WebSocketConnection
bean。
Parameters
使用 @OnOpen
和 @OnClose
注释的方法可以接受以下参数:
-
WebSocketConnection
/WebSocketClientConnection
-
HandshakeRequest
-
用
@PathParam
注释的String
参数
注释为 @OnClose
的端点方法也可能接受 io.quarkus.websockets.next.CloseReason
参数,此参数可能表示关闭连接的原因。
Supported return types
@OnOpen
和 @OnClose
方法支持不同的返回类型。
对于 @OnOpen
方法,与 @On[Text|Binary]Message
相同的规则适用。因此,注释为 @OnOpen
的方法可以在连接后立即向客户端发送消息。@OnOpen
方法支持的返回类型包括:
-
void
:表示阻止方法,不向已连接的客户端发回显式消息。 -
Uni<Void>
:表示非阻止方法,其中返回的Uni
完成表示处理结束。没有消息发回给客户端。 -
类型为
X
的对象:表示阻止方法,其中返回的对象序列化并发回给客户端。 -
Uni<X>
:指定一个非阻止方法,其中非空Uni
发射的项目被发送给客户端。 -
Multi<X>
:表示非阻塞方法,其中由非空Multi
发出的项将按顺序发送给客户端,直至完成或取消。 -
Kotlin
suspend
功能返回Unit
:表示非阻止方法,其中没有显式消息被发送回给客户端。 -
Kotlin
suspend
功能返回X
:指定一个非阻止方法,其中返回的项目被发送给客户端。
发送到客户端的项目为 serialized ,但 String
、io.vertx.core.json.JsonObject
、io.vertx.core.json.JsonArray
、io.vertx.core.buffer.Buffer
和 byte[]
类型除外。对于 Multi
,Quarkus 订阅返回的 Multi
并将其写入 WebSocket
中发送它们。String
、JsonObject
和 JsonArray
作为文本消息发送。Buffers
和字节数组作为二进制消息发送。
对于 @OnClose
方法,支持的返回类型包括:
-
void
:该方法被认为是阻止的。 -
Uni<Void>
:该方法被认为是非阻止的。 -
Kotlin
suspend
功能返回Unit
:该方法被认为是非阻止的。
在服务器端点上声明的 |
Error handling
当发生错误时,也可以通知 WebSocket 端点。当端点回调抛出运行时错误,或发生转换错误,或返回的 io.smallrye.mutiny.Uni
/io.smallrye.mutiny.Multi
接收失败时,调用注释为 @io.quarkus.websockets.next.OnError
的 WebSocket 端点方法。
该方法必须接受正好一个 error 参数,即从 java.lang.Throwable
分配的参数。该方法还可以接受以下参数:
-
WebSocketConnection
/WebSocketClientConnection
-
HandshakeRequest
-
用
@PathParam
注释的String
参数
一个端点可以声明多个注释为 @io.quarkus.websockets.next.OnError
的方法。但是,每个方法必须声明一个不同的错误参数。选择声明实际异常的最具体超类型的方法。
|
当错误发生且无错误处理程序可以处理失败时,Quarkus 将采用由 `quarkus.websockets-next.server.unhandled-failure-strategy`指定的方法。默认情况下,连接已关闭。另外,可以记录一条错误消息或不执行任何操作。
Serialization and deserialization
WebSocket Next 扩展支持消息的自动序列化和反序列化。
类型为 String
、JsonObject
、JsonArray
、Buffer
、和 `byte[]`的对象会被按原样发送,并绕过序列化和反序列化。当未提供任何编码器时,序列化和反序列化会自动将消息从 JSON 转换过来,或者将消息转换为 JSON。
当需要自定义序列化和反序列化时,你可以提供一个自定义编码器。
Custom codec
要实现一个自定义编码器,你必须提供一个实现 CDI Bean 的:
-
io.quarkus.websockets.next.BinaryMessageCodec
for binary messages -
io.quarkus.websockets.next.TextMessageCodec
for text messages
以下示例展示了如何为 `Item`类实现一个自定义编码器:
@Singleton
public class ItemBinaryMessageCodec implements BinaryMessageCodec<Item> {
@Override
public boolean supports(Type type) {
// Allows selecting the right codec for the right type
return type.equals(Item.class);
}
@Override
public Buffer encode(Item value) {
// Serialization
return Buffer.buffer(value.toString());
}
@Override
public Item decode(Type type, Buffer value) {
// Deserialization
return new Item(value.toString());
}
}
`OnTextMessage`和 `OnBinaryMessage`方法还能明确指定应该使用哪个编码器:
@OnTextMessage(codec = MyInputCodec.class) (1)
Item find(Item item) {
//....
}
-
为消息的反序列化和序列化都指定编码器
当序列化和反序列化必须使用不同编码器时,可以分别为序列化和反序列化指定要使用的编码器:
@OnTextMessage(
codec = MyInputCodec.class, (1)
outputCodec = MyOutputCodec.class (2)
Item find(Item item) {
//....
}
-
为传入消息的反序列化指定编码器
-
为传出消息的序列化指定编码器
Ping/pong messages
ping message可以用作保持连接或验证远程终端。 pong message作为 ping 消息的响应发送,它必须具有相同的有效负载。
服务器/客户端终端自动响应从客户端/服务器发送的 ping 消息。换句话说,在终端上声明 `@OnPingMessage`回调是没有必要的。
服务器可以向已连接的客户端发送 ping 消息。WebSocketConnection
/WebSocketClientConnection`声明用于发送 ping 消息的方法;有一个非阻塞变量 `sendPing(Buffer)`和一个阻塞变量 `sendPingAndAwait(Buffer)
。默认情况下,ping 消息不会自动发送。然而,配置属性 `quarkus.websockets-next.server.auto-ping-interval`和 `quarkus.websockets-next.client.auto-ping-interval`可以用于设置时间间隔,服务器/客户端会在此时间间隔后向已连接的客户端/服务器发送 ping 消息。
quarkus.websockets-next.server.auto-ping-interval=2 1
1 | 每 2 秒向已连接的客户端从服务器发送 ping 消息。 |
@OnPongMessage`注释用于定义一个回调,它使用从客户端/服务器发送的 pong 消息。一个终端必须声明最多一个用 `@OnPongMessage`注释的方法。回调方法必须返回 `void`或 `Uni<Void>
(或返回 `Unit`的 Kotlin `suspend`函数),并且它必须接受一个类型为 `Buffer`的单个参数。
@OnPongMessage
void pong(Buffer data) {
// ....
}
服务器/客户端还可以发送非请求的 pong 消息,这些消息可以用作单向心跳。有一个非阻塞变量: |
Inbound processing mode
WebSocket 终端可以使用 @WebSocket#inboundProcessingMode()`和 `@WebSocketClient.inboundProcessingMode()`分别定义用于处理特定连接的接收事件所用的模式。接收事件可以代表一条消息(文本、二进制、pong)、打开连接和关闭连接。默认情况下,事件按串行处理,并且会确保排序。这意味着,如果一个终端接收事件 `A`和 `B
(按这个特定顺序),那么事件 B`的回调将在事件 `A`的回调完成以后被调用。然而,在某些情况下,最好并发处理事件,即不保证排序,但也无并发限制。对于这种情况,应该使用 `InboundProcessingMode#CONCURRENT
。
Server API
HTTP server configuration
此扩展程序会重新使用 _main_HTTP 服务器。
因此,WebSocket 服务器的配置是在 quarkus.http.
配置部分完成的。
应用程序内配置的 WebSocket 路径与由 quarkus.http.root
定义的根路径(默认值为 /
)连接在一起。此连接确保 WebSocket 端点正确地放置在应用程序的 URL 结构中。
有关更多详细信息,请参阅 HTTP guide。
Sub-websockets endpoints
@WebSocket
端点可封装静态嵌套类,这些类还用 @WebSocket
注释,并表示 sub-websockets。这些子 WebSocket 的最终路径会连接外层类和嵌套类的路径。最终路径是根据 HTTP URL 规则进行标准化的。
子 WebSocket 继承对 @WebSocket
注释中声明的路径参数的访问权限,该注释用于外层类和嵌套类。在以下示例中,外层类内的 consumePrimary
方法可访问 version
参数。同时,嵌套类内的 consumeNested
方法可访问 version
和 id
参数:
@WebSocket(path = "/ws/v{version}")
public class MyPrimaryWebSocket {
@OnTextMessage
void consumePrimary(String s) { ... }
@WebSocket(path = "/products/{id}")
public static class MyNestedWebSocket {
@OnTextMessage
void consumeNested(String s) { ... }
}
}
WebSocket connection
io.quarkus.websockets.next.WebSocketConnection
对象表示 WebSocket 连接。Quarkus 提供了一个 @SessionScoped
CDI Bean,该 Bean 实施此接口,可以在 WebSocket
端点中注入,并用于与已连接的客户端进行交互。
使用 @OnOpen
、@OnTextMessage
、@OnBinaryMessage
和 @OnClose
进行注释的方法可以访问注入的 WebSocketConnection
对象:
@Inject WebSocketConnection connection;
请注意,在这些方法之外, |
该连接可用于向客户端发送消息、访问路径参数、向所有已连接的客户端广播消息,等等。
// Send a message:
connection.sendTextAndAwait("Hello!");
// Broadcast messages:
connection.broadcast().sendTextAndAwait(departure);
// Access path parameters:
String param = connection.pathParam("foo");
WebSocketConnection
提供发送消息的阻塞和非阻塞方法变体:
-
sendTextAndAwait(String message)
:向客户端发送一条文本消息并等待发送该消息。该方法具有阻塞性且只应在执行程序线程中调用。 -
sendText(String message)
:向客户端发送一条文本消息。它返回一个Uni
。该方法是无阻塞的,但您必须订阅它。
List open connections
还可以列出所有打开的连接。Quarkus 提供了一个类型为 io.quarkus.websockets.next.OpenConnections
的 CDI Bean,该 Bean 声明了方便访问连接的方法。
import io.quarkus.logging.Log;
import io.quarkus.websockets.next.OpenConnections;
class MyBean {
@Inject
OpenConnections connections;
void logAllOpenConnections() {
Log.infof("Open connections: %s", connections.listAll()); 1
}
}
1 | OpenConnections#listAll() 返回所有打开连接在给定时间的不可变快照。 |
还有其他一些方便的方法。例如,OpenConnections#findByEndpointId(String)
可以让您轻松地找到某个特定端点的连接。
CDI events
当打开一个新连接时,Quarkus 会使用限定符 @io.quarkus.websockets.next.Open
异步触发类型为 io.quarkus.websockets.next.WebSocketConnection
的 CDI 事件。此外,当一个连接关闭时,会使用限定符 @io.quarkus.websockets.next.Closed
异步触发类型为 WebSocketConnection
的 CDI 事件。
import jakarta.enterprise.event.ObservesAsync;
import io.quarkus.websockets.next.Open;
import io.quarkus.websockets.next.WebSocketConnection;
class MyBean {
void connectionOpened(@ObservesAsync @Open WebSocketConnection connection) { 1
// This observer method is called when a connection is opened...
}
}
1 | 异步观察者方法使用默认阻塞执行程序服务执行。 |
Security
可以使用安全注释来保护 WebSocket 端点回调方法,例如 io.quarkus.security.Authenticated
、jakarta.annotation.security.RolesAllowed
以及 Supported security annotations 文档中列出的其他注释。
例如:
package io.quarkus.websockets.next.test.security;
import jakarta.annotation.security.RolesAllowed;
import jakarta.inject.Inject;
import io.quarkus.security.ForbiddenException;
import io.quarkus.security.identity.SecurityIdentity;
import io.quarkus.websockets.next.OnError;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;
@WebSocket(path = "/end")
public class Endpoint {
@Inject
SecurityIdentity currentIdentity;
@OnOpen
String open() {
return "ready";
}
@RolesAllowed("admin")
@OnTextMessage
String echo(String message) { 1
return message;
}
@OnError
String error(ForbiddenException t) { 2
return "forbidden:" + currentIdentity.getPrincipal().getName();
}
}
1 | 只有当当前安全标识具有 admin 角色时,才可以调用 echo 回调方法。 |
2 | 发生授权故障后调用错误处理程序。 |
`SecurityIdentity`最初是在安全 HTTP 升级期间创建的,并与 websocket 连接关联。
当使用 OpenId Connect 扩展且令牌过期时,Quarkus 将自动关闭连接。 |
Secure HTTP upgrade
当标准安全注释放在端点类中或定义了 HTTP 安全策略时,HTTP 升级将被保护。保护 HTTP 升级的优点是处理更少,授权可以及早执行,并且只执行一次。您应始终更喜欢 HTTP 升级安全性,除非像上面的示例中一样,您需要对错误执行操作。
package io.quarkus.websockets.next.test.security;
import io.quarkus.security.Authenticated;
import jakarta.inject.Inject;
import io.quarkus.security.identity.SecurityIdentity;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;
@Authenticated 1
@WebSocket(path = "/end")
public class Endpoint {
@Inject
SecurityIdentity currentIdentity;
@OnOpen
String open() {
return "ready";
}
@OnTextMessage
String echo(String message) {
return message;
}
}
1 | 最初的 HTTP 握手以匿名用户的 401 状态结束。您还可以使用 `quarkus.websockets-next.server.security.auth-failure-redirect-url`配置属性重定向在授权故障时的握手请求。 |
仅当在 `@WebSocket`注释旁边的端点类上声明安全注释时,HTTP 升级才受到保护。在端点 bean 上放置一个安全注释不会保护 bean 方法,只会保护 HTTP 升级。您必须始终验证端点是否按预期受保护。
quarkus.http.auth.permission.http-upgrade.paths=/end
quarkus.http.auth.permission.http-upgrade.policy=authenticated
Inspect and/or reject HTTP upgrade
要检查 HTTP 升级,您必须提供实现 `io.quarkus.websockets.next.HttpUpgradeCheck`接口的 CDI bean。Quarkus 在应该升级为 WebSocket 连接的每个 HTTP 请求上调用 `HttpUpgradeCheck#perform`方法。在此方法中,您可以执行任何业务逻辑和/或拒绝 HTTP 升级。
package io.quarkus.websockets.next.test;
import io.quarkus.websockets.next.HttpUpgradeCheck;
import io.smallrye.mutiny.Uni;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped 1
public class ExampleHttpUpgradeCheck implements HttpUpgradeCheck {
@Override
public Uni<CheckResult> perform(HttpUpgradeContext ctx) {
if (rejectUpgrade(ctx)) {
return CheckResult.rejectUpgrade(400); 2
}
return CheckResult.permitUpgrade();
}
private boolean rejectUpgrade(HttpUpgradeContext ctx) {
var headers = ctx.httpRequest().headers();
// implement your business logic in here
}
}
1 | 实现 HttpUpgradeCheck`接口的 CDI bean 可以是 `@ApplicationScoped 、`@Singleton`或 `@Dependent`bean,但绝不能是 `@RequestScoped`bean。 |
2 | 拒绝 HTTP 升级。最初的 HTTP 握手以 400 错误请求响应状态代码结束。 |
您可以使用 `HttpUpgradeCheck#appliesTo`方法为应用 `HttpUpgradeCheck`的 WebSocket 端点进行选择。 |
TLS
由于此扩展重复使用 _main_HTTP 服务器,因此所有相关的服务器配置都适用。有关更多详细信息,请参阅 HTTP guide。
Client API
Client connectors
`io.quarkus.websockets.next.WebSocketConnector<CLIENT>`用于为客户端端点配置和创建新连接。提供了一个实现此接口的 CDI bean,并可以将其注入其他 bean 中。实际类型参数用于确定客户端端点。该类型在构建期间得到验证 - 如果它不表示客户端端点,则构建失败。
让我们考虑以下客户端端点:
@WebSocketClient(path = "/endpoint/{name}")
public class ClientEndpoint {
@OnTextMessage
void onMessage(@PathParam String name, String message, WebSocketClientConnection connection) {
// ...
}
}
此客户端端点的连接器使用如下:
@Singleton
public class MyBean {
@ConfigProperty(name = "enpoint.uri")
URI myUri;
@Inject
WebSocketConnector<ClientEndpoint> connector; 1
void openAndSendMessage() {
WebSocketClientConnection connection = connector
.baseUri(uri) 2
.pathParam("name", "Roxanne") 3
.connectAndAwait();
connection.sendTextAndAwait("Hi!"); 4
}
}
1 | 注入 `ClientEndpoint`的连接器。 |
2 | 如果未提供基本 URI ,我们将尝试从 config 中获取值。该密钥由 client id 和 `.base-uri`后缀组成。 |
3 | 设置路径参数值。如果客户端端点路径不包含具有给定名称的参数,则抛出 IllegalArgumentException 。 |
4 | 根据需要使用连接发送消息。 |
如果应用程序尝试为丢失的端点注入连接器,则会抛出错误。 |
Basic connector
如果应用程序开发者不需要客户端端点和连接器的组合,则可以使用 basic connector 。基本连接器是一种创建连接以及使用/发送消息的简单方法,无需定义客户端端点。
@Singleton
public class MyBean {
@Inject
BasicWebSocketConnector connector; 1
void openAndConsume() {
WebSocketClientConnection connection = connector
.baseUri(uri) 2
.path("/ws") 3
.executionModel(ExecutionModel.NON_BLOCKING) 4
.onTextMessage((c, m) -> { 5
// ...
})
.connectAndAwait();
}
}
1 | Inject the connector. |
2 | 必须始终设置基本 URI。 |
3 | 应附加到基本 URI 的附加路径。 |
4 | 设定回调处理程序的执行模型。默认情况下,回调可能会阻塞当前线程。然而在这种情况下,回调在事件循环上执行,并且可能不会阻塞当前线程。 |
5 | lambda 将在从服务器发送的每条文本消息中被调用。 |
基本连接器更接近底层 API,仅供高级用户使用。然而,与其他底层 WebSocket 客户端不同,它仍然是一个 CDI bean,可以在其他 bean 中注入。它还提供了一种配置回调执行模型的方法,确保与其他 Quarkus 组件的最佳集成。
WebSocket client connection
io.quarkus.websockets.next.WebSocketClientConnection
对象表示 WebSocket 连接。Quarkus 提供了一个实现此接口的 @SessionScoped
CDI bean,该 bean 可以在 WebSocketClient
端点中注入并用于与已连接的服务器进行交互。
注释有 @OnOpen
、@OnTextMessage
、@OnBinaryMessage`和 `@OnClose
的方法可以访问注入的 WebSocketClientConnection
对象:
@Inject WebSocketClientConnection connection;
请注意,在这些方法之外, |
该连接可用于向客户端发送消息、访问路径参数等。
// Send a message:
connection.sendTextAndAwait("Hello!");
// Broadcast messages:
connection.broadcast().sendTextAndAwait(departure);
// Access path parameters:
String param = connection.pathParam("foo");
WebSocketClientConnection
提供了阻塞和非阻塞方法变体来发送消息:
-
sendTextAndAwait(String message)
:向客户端发送一条文本消息并等待发送该消息。该方法具有阻塞性且只应在执行程序线程中调用。 -
sendText(String message)
:向客户端发送一条文本消息。它返回一个Uni
。该方法是无阻塞的,但您必须订阅它。
List open client connections
还可以列出所有打开的连接。Quarkus 提供了一个 io.quarkus.websockets.next.OpenClientConnections
类型的 CDI bean,该 bean 声明了访问连接的便捷方法。
import io.quarkus.logging.Log;
import io.quarkus.websockets.next.OpenClientConnections;
class MyBean {
@Inject
OpenClientConnections connections;
void logAllOpenClinetConnections() {
Log.infof("Open client connections: %s", connections.listAll()); 1
}
}
1 | OpenClientConnections#listAll() 返回给定时间所有打开连接的不可变快照。 |
还有其他便捷方法。例如,OpenClientConnections#findByClientId(String)
可以轻松找到特定端点的连接。
CDI events
Quarkus 在打开新连接时以限定符 @io.quarkus.websockets.next.Open
异步触发类型为 io.quarkus.websockets.next.WebSocketClientConnection
的 CDI 事件。此外,在连接关闭时以限定符 @io.quarkus.websockets.next.Closed
异步触发类型为 WebSocketClientConnection
的 CDI 事件。
import jakarta.enterprise.event.ObservesAsync;
import io.quarkus.websockets.next.Open;
import io.quarkus.websockets.next.WebSocketClientConnection;
class MyBean {
void connectionOpened(@ObservesAsync @Open WebSocketClientConnection connection) { 1
// This observer method is called when a connection is opened...
}
}
1 | 异步观察者方法使用默认阻塞执行程序服务执行。 |
Configuring SSL/TLS
要建立一个 TLS 连接,您需要使用 TLS registry 配置一个 named 配置:
quarkus.tls.my-ws-client.trust-store.p12.path=server-truststore.p12
quarkus.tls.my-ws-client.trust-store.p12.password=secret
quarkus.websockets-next.client.tls-configuration-name=my-ws-client # Reference the named configuration
使用 WebSocket 客户端时,需使用 named 配置来避免与其他 TLS 配置发生冲突。客户端将不会使用默认的 TLS 配置。
当您配置一个 named TLS 配置时,默认情况下会启用 TLS。
Traffic logging
Quarkus 可以记录收发消息以进行调试。要启用服务器的流量日志记录,将 quarkus.websockets-next.server.traffic-logging.enabled
配置属性设置为 true
。要启用客户端的流量日志记录,将 quarkus.websockets-next.client.traffic-logging.enabled
配置属性设置为 true
。文本消息的载荷也会被记录。然而,记录字符的数量是有限的。默认限制为 100,但是您可以分别使用 quarkus.websockets-next.server.traffic-logging.text-payload-limit
和 quarkus.websockets-next.client.traffic-logging.text-payload-limit
配置属性更改此限制。
如果已为记录器 |
quarkus.websockets-next.server.traffic-logging.enabled=true 1
quarkus.websockets-next.server.traffic-logging.text-payload-limit=50 2
quarkus.log.category."io.quarkus.websockets.next.traffic".level=DEBUG 3
1 | Enables traffic logging. |
2 | 设置要记录的文本消息有效负载的字符数。 |
3 | 为记录器 io.quarkus.websockets.next.traffic`启用 `DEBUG 级别。 |