WebSockets Next extension reference guide

The WebSocket protocol

_WebSocket_协议,在 RFC6455中有文档说明,建立了一个通过单一 TCP 连接在客户端和服务器之间创建双向通信通道的标准化方法。与 HTTP 不同,WebSocket 作为一种独立的 TCP 协议运行,但被设计成可以与 HTTP 无缝协作。例如,它重用了相同的端口,并与相同安全机制兼容。

使用 WebSocket 进行交互会以使用“升级”标头的 HTTP 请求开始,以过渡到 WebSocket 协议。服务器不会用 `200 OK`响应,而是用 `101 Switching Protocols`响应来升级到 WebSocket 连接。在握手成功后,最初的 HTTP 升级请求中使用的 TCP 套接字仍然处于打开状态,这允许客户端和服务器持续地双向交换消息。

HTTP and WebSocket architecture styles

尽管 WebSocket 与 HTTP 兼容并在 HTTP 请求中启动,但至关重要的是意识到这两个协议导致不同的架构和编程模型。

使用 HTTP/REST 时,应用程序围绕处理各种 HTTP 方法和路径的资源/端点进行构造。客户端交互通过使用适当的方法和路径发出 HTTP 请求实现,遵循请求-响应模式。服务器根据路径、方法和标头将传入请求路由到相应处理程序,然后以明确定义的响应进行回复。

相反,WebSocket 通常涉及一个端点,用于初始 HTTP 连接,之后所有消息都使用同一条 TCP 连接。它引入了一个完全不同的交互模型:异步且消息驱动的。

与 HTTP 相反,WebSocket 是一个低级传输协议。消息格式、路由或处理要求客户端与服务器之间就消息语义达成事先协议。

对于 WebSocket 客户端和服务器,HTTP 握手请求中的 Sec-WebSocket-Protocol 标头允许多个级别的消息协议协商。如果不存在,服务器和客户端必须建立自己的约定。

Quarkus WebSockets vs. Quarkus WebSockets Next

本指南使用 quarkus-websockets-next 扩展,它是 WebSocket API 的一个实现,与旧版 quarkus-websockets 扩展相比,效率和可用性更高。原始的 quarkus-websockets 扩展仍然可以访问,将获得持续的支持,但不太可能获得功能开发。

quarkus-websockets 不同的是,quarkus-websockets-next 扩展 not 不会实现 Jakarta WebSocket 规范。相反,它引入了现代 API,优先考虑使用简单性。此外,它经过专门设计,可与 Quarkus 的反应架构和网络层无缝集成。

Quarkus WebSockets Next 扩展使用的注解不同于 JSR 356 中的注解,尽管有时候它们使用相同的名称。JSR 注解带有 Quarkus WebSockets Next 扩展不遵循的语义。

Project setup

要使用 websockets-next 扩展,您需要将 io.quarkus:quarkus-websockets-next 依赖项添加到项目中。

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-websockets-next</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-websockets-next")

Endpoints

服务器和客户端 API 都允许您定义 endpoints,用于接收和发送消息。端点实现为 CDI bean,并支持注入。端点声明了 callback methods,其使用 @OnTextMessage@OnBinaryMessage@OnPong@OnOpen@OnClose@OnError 进行注解。这些方法用于处理各种 WebSocket 事件。通常,当已连接的客户端向服务器发送消息时,会调用用 @OnTextMessage 注解的方法,反之亦然。

客户端 API 还包括 connectors,用于配置和创建新的 WebSocket 连接。

Server endpoints

服务器端点是使用 @io.quarkus.websockets.next.WebSocket 注解的类。WebSocket#path() 的值为用来定义端点的路径。

package org.acme.websockets;

import io.quarkus.websockets.next.WebSocket;
import jakarta.inject.Inject;

@WebSocket(path = "/chat/{username}") 1
public class ChatWebSocket {

}

因此,客户端可以使用 ws://localhost:8080/chat/your-name 连接到此 Web 套接字端点。如果使用 TLS,URL 是 wss://localhost:8443/chat/your-name

endpoint 路径相对于由 quarkus.http.root-path (默认为 /) 配置的 root context。例如,如果您向 application.properties 中添加 quarkus.http.root-path=/api,则客户端可以使用 http://localhost:8080/api/chat/the-name 连接到此端点。

Client endpoints

客户端端点是使用 @io.quarkus.websockets.next.WebSocketClient 注解的类。WebSocketClient#path() 的值为用来定义此客户端将连接到的端点的路径。

package org.acme.websockets;

import io.quarkus.websockets.next.WebSocketClient;
import jakarta.inject.Inject;

@WebSocketClient(path = "/chat/{username}") 1
public class ChatWebSocket {

}

客户端端点用于接收和发送消息。您需要使用 connectors API 来配置和打开新的 WebSocket 连接。

