RSocket

  • Fire-and-Forget

  • WebSocket

  • TCP

  • Spring

  • Annotated Responders

  • RSocketExchange

  • ConnectMapping :description: 这篇文档描述了 Spring Framework 对 RSocket 协议的支持。RSocket 是一种用于通过 TCP、WebSocket 和其他字节流传输进行多路复用、双向通信的应用程序协议。RSocket 通过以下三种交互模型之一建立连接:请求 - 响应;请求 - 流;信道。建立初始连接后,客户端和服务器之间的区别就不复存在,因为双方都变得对称了,并且每一方都可以发起请求。本文重点介绍了 RSocket 的主要特性和优点,包括:Reactive Streams 语义;请求节流;会话恢复;大消息的分段和重新组装;保活(心跳)。本文还提供了 Java 实现的信息、RSocket 协议的简要概述、RSocket 请求器和响应器以及 RSocket 接口的详细信息。

本节描述了 Spring Framework 对 RSocket 协议的支持。

This section describes Spring Framework’s support for the RSocket protocol.

Overview

RSocket 是用于通过 TCP、WebSocket 和其他字节流传输进行多路复用、双向通信的应用程序协议,它使用以下交互模型之一:

RSocket is an application protocol for multiplexed, duplex communication over TCP, WebSocket, and other byte stream transports, using one of the following interaction models:

  • Request-Response — send one message and receive one back.

  • Request-Stream — send one message and receive a stream of messages back.

  • Channel — send streams of messages in both directions.

  • Fire-and-Forget — send a one-way message.

建立初始连接后,“客户端”与“服务器”之间的区别就不复存在,因为双方都变得对称,并且每一方都可以发起上述交互之一。这就是为什么在协议调用中,参与方称为“请求方”和“应答方”,而上述交互称为“请求流”或“请求”。

Once the initial connection is made, the "client" vs "server" distinction is lost as both sides become symmetrical and each side can initiate one of the above interactions. This is why in the protocol calls the participating sides "requester" and "responder" while the above interactions are called "request streams" or simply "requests".

以下是 RSocket 协议的主要特性和优点:

These are the key features and benefits of the RSocket protocol:

  • Reactive Streams semantics across network boundary — for streaming requests such as Request-Stream and Channel, back pressure signals travel between requester and responder, allowing a requester to slow down a responder at the source, hence reducing reliance on network layer congestion control, and the need for buffering at the network level or at any level.

  • Request throttling — this feature is named "Leasing" after the LEASE frame that can be sent from each end to limit the total number of requests allowed by other end for a given time. Leases are renewed periodically.

  • Session resumption — this is designed for loss of connectivity and requires some state to be maintained. The state management is transparent for applications, and works well in combination with back pressure which can stop a producer when possible and reduce the amount of state required.

  • Fragmentation and re-assembly of large messages.

  • Keepalive (heartbeats).

RSocket 有多种语言的https://github.com/rsocket[实现]。https://github.com/rsocket/rsocket-java[Java 库]建立在https://projectreactor.io/[Project Reactor]和https://github.com/reactor/reactor-netty[Reactor Netty](用于传输)之上。这意味着您应用程序中 Reactive Streams 发布者的信号可以透明地通过 RSocket 在网络中传播。

RSocket has implementations in multiple languages. The Java library is built on Project Reactor, and Reactor Netty for the transport. That means signals from Reactive Streams Publishers in your application propagate transparently through RSocket across the network.

The Protocol

RSocket 的一项优点就在于它在传输线上有明确的行为,还有一个该协议的https://rsocket.io/about/protocol[规范]以及该协议的几个https://github.com/rsocket/rsocket/tree/master/Extensions[扩展],但它们都很容易阅读。因此,阅读一下该规范是个好主意,虽然该规范和语言实现及更高级的框架 API 没有关系。本节提供一个简明的概览,以确定一些上下文。

One of the benefits of RSocket is that it has well defined behavior on the wire and an easy to read specification along with some protocol extensions. Therefore it is a good idea to read the spec, independent of language implementations and higher level framework APIs. This section provides a succinct overview to establish some context.

连接

Connecting

最初,客户端通过一些低级流传输(例如 TCP 或 WebSocket)连接到服务器,并向服务器发送 SETUP 帧以设置连接参数。

Initially a client connects to a server via some low level streaming transport such as TCP or WebSocket and sends a SETUP frame to the server to set parameters for the connection.

服务器可能会拒绝 SETUP 帧,但通常在发送给服务器(对于客户端)和接收给服务器(对于服务器)后,双方都可以开始发出请求,除非 SETUP 表示使用租赁语义来限制请求的数量,在这种情况下,双方必须等到另一端的 LEASE 帧允许发出请求。

The server may reject the SETUP frame, but generally after it is sent (for the client) and received (for the server), both sides can begin to make requests, unless SETUP indicates use of leasing semantics to limit the number of requests, in which case both sides must wait for a LEASE frame from the other end to permit making requests.

发出请求

Making Requests

建立连接后,双方都可以通过 REQUEST_RESPONSEREQUEST_STREAMREQUEST_CHANNELREQUEST_FNF 之一帧发起请求。这些帧中的每一个都将一条消息从请求方传递给应答方。

