RSocket Support
RSocket Spring Integration 模块 (spring-integration-rsocket
) 允许执行 RSocket application protocol。
The RSocket Spring Integration module (spring-integration-rsocket
) allows for executions of RSocket application protocol.
你需要将此依赖项包含在你的项目中:
You need to include this dependency into your project:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
<version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-rsocket:{project-version}"
此模块从 5.2 版本开始可用,基于 Spring Messaging 基础,并附有其 RSocket 组件实现,如 RSocketRequester
、RSocketMessageHandler
和 RSocketStrategies
。请参阅 Spring Framework RSocket Support 以获取有关 RSocket 协议、术语和组件的更多信息。
This module is available starting with version 5.2 and is based on the Spring Messaging foundation with its RSocket component implementations, such as RSocketRequester
, RSocketMessageHandler
and RSocketStrategies
.
See Spring Framework RSocket Support for more information about the RSocket protocol, terminology and components.
在通过通道适配器启动集成流处理之前,我们需要在服务器和客户端之间建立 RSocket 连接。为此,Spring Integration RSocket 支持提供了 ServerRSocketConnector
和 ClientRSocketConnector
,是 AbstractRSocketConnector
的实现。
Before starting an integration flow processing via channel adapters, we need to establish an RSocket connection between server and client.
For this purpose, Spring Integration RSocket support provides the ServerRSocketConnector
and ClientRSocketConnector
implementations of the AbstractRSocketConnector
.
根据提供的 io.rsocket.transport.ServerTransport
,ServerRSocketConnector
在主机和端口上公开一个侦听器,以接受来自客户端的连接。内部 RSocketServer
实例可以通过 setServerConfigurer()
进行自定义,以及可配置的其他选项,例如有效负载数据和标头元数据的 RSocketStrategies
和 MimeType
。当客户端请求者提供了 setupRoute
(请参见下面的 ClientRSocketConnector
)时,连接的客户端通过 clientRSocketKeyStrategy
BiFunction<Map<String, Object>, DataBuffer, Object>
确定的键存储为 RSocketRequester
。默认情况下,连接数据用作以 UTF-8 字符集转换为字符串的键。此类 RSocketRequester
注册表可用于应用程序逻辑中来确定特定客户端连接以与其进行交互,或将同一消息发布到所有连接的客户端。当从客户端建立连接时,将从 ServerRSocketConnector
发出 RSocketConnectedEvent
。这类似于 Spring Messaging 模块中 @ConnectMapping
注释提供的内容。映射模式 *
表示接受所有客户端路由。RSocketConnectedEvent
可通过 DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER
头区分不同的路由。
The ServerRSocketConnector
exposes a listener on the host and port according to provided io.rsocket.transport.ServerTransport
for accepting connections from clients.
An internal RSocketServer
instance can be customized with the setServerConfigurer()
, as well as other options that can be configured, e.g. RSocketStrategies
and MimeType
for payload data and headers metadata.
When a setupRoute
is provided from the client requester (see ClientRSocketConnector
below), a connected client is stored as a RSocketRequester
under the key determined by the clientRSocketKeyStrategy
BiFunction<Map<String, Object>, DataBuffer, Object>
.
By default, a connection data is used for the key as a converted value to string with UTF-8 charset.
Such an RSocketRequester
registry can be used in the application logic to determine a particular client connection for interaction with it, or for publishing the same message to all connected clients.
When a connection is established from the client, an RSocketConnectedEvent
is emitted from the ServerRSocketConnector
.
This is similar to what is provided by the @ConnectMapping
annotation in Spring Messaging module.
The mapping pattern *
means accept all the client routes.
The RSocketConnectedEvent
can be used to distinguish different routes via DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER
header.
典型的服务器配置可能如下所示:
A typical server configuration might look like this:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ServerRSocketConnector serverRSocketConnector() {
ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
+ headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
return serverRSocketConnector;
}
@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
...
}
包括 RSocketStrategies
Bean 和 RSocketConnectedEvent
的 @EventListener
在内的所有选项都是可选的。有关更多信息,请参阅 ServerRSocketConnector
JavaDocs。
All the options, including RSocketStrategies
bean and @EventListener
for RSocketConnectedEvent
, are optional.
See ServerRSocketConnector
JavaDocs for more information.
从版本 5.2.1 开始,ServerRSocketMessageHandler
被提取到一个公共的顶级类中,以便与现有 RSocket 服务器连接。当 ServerRSocketConnector
随 ServerRSocketMessageHandler
的外部实例一起提供时,它不会在内部创建 RSocket 服务器,而仅将所有处理逻辑委派给提供的实例。此外,ServerRSocketMessageHandler
可以通过 messageMappingCompatible
标志进行配置,以处理 RSocket 控制器的 @MessageMapping
,从而完全取代标准 RSocketMessageHandler
提供的功能。当经典 @MessageMapping
方法与 RSocket 通道适配器同时存在于同一应用程序中,且应用程序中存在外部配置的 RSocket 服务器时,这在混合配置中很有用。
Starting with version 5.2.1, the ServerRSocketMessageHandler
is extracted to a public, top-level class for possible connection with an existing RSocket server.
When a ServerRSocketConnector
is supplied with an external instance of ServerRSocketMessageHandler
, it doesn’t create an RSocket server internally and just delegates all the handling logic to the provided instance.
In addition, the ServerRSocketMessageHandler
can be configured with a messageMappingCompatible
flag to handle also @MessageMapping
for an RSocket controller, fully replacing the functionality provided by the standard RSocketMessageHandler
.
This can be useful in mixed configurations, when classic @MessageMapping
methods are present in the same application along with RSocket channel adapters and an externally configured RSocket server is present in the application.
ClientRSocketConnector
根据通过提供的 ClientTransport
连接的 RSocket
,用作 RSocketRequester
的容器。RSocketConnector
可以通过提供的 RSocketConnectorConfigurer
进行自定义。还可以在此组件上配置带有可选模板变量的 setupRoute
以及带元数据的 setupData
。
The ClientRSocketConnector
serves as a holder for RSocketRequester
based on the RSocket
connected via the provided ClientTransport
.
The RSocketConnector
can be customized with the provided RSocketConnectorConfigurer
.
The setupRoute
(with optional templates variables) and setupData
with metadata can be also configured on this component.
典型的客户端配置可能如下所示:
A typical client configuration might look like this:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector =
new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
clientRSocketConnector.setSetupRoute("clientConnect/{user}");
clientRSocketConnector.setSetupRouteVariables("myUser");
return clientRSocketConnector;
}
这些选项中的大多数(包括 RSocketStrategies
Bean)都是可选的。请注意,我们如何连接到任意端口上的本地启动的 RSocket 服务器。请参见 ServerRSocketConnector.clientRSocketKeyStrategy
获取 setupData
用例。另请参阅 ClientRSocketConnector
及其 AbstractRSocketConnector
超类 JavaDocs,了解更多信息。
Most of these options (including RSocketStrategies
bean) are optional.
Note how we connect to the locally started RSocket server on the arbitrary port.
See ServerRSocketConnector.clientRSocketKeyStrategy
for setupData
use cases.
Also see ClientRSocketConnector
and its AbstractRSocketConnector
superclass JavaDocs for more information.
ClientRSocketConnector
和 ServerRSocketConnector
负责将入站通道适配器映射到其 path
配置,以便路由传入的 RSocket 请求。参见下一部分了解更多信息。
Both ClientRSocketConnector
and ServerRSocketConnector
are responsible for mapping inbound channel adapters to their path
configuration for routing incoming RSocket requests.
See the next section for more information.
RSocket Inbound Gateway
RSocketInboundGateway
负责接收 RSocket 请求和生成响应(如果有)。它需要一个 path
映射数组,该数组可以是类似于 MVC 请求映射或 @MessageMapping
语义的模式。此外(自版本 5.2.2 起),可以在 RSocketInboundGateway
上配置一组交互模型(参见 RSocketInteractionModel
),以便通过特定帧类型将 RSocket 请求限制到这个端点。默认情况下,支持所有交互模型。这样的 Bean 根据其 IntegrationRSocketEndpoint
实现(ReactiveMessageHandler
的扩展),由 ServerRSocketConnector
或 ClientRSocketConnector
自动检测,用于对传入请求执行内部 IntegrationRSocketMessageHandler
中的路由逻辑。可以为 RSocketInboundGateway
提供一个 AbstractRSocketConnector
,以便进行显式端点注册。这样,将在该 AbstractRSocketConnector
上禁用自动检测选项。还可以将 RSocketStrategies
注入到 RSocketInboundGateway
,或者从提供的 AbstractRSocketConnector
中获取它们,从而覆盖任何显式注入。根据提供的 requestElementType
,使用这些 RSocketStrategies
中的解码器解码请求有效负载。如果传入的 Message
中未提供 RSocketPayloadReturnValueHandler.RESPONSE_HEADER
头,RSocketInboundGateway
会将请求作为 fireAndForget
RSocket 交互模型处理。在这种情况下,RSocketInboundGateway
会对 outputChannel
执行纯 send
操作。否则,将使用 RSocketPayloadReturnValueHandler.RESPONSE_HEADER
头中的 MonoProcessor
值向 RSocket 发送答复。为此,RSocketInboundGateway
对 outputChannel
执行 sendAndReceiveMessageReactive
操作。要向下游发送的消息的 payload
始终是 MessagingRSocket
逻辑中的 Flux
。当处于 fireAndForget
RSocket 交互模型中时,消息将进行纯转换的 payload
。答复 payload
可以是普通对象或 Publisher
- RSocketInboundGateway
会根据 RSocketStrategies
中提供的编码器将它们正确地转换为 RSocket 响应。
The RSocketInboundGateway
is responsible for receiving RSocket requests and producing responses (if any).
It requires an array of path
mapping which could be as patterns similar to MVC request mapping or @MessageMapping
semantics.
In addition, (since version 5.2.2), a set of interaction models (see RSocketInteractionModel
) can be configured on the RSocketInboundGateway
to restrict RSocket requests to this endpoint by the particular frame type.
By default, all the interaction models are supported.
Such a bean, according its IntegrationRSocketEndpoint
implementation (extension of a ReactiveMessageHandler
), is auto detected either by the ServerRSocketConnector
or ClientRSocketConnector
for a routing logic in the internal IntegrationRSocketMessageHandler
for incoming requests.
An AbstractRSocketConnector
can be provided to the RSocketInboundGateway
for explicit endpoint registration.
This way, the auto-detection option is disabled on that AbstractRSocketConnector
.
The RSocketStrategies
can also be injected into the RSocketInboundGateway
or they are obtained from the provided AbstractRSocketConnector
overriding any explicit injection.
Decoders are used from those RSocketStrategies
to decode a request payload according to the provided requestElementType
.
If an RSocketPayloadReturnValueHandler.RESPONSE_HEADER
header is not provided in incoming the Message
, the RSocketInboundGateway
treats a request as a fireAndForget
RSocket interaction model.
In this case, an RSocketInboundGateway
performs a plain send
operation into the outputChannel
.
Otherwise, a MonoProcessor
value from the RSocketPayloadReturnValueHandler.RESPONSE_HEADER
header is used for sending a reply to the RSocket.
For this purpose, an RSocketInboundGateway
performs a sendAndReceiveMessageReactive
operation on the outputChannel
.
The payload
of the message to send downstream is always a Flux
according to MessagingRSocket
logic.
When in a fireAndForget
RSocket interaction model, the message has a plain converted payload
.
The reply payload
could be a plain object or a Publisher
- the RSocketInboundGateway
converts both of them properly into an RSocket response according to the encoders provided in the RSocketStrategies
.
从版本 5.3 开始,将 decodeFluxAsUnit
选项(默认 false
)添加到 RSocketInboundGateway
。默认情况下,传入 Flux
会以每件事件单独解码的方式进行转换。这是目前 @MessageMapping
语义中存在的确切行为。要还原先前的行为或根据应用程序要求将整个 Flux
解码为单个单元,则必须将 decodeFluxAsUnit
设置为 true
。但是,目标解码逻辑取决于所选的 Decoder
,例如 StringDecoder
要求流中存在换行符(默认情况下),以指示一个字节缓冲区的结束。
Starting with version 5.3, a decodeFluxAsUnit
option (default false
) is added to the RSocketInboundGateway
.
By default, incoming Flux
is transformed the way that each its event is decoded separately.
This is an exact behavior present currently with @MessageMapping
semantics.
To restore a previous behavior or decode the whole Flux
as single unit according application requirements, the decodeFluxAsUnit
has to be set to true
.
However, the target decoding logic depends on the Decoder
selected, e.g. a StringDecoder
requires a new line separator (by default) to be present in the stream to indicate a byte buffer end.
请参阅 Configuring RSocket Endpoints with Java 以了解如何配置 RSocketInboundGateway
端点以及如何处理下游有效负载的示例。
See Configuring RSocket Endpoints with Java for samples how to configure an RSocketInboundGateway
endpoint and deal with payloads downstream.
RSocket Outbound Gateway
RSocketOutboundGateway
是一个 AbstractReplyProducingMessageHandler
,用于向 RSocket 执行请求并根据 RSocket 答复(如果有)生成答复。低级 RSocket 协议交互委托给从提供的 ClientRSocketConnector
解析的 RSocketRequester
,或者委托给服务器端请求消息中的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
头。可以通过 RSocketConnectedEvent
或使用 ServerRSocketConnector.getClientRSocketRequester()
API 解析服务器端的目标 RSocketRequester
,方法是根据通过 ServerRSocketConnector.setClientRSocketKeyStrategy()
选择的用于连接请求映射的业务密钥。参见 ServerRSocketConnector
JavaDocs 了解更多信息。
The RSocketOutboundGateway
is an AbstractReplyProducingMessageHandler
to perform requests into RSocket and produce replies based on the RSocket replies (if any).
A low level RSocket protocol interaction is delegated into an RSocketRequester
resolved from the provided ClientRSocketConnector
or from the RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
header in the request message on the server side.
A target RSocketRequester
on the server side can be resolved from an RSocketConnectedEvent
or using ServerRSocketConnector.getClientRSocketRequester()
API according some business key selected for connect request mappings via ServerRSocketConnector.setClientRSocketKeyStrategy()
.
See ServerRSocketConnector
JavaDocs for more information.
必须显式地配置要发送请求的 route
(以及路径变量),或者通过一个针对请求消息求值的 SpEL 表达式。
The route
to send request has to be configured explicitly (together with path variables) or via a SpEL expression which is evaluated against request message.
可以通过 RSocketInteractionModel
选项或相应的表达式设置来提供 RSocket 交互模型。默认情况下,requestResponse
用于普通网关用例。
The RSocket interaction model can be provided via RSocketInteractionModel
option or respective expression setting.
By default, a requestResponse
is used for common gateway use-cases.
当请求消息有效负载是一个 Publisher
时,可以提供一个 publisherElementType
选项,以根据目标 RSocketRequester
中提供的 RSocketStrategies
对其元素进行编码。此选项的表达式可以求值为 ParameterizedTypeReference
。参见 RSocketRequester.RequestSpec.data()
JavaDocs,了解更多有关数据及其类型的信息。
When request message payload is a Publisher
, a publisherElementType
option can be provided to encode its elements according an RSocketStrategies
supplied in the target RSocketRequester
.
An expression for this option can evaluate to a ParameterizedTypeReference
.
See the RSocketRequester.RequestSpec.data()
JavaDocs for more information about data and its type.
还可以在 RSocket 请求中添加 metadata
。为此,可以在 RSocketOutboundGateway
上针对请求消息配置一个 metadataExpression
。此类表达式必须求值为 Map<Object, MimeType>
。
An RSocket request can also be enhanced with a metadata
.
For this purpose a metadataExpression
against request message can be configured on the RSocketOutboundGateway
.
Such an expression must evaluate to a Map<Object, MimeType>
.
当 interactionModel
不是 fireAndForget
时,必须提供 expectedResponseType
。默认情况下,它是一个 String.class
。此选项的表达式可以求值为 ParameterizedTypeReference
。参见 RSocketRequester.RetrieveSpec.retrieveMono()
和 RSocketRequester.RetrieveSpec.retrieveFlux()
JavaDocs,了解更多有关答复数据及其类型的信息。
When interactionModel
is not fireAndForget
, an expectedResponseType
must be supplied.
It is a String.class
by default.
An expression for this option can evaluate to a ParameterizedTypeReference
.
See the RSocketRequester.RetrieveSpec.retrieveMono()
and RSocketRequester.RetrieveSpec.retrieveFlux()
JavaDocs for more information about reply data and its type.
RSocketOutboundGateway
的答复 payload
始终是一个 Mono
(即使对于 fireAndForget
交互模型它也是 Mono<Void>
),这使得此组件始终为 async
。此 Mono
会在普通通道中订阅进 outputChannel
,或按需由 FluxMessageChannel
处理。requestStream
或 requestChannel
交互模型的 Flux
响应也会被包装到答复 Mono
中。它可以通过具有直通服务激活器的 FluxMessageChannel
展平下游:
A reply payload
from the RSocketOutboundGateway
is a Mono
(even for a fireAndForget
interaction model it is Mono<Void>
) always making this component as async
.
Such a Mono
is subscribed before producing into the outputChannel
for regular channels or processed on demand by the FluxMessageChannel
.
A Flux
response for the requestStream
or requestChannel
interaction model is also wrapped into a reply Mono
.
It can be flattened downstream by the FluxMessageChannel
with a passthrough service activator:
@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
return payload;
}
或者在目标应用程序逻辑中显式订阅。
Or subscribed explicitly in the target application logic.
还可以配置(或通过表达式求值)预期响应类型为 void
,将此网关视为出站通道适配器。但是,仍然必须配置 outputChannel
(即使它只是一个 NullChannel
),才能启动对返回的 Mono
的订阅。
The expected response type can also be configured (or evaluated via expression) to void
treating this gateway as an outbound channel adapter.
However, the outputChannel
still has to be configured (even if it just a NullChannel
) to initiate a subscription to the returned Mono
.
请参阅 Configuring RSocket Endpoints with Java 以了解如何配置 RSocketOutboundGateway
端点以及如何处理下游有效负载的示例。
See Configuring RSocket Endpoints with Java for samples how to configure an RSocketOutboundGateway
endpoint a deal with payloads downstream.
RSocket Namespace Support
Spring Integration 提供了一个 rsocket
命名空间和相应的模式定义。要将其包含在配置中,请在应用程序上下文配置文件中添加以下命名空间声明:
Spring Integration provides an rsocket
namespace and the corresponding schema definition.
To include it in your configuration, add the following namespace declaration in your application context configuration file:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/rsocket
https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
...
</beans>
Inbound
要通过 XML 配置 Spring Integration RSocket 入站通道适配器,你需要使用 int-rsocket
命名空间中的一个合适的 inbound-gateway
组件。以下示例演示了如何配置它:
To configure Spring Integration RSocket inbound channel adapters with XML, you need to use an appropriate inbound-gateway
components from the int-rsocket
namespace.
The following example shows how to configure it:
<int-rsocket:inbound-gateway id="inboundGateway"
path="testPath"
interaction-models="requestStream,requestChannel"
rsocket-connector="clientRSocketConnector"
request-channel="requestChannel"
rsocket-strategies="rsocketStrategies"
request-element-type="byte[]"/>
ClientRSocketConnector
和 ServerRSocketConnector
应配置为通用的 <bean>
定义。
A ClientRSocketConnector
and ServerRSocketConnector
should be configured as generic <bean>
definitions.
Outbound
<int-rsocket:outbound-gateway id="outboundGateway"
client-rsocket-connector="clientRSocketConnector"
auto-startup="false"
interaction-model="fireAndForget"
route-expression="'testRoute'"
request-channel="requestChannel"
publisher-element-type="byte[]"
expected-response-type="java.util.Date"
metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>
有关所有这些 XML 属性的描述,请参见 spring-integration-rsocket.xsd
。
See spring-integration-rsocket.xsd
for description for all those XML attributes.
Configuring RSocket Endpoints with Java
以下示例展示了如何使用 Java 配置 RSocket 入站端点:
The following example shows how to configure an RSocket inbound endpoint with Java:
@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
return rsocketInboundGateway;
}
@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
return payload.next().map(String::toUpperCase);
}
此配置中假设了一个 ClientRSocketConnector
或 ServerRSocketConnector
,它用于自动检测 “echo” 路径上的此类端点。注意 @Transformer
签名及其对 RSocket 请求的完全响应式处理和产生响应式答复。
A ClientRSocketConnector
or ServerRSocketConnector
is assumed in this configuration with meaning for auto-detection of such an endpoint on the “echo” path.
Pay attention to the @Transformer
signature with its fully reactive processing of the RSocket requests and producing reactive replies.
以下示例展示了如何使用 Java DSL 配置 RSocket 入站网关:
The following example shows how to configure a RSocket inbound gateway with the Java DSL:
@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
return IntegrationFlow
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel))
.<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
.get();
}
此配置中假设了一个 ClientRSocketConnector
或 ServerRSocketConnector
,它用于自动检测 “/uppercase” 路径上的此类端点和作为 “request channel” 的预期交互模型。
A ClientRSocketConnector
or ServerRSocketConnector
is assumed in this configuration with meaning for auto-detection of such an endpoint on the “/uppercase” path and expected interaction model as “request channel”.
以下示例展示了如何使用 Java 配置 RSocket 出站网关:
The following example shows how to configure a RSocket outbound gateway with Java:
@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
RSocketOutboundGateway rsocketOutboundGateway =
new RSocketOutboundGateway(
new FunctionExpression<Message<?>>((m) ->
m.getHeaders().get("route_header")));
rsocketOutboundGateway.setInteractionModelExpression(
new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
return rsocketOutboundGateway;
}
setClientRSocketConnector()
仅在客户端需要。在服务器端,必须在请求消息中提供带有 RSocketRequester
值的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
标头。
The setClientRSocketConnector()
is required only for the client side.
On the server side, the RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
header with an RSocketRequester
value must be supplied in the request message.
以下示例展示了如何使用 Java DSL 配置 RSocket 出站网关:
The following example shows how to configure a RSocket outbound gateway with the Java DSL:
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlow
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel(RSocketInteractionModel.requestResponse)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
}
有关如何在上述流程开始时使用提及的 Function
接口的更多信息,请参阅 IntegrationFlow
as a Gateway。
See IntegrationFlow
as a Gateway for more information how to use a mentioned Function
interface in the beginning of the flow above.