The reactive() Endpoint

从版本 5.5 开始,ConsumerEndpointSpec 提供一个 reactive() 配置属性,其中包含一个可选的自定义项 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>。此选项将目标端点配置为 ReactiveStreamsConsumer 实例,与输入通道类型无关,此输入通道类型通过 IntegrationReactiveUtils.messageChannelToFlux() 转换为 Flux。提供的功能从 Flux.transform() 运算符中使用,以从输入通道自定义 (publishOn(), log(), doOnNext() 等) 响应流源。

Starting with version 5.5, the ConsumerEndpointSpec provides a reactive() configuration property with an optional customizer Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>. This option configures the target endpoint as a ReactiveStreamsConsumer instance, independently of the input channel type, which is converted to a Flux via IntegrationReactiveUtils.messageChannelToFlux(). The provided function is used from the Flux.transform() operator to customize (publishOn(), log(), doOnNext() etc.) a reactive stream source from the input channel.

以下示例演示如何独立于最终订阅者和 DirectChannel 的生成器,从输入通道更改发布线程:

The following example demonstrates how to change the publishing thread from the input channel independently of the final subscriber and producer to that DirectChannel:

@Bean
public IntegrationFlow reactiveEndpointFlow() {
    return IntegrationFlow
            .from("inputChannel")
            .transformWith(t -> t
                              .<String, Integer>transformer(Integer::parseInt)
                              .reactive(flux -> flux.publishOn(Schedulers.parallel()))
            )
            .get();
}

有关更多信息,请参阅 Reactive Streams Support

See Reactive Streams Support for more information.