Once a connection is established, both sides may initiate a request through one of the frames REQUEST_RESPONSE, REQUEST_STREAM, REQUEST_CHANNEL, or REQUEST_FNF. Each of those frames carries one message from the requester to the responder.

然后,应答方可能会返回带有响应消息的 PAYLOAD 帧,并且在 REQUEST_CHANNEL 的情况下,请求方也可以发送带有更多请求消息的 PAYLOAD 帧。

The responder may then return PAYLOAD frames with response messages, and in the case of REQUEST_CHANNEL the requester may also send PAYLOAD frames with more request messages.

当一个请求涉及到一个消息流(例如 Request-StreamChannel)时,应答方必须遵守来自请求方的需求信号。需求表示为消息的数量。初始需求在 REQUEST_STREAMREQUEST_CHANNEL 帧中指定。后续需求通过 REQUEST_N 帧发出信号。

When a request involves a stream of messages such as Request-Stream and Channel, the responder must respect demand signals from the requester. Demand is expressed as a number of messages. Initial demand is specified in REQUEST_STREAM and REQUEST_CHANNEL frames. Subsequent demand is signaled via REQUEST_N frames.

双方还可以通过 METADATA_PUSH 帧发送元数据通知,这些通知不属于任何单个请求,而是属于整个连接。

Each side may also send metadata notifications, via the METADATA_PUSH frame, that do not pertain to any individual request but rather to the connection as a whole.

消息格式

Message Format

RSocket 消息包含数据和元数据。元数据可用于发送路由、安全令牌等数据。数据和元数据可以采用不同的格式。每种数据的 Mime 类型在 SETUP 帧中声明,并应用于给定连接上的所有请求。

RSocket messages contain data and metadata. Metadata can be used to send a route, a security token, etc. Data and metadata can be formatted differently. Mime types for each are declared in the SETUP frame and apply to all requests on a given connection.

虽然所有消息都可以具有元数据,但通常,诸如路由之类的元数据是针对每个请求的,因此只包含在请求的第一个消息中,即以下帧之一:“REQUEST_RESPONSE”、“REQUEST_STREAM”、“REQUEST_CHANNEL”或“REQUEST_FNF”。

While all messages can have metadata, typically metadata such as a route are per-request and therefore only included in the first message on a request, i.e. with one of the frames REQUEST_RESPONSE, REQUEST_STREAM, REQUEST_CHANNEL, or REQUEST_FNF.

协议扩展定义了在应用程序中使用的通用元数据格式:

Protocol extensions define common metadata formats for use in applications:

Java Implementation

RSocket 的https://github.com/rsocket/rsocket-java[Java 实现]建立在https://projectreactor.io/[Project Reactor]之上。TCP 和 WebSocket 的传输建立在https://github.com/reactor/reactor-netty[Reactor Netty]之上。Reactor 是一个反应式流库,它简化了实现该协议的工作。对于应用程序来说,与 `Flux`和 `Mono`配合使用具有声明式运算符和透明反压支持,这是一个自然而然的选择。

The Java implementation for RSocket is built on Project Reactor. The transports for TCP and WebSocket are built on Reactor Netty. As a Reactive Streams library, Reactor simplifies the job of implementing the protocol. For applications it is a natural fit to use Flux and Mono with declarative operators and transparent back pressure support.

RSocket Java 中的 API 是有意的最小且基本的。它专注于协议特性,并将应用程序编程模型(例如,RPC 代码生成与其他模型)作为一个更高级别的独立关注点。

The API in RSocket Java is intentionally minimal and basic. It focuses on protocol features and leaves the application programming model (e.g. RPC codegen vs other) as a higher level, independent concern.

主合约https://github.com/rsocket/rsocket-java/tree/master//rsocket-core/src/main/java/io/rsocket/RSocket.java[io.rsocket.RSocket]对四种请求交互类型进行了建模,其中 `Mono`表示一条消息的承诺,`Flux`表示一系列消息,而 `io.rsocket.Payload`则表示实际消息(可访问到数据和元数据,形式是字节缓冲区)。对 `RSocket`契约的对称使用。对于请求,应用程序会收到一个 `RSocket`来执行请求。对于响应,应用程序实现 `RSocket`以处理请求。

The main contract io.rsocket.RSocket models the four request interaction types with Mono representing a promise for a single message, Flux a stream of messages, and io.rsocket.Payload the actual message with access to data and metadata as byte buffers. The RSocket contract is used symmetrically. For requesting, the application is given an RSocket to perform requests with. For responding, the application implements RSocket to handle requests.

这并不是一个全面的介绍。在大多数情况下,Spring 应用程序不必直接使用其 API。然而,独立于 Spring 查看或体验 RSocket 可能很重要。RSocket Java 存储库包含许多https://github.com/rsocket/rsocket-java/tree/master//rsocket-examples[示例应用],展示了其 API 和协议特性。

This is not meant to be a thorough introduction. For the most part, Spring applications will not have to use its API directly. However it may be important to see or experiment with RSocket independent of Spring. The RSocket Java repository contains a number of sample apps that demonstrate its API and protocol features.

Spring Support

spring-messaging 模块包含以下内容:

The spring-messaging module contains the following:

  • RSocketRequester — fluent API to make requests through an io.rsocket.RSocket with data and metadata encoding/decoding.

  • Annotated Responders — `@MessageMapping` and @RSocketExchange annotated handler methods for responding.

  • RSocket Interface — RSocket service declaration as Java interface with @RSocketExchange methods, for use as requester or responder.

spring-web 模块包含 RSocket 应用程序可能需要的 Jackson CBOR/JSON 和 Protobuf 等 EncoderDecoder 实现。它还包含可插入以进行高效路由匹配的 PathPatternParser

The spring-web module contains Encoder and Decoder implementations such as Jackson CBOR/JSON, and Protobuf that RSocket applications will likely need. It also contains the PathPatternParser that can be plugged in for efficient route matching.

Spring Boot 2.2 支持在 TCP 或 WebSocket 上建立一个 RSocket 服务器,包括在 WebFlux 服务器中通过 WebSocket 公开 RSocket 的选项。对于一个 RSocketRequester.Builder`和 `RSocketStrategies,还有客户端支持和自动配置。有关更多详细信息,请参阅 Spring Boot 参考中的https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.rsocket[RSocket 部分]。

Spring Boot 2.2 supports standing up an RSocket server over TCP or WebSocket, including the option to expose RSocket over WebSocket in a WebFlux server. There is also client support and auto-configuration for an RSocketRequester.Builder and RSocketStrategies. See the RSocket section in the Spring Boot reference for more details.

Spring Security 5.2 提供 RSocket 支持。

Spring Security 5.2 provides RSocket support.

Spring Integration 5.2 提供传入和传出网关,以与 RSocket 客户端和服务器进行交互。有关更多详细信息,请参阅 Spring Integration 参考手册。

Spring Integration 5.2 provides inbound and outbound gateways to interact with RSocket clients and servers. See the Spring Integration Reference Manual for more details.

Spring Cloud Gateway 支持 RSocket 连接。

Spring Cloud Gateway supports RSocket connections.

RSocketRequester

RSocketRequester 提供一个流畅的 API 来执行 RSocket 请求,接受和返回用于数据和元数据的对象,而不是低级数据缓冲区。它可以对称使用,从客户端发出请求,并从服务器发出请求。

RSocketRequester provides a fluent API to perform RSocket requests, accepting and returning objects for data and metadata instead of low level data buffers. It can be used symmetrically, to make requests from clients and to make requests from servers.

Client Requester

在客户端获取 RSocketRequester 是连接到服务器,其中涉及使用连接设置发送 RSocket SETUP 帧。RSocketRequester 提供了一个帮助器来准备 io.rsocket.core.RSocketConnector,其中包括 SETUP 帧的连接设置。

To obtain an RSocketRequester on the client side is to connect to a server which involves sending an RSocket SETUP frame with connection settings. RSocketRequester provides a builder that helps to prepare an io.rsocket.core.RSocketConnector including connection settings for the SETUP frame.

这是使用默认设置连接的最基本方法:

This is the most basic way to connect with default settings:

  • Java

  • Kotlin

RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);

URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)

URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)

以上操作不会立即连接。当发出请求时,将透明地建立共享连接并使用。

The above does not connect immediately. When requests are made, a shared connection is established transparently and used.

Connection Setup

RSocketRequester.Builder 提供以下内容来定制初始 SETUP 帧:

RSocketRequester.Builder provides the following to customize the initial SETUP frame:

  • dataMimeType(MimeType) — set the mime type for data on the connection.

  • metadataMimeType(MimeType) — set the mime type for metadata on the connection.

  • setupData(Object) — data to include in the SETUP.

  • setupRoute(String, Object…​) — route in the metadata to include in the SETUP.

  • setupMetadata(Object, MimeType) — other metadata to include in the SETUP.

对于数据,默认 mime 类型源自配置的第一个 Decoder。对于元数据,默认 mime 类型是https://github.com/rsocket/rsocket/tree/master/Extensions/CompositeMetadata.md[复合元数据],它允许每个请求有多个元数据值和 mime 类型对。通常不需要更改两者。

For data, the default mime type is derived from the first configured Decoder. For metadata, the default mime type is composite metadata which allows multiple metadata value and mime type pairs per request. Typically both don’t need to be changed.

SETUP 帧中的数据和元数据是可选的。在服务器端,@ConnectMapping 方法可用于处理连接的开始和 SETUP 帧的内容。元数据可用于连接级别安全性。

Data and metadata in the SETUP frame is optional. On the server side, @ConnectMapping methods can be used to handle the start of a connection and the content of the SETUP frame. Metadata may be used for connection level security.

Strategies

`RSocketRequester.Builder`接受 `RSocketStrategies`以配置请求方。您需要使用它来提供编码器和解码器,用于数据的 (de)-serialization 化以及元数据值。默认情况下,仅在 `String`的 `spring-core`中注册了 `byte[]`和 `ByteBuffer`的基本编解码器。添加 `spring-web`可以访问可以按如下方式注册的更多内容:

RSocketRequester.Builder accepts RSocketStrategies to configure the requester. You’ll need to use this to provide encoders and decoders for (de)-serialization of data and metadata values. By default only the basic codecs from spring-core for String, byte[], and ByteBuffer are registered. Adding spring-web provides access to more that can be registered as follows:

  • Java

  • Kotlin