Path parameters

WebSocket 端点的路径可以包含路径参数。语法与 JAX-RS 资源相同: {parameterName}

您可以分别使用 io.quarkus.websockets.next.WebSocketConnection#pathParam(String) 方法或 io.quarkus.websockets.next.WebSocketClientConnection#pathParam(String) 访问路径参数值。或者,自动注入用 @io.quarkus.websockets.next.PathParam 注解的端点回调方法参数。

WebSocketConnection#pathParam(String) example
@Inject io.quarkus.websockets.next.WebSocketConnection connection;
// ...
String value = connection.pathParam("parameterName");

路径参数值始终为字符串。如果路径中不存在路径参数,则 WebSocketConnection#pathParam(String)/WebSocketClientConnection#pathParam(String) 方法会返回 null。如果存在一个用 @PathParam 注解的端点回调方法参数,而参数名称在端点路径中未定义,则构建会失败。

不支持查询参数。但是,您可以使用 WebSocketConnection#handshakeRequest().query() 访问查询

CDI scopes

端点被管理为 CDI Bean。默认情况下,使用了 @Singleton 范围。但是,开发者可以指定备用范围来满足其特定要求。

@Singleton@ApplicationScoped 端点在所有 WebSocket 连接之间共享。因此,实现应该无状态或线程安全。

import jakarta.enterprise.context.SessionScoped;

@WebSocket(path = "/ws")
@SessionScoped 1
public class MyWebSocket {

}
1 此服务器端点不共享,范围限定于会话。

每个 WebSocket 连接都与其自己的 session 上下文相关联。当调用 @OnOpen 方法时,将创建与 WebSocket 连接对应的会话上下文。对 @On[Text|Binary]Message@OnClose 方法的后续调用将使用此相同的会话上下文。会话上下文保持活动状态,直到 @OnClose 方法完成执行,此时会话上下文会终止。

如果 WebSocket 端点未声明 @OnOpen 方法,则仍会创建会话上下文。无论是否具有 @OnClose 方法,会话上下文都将保持活动状态,直到连接终止。

@OnTextMessage,@OnBinaryMessage,@OnOpen@OnClose 注释的方法在方法执行期间(直至生成结果)也将激活请求范围。

Callback methods

WebSocket 端点可以声明:

  • 最多一个 @OnTextMessage 方法:处理来自已连接客户端/服务器的文本消息。

  • 最多一个 @OnBinaryMessage 方法:处理来自已连接客户端/服务器的二进制消息。

  • 最多一个 @OnPongMessage 方法:处理来自已连接客户端/服务器的 pong 消息。

  • 最多一个 @OnOpen 方法:在连接打开时调用。

  • 最多一个 @OnClose 方法:在连接关闭时执行。

  • 任意数量的 @OnError 方法:在发生错误时调用;即,当端点回调抛出运行时错误,或发生转换错误,或返回的 io.smallrye.mutiny.Uni/ io.smallrye.mutiny.Multi 接收失败时调用。

并非所有端点都需要包含所有方法。但是,它必须至少包含 @On[Text|Binary]Message@OnOpen

如果任何端点违反这些规则,则会在构建时抛出错误。表示子 Websocket 的静态嵌套类遵循相同的准则。

在 WebSocket 端点外部使用 @OnTextMessage@OnBinaryMessage@OnOpen@OnClose 注释的任何方法都被视为错误,并且会导致构建失败,并显示适当的错误消息。

Processing messages

从客户端接收消息的方法使用 @OnTextMessage@OnBinaryMessage 注释。

对于从客户端接收的每个 text 消息,都会调用 OnTextMessage。对于客户端接收的每个 binary 消息,都会调用 OnBinaryMessage

Invocation rules

调用这些带注释的方法时,链接到 WebSocket 连接的 session 作用域保持活动状态。此外,请求作用域将持续处于活动状态,直至方法完成(或直至它为异步和反应式方法生成其结果)。

与 Quarkus REST 类似,Quarkus WebSocket Next 支持 blockingnon-blocking 逻辑,这由方法签名和附加注释(如 @Blocking@NonBlocking)决定。

以下规则控制执行:

  • 非阻塞方法必须在连接的事件循环中执行。

  • @RunOnVirtualThread 注释的方法被视为阻塞方法,应在虚拟线程中执行。

  • 如果没有用 @RunOnVirtualThread 注释,则阻塞方法必须在一个工作线程中执行。

  • 当使用 @RunOnVirtualThread 时,每个调用会产生一个新的虚拟线程。

  • 返回 CompletionStageUniMulti 的方法被视为非阻塞方法。

  • 返回 void 或普通对象的方法被视为阻塞方法。

  • Kotlin suspend 函数被视为非阻塞函数。

Method parameters

