RSocket
-
Fire-and-Forget
-
WebSocket
-
TCP
-
Spring
-
Annotated Responders
-
RSocketExchange
-
ConnectMapping :description: 这篇文档描述了 Spring Framework 对 RSocket 协议的支持。RSocket 是一种用于通过 TCP、WebSocket 和其他字节流传输进行多路复用、双向通信的应用程序协议。RSocket 通过以下三种交互模型之一建立连接:请求 - 响应;请求 - 流;信道。建立初始连接后,客户端和服务器之间的区别就不复存在,因为双方都变得对称了,并且每一方都可以发起请求。本文重点介绍了 RSocket 的主要特性和优点,包括:Reactive Streams 语义;请求节流;会话恢复;大消息的分段和重新组装;保活(心跳)。本文还提供了 Java 实现的信息、RSocket 协议的简要概述、RSocket 请求器和响应器以及 RSocket 接口的详细信息。
本节描述了 Spring Framework 对 RSocket 协议的支持。
Overview
RSocket 是用于通过 TCP、WebSocket 和其他字节流传输进行多路复用、双向通信的应用程序协议,它使用以下交互模型之一:
-
Request-Response
— 发送一条消息并收到一条回复消息。 -
Request-Stream
— 发送一条消息并收到一系列回复消息。 -
Channel
— 双向发送一系列消息。 -
Fire-and-Forget
— send a one-way message.
建立初始连接后,“客户端”与“服务器”之间的区别就不复存在,因为双方都变得对称,并且每一方都可以发起上述交互之一。这就是为什么在协议调用中,参与方称为“请求方”和“应答方”,而上述交互称为“请求流”或“请求”。
以下是 RSocket 协议的主要特性和优点:
-
Reactive Streams 语义跨网络边界 — 对于流式请求(例如
Request-Stream
和Channel
),反压信号在请求方和应答方之间传输,这让请求方可以减慢源头的应答方速度,从而减少对网络层拥塞控制的依赖,以及对缓冲区在网络级别或任何级别上的需求。 -
请求节流 — 此功能以
LEASE
帧命名(每端都可以发送此帧),可以限制一段时间内允许的请求总数。租约会定期续订。 -
会话恢复 — 此功能专为连接丢失而设计,需要维护一些状态。状态管理对应用程序来说是透明的,当可能时它可以停止生产者并减少所需的状态量,与反压配合使用效果很好。
-
对大消息进行分段和重新组装。
-
Keepalive (heartbeats).
RSocket 有多种语言的https://github.com/rsocket[实现]。https://github.com/rsocket/rsocket-java[Java 库]建立在https://projectreactor.io/[Project Reactor]和https://github.com/reactor/reactor-netty[Reactor Netty](用于传输)之上。这意味着您应用程序中 Reactive Streams 发布者的信号可以透明地通过 RSocket 在网络中传播。
The Protocol
RSocket 的一项优点就在于它在传输线上有明确的行为,还有一个该协议的https://rsocket.io/about/protocol[规范]以及该协议的几个https://github.com/rsocket/rsocket/tree/master/Extensions[扩展],但它们都很容易阅读。因此,阅读一下该规范是个好主意,虽然该规范和语言实现及更高级的框架 API 没有关系。本节提供一个简明的概览,以确定一些上下文。
连接
最初,客户端通过一些低级流传输(例如 TCP 或 WebSocket)连接到服务器,并向服务器发送 SETUP
帧以设置连接参数。
服务器可能会拒绝 SETUP
帧,但通常在发送给服务器(对于客户端)和接收给服务器(对于服务器)后,双方都可以开始发出请求,除非 SETUP
表示使用租赁语义来限制请求的数量,在这种情况下,双方必须等到另一端的 LEASE
帧允许发出请求。
发出请求
建立连接后,双方都可以通过 REQUEST_RESPONSE
、REQUEST_STREAM
、REQUEST_CHANNEL
或 REQUEST_FNF
之一帧发起请求。这些帧中的每一个都将一条消息从请求方传递给应答方。
然后,应答方可能会返回带有响应消息的 PAYLOAD
帧,并且在 REQUEST_CHANNEL
的情况下,请求方也可以发送带有更多请求消息的 PAYLOAD
帧。
当一个请求涉及到一个消息流(例如 Request-Stream
和 Channel
)时,应答方必须遵守来自请求方的需求信号。需求表示为消息的数量。初始需求在 REQUEST_STREAM
和 REQUEST_CHANNEL
帧中指定。后续需求通过 REQUEST_N
帧发出信号。
双方还可以通过 METADATA_PUSH
帧发送元数据通知,这些通知不属于任何单个请求,而是属于整个连接。
消息格式
RSocket 消息包含数据和元数据。元数据可用于发送路由、安全令牌等数据。数据和元数据可以采用不同的格式。每种数据的 Mime 类型在 SETUP
帧中声明,并应用于给定连接上的所有请求。
虽然所有消息都可以具有元数据,但通常,诸如路由之类的元数据是针对每个请求的,因此只包含在请求的第一个消息中,即以下帧之一:“REQUEST_RESPONSE
”、“REQUEST_STREAM
”、“REQUEST_CHANNEL
”或“REQUEST_FNF
”。
协议扩展定义了在应用程序中使用的通用元数据格式:
Java Implementation
RSocket 的https://github.com/rsocket/rsocket-java[Java 实现]建立在https://projectreactor.io/[Project Reactor]之上。TCP 和 WebSocket 的传输建立在https://github.com/reactor/reactor-netty[Reactor Netty]之上。Reactor 是一个反应式流库,它简化了实现该协议的工作。对于应用程序来说,与 `Flux`和 `Mono`配合使用具有声明式运算符和透明反压支持,这是一个自然而然的选择。
RSocket Java 中的 API 是有意的最小且基本的。它专注于协议特性,并将应用程序编程模型(例如,RPC 代码生成与其他模型)作为一个更高级别的独立关注点。
主合约https://github.com/rsocket/rsocket-java/tree/master//rsocket-core/src/main/java/io/rsocket/RSocket.java[io.rsocket.RSocket]对四种请求交互类型进行了建模,其中 `Mono`表示一条消息的承诺,`Flux`表示一系列消息,而 `io.rsocket.Payload`则表示实际消息(可访问到数据和元数据,形式是字节缓冲区)。对 `RSocket`契约的对称使用。对于请求,应用程序会收到一个 `RSocket`来执行请求。对于响应,应用程序实现 `RSocket`以处理请求。
这并不是一个全面的介绍。在大多数情况下,Spring 应用程序不必直接使用其 API。然而,独立于 Spring 查看或体验 RSocket 可能很重要。RSocket Java 存储库包含许多https://github.com/rsocket/rsocket-java/tree/master//rsocket-examples[示例应用],展示了其 API 和协议特性。
Spring Support
spring-messaging
模块包含以下内容:
-
RSocketRequester — 流畅的 API,用于通过
io.rsocket.RSocket
请求,并对数据和元数据进行编码/解码。 -
Annotated Responders —
@MessageMapping`和 `@RSocketExchange
带注释的处理程序方法,用于响应。 -
RSocket Interface — 使用带有
@RSocketExchange
方法的 Java 接口作为 RSocket 服务声明,用作请求方或应答方。
spring-web
模块包含 RSocket 应用程序可能需要的 Jackson CBOR/JSON 和 Protobuf 等 Encoder
和 Decoder
实现。它还包含可插入以进行高效路由匹配的 PathPatternParser
。
Spring Boot 2.2 支持在 TCP 或 WebSocket 上建立一个 RSocket 服务器,包括在 WebFlux 服务器中通过 WebSocket 公开 RSocket 的选项。对于一个 RSocketRequester.Builder`和 `RSocketStrategies
,还有客户端支持和自动配置。有关更多详细信息,请参阅 Spring Boot 参考中的https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.rsocket[RSocket 部分]。
Spring Security 5.2 提供 RSocket 支持。
Spring Integration 5.2 提供传入和传出网关,以与 RSocket 客户端和服务器进行交互。有关更多详细信息,请参阅 Spring Integration 参考手册。
Spring Cloud Gateway 支持 RSocket 连接。
RSocketRequester
RSocketRequester
提供一个流畅的 API 来执行 RSocket 请求,接受和返回用于数据和元数据的对象,而不是低级数据缓冲区。它可以对称使用,从客户端发出请求,并从服务器发出请求。
Client Requester
在客户端获取 RSocketRequester
是连接到服务器,其中涉及使用连接设置发送 RSocket SETUP
帧。RSocketRequester
提供了一个帮助器来准备 io.rsocket.core.RSocketConnector
,其中包括 SETUP
帧的连接设置。
这是使用默认设置连接的最基本方法:
-
Java
-
Kotlin
RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);
URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)
URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)
以上操作不会立即连接。当发出请求时,将透明地建立共享连接并使用。
Connection Setup
RSocketRequester.Builder
提供以下内容来定制初始 SETUP
帧:
-
dataMimeType(MimeType)
— 设置连接上数据的 MIME 类型。 -
@(1) - 在连接上设定元数据的 MIME 类型。
-
@(2) - 要 include 在 @(3) 中的数据。
-
@(4) - 要 include 在 @(5) 中的元数据中的路由。
-
@(6) - 要 include 在 @(7) 中的其他元数据。
对于数据,默认 mime 类型源自配置的第一个 Decoder
。对于元数据,默认 mime 类型是https://github.com/rsocket/rsocket/tree/master/Extensions/CompositeMetadata.md[复合元数据],它允许每个请求有多个元数据值和 mime 类型对。通常不需要更改两者。
SETUP
帧中的数据和元数据是可选的。在服务器端,@ConnectMapping 方法可用于处理连接的开始和 SETUP
帧的内容。元数据可用于连接级别安全性。
Strategies
`RSocketRequester.Builder`接受 `RSocketStrategies`以配置请求方。您需要使用它来提供编码器和解码器,用于数据的 (de)-serialization 化以及元数据值。默认情况下,仅在 `String`的 `spring-core`中注册了 `byte[]`和 `ByteBuffer`的基本编解码器。添加 `spring-web`可以访问可以按如下方式注册的更多内容:
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.build()
val requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000)
RSocketStrategies
被设计为可复用。在某些场景中(例如,同一个应用程序中的客户端和服务器),最好在 Spring 配置中声明它。
Client Responders
RSocketRequester.Builder
可用于配置来自服务器的请求的响应器。
您可以使用基于与服务器上使用的相同基础架构的注释处理程序进行客户端响应,但以编程方式注册,如下所示:
- Java
-
RSocketStrategies strategies = RSocketStrategies.builder() .routeMatcher(new PathPatternRouteMatcher()) (1) .build(); SocketAcceptor responder = RSocketMessageHandler.responder(strategies, new ClientHandler()); (2) RSocketRequester requester = RSocketRequester.builder() .rsocketConnector(connector -> connector.acceptor(responder)) (3) .tcp("localhost", 7000);
1 | 如果存在 @(9),则使用 @(8) 以实现高效的路由匹配。 |
2 | 使用具有 @(10) 和/或 @(11) 方法的类创建一个响应器。 |
3 | Register the responder.
|
4 | 如果存在 @(9),则使用 @(8) 以实现高效的路由匹配。 |
5 | 使用具有 @(10) 和/或 @(11) 方法的类创建一个响应器。 |
6 | Register the responder. |
请注意,上述内容只是一个快捷方式,旨在对客户端响应程序进行编程注册。对于客户端响应程序位于 Spring 配置中的其他场景,您仍然可以将 RSocketMessageHandler
声明为 Spring Bean,然后按如下方式应用:
-
Java
-
Kotlin
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(handler.responder()) }
.tcp("localhost", 7000)
对于上述内容,您可能还需要在 RSocketMessageHandler
中使用 setHandlerPredicate
来切换到用于检测客户端响应程序的不同策略,例如基于 @RSocketClientResponder
等自定义注释而不是默认的 @Controller
。这在客户端和服务器或同一个应用程序中的多个客户端的场景中是必需的。
另请参阅 Annotated Responders,以了解有关编程模型的更多信息。
Advanced
RSocketRequesterBuilder
提供了一个回调来公开底层的 io.rsocket.core.RSocketConnector
,用于为保活间隔、会话恢复、拦截器等提供更多配置选项。您可以按如下方式配置该级别的选项:
-
Java
-
Kotlin
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
.rsocketConnector {
//...
}
.tcp("localhost", 7000)
Server Requester
从服务器向已连接客户端发出请求,只涉及从服务器获取已连接客户端的请求者。
在 Annotated Responders 中,@ConnectMapping
和 @MessageMapping
方法支持 RSocketRequester
参数。使用它来访问连接的请求者。请记住,@ConnectMapping
方法本质上是 SETUP
帧的处理程序,必须在开始请求之前处理。因此,开始时的请求必须与处理分离。例如:
- Java
-
@ConnectMapping Mono<Void> handle(RSocketRequester requester) { requester.route("status").data("5") .retrieveFlux(StatusReport.class) .subscribe(bar -> { (1) // ... }); return ... (2) }
1 | 异步启动请求,独立于处理。 |
2 | 执行处理并返回完成 @(12)。
|
3 | 异步启动请求,独立于处理。 |
4 | 在暂停函数中执行处理。 |
Requests
- Java
-
ViewBox viewBox = ... ; Flux<AirportLocation> locations = requester.route("locate.radars.within") (1) .data(viewBox) (2) .retrieveFlux(AirportLocation.class); (3)
1 | 指定一个路由以 include 在请求消息的元数据中。 |
2 | 提供请求消息的数据。 |
3 | Declare the expected response.
|
4 | 指定一个路由以 include 在请求消息的元数据中。 |
5 | 提供请求消息的数据。 |
6 | Declare the expected response. |
交互类型是由输入和输出基数隐式确定的。上述示例为 Request-Stream
,因为它发送了一个值并接收了一个值流。在大多数情况下,只要输入和输出选择与 RSocket 交互类型以及响应者预期的输入和输出类型相匹配,您就不需要考虑这一点。无效组合的唯一示例是一对多。
data(Object)
方法还接受任何 Reactive Streams Publisher
,包括 Flux
和 Mono
,以及 ReactiveAdapterRegistry
中注册的任何其他值生成器。对于生成相同类型值的 Multi 值 Publisher
(如 Flux
),请考虑使用一个重载的 data
方法,以避免每个元素都进行类型检查和 Encoder
查找:
data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
data(Object)
步骤是可选的。对于不发送数据的请求,请跳过此步骤:
-
Java
-
Kotlin
Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait
val location = requester.route("find.radar.EWR")
.retrieveAndAwait<AirportLocation>()
如果使用https://github.com/rsocket/rsocket/tree/master/Extensions/CompositeMetadata.md[复合元数据](默认值)并且注册的 `Encoder`支持这些值,则可以添加额外的元数据值。例如:
-
Java
-
Kotlin
String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow
val requester: RSocketRequester = ...
val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
val locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlow<AirportLocation>()
对于 Fire-and-Forget
,请使用返回 Mono<Void>
的 send()
方法。请注意,Mono
仅指示已成功发送消息,而不是已处理消息。
对于 Metadata-Push
,请使用返回值为 Mono<Void>
的 sendMetadata()
方法。
Annotated Responders
RSocket 响应器可以作为 @MessageMapping
和 @ConnectMapping
方法实现。@MessageMapping
方法处理单个请求,而 @ConnectMapping
方法处理连接級事件(安装和元数据推送)。支持注解响应器,以便从服务器端或客户端响应。
Server Responders
要在服务器端使用注解响应器,请将 RSocketMessageHandler
添加到 Spring 配置中,以检测具有 @MessageMapping
和 @ConnectMapping
方法的 @Controller
Bean:
-
Java
-
Kotlin
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.routeMatcher(new PathPatternRouteMatcher());
return handler;
}
}
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
routeMatcher = PathPatternRouteMatcher()
}
}
然后通过 Java RSocket API 启动 RSocket 服务器,并为响应器插入 RSocketMessageHandler
,如下所示:
-
Java
-
Kotlin
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
CloseableChannel server =
RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.block();
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val server = RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.awaitSingle()
RSocketMessageHandler
默认支持 composite 和 routing 元数据。如果你需要切换到不同 MIME 类型或注册其他元数据 MIME 类型,可以设置其 MetadataExtractor。
您需要设置元数据和数据格式支持所需的 Encoder
和 Decoder
实例。您可能需要 spring-web
模块用于编解码器实现。
默认情况下,SimpleRouteMatcher
用于通过 AntPathMatcher
匹配路由。我们建议将 spring-web
中的 PathPatternRouteMatcher
插入用于高效路由匹配。RSocket 路由可以是分层的,但不是 URL 路径。默认情况下,两个路由匹配器都使用“.”作为分隔符,没有像 HTTP URL 中那样的 URL 解码。
RSocketMessageHandler
可通过 RSocketStrategies
进行配置,当您需要在同一进程中的客户端和服务器之间共享配置时,这可能很有用:
-
Java
-
Kotlin
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setRSocketStrategies(rsocketStrategies());
return handler;
}
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.routeMatcher(new PathPatternRouteMatcher())
.build();
}
}
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
rSocketStrategies = rsocketStrategies()
}
@Bean
fun rsocketStrategies() = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.routeMatcher(PathPatternRouteMatcher())
.build()
}
Client Responders
客户机端的带注释响应器需要在`RSocketRequester.Builder`中配置。有关详细信息,请参见Client Responders。
@MessageMapping
-
Java
-
Kotlin
@Controller
public class RadarsController {
@MessageMapping("locate.radars.within")
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
@Controller
class RadarsController {
@MessageMapping("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
上述 @MessageMapping
方法响应具有路由“locate.radars.within”的请求流交互。它支持一个灵活的方法签名,其中提供使用以下方法参数的选项:
Method Argument | Description |
---|---|
|
请求有效负载。这可以是异步类型的具体值,例如 |
|
用于向远程终端发出请求的请求者。 |
|
从路由中提取的值,基于映射模式中的变量,例如 |
|
元数据值注册为提取,如 MetadataExtractor中所述。 |
|
所有元数据值注册为提取,如 MetadataExtractor中所述。 |
返回值预期是多个对象,其作为响应有效负载进行序列化。这可以是异步类型(如 Mono
或 Flux
)、一个具体值、void
或无值异步类型(如 Mono<Void>
)。
@MessageMapping
方法支持的 RSocket 交互类型是由输入(即 @Payload
参数)和输出的基数确定的,其中基数表示以下内容:
Cardinality | Description |
---|---|
1 |
显式值或单值异步类型,例如 |
Many |
多值异步类型,例如 |
0 |
对于输入,这意味着该方法没有 |
下表显示了所有输入和输出基数组合以及相应的交互类型:
Input Cardinality | Output Cardinality | Interaction Types |
---|---|---|
0, 1 |
0 |
Fire-and-Forget, Request-Response |
0, 1 |
1 |
Request-Response |
0, 1 |
Many |
Request-Stream |
Many |
0, 1, Many |
Request-Channel |
@RSocketExchange
作为 @MessageMapping
的替代方法,您还可以使用 @RSocketExchange
方法处理请求。此类方法在 [RSocket Interface,rsocket-interface] 中声明,可以用作请求者通过 RSocketServiceProxyFactory
或由响应器实现。
例如,将请求作为响应器处理:
-
Java
-
Kotlin
public interface RadarsService {
@RSocketExchange("locate.radars.within")
Flux<AirportLocation> radars(MapRequest request);
}
@Controller
public class RadarsController implements RadarsService {
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
interface RadarsService {
@RSocketExchange("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation>
}
@Controller
class RadarsController : RadarsService {
override fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
@RSocketExhange
和 @MessageMapping
之间有一些区别,因为前者需要适用于请求者和响应者的使用。例如,虽然 @MessageMapping
可以声明为处理任意数量的路由,且每个路由可以是一个模式,但 @RSocketExchange
必须使用一个具体的路由声明。还有一些与元数据相关的受支持方法参数的差异,有关受支持参数的列表,请参见 <<@MessageMapping,rsocket-annot-messagemapping>> 和 [RSocket Interface,rsocket-interface]。
@RSocketExchange
可以在类型级别使用,为给定的 RSocket 服务接口指定所有路由的公共前缀。
@ConnectMapping
@ConnectMapping
在 RSocket 连接开始时处理 SETUP
帧,以及后续的元数据推送通知通过 METADATA_PUSH
帧,即 io.rsocket.RSocket
中的 metadataPush(Payload)
。
@ConnectMapping`方法支持与@MessageMapping相同的自变量,但基于来自`SETUP`和`METADATA_PUSH`帧的元数据和数据。
@ConnectMapping`可以有一个模式,可以将处理范围缩小到具有元数据中路由的特定连接,或者如果没有声明模式,则所有连接都匹配。
`@ConnectMapping`方法不能返回数据,并且必须声明为以`void`或`Mono<Void>`作为返回值。如果处理为新连接返回错误,则拒绝该连接。不得保留处理以向`RSocketRequester`发出连接请求。有关详细信息,请参见Server Requester。
MetadataExtractor
响应者必须解释元数据。https://github.com/rsocket/rsocket/tree/master/Extensions/CompositeMetadata.md[复合元数据]允许独立格式化的元数据值(例如,用于路由、安全性、跟踪),每个值都有自己的 mime 类型。应用程序需要一种方法来配置要支持的元数据 mime 类型,以及一种方法来访问提取的值。
MetadataExtractor
是一个用于获取已序列化的元数据并返回解码后的键值对的约定,然后可以通过名称(例如通过带注释的处理程序方法中的 @Header
)访问这些键值对。
`DefaultMetadataExtractor`可以接收 `Decoder`实例来解码元数据。开箱即用,它内置了对https://github.com/rsocket/rsocket/tree/master/Extensions/Routing.md["message/x.rsocket.routing.v0"]的支持,对它解码为`String`并将它保存在“route”键下。对于任何其他 mime 类型,您需要提供一个 `Decoder`并注册 mime 类型,如下所示:
-
Java
-
Kotlin
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")
组合元数据非常适合于组合独立元数据值。不过,请求程序可能不支持组合元数据,或者可能选择不使用它。为此,DefaultMetadataExtractor
可能需要自定义逻辑来将解码值映射到输出映射。下面是一个将 JSON 用于元数据的示例:
-
Java
-
Kotlin
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
MimeType.valueOf("application/vnd.myapp.metadata+json"),
new ParameterizedTypeReference<Map<String,String>>() {},
(jsonMap, outputMap) -> {
outputMap.putAll(jsonMap);
});
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
outputMap.putAll(jsonMap)
}
当通过 RSocketStrategies
配置 MetadataExtractor
时,您可以让 RSocketStrategies.Builder
使用配置的解码器创建提取器,并仅仅使用回调以如下方式自定义注册:
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.metadataExtractorRegistry(registry -> {
registry.metadataToExtract(fooMimeType, Foo.class, "foo");
// ...
})
.build();
import org.springframework.messaging.rsocket.metadataToExtract
val strategies = RSocketStrategies.builder()
.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
registry.metadataToExtract<Foo>(fooMimeType, "foo")
// ...
}
.build()
RSocket Interface
Spring Framework 允许你使用`@RSocketExchange`方法将 RSocket 服务定义为 Java 接口。你可以将此类接口传递给`RSocketServiceProxyFactory`,以创建通过RSocketRequester执行请求的代理。你还可以实现接口作为处理请求的响应器。
首先使用 @RSocketExchange
方法创建接口:
interface RadarService {
@RSocketExchange("radars")
Flux<AirportLocation> getRadars(@Payload MapRequest request);
// more RSocket exchange methods...
}
现在您可以创建代理,当调用方法时,该代理将执行请求:
RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();
RadarService service = factory.createClient(RadarService.class);
你还可以实现接口以处理请求作为响应器。请参阅Annotated Responders。
Method Parameters
经过注释的 RSocket 交换方法支持具有以下方法参数的灵活方法签名:
Method argument | Description |
---|---|
|
将路由变量添加到 |
|
设置请求的输入有效负载。这可以是具体值或任何可以通过 |
|
输入有效负载中元数据项的值。这可以是任意 |
|
元数据项的 |