RSocketStrategies strategies = RSocketStrategies.builder()
	.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
	.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
	.build();

RSocketRequester requester = RSocketRequester.builder()
	.rsocketStrategies(strategies)
	.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
		.encoders { it.add(Jackson2CborEncoder()) }
		.decoders { it.add(Jackson2CborDecoder()) }
		.build()

val requester = RSocketRequester.builder()
		.rsocketStrategies(strategies)
		.tcp("localhost", 7000)

RSocketStrategies 被设计为可复用。在某些场景中(例如,同一个应用程序中的客户端和服务器),最好在 Spring 配置中声明它。

RSocketStrategies is designed for re-use. In some scenarios, e.g. client and server in the same application, it may be preferable to declare it in Spring configuration.

Client Responders

RSocketRequester.Builder 可用于配置来自服务器的请求的响应器。

RSocketRequester.Builder can be used to configure responders to requests from the server.

您可以使用基于与服务器上使用的相同基础架构的注释处理程序进行客户端响应,但以编程方式注册,如下所示:

You can use annotated handlers for client-side responding based on the same infrastructure that’s used on a server, but registered programmatically as follows:

Java
RSocketStrategies strategies = RSocketStrategies.builder()
	.routeMatcher(new PathPatternRouteMatcher())  (1)
	.build();

SocketAcceptor responder =
	RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> connector.acceptor(responder)) (3)
	.tcp("localhost", 7000);
1 Use PathPatternRouteMatcher, if spring-web is present, for efficient route matching.
2 Create a responder from a class with @MessageMapping and/or @ConnectMapping methods.
3 Register the responder.
Kotlin
val strategies = RSocketStrategies.builder()
		.routeMatcher(PathPatternRouteMatcher())  (1)
		.build()

val responder =
	RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

val requester = RSocketRequester.builder()
		.rsocketConnector { it.acceptor(responder) } (3)
		.tcp("localhost", 7000)
4 Use PathPatternRouteMatcher, if spring-web is present, for efficient route matching.
5 Create a responder from a class with @MessageMapping and/or @ConnectMapping methods.
6 Register the responder.

请注意,上述内容只是一个快捷方式,旨在对客户端响应程序进行编程注册。对于客户端响应程序位于 Spring 配置中的其他场景,您仍然可以将 RSocketMessageHandler 声明为 Spring Bean,然后按如下方式应用:

Note the above is only a shortcut designed for programmatic registration of client responders. For alternative scenarios, where client responders are in Spring configuration, you can still declare RSocketMessageHandler as a Spring bean and then apply as follows:

  • Java

  • Kotlin

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> connector.acceptor(handler.responder()))
	.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val requester = RSocketRequester.builder()
		.rsocketConnector { it.acceptor(handler.responder()) }
		.tcp("localhost", 7000)

对于上述内容,您可能还需要在 RSocketMessageHandler 中使用 setHandlerPredicate 来切换到用于检测客户端响应程序的不同策略,例如基于 @RSocketClientResponder 等自定义注释而不是默认的 @Controller。这在客户端和服务器或同一个应用程序中的多个客户端的场景中是必需的。

For the above you may also need to use setHandlerPredicate in RSocketMessageHandler to switch to a different strategy for detecting client responders, e.g. based on a custom annotation such as @RSocketClientResponder vs the default @Controller. This is necessary in scenarios with client and server, or multiple clients in the same application.

另请参阅 Annotated Responders,以了解有关编程模型的更多信息。

See also Annotated Responders, for more on the programming model.

Advanced

RSocketRequesterBuilder 提供了一个回调来公开底层的 io.rsocket.core.RSocketConnector,用于为保活间隔、会话恢复、拦截器等提供更多配置选项。您可以按如下方式配置该级别的选项:

RSocketRequesterBuilder provides a callback to expose the underlying io.rsocket.core.RSocketConnector for further configuration options for keepalive intervals, session resumption, interceptors, and more. You can configure options at that level as follows:

  • Java

  • Kotlin

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> {
		// ...
	})
	.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
		.rsocketConnector {
			//...
		}
		.tcp("localhost", 7000)

Server Requester

从服务器向已连接客户端发出请求,只涉及从服务器获取已连接客户端的请求者。

To make requests from a server to connected clients is a matter of obtaining the requester for the connected client from the server.

Annotated Responders 中,@ConnectMapping@MessageMapping 方法支持 RSocketRequester 参数。使用它来访问连接的请求者。请记住,@ConnectMapping 方法本质上是 SETUP 帧的处理程序,必须在开始请求之前处理。因此,开始时的请求必须与处理分离。例如:

In Annotated Responders, @ConnectMapping and @MessageMapping methods support an RSocketRequester argument. Use it to access the requester for the connection. Keep in mind that @ConnectMapping methods are essentially handlers of the SETUP frame which must be handled before requests can begin. Therefore, requests at the very start must be decoupled from handling. For example:

Java
@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
	requester.route("status").data("5")
		.retrieveFlux(StatusReport.class)
		.subscribe(bar -> { (1)
			// ...
		});
	return ... (2)
}
1 Start the request asynchronously, independent from handling.
2 Perform handling and return completion Mono<Void>.
Kotlin
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
	GlobalScope.launch {
		requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
			// ...
		}
	}
	/// ... 2
}
3 Start the request asynchronously, independent from handling.
4 Perform handling in the suspending function.

Requests

获得 clientserver 请求者后,你可以如下进行请求:

Once you have a client or server requester, you can make requests as follows:

Java
ViewBox viewBox = ... ;

Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlux(AirportLocation.class); (3)
1 Specify a route to include in the metadata of the request message.
2 Provide data for the request message.
3 Declare the expected response.
Kotlin
val viewBox: ViewBox = ...

val locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlow<AirportLocation>() (3)
4 Specify a route to include in the metadata of the request message.
5 Provide data for the request message.
6 Declare the expected response.

交互类型是由输入和输出基数隐式确定的。上述示例为 Request-Stream,因为它发送了一个值并接收了一个值流。在大多数情况下,只要输入和输出选择与 RSocket 交互类型以及响应者预期的输入和输出类型相匹配,您就不需要考虑这一点。无效组合的唯一示例是一对多。

The interaction type is determined implicitly from the cardinality of the input and output. The above example is a Request-Stream because one value is sent and a stream of values is received. For the most part you don’t need to think about this as long as the choice of input and output matches an RSocket interaction type and the types of input and output expected by the responder. The only example of an invalid combination is many-to-one.

data(Object) 方法还接受任何 Reactive Streams Publisher,包括 FluxMono,以及 ReactiveAdapterRegistry 中注册的任何其他值生成器。对于生成相同类型值的 Multi 值 Publisher(如 Flux),请考虑使用一个重载的 data 方法,以避免每个元素都进行类型检查和 Encoder 查找:

The data(Object) method also accepts any Reactive Streams Publisher, including Flux and Mono, as well as any other producer of value(s) that is registered in the ReactiveAdapterRegistry. For a multi-value Publisher such as Flux which produces the same types of values, consider using one of the overloaded data methods to avoid having type checks and Encoder lookup on every element:

data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);

data(Object) 步骤是可选的。对于不发送数据的请求,请跳过此步骤:

The data(Object) step is optional. Skip it for requests that don’t send data:

  • Java

  • Kotlin

Mono<AirportLocation> location = requester.route("find.radar.EWR"))
	.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait

val location = requester.route("find.radar.EWR")
	.retrieveAndAwait<AirportLocation>()

如果使用https://github.com/rsocket/rsocket/tree/master/Extensions/CompositeMetadata.md[复合元数据](默认值)并且注册的 `Encoder`支持这些值,则可以添加额外的元数据值。例如:

Extra metadata values can be added if using composite metadata (the default) and if the values are supported by a registered Encoder. For example:

  • Java

  • Kotlin

String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");

Flux<AirportLocation> locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow

val requester: RSocketRequester = ...

val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")

val locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlow<AirportLocation>()

对于 Fire-and-Forget,请使用返回 Mono<Void>send() 方法。请注意,Mono 仅指示已成功发送消息,而不是已处理消息。

For Fire-and-Forget use the send() method that returns Mono<Void>. Note that the Mono indicates only that the message was successfully sent, and not that it was handled.

对于 Metadata-Push,请使用返回值为 Mono<Void>sendMetadata() 方法。

For Metadata-Push use the sendMetadata() method with a Mono<Void> return value.

Annotated Responders

RSocket 响应器可以作为 @MessageMapping@ConnectMapping 方法实现。@MessageMapping 方法处理单个请求,而 @ConnectMapping 方法处理连接級事件(安装和元数据推送)。支持注解响应器,以便从服务器端或客户端响应。

RSocket responders can be implemented as @MessageMapping and @ConnectMapping methods. @MessageMapping methods handle individual requests while @ConnectMapping methods handle connection-level events (setup and metadata push). Annotated responders are supported symmetrically, for responding from the server side and for responding from the client side.

Server Responders

要在服务器端使用注解响应器,请将 RSocketMessageHandler 添加到 Spring 配置中,以检测具有 @MessageMapping@ConnectMapping 方法的 @Controller Bean:

To use annotated responders on the server side, add RSocketMessageHandler to your Spring configuration to detect @Controller beans with @MessageMapping and @ConnectMapping methods:

  • Java

  • Kotlin

@Configuration
static class ServerConfig {

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.routeMatcher(new PathPatternRouteMatcher());
		return handler;
	}
}
@Configuration
class ServerConfig {

	@Bean
	fun rsocketMessageHandler() = RSocketMessageHandler().apply {
		routeMatcher = PathPatternRouteMatcher()
	}
}

然后通过 Java RSocket API 启动 RSocket 服务器,并为响应器插入 RSocketMessageHandler,如下所示:

Then start an RSocket server through the Java RSocket API and plug the RSocketMessageHandler for the responder as follows:

  • Java

  • Kotlin

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

CloseableChannel server =
	RSocketServer.create(handler.responder())
		.bind(TcpServerTransport.create("localhost", 7000))
		.block();
import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val server = RSocketServer.create(handler.responder())
		.bind(TcpServerTransport.create("localhost", 7000))
		.awaitSingle()

