The reactive()
Endpoint
从版本 5.5 开始,ConsumerEndpointSpec
提供一个 reactive()
配置属性,其中包含一个可选的自定义项 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
。此选项将目标端点配置为 ReactiveStreamsConsumer
实例,与输入通道类型无关,此输入通道类型通过 IntegrationReactiveUtils.messageChannelToFlux()
转换为 Flux
。提供的功能从 Flux.transform()
运算符中使用,以从输入通道自定义 (publishOn()
, log()
, doOnNext()
等) 响应流源。
以下示例演示如何独立于最终订阅者和 DirectChannel
的生成器,从输入通道更改发布线程:
@Bean
public IntegrationFlow reactiveEndpointFlow() {
return IntegrationFlow
.from("inputChannel")
.transformWith(t -> t
.<String, Integer>transformer(Integer::parseInt)
.reactive(flux -> flux.publishOn(Schedulers.parallel()))
)
.get();
}
有关更多信息,请参阅 Reactive Streams Support。