该方法必须精确接受一个消息参数:

  • 消息对象(任何类型)。

  • 消息类型为 X 的 Multi&lt;X&gt;

但是,它还可以接受以下参数:

  • WebSocketConnection/WebSocketClientConnection

  • HandshakeRequest

  • @PathParam 注释的 String 参数

消息对象表示发送的数据,可以作为原始内容 (StringJsonObjectJsonArrayBufferbyte[]) 访问,也可以访问反序列化的高级对象,这是推荐的方法。

接收 Multi 时,会为每个连接调用一次该方法,并且所提供的 Multi 接收此连接传输的项。该方法必须订阅 Multi 以接收这些项(或返回一个 Multi)。

Supported return types

@OnTextMessage@OnBinaryMessage 注释的方法可以返回各种类型,以有效地处理 WebSocket 通信:

  • void:表示阻断方法,其中不会向客户端发送明确的响应。

  • Uni&lt;Void&gt;:表示在返回 Uni 完成处理时,才表示非阻塞方法。不会向客户端返回显式响应。

  • 类型为 X 的对象表示阻塞方法,其中返回的对象将序列化并作为响应返还给客户端。

  • Uni&lt;X&gt;:指定非阻塞方法,其中由非空 Uni 发出的项将作为响应发送到客户端。

  • Multi&lt;X&gt;:表示非阻塞方法,其中由非空 Multi 发出的项将按顺序发送给客户端,直至完成或取消。

  • 返回 Unit 的 Kotlin suspend 函数:表示非阻塞方法,此时不会向客户端返回显式响应。

  • 返回 X 的 Kotlin suspend 函数:指定非阻塞方法,其中返回的项将作为响应发送到客户端。

以下是这些方法的一些示例:

@OnTextMessage
void consume(Message m) {
// Process the incoming message. The method is called on an executor thread for each incoming message.
}

@OnTextMessage
Uni<Void> consumeAsync(Message m) {
// Process the incoming message. The method is called on an event loop thread for each incoming message.
// The method completes when the returned Uni emits its item.
}

@OnTextMessage
ResponseMessage process(Message m) {
// Process the incoming message and send a response to the client.
// The method is called for each incoming message.
// Note that if the method returns `null`, no response will be sent to the client.
}

@OnTextMessage
Uni<ResponseMessage> processAsync(Message m) {
// Process the incoming message and send a response to the client.
// The method is called for each incoming message.
// Note that if the method returns `null`, no response will be sent to the client. The method completes when the returned Uni emits its item.
}

@OnTextMessage
Multi<ResponseMessage> stream(Message m) {
// Process the incoming message and send multiple responses to the client.
// The method is called for each incoming message.
// The method completes when the returned Multi emits its completion signal.
// The method cannot return `null` (but an empty multi if no response must be sent)
}

返回 Multi 时,Quarkus 会自动订阅返回的 Multi,并一直写入已发出的项,直到完成、失败或取消。失败或取消将终止连接。

Streams

除了各个消息之外,WebSocket 端点还可以处理消息流。在这种情况下,该方法接收一个 Multi<X> 作为参数。每个 X 实例都使用上面列出的相同规则进行反序列化。

接收 Multi 的方法可以返回另一个 Multivoid。如果该方法返回 Multi,则不必订阅传入的 multi

@OnTextMessage
public Multi<ChatMessage> stream(Multi<ChatMessage> incoming) {
    return incoming.log();
}

这种方法允许双向流。

当方法返回 void 时,它必须订阅传入的 Multi

@OnTextMessage
public void stream(Multi<ChatMessage> incoming) {
    incoming.subscribe().with(item -> log(item));
}

Skipping reply

当方法旨在生成一条写到客户端的消息时,它可以发出 null。发出 null 表示不对客户端发送任何消息,从而允许在需要时跳过响应。

JsonObject and JsonArray

Vert.x JsonObjectJsonArray 实例绕过序列化和反序列化机制。消息以文本消息的形式发送。

OnOpen and OnClose methods

还可以在客户端连接或断开连接时通知 WebSocket 端点。

这是通过使用 @OnOpen@OnClose 来注释方法完成的:

@OnOpen(broadcast = true)
public ChatMessage onOpen() {
    return new ChatMessage(MessageType.USER_JOINED, connection.pathParam("username"), null);
}

@Inject WebSocketConnection connection;

@OnClose
public void onClose() {
    ChatMessage departure = new ChatMessage(MessageType.USER_LEFT, connection.pathParam("username"), null);
    connection.broadcast().sendTextAndAwait(departure);
}

在客户端连接时触发 @OnOpen,而在客户端断开连接时调用 @OnClose

这些方法可以访问 session-scoped WebSocketConnection bean。

Parameters

