WebFlux Support

WebFlux Spring Integration 模块(spring-integration-webflux)允许以响应方式执行 HTTP 请求和处理入站 HTTP 请求。 你需要将此依赖项包含在你的项目中:

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-webflux</artifactId>
    <version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-webflux:{project-version}"

在基于非 Servlet 的服务器配置的情况下,必须包含 io.projectreactor.netty:reactor-netty 依赖项。 WebFlux 支持包含以下网关实现:WebFluxInboundEndpointWebFluxRequestExecutingMessageHandler。此支持完全基于 Spring WebFluxProject Reactor 基础。有关详细信息,请参见 HTTP Support,因为许多选项在响应式和常规 HTTP 组件之间共享。

WebFlux Namespace Support

Spring Integration 提供 webflux 命名空间和相应的模式定义。若要将其包含在配置中,请在应用上下文配置文件中添加以下命名空间声明:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/webflux
    https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
    ...
</beans>

WebFlux Inbound Components

从版本 5.0 开始,提供了 WebHandlerWebFluxInboundEndpoint 实现。此组件类似于基于 MVC 的 HttpRequestHandlingEndpointSupport,后者通过新提取的 BaseHttpInboundEndpoint 与它共享一些常见选项。它在 Spring WebFlux 响应环境中使用(而不是 MVC)。以下示例展示了 WebFlux 端点的简单实现:

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
    return IntegrationFlow
        .from(WebFlux.inboundChannelAdapter("/reactivePost")
            .requestMapping(m -> m.methods(HttpMethod.POST))
            .requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
            .statusCodeFunction(m -> HttpStatus.ACCEPTED))
        .channel(c -> c.queue("storeChannel"))
        .get();
}
@Bean
fun inboundChannelAdapterFlow() =
    integrationFlow(
        WebFlux.inboundChannelAdapter("/reactivePost")
            .apply {
                requestMapping { m -> m.methods(HttpMethod.POST) }
                requestPayloadType(ResolvableType.forClassWithGenerics(Flux::class.java, String::class.java))
                statusCodeFunction { m -> HttpStatus.ACCEPTED }
            })
    {
        channel { queue("storeChannel") }
    }
@Configuration
@EnableWebFlux
@EnableIntegration
public class ReactiveHttpConfiguration {

    @Bean
    public WebFluxInboundEndpoint simpleInboundEndpoint() {
        WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
        RequestMapping requestMapping = new RequestMapping();
        requestMapping.setPathPatterns("/test");
        endpoint.setRequestMapping(requestMapping);
        endpoint.setRequestChannelName("serviceChannel");
        return endpoint;
    }

    @ServiceActivator(inputChannel = "serviceChannel")
    String service() {
        return "It works!";
    }

}
<int-webflux:inbound-gateway request-channel="requests" path="/sse">
    <int-webflux:request-mapping produces="text/event-stream"/>
</int-webflux:inbound-gateway>

与之前的示例中提到的 HttpRequestHandlingEndpointSupport 配置类似,但这里用 @EnableWebFlux 为集成应用程序添加 WebFlux 基础结构。此外,WebFluxInboundEndpoint 会通过使用由响应式 HTTP 服务器实现提供的支持背压的按需功能来对下游流执行 sendAndReceive 操作。

应答部分也不是阻塞的,并且基于内部 FutureReplyChannel,已平坦映射到应答 Mono 以便按需解析。