RSocketMessageHandler 默认支持 compositerouting 元数据。如果你需要切换到不同 MIME 类型或注册其他元数据 MIME 类型,可以设置其 MetadataExtractor

RSocketMessageHandler supports composite and routing metadata by default. You can set its MetadataExtractor if you need to switch to a different mime type or register additional metadata mime types.

您需要设置元数据和数据格式支持所需的 EncoderDecoder 实例。您可能需要 spring-web 模块用于编解码器实现。

You’ll need to set the Encoder and Decoder instances required for metadata and data formats to support. You’ll likely need the spring-web module for codec implementations.

默认情况下,SimpleRouteMatcher 用于通过 AntPathMatcher 匹配路由。我们建议将 spring-web 中的 PathPatternRouteMatcher 插入用于高效路由匹配。RSocket 路由可以是分层的,但不是 URL 路径。默认情况下,两个路由匹配器都使用“.”作为分隔符,没有像 HTTP URL 中那样的 URL 解码。

By default SimpleRouteMatcher is used for matching routes via AntPathMatcher. We recommend plugging in the PathPatternRouteMatcher from spring-web for efficient route matching. RSocket routes can be hierarchical but are not URL paths. Both route matchers are configured to use "." as separator by default and there is no URL decoding as with HTTP URLs.

RSocketMessageHandler 可通过 RSocketStrategies 进行配置,当您需要在同一进程中的客户端和服务器之间共享配置时,这可能很有用:

RSocketMessageHandler can be configured via RSocketStrategies which may be useful if you need to share configuration between a client and a server in the same process:

  • Java

  • Kotlin

@Configuration
static class ServerConfig {

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.setRSocketStrategies(rsocketStrategies());
		return handler;
	}

	@Bean
	public RSocketStrategies rsocketStrategies() {
		return RSocketStrategies.builder()
			.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
			.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
			.routeMatcher(new PathPatternRouteMatcher())
			.build();
	}
}
@Configuration
class ServerConfig {

	@Bean
	fun rsocketMessageHandler() = RSocketMessageHandler().apply {
		rSocketStrategies = rsocketStrategies()
	}

	@Bean
	fun rsocketStrategies() = RSocketStrategies.builder()
			.encoders { it.add(Jackson2CborEncoder()) }
			.decoders { it.add(Jackson2CborDecoder()) }
			.routeMatcher(PathPatternRouteMatcher())
			.build()
}

Client Responders

客户机端的带注释响应器需要在`RSocketRequester.Builder`中配置。有关详细信息,请参见Client Responders

Annotated responders on the client side need to be configured in the RSocketRequester.Builder. For details, see Client Responders.

@MessageMapping

一旦serverclient响应器配置就绪,就可以使用`@MessageMapping`方法,如下所示:

Once server or client responder configuration is in place, @MessageMapping methods can be used as follows:

  • Java

  • Kotlin

@Controller
public class RadarsController {

	@MessageMapping("locate.radars.within")
	public Flux<AirportLocation> radars(MapRequest request) {
		// ...
	}
}
@Controller
class RadarsController {

	@MessageMapping("locate.radars.within")
	fun radars(request: MapRequest): Flow<AirportLocation> {
		// ...
	}
}

上述 @MessageMapping 方法响应具有路由“locate.radars.within”的请求流交互。它支持一个灵活的方法签名,其中提供使用以下方法参数的选项:

The above @MessageMapping method responds to a Request-Stream interaction having the route "locate.radars.within". It supports a flexible method signature with the option to use the following method arguments:

Method Argument Description

@Payload

The payload of the request. This can be a concrete value of asynchronous types like Mono or Flux.

Note: Use of the annotation is optional. A method argument that is not a simple type and is not any of the other supported arguments, is assumed to be the expected payload.

RSocketRequester

Requester for making requests to the remote end.

@DestinationVariable

Value extracted from the route based on variables in the mapping pattern, e.g. @MessageMapping("find.radar.{id}").

@Header

Metadata value registered for extraction as described in MetadataExtractor.

@Headers Map<String, Object>

All metadata values registered for extraction as described in MetadataExtractor.

返回值预期是多个对象,其作为响应有效负载进行序列化。这可以是异步类型(如 MonoFlux)、一个具体值、void 或无值异步类型(如 Mono<Void>)。

The return value is expected to be one or more Objects to be serialized as response payloads. That can be asynchronous types like Mono or Flux, a concrete value, or either void or a no-value asynchronous type such as Mono<Void>.

@MessageMapping 方法支持的 RSocket 交互类型是由输入(即 @Payload 参数)和输出的基数确定的,其中基数表示以下内容:

The RSocket interaction type that an @MessageMapping method supports is determined from the cardinality of the input (i.e. @Payload argument) and of the output, where cardinality means the following:

Cardinality Description

1

Either an explicit value, or a single-value asynchronous type such as Mono<T>.

Many

A multi-value asynchronous type such as Flux<T>.

0

For input this means the method does not have an @Payload argument.

For output this is void or a no-value asynchronous type such as Mono<Void>.

下表显示了所有输入和输出基数组合以及相应的交互类型:

The table below shows all input and output cardinality combinations and the corresponding interaction type(s):