使用 @OnOpen@OnClose 注释的方法可以接受以下参数:

  • WebSocketConnection/WebSocketClientConnection

  • HandshakeRequest

  • @PathParam 注释的 String 参数

注释为 @OnClose 的端点方法也可能接受 io.quarkus.websockets.next.CloseReason 参数,此参数可能表示关闭连接的原因。

Supported return types

@OnOpen@OnClose 方法支持不同的返回类型。

对于 @OnOpen 方法,与 @On[Text|Binary]Message 相同的规则适用。因此,注释为 @OnOpen 的方法可以在连接后立即向客户端发送消息。@OnOpen 方法支持的返回类型包括:

  • void:表示阻止方法,不向已连接的客户端发回显式消息。

  • Uni&lt;Void&gt;:表示非阻止方法,其中返回的 Uni 完成表示处理结束。没有消息发回给客户端。

  • 类型为 X 的对象:表示阻止方法,其中返回的对象序列化并发回给客户端。

  • Uni&lt;X&gt;:指定一个非阻止方法,其中非空 Uni 发射的项目被发送给客户端。

  • Multi&lt;X&gt;:表示非阻塞方法,其中由非空 Multi 发出的项将按顺序发送给客户端,直至完成或取消。

  • Kotlin suspend 功能返回 Unit:表示非阻止方法,其中没有显式消息被发送回给客户端。

  • Kotlin suspend 功能返回 X:指定一个非阻止方法,其中返回的项目被发送给客户端。

发送到客户端的项目为 serialized ,但 Stringio.vertx.core.json.JsonObjectio.vertx.core.json.JsonArrayio.vertx.core.buffer.Bufferbyte[] 类型除外。对于 Multi,Quarkus 订阅返回的 Multi 并将其写入 WebSocket 中发送它们。StringJsonObjectJsonArray 作为文本消息发送。Buffers 和字节数组作为二进制消息发送。

对于 @OnClose 方法,支持的返回类型包括:

  • void:该方法被认为是阻止的。

  • Uni&lt;Void&gt;:该方法被认为是非阻止的。

  • Kotlin suspend 功能返回 Unit:该方法被认为是非阻止的。

在服务器端点上声明的 @OnClose 方法不能通过返回对象将项目发送给已连接的客户端。它们只能通过使用 WebSocketConnection 对象将消息发送给其他客户端。

Error handling

当发生错误时,也可以通知 WebSocket 端点。当端点回调抛出运行时错误,或发生转换错误,或返回的 io.smallrye.mutiny.Uni/io.smallrye.mutiny.Multi 接收失败时,调用注释为 @io.quarkus.websockets.next.OnError 的 WebSocket 端点方法。

该方法必须接受正好一个 error 参数,即从 java.lang.Throwable 分配的参数。该方法还可以接受以下参数:

  • WebSocketConnection/WebSocketClientConnection

  • HandshakeRequest

  • @PathParam 注释的 String 参数

一个端点可以声明多个注释为 @io.quarkus.websockets.next.OnError 的方法。但是,每个方法必须声明一个不同的错误参数。选择声明实际异常的最具体超类型的方法。

@io.quarkus.websockets.next.OnError 注释还可以用于声明全局错误处理程序,即未在 WebSocket 端点上声明的方法。此类方法可能不接受 @PathParam 参数。在端点上声明的错误处理程序优先于全局错误处理程序。

当错误发生且无错误处理程序可以处理失败时,Quarkus 将采用由 `quarkus.websockets-next.server.unhandled-failure-strategy`指定的方法。默认情况下,连接已关闭。另外,可以记录一条错误消息或不执行任何操作。

Serialization and deserialization

WebSocket Next 扩展支持消息的自动序列化和反序列化。

类型为 StringJsonObjectJsonArrayBuffer、和 `byte[]`的对象会被按原样发送,并绕过序列化和反序列化。当未提供任何编码器时,序列化和反序列化会自动将消息从 JSON 转换过来,或者将消息转换为 JSON。

当需要自定义序列化和反序列化时,你可以提供一个自定义编码器。

Custom codec

要实现一个自定义编码器,你必须提供一个实现 CDI Bean 的:

  • io.quarkus.websockets.next.BinaryMessageCodec for binary messages

  • io.quarkus.websockets.next.TextMessageCodec for text messages

以下示例展示了如何为 `Item`类实现一个自定义编码器:

@Singleton
public class ItemBinaryMessageCodec implements BinaryMessageCodec<Item> {

    @Override
    public boolean supports(Type type) {
        // Allows selecting the right codec for the right type
        return type.equals(Item.class);
    }

    @Override
    public Buffer encode(Item value) {
        // Serialization
        return Buffer.buffer(value.toString());
    }

    @Override
    public Item decode(Type type, Buffer value) {
        // Deserialization
        return new Item(value.toString());
    }
}