您可以使用自定义 ServerCodecConfigurerRequestedContentTypeResolver、甚至 ReactiveAdapterRegistry 来配置 WebFluxInboundEndpoint。后者提供了一种机制,您可以使用这种机制将答复返回为任何响应式类型:Reactor Flux、RxJava ObservableFlowable 等。通过这种方式,我们可以使用 Spring 集成组件实现 Server Sent Events 场景,如下面的示例所示:

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow sseFlow() {
    return IntegrationFlow
            .from(WebFlux.inboundGateway("/sse")
                    .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
            .handle((p, h) -> Flux.just("foo", "bar", "baz"))
            .get();
}
@Bean
fun sseFlow() =
     integrationFlow(
            WebFlux.inboundGateway("/sse")
                       .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
            {
                 handle { (p, h) -> Flux.just("foo", "bar", "baz") }
            }
@Bean
public WebFluxInboundEndpoint webfluxInboundGateway() {
    WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
    RequestMapping requestMapping = new RequestMapping();
    requestMapping.setPathPatterns("/sse");
    requestMapping.setProduces(MediaType.TEXT_EVENT_STREAM_VALUE);
    endpoint.setRequestMapping(requestMapping);
    endpoint.setRequestChannelName("requests");
    return endpoint;
}
<int-webflux:inbound-channel-adapter id="reactiveFullConfig" channel="requests"
                               path="test1"
                               auto-startup="false"
                               phase="101"
                               request-payload-type="byte[]"
                               error-channel="errorChannel"
                               payload-expression="payload"
                               supported-methods="PUT"
                               status-code-expression="'202'"
                               header-mapper="headerMapper"
                               codec-configurer="codecConfigurer"
                               reactive-adapter-registry="reactiveAdapterRegistry"
                               requested-content-type-resolver="requestedContentTypeResolver">
            <int-webflux:request-mapping headers="foo"/>
            <int-webflux:cross-origin origin="foo" method="PUT"/>
            <int-webflux:header name="foo" expression="'foo'"/>
</int-webflux:inbound-channel-adapter>

有关更多可能的配置选项,请参见 Request Mapping SupportCross-origin Resource Sharing (CORS) Support

如果请求正文为空或 payloadExpression 返回 null,则请求参数 (MultiValueMap<String, String>) 会用于要处理的目标消息的 payload

Payload Validation

从 5.2 版开始,WebFluxInboundEndpoint 可以使用 Validator 进行配置。它不同于 HTTP Support 中的 MVC 验证,它用于验证 Publisher 中的元素(其中请求已由 HttpMessageReader 转换为 Publisher)在执行回退和 payloadExpression 函数之前。框架无法假定在构建最终有效载荷后 Publisher 对象会有多复杂。如果有在最终有效载荷(或其 Publisher 元素)中严格限制验证可见性的要求,则验证应顺流而下,而不是流向 WebFlux 端点。在 Spring WebFlux documentation 中了解更多信息。无效有效载荷用 IntegrationWebExchangeBindExceptionWebExchangeBindException 扩展)拒绝,其中包含所有验证 Errors。有关验证的更多信息,请参见 Spring 框架 Reference Manual

WebFlux Outbound Components

WebFluxRequestExecutingMessageHandler(从版本 5.0 开始)的实现类似于 HttpRequestExecutingMessageHandler。它使用来自 Spring Framework WebFlux 模块的 WebClient。要进行配置,请定义类似于以下内容的 Bean:

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow outboundReactive() {
    return f -> f
        .handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
                UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
                        .queryParams(m.getPayload())
                        .build()
                        .toUri())
                .httpMethod(HttpMethod.GET)
                .expectedResponseType(String.class));
}
@Bean
fun outboundReactive() =
    integrationFlow {
        handle(
            WebFlux.outboundGateway<MultiValueMap<String, String>>({ m ->
                UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
                    .queryParams(m.getPayload())
                    .build()
                    .toUri()
            })
                .httpMethod(HttpMethod.GET)
                .expectedResponseType(String::class.java)
        )
    }