Input Cardinality Output Cardinality Interaction Types

0, 1

0

Fire-and-Forget, Request-Response

0, 1

1

Request-Response

0, 1

Many

Request-Stream

Many

0, 1, Many

Request-Channel

@RSocketExchange

作为 @MessageMapping 的替代方法,您还可以使用 @RSocketExchange 方法处理请求。此类方法在 [RSocket Interface,rsocket-interface] 中声明,可以用作请求者通过 RSocketServiceProxyFactory 或由响应器实现。

As an alternative to @MessageMapping, you can also handle requests with @RSocketExchange methods. Such methods are declared on an rsocket-interface and can be used as a requester via RSocketServiceProxyFactory or implemented by a responder.

例如,将请求作为响应器处理:

For example, to handle requests as a responder:

  • Java

  • Kotlin

public interface RadarsService {

	@RSocketExchange("locate.radars.within")
	Flux<AirportLocation> radars(MapRequest request);
}

@Controller
public class RadarsController implements RadarsService {

	public Flux<AirportLocation> radars(MapRequest request) {
		// ...
	}
}
interface RadarsService {

	@RSocketExchange("locate.radars.within")
	fun radars(request: MapRequest): Flow<AirportLocation>
}

@Controller
class RadarsController : RadarsService {

	override fun radars(request: MapRequest): Flow<AirportLocation> {
		// ...
	}
}

@RSocketExhange@MessageMapping 之间有一些区别,因为前者需要适用于请求者和响应者的使用。例如,虽然 @MessageMapping 可以声明为处理任意数量的路由,且每个路由可以是一个模式,但 @RSocketExchange 必须使用一个具体的路由声明。还有一些与元数据相关的受支持方法参数的差异,有关受支持参数的列表,请参见 <<@MessageMapping,rsocket-annot-messagemapping>> 和 [RSocket Interface,rsocket-interface]

There some differences between @RSocketExhange and @MessageMapping since the former needs to remain suitable for requester and responder use. For example, while @MessageMapping can be declared to handle any number of routes and each route can be a pattern, @RSocketExchange must be declared with a single, concrete route. There are also small differences in the supported method parameters related to metadata, see <<@MessageMapping,rsocket-annot-messagemapping>> and rsocket-interface for a list of supported parameters.

@RSocketExchange 可以在类型级别使用,为给定的 RSocket 服务接口指定所有路由的公共前缀。

@RSocketExchange can be used at the type level to specify a common prefix for all routes for a given RSocket service interface.

@ConnectMapping

@ConnectMapping 在 RSocket 连接开始时处理 SETUP 帧,以及后续的元数据推送通知通过 METADATA_PUSH 帧,即 io.rsocket.RSocket 中的 metadataPush(Payload)

@ConnectMapping handles the SETUP frame at the start of an RSocket connection, and any subsequent metadata push notifications through the METADATA_PUSH frame, i.e. metadataPush(Payload) in io.rsocket.RSocket.

@ConnectMapping`方法支持与@MessageMapping相同的自变量,但基于来自`SETUP`和`METADATA_PUSH`帧的元数据和数据。@ConnectMapping`可以有一个模式,可以将处理范围缩小到具有元数据中路由的特定连接,或者如果没有声明模式,则所有连接都匹配。

@ConnectMapping methods support the same arguments as @MessageMapping but based on metadata and data from the SETUP and METADATA_PUSH frames. @ConnectMapping can have a pattern to narrow handling to specific connections that have a route in the metadata, or if no patterns are declared then all connections match.

`@ConnectMapping`方法不能返回数据,并且必须声明为以`void`或`Mono<Void>`作为返回值。如果处理为新连接返回错误,则拒绝该连接。不得保留处理以向`RSocketRequester`发出连接请求。有关详细信息,请参见Server Requester

@ConnectMapping methods cannot return data and must be declared with void or Mono<Void> as the return value. If handling returns an error for a new connection then the connection is rejected. Handling must not be held up to make requests to the RSocketRequester for the connection. See Server Requester for details.

MetadataExtractor

响应者必须解释元数据。https://github.com/rsocket/rsocket/tree/master/Extensions/CompositeMetadata.md[复合元数据]允许独立格式化的元数据值(例如,用于路由、安全性、跟踪),每个值都有自己的 mime 类型。应用程序需要一种方法来配置要支持的元数据 mime 类型,以及一种方法来访问提取的值。

Responders must interpret metadata. Composite metadata allows independently formatted metadata values (e.g. for routing, security, tracing) each with its own mime type. Applications need a way to configure metadata mime types to support, and a way to access extracted values.

MetadataExtractor 是一个用于获取已序列化的元数据并返回解码后的键值对的约定,然后可以通过名称(例如通过带注释的处理程序方法中的 @Header)访问这些键值对。

MetadataExtractor is a contract to take serialized metadata and return decoded name-value pairs that can then be accessed like headers by name, for example via @Header in annotated handler methods.

`DefaultMetadataExtractor`可以接收 `Decoder`实例来解码元数据。开箱即用,它内置了对https://github.com/rsocket/rsocket/tree/master/Extensions/Routing.md["message/x.rsocket.routing.v0"]的支持,对它解码为`String`并将它保存在“route”键下。对于任何其他 mime 类型,您需要提供一个 `Decoder`并注册 mime 类型,如下所示:

DefaultMetadataExtractor can be given Decoder instances to decode metadata. Out of the box it has built-in support for "message/x.rsocket.routing.v0" which it decodes to String and saves under the "route" key. For any other mime type you’ll need to provide a Decoder and register the mime type as follows:

  • Java

  • Kotlin

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")

组合元数据非常适合于组合独立元数据值。不过,请求程序可能不支持组合元数据,或者可能选择不使用它。为此,DefaultMetadataExtractor 可能需要自定义逻辑来将解码值映射到输出映射。下面是一个将 JSON 用于元数据的示例:

Composite metadata works well to combine independent metadata values. However the requester might not support composite metadata, or may choose not to use it. For this, DefaultMetadataExtractor may needs custom logic to map the decoded value to the output map. Here is an example where JSON is used for metadata:

  • Java

  • Kotlin

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
	MimeType.valueOf("application/vnd.myapp.metadata+json"),
	new ParameterizedTypeReference<Map<String,String>>() {},
	(jsonMap, outputMap) -> {
		outputMap.putAll(jsonMap);
	});
import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
	outputMap.putAll(jsonMap)
}

当通过 RSocketStrategies 配置 MetadataExtractor 时,您可以让 RSocketStrategies.Builder 使用配置的解码器创建提取器,并仅仅使用回调以如下方式自定义注册:

When configuring MetadataExtractor through RSocketStrategies, you can let RSocketStrategies.Builder create the extractor with the configured decoders, and simply use a callback to customize registrations as follows:

  • Java

  • Kotlin

RSocketStrategies strategies = RSocketStrategies.builder()
	.metadataExtractorRegistry(registry -> {
		registry.metadataToExtract(fooMimeType, Foo.class, "foo");
		// ...
	})
	.build();
import org.springframework.messaging.rsocket.metadataToExtract

val strategies = RSocketStrategies.builder()
		.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
			registry.metadataToExtract<Foo>(fooMimeType, "foo")
			// ...
		}
		.build()

RSocket Interface

Spring Framework 允许你使用`@RSocketExchange`方法将 RSocket 服务定义为 Java 接口。你可以将此类接口传递给`RSocketServiceProxyFactory`,以创建通过RSocketRequester执行请求的代理。你还可以实现接口作为处理请求的响应器。

The Spring Framework lets you define an RSocket service as a Java interface with @RSocketExchange methods. You can pass such an interface to RSocketServiceProxyFactory to create a proxy which performs requests through an RSocketRequester. You can also implement the interface as a responder that handles requests.

首先使用 @RSocketExchange 方法创建接口:

Start by creating the interface with @RSocketExchange methods:

interface RadarService {

	@RSocketExchange("radars")
	Flux<AirportLocation> getRadars(@Payload MapRequest request);

	// more RSocket exchange methods...

}

现在您可以创建代理,当调用方法时,该代理将执行请求:

Now you can create a proxy that performs requests when methods are called:

RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();

RadarService service = factory.createClient(RadarService.class);

你还可以实现接口以处理请求作为响应器。请参阅Annotated Responders

You can also implement the interface to handle requests as a responder. See Annotated Responders.

Method Parameters

经过注释的 RSocket 交换方法支持具有以下方法参数的灵活方法签名:

Annotated, RSocket exchange methods support flexible method signatures with the following method parameters:

Method argument Description

@DestinationVariable

Add a route variable to pass to RSocketRequester along with the route from the @RSocketExchange annotation in order to expand template placeholders in the route. This variable can be a String or any Object, which is then formatted via toString().

@Payload

Set the input payload(s) for the request. This can be a concrete value, or any producer of values that can be adapted to a Reactive Streams Publisher via ReactiveAdapterRegistry

Object, if followed by MimeType

The value for a metadata entry in the input payload. This can be any Object as long as the next argument is the metadata entry MimeType. The value can be a concrete value or any producer of a single value that can be adapted to a Reactive Streams Publisher via ReactiveAdapterRegistry.

MimeType

The MimeType for a metadata entry. The preceding method argument is expected to be the metadata value.

Return Values

带注释的 RSocket 交换方法支持返回具体值或能够通过 ReactiveAdapterRegistry 调整到 Reactive Streams Publisher 的任何值生成器。

Annotated, RSocket exchange methods support return values that are concrete value(s), or any producer of value(s) that can be adapted to a Reactive Streams Publisher via ReactiveAdapterRegistry.

默认情况下,具有同步(阻塞)方法签名的 RSocket 服务方法的行为取决于 RSocket ClientTransport 的响应超时设置以及 RSocket 保活设置。RSocketServiceProxyFactory.Builder 公开了一个 blockTimeout 选项,还允许您配置阻塞响应的最大时间,但是我们建议在 RSocket 层配置超时值,以获得更好的控制。

By default, the behavior of RSocket service methods with synchronous (blocking) method signature depends on response timeout settings of the underlying RSocket ClientTransport as well as RSocket keep-alive settings. RSocketServiceProxyFactory.Builder does expose a blockTimeout option that also lets you configure the maximum time to block for a response, but we recommend configuring timeout values at the RSocket level for more control.