WebFlux Support
WebFlux Spring Integration 模块(spring-integration-webflux
)允许以响应方式执行 HTTP 请求和处理入站 HTTP 请求。
The WebFlux Spring Integration module (spring-integration-webflux
) allows for the execution of HTTP requests and the processing of inbound HTTP requests in a reactive manner.
你需要将此依赖项包含在你的项目中:
You need to include this dependency into your project:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-webflux</artifactId>
<version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-webflux:{project-version}"
在基于非 Servlet 的服务器配置的情况下,必须包含 io.projectreactor.netty:reactor-netty
依赖项。
The io.projectreactor.netty:reactor-netty
dependency must be included in case of non-Servlet-based server configuration.
WebFlux 支持包含以下网关实现:WebFluxInboundEndpoint
和 WebFluxRequestExecutingMessageHandler
。此支持完全基于 Spring WebFlux 和 Project Reactor 基础。有关详细信息,请参见 HTTP Support,因为许多选项在响应式和常规 HTTP 组件之间共享。
The WebFlux support consists of the following gateway implementations: WebFluxInboundEndpoint
and WebFluxRequestExecutingMessageHandler
.
The support is fully based on the Spring WebFlux and Project Reactor foundations.
See HTTP Support for more information, since many options are shared between reactive and regular HTTP components.
WebFlux Namespace Support
Spring Integration 提供 webflux
命名空间和相应的模式定义。若要将其包含在配置中,请在应用上下文配置文件中添加以下命名空间声明:
Spring Integration provides a webflux
namespace and the corresponding schema definition.
To include it in your configuration, add the following namespace declaration in your application context configuration file:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/webflux
https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
...
</beans>
WebFlux Inbound Components
从版本 5.0 开始,提供了 WebHandler
的 WebFluxInboundEndpoint
实现。此组件类似于基于 MVC 的 HttpRequestHandlingEndpointSupport
,后者通过新提取的 BaseHttpInboundEndpoint
与它共享一些常见选项。它在 Spring WebFlux 响应环境中使用(而不是 MVC)。以下示例展示了 WebFlux 端点的简单实现:
Starting with version 5.0, the WebFluxInboundEndpoint
implementation of WebHandler
is provided.
This component is similar to the MVC-based HttpRequestHandlingEndpointSupport
, with which it shares some common options through the newly extracted BaseHttpInboundEndpoint
.
It is used in the Spring WebFlux reactive environment (instead of MVC).
The following example shows a simple implementation of a WebFlux endpoint:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
return IntegrationFlow
.from(WebFlux.inboundChannelAdapter("/reactivePost")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
.statusCodeFunction(m -> HttpStatus.ACCEPTED))
.channel(c -> c.queue("storeChannel"))
.get();
}
@Bean
fun inboundChannelAdapterFlow() =
integrationFlow(
WebFlux.inboundChannelAdapter("/reactivePost")
.apply {
requestMapping { m -> m.methods(HttpMethod.POST) }
requestPayloadType(ResolvableType.forClassWithGenerics(Flux::class.java, String::class.java))
statusCodeFunction { m -> HttpStatus.ACCEPTED }
})
{
channel { queue("storeChannel") }
}
@Configuration
@EnableWebFlux
@EnableIntegration
public class ReactiveHttpConfiguration {
@Bean
public WebFluxInboundEndpoint simpleInboundEndpoint() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/test");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("serviceChannel");
return endpoint;
}
@ServiceActivator(inputChannel = "serviceChannel")
String service() {
return "It works!";
}
}
<int-webflux:inbound-gateway request-channel="requests" path="/sse">
<int-webflux:request-mapping produces="text/event-stream"/>
</int-webflux:inbound-gateway>
与之前的示例中提到的 HttpRequestHandlingEndpointSupport
配置类似,但这里用 @EnableWebFlux
为集成应用程序添加 WebFlux 基础结构。此外,WebFluxInboundEndpoint
会通过使用由响应式 HTTP 服务器实现提供的支持背压的按需功能来对下游流执行 sendAndReceive
操作。
The configuration is similar to the HttpRequestHandlingEndpointSupport
(mentioned prior to the example), except that we use @EnableWebFlux
to add the WebFlux infrastructure to our integration application.
Also, the WebFluxInboundEndpoint
performs sendAndReceive
operations to the downstream flow by using back-pressure, on-demand based capabilities, provided by the reactive HTTP server implementation.
应答部分也不是阻塞的,并且基于内部 |
The reply part is non-blocking as well and is based on the internal |
您可以使用自定义 ServerCodecConfigurer
、RequestedContentTypeResolver
、甚至 ReactiveAdapterRegistry
来配置 WebFluxInboundEndpoint
。后者提供了一种机制,您可以使用这种机制将答复返回为任何响应式类型:Reactor Flux
、RxJava Observable
、Flowable
等。通过这种方式,我们可以使用 Spring 集成组件实现 Server Sent Events 场景,如下面的示例所示:
You can configure the WebFluxInboundEndpoint
with a custom ServerCodecConfigurer
, a RequestedContentTypeResolver
, and even a ReactiveAdapterRegistry
.
The latter provides a mechanism you can use to return a reply as any reactive type: Reactor Flux
, RxJava Observable
, Flowable
, and others.
This way, we can implement Server Sent Events scenarios with Spring Integration components, as the following example shows:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow sseFlow() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
.handle((p, h) -> Flux.just("foo", "bar", "baz"))
.get();
}
@Bean
fun sseFlow() =
integrationFlow(
WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
{
handle { (p, h) -> Flux.just("foo", "bar", "baz") }
}
@Bean
public WebFluxInboundEndpoint webfluxInboundGateway() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/sse");
requestMapping.setProduces(MediaType.TEXT_EVENT_STREAM_VALUE);
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("requests");
return endpoint;
}
<int-webflux:inbound-channel-adapter id="reactiveFullConfig" channel="requests"
path="test1"
auto-startup="false"
phase="101"
request-payload-type="byte[]"
error-channel="errorChannel"
payload-expression="payload"
supported-methods="PUT"
status-code-expression="'202'"
header-mapper="headerMapper"
codec-configurer="codecConfigurer"
reactive-adapter-registry="reactiveAdapterRegistry"
requested-content-type-resolver="requestedContentTypeResolver">
<int-webflux:request-mapping headers="foo"/>
<int-webflux:cross-origin origin="foo" method="PUT"/>
<int-webflux:header name="foo" expression="'foo'"/>
</int-webflux:inbound-channel-adapter>
有关更多可能的配置选项,请参见 Request Mapping Support 和 Cross-origin Resource Sharing (CORS) Support。
See Request Mapping Support and Cross-origin Resource Sharing (CORS) Support for more possible configuration options.
如果请求正文为空或 payloadExpression
返回 null
,则请求参数 (MultiValueMap<String, String>
) 会用于要处理的目标消息的 payload
。
When the request body is empty or payloadExpression
returns null
, the request params (MultiValueMap<String, String>
) is used for a payload
of the target message to process.
Payload Validation
从 5.2 版开始,WebFluxInboundEndpoint
可以使用 Validator
进行配置。它不同于 HTTP Support 中的 MVC 验证,它用于验证 Publisher
中的元素(其中请求已由 HttpMessageReader
转换为 Publisher
)在执行回退和 payloadExpression
函数之前。框架无法假定在构建最终有效载荷后 Publisher
对象会有多复杂。如果有在最终有效载荷(或其 Publisher
元素)中严格限制验证可见性的要求,则验证应顺流而下,而不是流向 WebFlux 端点。在 Spring WebFlux documentation 中了解更多信息。无效有效载荷用 IntegrationWebExchangeBindException
(WebExchangeBindException
扩展)拒绝,其中包含所有验证 Errors
。有关验证的更多信息,请参见 Spring 框架 Reference Manual。
Starting with version 5.2, the WebFluxInboundEndpoint
can be configured with a Validator
.
Unlike the MVC validation in the HTTP Support, it is used to validate elements in the Publisher
to which a request has been converted by the HttpMessageReader
, before performing a fallback and payloadExpression
functions.
The Framework can’t assume how complex the Publisher
object can be after building the final payload.
If there is a requirements to restrict validation visibility for exactly final payload (or its Publisher
elements), the validation should go downstream instead of WebFlux endpoint.
See more information in the Spring WebFlux documentation.
An invalid payload is rejected with an IntegrationWebExchangeBindException
(a WebExchangeBindException
extension), containing all the validation Errors
.
See more in Spring Framework Reference Manual about validation.
WebFlux Outbound Components
WebFluxRequestExecutingMessageHandler
(从版本 5.0 开始)的实现类似于 HttpRequestExecutingMessageHandler
。它使用来自 Spring Framework WebFlux 模块的 WebClient
。要进行配置,请定义类似于以下内容的 Bean:
The WebFluxRequestExecutingMessageHandler
(starting with version 5.0) implementation is similar to HttpRequestExecutingMessageHandler
.
It uses a WebClient
from the Spring Framework WebFlux module.
To configure it, define a bean similar to the following:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow outboundReactive() {
return f -> f
.handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri())
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class));
}
@Bean
fun outboundReactive() =
integrationFlow {
handle(
WebFlux.outboundGateway<MultiValueMap<String, String>>({ m ->
UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri()
})
.httpMethod(HttpMethod.GET)
.expectedResponseType(String::class.java)
)
}
@ServiceActivator(inputChannel = "reactiveHttpOutRequest")
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {
WebFluxRequestExecutingMessageHandler handler =
new WebFluxRequestExecutingMessageHandler("http://localhost:8080/foo", client);
handler.setHttpMethod(HttpMethod.POST);
handler.setExpectedResponseType(String.class);
return handler;
}
<int-webflux:outbound-gateway id="reactiveExample1"
request-channel="requests"
url="http://localhost/test"
http-method-expression="headers.httpMethod"
extract-request-payload="false"
expected-response-type-expression="payload"
charset="UTF-8"
reply-timeout="1234"
reply-channel="replies"/>
<int-webflux:outbound-channel-adapter id="reactiveExample2"
url="http://localhost/example"
http-method="GET"
channel="requests"
charset="UTF-8"
extract-payload="false"
expected-response-type="java.lang.String"
order="3"
auto-startup="false"/>
WebClient
exchange()
操作会返回 Mono<ClientResponse>
,它(通过使用多个 Mono.map()
步骤)映射到 AbstractIntegrationMessageBuilder
作为 WebFluxRequestExecutingMessageHandler
的输出。通过使用 ReactiveChannel
作为 outputChannel
,Mono<ClientResponse>
评估会被延迟,直至执行下游订阅。否则,它会被视为 async
模式,并且 Mono
响应会针对 WebFluxRequestExecutingMessageHandler
的异步回复而调整为 SettableListenableFuture
。输出消息的目标有效负载取决于 WebFluxRequestExecutingMessageHandler
配置。setExpectedResponseType(Class<?>)
或 setExpectedResponseTypeExpression(Expression)
确定响应主体元素转换的目标类型。如果 replyPayloadToFlux
设置为 true
,则响应主体将利用针对每个元素提供的 expectedResponseType
转换为 Flux
,并且这个 Flux
被作为下游的有效负载发送。之后,你可以在响应方式下使用 splitter 遍历此 Flux
。
The WebClient
exchange()
operation returns a Mono<ClientResponse>
, which is mapped (by using several Mono.map()
steps) to an AbstractIntegrationMessageBuilder
as the output from the WebFluxRequestExecutingMessageHandler
.
Together with the ReactiveChannel
as an outputChannel
, the Mono<ClientResponse>
evaluation is deferred until a downstream subscription is made.
Otherwise, it is treated as an async
mode, and the Mono
response is adapted to a SettableListenableFuture
for an asynchronous reply from the WebFluxRequestExecutingMessageHandler
.
The target payload of the output message depends on the WebFluxRequestExecutingMessageHandler
configuration.
The setExpectedResponseType(Class<?>)
or setExpectedResponseTypeExpression(Expression)
identifies the target type of the response body element conversion.
If replyPayloadToFlux
is set to true
, the response body is converted to a Flux
with the provided expectedResponseType
for each element, and this Flux
is sent as the payload downstream.
Afterward, you can use a splitter to iterate over this Flux
in a reactive manner.
此外,可以将 BodyExtractor<?, ClientHttpResponse>
注入到 WebFluxRequestExecutingMessageHandler
,而不是将 expectedResponseType
和 replyPayloadToFlux
属性。它可用于低级别访问 ClientHttpResponse
和更好地控制报文和 HTTP 标头转换。Spring Integration 提供 ClientHttpResponseBodyExtractor
作为标识函数,以生成(下游)整个 ClientHttpResponse
和任何其他可能的自定义逻辑。
In addition, a BodyExtractor<?, ClientHttpResponse>
can be injected into the WebFluxRequestExecutingMessageHandler
instead of the expectedResponseType
and replyPayloadToFlux
properties.
It can be used for low-level access to the ClientHttpResponse
and more control over body and HTTP headers conversion.
Spring Integration provides ClientHttpResponseBodyExtractor
as a identity function to produce (downstream) the whole ClientHttpResponse
and any other possible custom logic.
从版本 5.2 开始,WebFluxRequestExecutingMessageHandler
支持响应式 Publisher
、Resource
和 MultiValueMap
类型作为请求消息有效载荷。内部使用相应 BodyInserter
填充到 WebClient.RequestBodySpec
中。当有效载荷是响应式 Publisher
时,可以将配置的 publisherElementType
或 publisherElementTypeExpression
用于确定发行者元素类型的类型。该表达式必须解析为 Class<?>
、String
,该表达式会解析为目标 Class<?>
或 ParameterizedTypeReference
。
Starting with version 5.2, the WebFluxRequestExecutingMessageHandler
supports reactive Publisher
, Resource
, and MultiValueMap
types as the request message payload.
A respective BodyInserter
is used internally to be populated into the WebClient.RequestBodySpec
.
When the payload is a reactive Publisher
, a configured publisherElementType
or publisherElementTypeExpression
can be used to determine a type for the publisher’s element type.
The expression must be resolved to a Class<?>
, String
which is resolved to the target Class<?>
or ParameterizedTypeReference
.
从版本 5.5 开始,WebFluxRequestExecutingMessageHandler
公开了 extractResponseBody
标志(默认情况下为 true
),用于仅返回响应正文,或者将整个 ResponseEntity
作为回复消息有效载荷返回,而不管提供的 expectedResponseType
或 replyPayloadToFlux
。如果 ResponseEntity
中不存在报文,则会忽略此标志,且将返回整个 ResponseEntity
。
Starting with version 5.5, the WebFluxRequestExecutingMessageHandler
exposes an extractResponseBody
flag (which is true
by default) to return just the response body, or to return the whole ResponseEntity
as the reply message payload, independently of the provided expectedResponseType
or replyPayloadToFlux
.
If a body is not present in the ResponseEntity
, this flag is ignored and the whole ResponseEntity
is returned.
有关更多可能的配置选项,请参见 HTTP Outbound Components 。
See HTTP Outbound Components for more possible configuration options.
WebFlux Header Mappings
由于 WebFlux 组件完全基于 HTTP 协议,因此在 HTTP 标头映射中没有差别。有关用于映射标头的更多可能选项和组件的信息,请参见 HTTP Header Mappings 。
Since WebFlux components are fully based on the HTTP protocol, there is no difference in the HTTP headers mapping. See HTTP Header Mappings for more possible options and components to use for mapping headers.
WebFlux Request Attributes
从版本 6.0 开始,WebFluxRequestExecutingMessageHandler
可以配置为通过 setAttributeVariablesExpression()
评估请求属性。此 SpEL 表达式必须在 Map
中求值。然后将该映射传播到 WebClient.RequestBodySpec.attributes(Consumer<Map<String, Object>> attributesConsumer)
HTTP 请求配置回调。如果需要将键值对象形式的信息从 Message
传递到请求,并且下游筛选器可以访问这些属性以进行进一步处理,则这将非常有用。
Starting with version 6.0, the WebFluxRequestExecutingMessageHandler
can be configured to evaluate request attributes via setAttributeVariablesExpression()
.
This SpEL expression must be evaluated in Map
.
Such a map is then propagated to the WebClient.RequestBodySpec.attributes(Consumer<Map<String, Object>> attributesConsumer)
HTTP request configuration callback.
This will be helpful if an information in a form of key-value object needs to be passed from Message
to request and downstream filter will get access to these attributes for further processing.