`OnTextMessage`和 `OnBinaryMessage`方法还能明确指定应该使用哪个编码器:

@OnTextMessage(codec = MyInputCodec.class) (1)
Item find(Item item) {
        //....
}
  1. 为消息的反序列化和序列化都指定编码器

当序列化和反序列化必须使用不同编码器时,可以分别为序列化和反序列化指定要使用的编码器:

@OnTextMessage(
        codec = MyInputCodec.class, (1)
        outputCodec = MyOutputCodec.class (2)
Item find(Item item) {
        //....
}
  1. 为传入消息的反序列化指定编码器

  2. 为传出消息的序列化指定编码器

Ping/pong messages

ping message可以用作保持连接或验证远程终端。 pong message作为 ping 消息的响应发送,它必须具有相同的有效负载。

服务器/客户端终端自动响应从客户端/服务器发送的 ping 消息。换句话说,在终端上声明 `@OnPingMessage`回调是没有必要的。

服务器可以向已连接的客户端发送 ping 消息。WebSocketConnection/WebSocketClientConnection`声明用于发送 ping 消息的方法;有一个非阻塞变量 `sendPing(Buffer)`和一个阻塞变量 `sendPingAndAwait(Buffer)。默认情况下,ping 消息不会自动发送。然而,配置属性 `quarkus.websockets-next.server.auto-ping-interval`和 `quarkus.websockets-next.client.auto-ping-interval`可以用于设置时间间隔,服务器/客户端会在此时间间隔后向已连接的客户端/服务器发送 ping 消息。

quarkus.websockets-next.server.auto-ping-interval=2 1
1 每 2 秒向已连接的客户端从服务器发送 ping 消息。

@OnPongMessage`注释用于定义一个回调,它使用从客户端/服务器发送的 pong 消息。一个终端必须声明最多一个用 `@OnPongMessage`注释的方法。回调方法必须返回 `void`或 `Uni<Void>(或返回 `Unit`的 Kotlin `suspend`函数),并且它必须接受一个类型为 `Buffer`的单个参数。

@OnPongMessage
void pong(Buffer data) {
    // ....
}

服务器/客户端还可以发送非请求的 pong 消息,这些消息可以用作单向心跳。有一个非阻塞变量: WebSocketConnection#sendPong(Buffer),还有一个阻塞变量: WebSocketConnection#sendPongAndAwait(Buffer)

Inbound processing mode

WebSocket 终端可以使用 @WebSocket#inboundProcessingMode()`和 `@WebSocketClient.inboundProcessingMode()`分别定义用于处理特定连接的接收事件所用的模式。接收事件可以代表一条消息(文本、二进制、pong)、打开连接和关闭连接。默认情况下,事件按串行处理,并且会确保排序。这意味着,如果一个终端接收事件 `A`和 `B(按这个特定顺序),那么事件 B`的回调将在事件 `A`的回调完成以后被调用。然而,在某些情况下,最好并发处理事件,即不保证排序,但也无并发限制。对于这种情况,应该使用 `InboundProcessingMode#CONCURRENT

Server API

HTTP server configuration

此扩展程序会重新使用 _main_HTTP 服务器。

因此,WebSocket 服务器的配置是在 quarkus.http. 配置部分完成的。

应用程序内配置的 WebSocket 路径与由 quarkus.http.root 定义的根路径(默认值为 /)连接在一起。此连接确保 WebSocket 端点正确地放置在应用程序的 URL 结构中。

有关更多详细信息,请参阅 HTTP guide

Sub-websockets endpoints

@WebSocket 端点可封装静态嵌套类,这些类还用 @WebSocket 注释,并表示 sub-websockets。这些子 WebSocket 的最终路径会连接外层类和嵌套类的路径。最终路径是根据 HTTP URL 规则进行标准化的。

子 WebSocket 继承对 @WebSocket 注释中声明的路径参数的访问权限,该注释用于外层类和嵌套类。在以下示例中,外层类内的 consumePrimary 方法可访问 version 参数。同时,嵌套类内的 consumeNested 方法可访问 versionid 参数:

@WebSocket(path = "/ws/v{version}")
public class MyPrimaryWebSocket {

    @OnTextMessage
    void consumePrimary(String s)    { ... }

    @WebSocket(path = "/products/{id}")
    public static class MyNestedWebSocket {

      @OnTextMessage
      void consumeNested(String s)    { ... }

    }
}

WebSocket connection

io.quarkus.websockets.next.WebSocketConnection 对象表示 WebSocket 连接。Quarkus 提供了一个 @SessionScoped CDI Bean,该 Bean 实施此接口,可以在 WebSocket 端点中注入,并用于与已连接的客户端进行交互。

使用 @OnOpen@OnTextMessage@OnBinaryMessage@OnClose 进行注释的方法可以访问注入的 WebSocketConnection 对象:

@Inject WebSocketConnection connection;

请注意,在这些方法之外,WebSocketConnection 对象不可用。但是,可以 list all open connections

该连接可用于向客户端发送消息、访问路径参数、向所有已连接的客户端广播消息,等等。

// Send a message:
connection.sendTextAndAwait("Hello!");

// Broadcast messages:
connection.broadcast().sendTextAndAwait(departure);

// Access path parameters:
String param = connection.pathParam("foo");

WebSocketConnection 提供发送消息的阻塞和非阻塞方法变体:

  • sendTextAndAwait(String message):向客户端发送一条文本消息并等待发送该消息。该方法具有阻塞性且只应在执行程序线程中调用。

  • sendText(String message):向客户端发送一条文本消息。它返回一个 Uni。该方法是无阻塞的,但您必须订阅它。

List open connections

还可以列出所有打开的连接。Quarkus 提供了一个类型为 io.quarkus.websockets.next.OpenConnections 的 CDI Bean,该 Bean 声明了方便访问连接的方法。

import io.quarkus.logging.Log;
import io.quarkus.websockets.next.OpenConnections;

class MyBean {

  @Inject
  OpenConnections connections;

  void logAllOpenConnections() {
     Log.infof("Open connections: %s", connections.listAll()); 1
  }
}
1 OpenConnections#listAll() 返回所有打开连接在给定时间的不可变快照。

还有其他一些方便的方法。例如,OpenConnections#findByEndpointId(String) 可以让您轻松地找到某个特定端点的连接。

CDI events

当打开一个新连接时,Quarkus 会使用限定符 @io.quarkus.websockets.next.Open 异步触发类型为 io.quarkus.websockets.next.WebSocketConnection 的 CDI 事件。此外,当一个连接关闭时,会使用限定符 @io.quarkus.websockets.next.Closed 异步触发类型为 WebSocketConnection 的 CDI 事件。

import jakarta.enterprise.event.ObservesAsync;
import io.quarkus.websockets.next.Open;
import io.quarkus.websockets.next.WebSocketConnection;

class MyBean {

  void connectionOpened(@ObservesAsync @Open WebSocketConnection connection) { 1
     // This observer method is called when a connection is opened...
  }
}
1 异步观察者方法使用默认阻塞执行程序服务执行。

Security

可以使用安全注释来保护 WebSocket 端点回调方法,例如 io.quarkus.security.Authenticatedjakarta.annotation.security.RolesAllowed 以及 Supported security annotations 文档中列出的其他注释。

例如:

package io.quarkus.websockets.next.test.security;

import jakarta.annotation.security.RolesAllowed;
import jakarta.inject.Inject;

import io.quarkus.security.ForbiddenException;
import io.quarkus.security.identity.SecurityIdentity;
import io.quarkus.websockets.next.OnError;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;

@WebSocket(path = "/end")
public class Endpoint {

    @Inject
    SecurityIdentity currentIdentity;

    @OnOpen
    String open() {
        return "ready";
    }

    @RolesAllowed("admin")
    @OnTextMessage
    String echo(String message) { 1
        return message;
    }

    @OnError
    String error(ForbiddenException t) { 2
        return "forbidden:" + currentIdentity.getPrincipal().getName();
    }
}
1 只有当当前安全标识具有 admin 角色时,才可以调用 echo 回调方法。
2 发生授权故障后调用错误处理程序。

`SecurityIdentity`最初是在安全 HTTP 升级期间创建的,并与 websocket 连接关联。

当使用 OpenId Connect 扩展且令牌过期时,Quarkus 将自动关闭连接。

Secure HTTP upgrade

当标准安全注释放在端点类中或定义了 HTTP 安全策略时,HTTP 升级将被保护。保护 HTTP 升级的优点是处理更少,授权可以及早执行,并且只执行一次。您应始终更喜欢 HTTP 升级安全性,除非像上面的示例中一样,您需要对错误执行操作。

Use standard security annotation to secure an HTTP upgrade
package io.quarkus.websockets.next.test.security;

import io.quarkus.security.Authenticated;
import jakarta.inject.Inject;

import io.quarkus.security.identity.SecurityIdentity;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;

@Authenticated 1
@WebSocket(path = "/end")
public class Endpoint {

    @Inject
    SecurityIdentity currentIdentity;

    @OnOpen
    String open() {
        return "ready";
    }

    @OnTextMessage
    String echo(String message) {
        return message;
    }
}
1 最初的 HTTP 握手以匿名用户的 401 状态结束。您还可以使用 `quarkus.websockets-next.server.security.auth-failure-redirect-url`配置属性重定向在授权故障时的握手请求。

仅当在 `@WebSocket`注释旁边的端点类上声明安全注释时,HTTP 升级才受到保护。在端点 bean 上放置一个安全注释不会保护 bean 方法,只会保护 HTTP 升级。您必须始终验证端点是否按预期受保护。

Use HTTP Security policy to secure an HTTP upgrade
quarkus.http.auth.permission.http-upgrade.paths=/end
quarkus.http.auth.permission.http-upgrade.policy=authenticated

Inspect and/or reject HTTP upgrade

要检查 HTTP 升级,您必须提供实现 `io.quarkus.websockets.next.HttpUpgradeCheck`接口的 CDI bean。Quarkus 在应该升级为 WebSocket 连接的每个 HTTP 请求上调用 `HttpUpgradeCheck#perform`方法。在此方法中,您可以执行任何业务逻辑和/或拒绝 HTTP 升级。

Example HttpUpgradeCheck
package io.quarkus.websockets.next.test;

import io.quarkus.websockets.next.HttpUpgradeCheck;
import io.smallrye.mutiny.Uni;
import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped 1
public class ExampleHttpUpgradeCheck implements HttpUpgradeCheck {

    @Override
    public Uni<CheckResult> perform(HttpUpgradeContext ctx) {
        if (rejectUpgrade(ctx)) {
            return CheckResult.rejectUpgrade(400); 2
        }
        return CheckResult.permitUpgrade();
    }

    private boolean rejectUpgrade(HttpUpgradeContext ctx) {
        var headers = ctx.httpRequest().headers();
        // implement your business logic in here
    }
}
1 实现 HttpUpgradeCheck`接口的 CDI bean 可以是 `@ApplicationScoped、`@Singleton`或 `@Dependent`bean,但绝不能是 `@RequestScoped`bean。
2 拒绝 HTTP 升级。最初的 HTTP 握手以 400 错误请求响应状态代码结束。

您可以使用 `HttpUpgradeCheck#appliesTo`方法为应用 `HttpUpgradeCheck`的 WebSocket 端点进行选择。

TLS

由于此扩展重复使用 _main_HTTP 服务器,因此所有相关的服务器配置都适用。有关更多详细信息,请参阅 HTTP guide

Client API

Client connectors

`io.quarkus.websockets.next.WebSocketConnector<CLIENT>`用于为客户端端点配置和创建新连接。提供了一个实现此接口的 CDI bean,并可以将其注入其他 bean 中。实际类型参数用于确定客户端端点。该类型在构建期间得到验证 - 如果它不表示客户端端点,则构建失败。

让我们考虑以下客户端端点:

Client endpoint
@WebSocketClient(path = "/endpoint/{name}")
public class ClientEndpoint {

    @OnTextMessage
    void onMessage(@PathParam String name, String message, WebSocketClientConnection connection) {
        // ...
    }
}

此客户端端点的连接器使用如下:

Connector
@Singleton
public class MyBean {

    @ConfigProperty(name = "enpoint.uri")
    URI myUri;

    @Inject
    WebSocketConnector<ClientEndpoint> connector; 1

    void openAndSendMessage() {
        WebSocketClientConnection connection = connector
            .baseUri(uri) 2
            .pathParam("name", "Roxanne") 3
            .connectAndAwait();
        connection.sendTextAndAwait("Hi!"); 4
    }
}
1 注入 `ClientEndpoint`的连接器。
2 如果未提供基本 URI,我们将尝试从 config 中获取值。该密钥由 client id 和 `.base-uri`后缀组成。
3 设置路径参数值。如果客户端端点路径不包含具有给定名称的参数,则抛出 IllegalArgumentException
4 根据需要使用连接发送消息。

如果应用程序尝试为丢失的端点注入连接器,则会抛出错误。

Basic connector

如果应用程序开发者不需要客户端端点和连接器的组合,则可以使用 basic connector 。基本连接器是一种创建连接以及使用/发送消息的简单方法,无需定义客户端端点。

Basic connector
@Singleton
public class MyBean {

    @Inject
    BasicWebSocketConnector connector; 1

    void openAndConsume() {
        WebSocketClientConnection connection = connector
            .baseUri(uri) 2
            .path("/ws") 3
            .executionModel(ExecutionModel.NON_BLOCKING) 4
            .onTextMessage((c, m) -> { 5
               // ...
            })
            .connectAndAwait();
    }
}
1 Inject the connector.
2 必须始终设置基本 URI。
3 应附加到基本 URI 的附加路径。
4 设定回调处理程序的执行模型。默认情况下,回调可能会阻塞当前线程。然而在这种情况下,回调在事件循环上执行,并且可能不会阻塞当前线程。
5 lambda 将在从服务器发送的每条文本消息中被调用。

基本连接器更接近底层 API,仅供高级用户使用。然而,与其他底层 WebSocket 客户端不同,它仍然是一个 CDI bean,可以在其他 bean 中注入。它还提供了一种配置回调执行模型的方法,确保与其他 Quarkus 组件的最佳集成。

WebSocket client connection

io.quarkus.websockets.next.WebSocketClientConnection 对象表示 WebSocket 连接。Quarkus 提供了一个实现此接口的 @SessionScoped CDI bean,该 bean 可以在 WebSocketClient 端点中注入并用于与已连接的服务器进行交互。

注释有 @OnOpen@OnTextMessage@OnBinaryMessage`和 `@OnClose 的方法可以访问注入的 WebSocketClientConnection 对象:

@Inject WebSocketClientConnection connection;

请注意,在这些方法之外,WebSocketClientConnection 对象是不可用的。然而,可以 list all open client connections

该连接可用于向客户端发送消息、访问路径参数等。

// Send a message:
connection.sendTextAndAwait("Hello!");

// Broadcast messages:
connection.broadcast().sendTextAndAwait(departure);

// Access path parameters:
String param = connection.pathParam("foo");

WebSocketClientConnection 提供了阻塞和非阻塞方法变体来发送消息:

  • sendTextAndAwait(String message):向客户端发送一条文本消息并等待发送该消息。该方法具有阻塞性且只应在执行程序线程中调用。

  • sendText(String message):向客户端发送一条文本消息。它返回一个 Uni。该方法是无阻塞的,但您必须订阅它。

List open client connections

还可以列出所有打开的连接。Quarkus 提供了一个 io.quarkus.websockets.next.OpenClientConnections 类型的 CDI bean,该 bean 声明了访问连接的便捷方法。

import io.quarkus.logging.Log;
import io.quarkus.websockets.next.OpenClientConnections;

class MyBean {

  @Inject
  OpenClientConnections connections;

  void logAllOpenClinetConnections() {
     Log.infof("Open client connections: %s", connections.listAll()); 1
  }
}
1 OpenClientConnections#listAll() 返回给定时间所有打开连接的不可变快照。

还有其他便捷方法。例如,OpenClientConnections#findByClientId(String) 可以轻松找到特定端点的连接。

CDI events

Quarkus 在打开新连接时以限定符 @io.quarkus.websockets.next.Open 异步触发类型为 io.quarkus.websockets.next.WebSocketClientConnection 的 CDI 事件。此外,在连接关闭时以限定符 @io.quarkus.websockets.next.Closed 异步触发类型为 WebSocketClientConnection 的 CDI 事件。

import jakarta.enterprise.event.ObservesAsync;
import io.quarkus.websockets.next.Open;
import io.quarkus.websockets.next.WebSocketClientConnection;

class MyBean {

  void connectionOpened(@ObservesAsync @Open WebSocketClientConnection connection) { 1
     // This observer method is called when a connection is opened...
  }
}
1 异步观察者方法使用默认阻塞执行程序服务执行。

Configuring SSL/TLS

要建立一个 TLS 连接,您需要使用 TLS registry 配置一个 named 配置:

quarkus.tls.my-ws-client.trust-store.p12.path=server-truststore.p12
quarkus.tls.my-ws-client.trust-store.p12.password=secret

quarkus.websockets-next.client.tls-configuration-name=my-ws-client # Reference the named configuration

使用 WebSocket 客户端时,需使用 named 配置来避免与其他 TLS 配置发生冲突。客户端将不会使用默认的 TLS 配置。

当您配置一个 named TLS 配置时,默认情况下会启用 TLS。

Traffic logging

Quarkus 可以记录收发消息以进行调试。要启用服务器的流量日志记录,将 quarkus.websockets-next.server.traffic-logging.enabled 配置属性设置为 true。要启用客户端的流量日志记录,将 quarkus.websockets-next.client.traffic-logging.enabled 配置属性设置为 true。文本消息的载荷也会被记录。然而,记录字符的数量是有限的。默认限制为 100,但是您可以分别使用 quarkus.websockets-next.server.traffic-logging.text-payload-limitquarkus.websockets-next.client.traffic-logging.text-payload-limit 配置属性更改此限制。

如果已为记录器 io.quarkus.websockets.next.traffic`启用了 `DEBUG 级别,才会记录消息。

Example server configuration
quarkus.websockets-next.server.traffic-logging.enabled=true 1
quarkus.websockets-next.server.traffic-logging.text-payload-limit=50 2

quarkus.log.category."io.quarkus.websockets.next.traffic".level=DEBUG 3
1 Enables traffic logging.
2 设置要记录的文本消息有效负载的字符数。
3 为记录器 io.quarkus.websockets.next.traffic`启用 `DEBUG 级别。

Configuration reference

Unresolved include directive in modules/ROOT/pages/websockets-next-reference.adoc - include::../../../target/quarkus-generated-doc/config/quarkus-websockets-next.adoc[]