@ServiceActivator(inputChannel = "reactiveHttpOutRequest")
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {
    WebFluxRequestExecutingMessageHandler handler =
        new WebFluxRequestExecutingMessageHandler("http://localhost:8080/foo", client);
    handler.setHttpMethod(HttpMethod.POST);
    handler.setExpectedResponseType(String.class);
    return handler;
}
<int-webflux:outbound-gateway id="reactiveExample1"
    request-channel="requests"
    url="http://localhost/test"
    http-method-expression="headers.httpMethod"
    extract-request-payload="false"
    expected-response-type-expression="payload"
    charset="UTF-8"
    reply-timeout="1234"
    reply-channel="replies"/>

<int-webflux:outbound-channel-adapter id="reactiveExample2"
    url="http://localhost/example"
    http-method="GET"
    channel="requests"
    charset="UTF-8"
    extract-payload="false"
    expected-response-type="java.lang.String"
    order="3"
    auto-startup="false"/>

WebClient exchange() 操作会返回 Mono<ClientResponse> ,它(通过使用多个 Mono.map() 步骤)映射到 AbstractIntegrationMessageBuilder 作为 WebFluxRequestExecutingMessageHandler 的输出。通过使用 ReactiveChannel 作为 outputChannelMono<ClientResponse> 评估会被延迟,直至执行下游订阅。否则,它会被视为 async 模式,并且 Mono 响应会针对 WebFluxRequestExecutingMessageHandler 的异步回复而调整为 SettableListenableFuture 。输出消息的目标有效负载取决于 WebFluxRequestExecutingMessageHandler 配置。setExpectedResponseType(Class<?>)setExpectedResponseTypeExpression(Expression) 确定响应主体元素转换的目标类型。如果 replyPayloadToFlux 设置为 true ,则响应主体将利用针对每个元素提供的 expectedResponseType 转换为 Flux ,并且这个 Flux 被作为下游的有效负载发送。之后,你可以在响应方式下使用 splitter 遍历此 Flux

此外,可以将 BodyExtractor<?, ClientHttpResponse> 注入到 WebFluxRequestExecutingMessageHandler,而不是将 expectedResponseTypereplyPayloadToFlux 属性。它可用于低级别访问 ClientHttpResponse 和更好地控制报文和 HTTP 标头转换。Spring Integration 提供 ClientHttpResponseBodyExtractor 作为标识函数,以生成(下游)整个 ClientHttpResponse 和任何其他可能的自定义逻辑。

从版本 5.2 开始,WebFluxRequestExecutingMessageHandler 支持响应式 PublisherResourceMultiValueMap 类型作为请求消息有效载荷。内部使用相应 BodyInserter 填充到 WebClient.RequestBodySpec 中。当有效载荷是响应式 Publisher 时,可以将配置的 publisherElementTypepublisherElementTypeExpression 用于确定发行者元素类型的类型。该表达式必须解析为 Class<?>String,该表达式会解析为目标 Class<?>ParameterizedTypeReference

从版本 5.5 开始,WebFluxRequestExecutingMessageHandler 公开了 extractResponseBody 标志(默认情况下为 true),用于仅返回响应正文,或者将整个 ResponseEntity 作为回复消息有效载荷返回,而不管提供的 expectedResponseTypereplyPayloadToFlux。如果 ResponseEntity 中不存在报文,则会忽略此标志,且将返回整个 ResponseEntity

有关更多可能的配置选项,请参见 HTTP Outbound Components

WebFlux Header Mappings

由于 WebFlux 组件完全基于 HTTP 协议,因此在 HTTP 标头映射中没有差别。有关用于映射标头的更多可能选项和组件的信息,请参见 HTTP Header Mappings

WebFlux Request Attributes

从版本 6.0 开始,WebFluxRequestExecutingMessageHandler 可以配置为通过 setAttributeVariablesExpression() 评估请求属性。此 SpEL 表达式必须在 Map 中求值。然后将该映射传播到 WebClient.RequestBodySpec.attributes(Consumer<Map<String, Object>> attributesConsumer) HTTP 请求配置回调。如果需要将键值对象形式的信息从 Message 传递到请求,并且下游筛选器可以访问这些属性以进行进一步处理,则这将非常有用。