Reactive Streams Support
Spring Integration 在框架的某些地方以及不同的方面为 Reactive Streams 交互提供支持。我们将在这里讨论其中的大部分,并在必要时通过适当的链接提供目标章节以获得详细信息。
Spring Integration provides support for Reactive Streams interaction in some places of the framework and from different aspects. We will discuss most of them here with appropriate links to the target chapters for details whenever necessary.
Preface
总之,Spring Integration 扩展了 Spring 编程模型,以支持众所周知的企业集成模式。Spring Integration 在基于 Spring 的应用程序中实现轻量级消息传递,并通过声明性适配器支持与外部系统的集成。Spring Integration 的主要目标是提供一个简单的模型来构建企业集成解决方案,同时保持必要的关注点分离,以便生成可维护、可测试的代码。在目标应用程序中,通过使用 message
、channel
和 endpoint
等一等公民来实现此目标,这些内容使我们能够构建一个集成流(管道),在大多数情况下,一个端点会将消息生成到一个通道,以便被另一个端点使用。通过这种方式,我们将集成交互模型与目标业务逻辑区分开来。这里,关键部分是在它们之间的通道:流行为取决于其实现,不会更改端点。
To recap, Spring Integration extends the Spring programming model to support the well-known Enterprise Integration Patterns.
Spring Integration enables lightweight messaging within Spring-based applications and supports integration with external systems via declarative adapters.
Spring Integration’s primary goal is to provide a simple model for building enterprise integration solutions while maintaining the separation of concerns that is essential for producing maintainable, testable code.
This goal is achieved in the target application using first class citizens like message
, channel
and endpoint
, which allow us to build an integration flow (pipeline), where (in most cases) one endpoint produces messages into a channel to be consumed by another endpoint.
This way we distinguish an integration interaction model from the target business logic.
The crucial part here is a channel in between: the flow behavior depends on its implementation leaving endpoints untouched.
另一方面,Reactive Streams 是一个使用非阻塞反压进行异步流处理的标准。Reactive Streams 的主要目标是管理流数据跨异步边界的交换 - 例如将元素传递到另一个线程或线程池 - 同时确保接收方不必缓冲任意数量的数据。换句话说,反压是此模型中不可分割的一部分,以便允许协调线程之间的队列受到限制。如 Project Reactor 等 Reactive Streams 实现的目的是在流应用程序的整个处理图中保留这些好处和特征。Reactive Streams 库的最终目标是以透明且流畅的方式为目标应用程序提供类型、一组运算符和支持 API,就像使用可用的编程语言结构一样,但最终解决方案不像使用正常函数链调用那样具有强制性。它分为两个阶段:定义和执行,这发生在稍后的订阅最终反应式发布者的时候,对数据的需求从定义的底部推送到顶部,根据需要应用反压 - 我们请求我们此刻可以处理的尽可能多的事件。反应式应用程序看起来像一个 "stream"
,或者正如我们习惯在 Spring Integration 中使用的 - "flow"
。实际上,自 Java 9 以来 Reactive Streams SPI 在 java.util.concurrent.Flow
类。
On the other hand, the Reactive Streams is a standard for asynchronous stream processing with non-blocking back pressure.
The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary – like passing elements on to another thread or thread-pool – while ensuring that the receiving side is not forced to buffer arbitrary amounts of data.
In other words, back pressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded.
The intention of Reactive Streams implementation, such as Project Reactor, is to preserve these benefits and characteristics across the whole processing graph of a stream application.
The ultimate goal of Reactive Streams libraries is to provide types, set of operators and supporting API for a target application in a transparent and smooth manner as is possible with available programming language structure, but the final solution is not as imperative as it is with a normal function chain invocation.
It is divided into to phases: definition and execution, which happens some time later during subscription to the final reactive publisher, and demand for data is pushed from the bottom of the definition to the top applying back-pressure as needed - we request as many events as we can handle at the moment.
The reactive application looks like a "stream"
or as we got used to in Spring Integration terms - "flow"
.
In fact the Reactive Streams SPI since Java 9 is presented in the java.util.concurrent.Flow
class.
从这里可以看出,Spring Integration 流实际上非常适合撰写 Reactive Stream 应用程序,当我们在端点上应用一些反应式框架运算符时,但事实上问题更加广泛,我们还需要记住,并不是所有端点(例如 JdbcMessageHandler
)都能在反应式流中以透明的方式进行处理。当然,Spring Integration 中 Reactive Streams 支持的主要目标是让整个过程完全反应式、按需启动并为反压做好准备。在目标协议和通道适配器的系统为通道适配器提供 Reactive Streams 交互模型之前,这是不可能的。在下面的章节中,我们将描述 Spring Integration 中提供的组件和方法,以便开发保留集成流结构的反应式应用程序。
From here it may look like Spring Integration flows are really a good fit for writing Reactive Streams applications when we apply some reactive framework operators on endpoints, but in fact the problems is much broader and we need to keep in mind that not all endpoints (e.g. JdbcMessageHandler
) can be processed in a reactive stream transparently.
Of course, the main goal for Reactive Streams support in Spring Integration is to allow the whole process to be fully reactive, on demand initiated and back-pressure ready.
It is not going to be possible until the target protocols and systems for channel adapters provide a Reactive Streams interaction model.
In the sections below we will describe what components and approaches are provided in Spring Integration for developing reactive application preserving integration flow structures.
Spring Integration 中的所有反应式流交互都使用 Project Reactor 类型(例如 |
All the Reactive Streams interaction in Spring Integration implemented with Project Reactor types, such as |
Messaging Gateway
与 Reactive Streams 交互的最简单的点是 @MessagingGateway
,其中我们仅仅使网关方法的返回类型作为 Mono<?>
- 并且在返回的 Mono`实例上发生订阅时,将会执行网关方法调用背后的整个集成流程。有关更多信息,请参阅 Reactor `Mono
。框架内部在完全基于 Reactive Streams 兼容协议的入站网关中使用了类似的 `Mono`回复方法(有关更多信息,请参阅下面的 Reactive Channel Adapters)。发送和接收操作被包装到 `Mono.deffer()`中,并根据需要链接来自 `replyChannel`标头的回复评估。 таким образом, входящий компонент для конкретного реактивного протокола (например, Netty) будет выступать в качестве подписчика и инициатора для реактивного потока, выполняемого в Spring Integration. Если полезная нагрузка запроса является реактивным типом, будет лучше обработать ее в определении реактивного потока, отложив процесс до подписки инициатора. Для этой цели метод обработчика также должен возвращать реактивный тип. Дополнительные сведения см. в следующем разделе.
The simplest point of interaction with Reactive Streams is a @MessagingGateway
where we just make a return type of the gateway method as a Mono<?>
- and the whole integration flow behind a gateway method call is going to be performed when a subscription happens on the returned Mono
instance.
See Reactor Mono
for more information.
A similar Mono
-reply approach is used in the framework internally for inbound gateways which are fully based on Reactive Streams compatible protocols (see Reactive Channel Adapters below for more information).
The send-and-receive operation is wrapped into a Mono.deffer()
with chaining a reply evaluation from the replyChannel
header whenever it is available.
This way an inbound component for the particular reactive protocol (e.g. Netty) is going to be as a subscriber and initiator for a reactive flow performed on the Spring Integration.
If the request payload is a reactive type, it would be better to handle it withing a reactive stream definition deferring a process to the initiator subscription.
For this purpose a handler method must return a reactive type as well.
See the next section for more information.
Reactive Reply Payload
当产生应答的 MessageHandler
返回应答消息的反应式类型负载时,它将使用为 outputChannel
提供的常规 MessageChannel
实现(async
必须设为 true
)以异步方式进行处理,并在输出通道是 ReactiveStreamsSubscribableChannel
实现(例如 FluxMessageChannel
)时,通过按需订阅进行扁平化。使用标准的命令式 MessageChannel
用例,并且如果应答负载是 多值 发布程序(有关详细信息,请参阅 ReactiveAdapter.isMultiValue()
),则将其包装在 Mono.just()
中。因此,Mono
必须在 downstream 显式订阅或由 downstream 的 FluxMessageChannel
进行扁平化。使用 ReactiveStreamsSubscribableChannel
作为 outputChannel
时,不必担心返回类型和订阅;一切都由框架内部平稳处理。
When a reply producing MessageHandler
returns a reactive type payload for a reply message, it is processed in an asynchronous manner with a regular MessageChannel
implementation provided for the outputChannel
(the async
must be set to true
) and flattened with on demand subscription when the output channel is a ReactiveStreamsSubscribableChannel
implementation, e.g. FluxMessageChannel
.
With a standard imperative MessageChannel
use-case, and if a reply payload is a multi-value publisher (see ReactiveAdapter.isMultiValue()
for more information), it is wrapped into a Mono.just()
.
A result of this, the Mono
has to be subscribed explicitly downstream or flattened by the FluxMessageChannel
downstream.
With a ReactiveStreamsSubscribableChannel
for the outputChannel
, there is no need to be concerned about return type and subscription; everything is processed smoothly by the framework internally.
有关更多信息,请参阅 Asynchronous Service Activator。
See Asynchronous Service Activator for more information.
有关详细信息,请 Kotlin Coroutines 查看。
Also see Kotlin Coroutines for more information.
FluxMessageChannel
and ReactiveStreamsConsumer
FluxMessageChannel 是 MessageChannel 和 Publisher<Message<?>> 的组合实现。Flux 作为现成资源在内部创建,用于从 send() 实现中发送传入消息。Publisher.subscribe() 实现委托给该内部 Flux。此外,对于按需上游消耗,FluxMessageChannel 为 ReactiveStreamsSubscribableChannel 合约提供一个实现。当此通道准备好订阅时,为该通道提供的任何上游 Publisher(例如,见以下 Source Polling Channel Adapter 和分流器)都自动订阅。来自这些委托发布者的事件被发送到上面提到的一个内部 Flux 中。
The FluxMessageChannel
is a combined implementation of MessageChannel
and Publisher<Message<?>>
.
A Flux
, as a hot source, is created internally for sinking incoming messages from the send()
implementation.
The Publisher.subscribe()
implementation is delegated to that internal Flux
.
Also, for on demand upstream consumption, the FluxMessageChannel
provides an implementation for the ReactiveStreamsSubscribableChannel
contract.
Any upstream Publisher
(see Source Polling Channel Adapter and splitter below, for example) provided for this channel is auto-subscribed when subscription is ready for this channel.
Events from this delegating publishers are sunk into an internal Flux
mentioned above.
对于 FluxMessageChannel,使用者必须是 org.reactivestreams.Subscriber 实例,以遵守 Reactive Streams 合约。幸运的是,Spring Integration 中所有的 MessageHandler 实现还实现了来自项目 Reactor 的 CoreSubscriber。借助介于两者之间的 ReactiveStreamsConsumer 实现,整个集成流程配置对于目标开发者来说是透明的。在这种情况下,流程行为已从命令式推送模型更改为反应式提取模型。ReactiveStreamsConsumer 还可用于使用 IntegrationReactiveUtils 将任何 MessageChannel 转换为反应源,从而使集成流程部分具有反应性。
A consumer for the FluxMessageChannel
must be a org.reactivestreams.Subscriber
instance for honoring the Reactive Streams contract.
Fortunately, all of the MessageHandler
implementations in Spring Integration also implement a CoreSubscriber
from project Reactor.
And thanks to a ReactiveStreamsConsumer
implementation in between, the whole integration flow configuration is left transparent for target developers.
In this case, the flow behavior is changed from an imperative push model to a reactive pull model.
A ReactiveStreamsConsumer
can also be used to turn any MessageChannel
into a reactive source using IntegrationReactiveUtils
, making an integration flow partially reactive.
有关更多信息,请参见 xref:channel/implementations.adoc#flux-message-channel[FluxMessageChannel
。
See FluxMessageChannel
for more information.
从版本 5.5 开始,ConsumerEndpointSpec 引入了 reactive() 选项,可将流程中的端点作为 ReactiveStreamsConsumer,与输入通道无关。可提供可选的 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>,以通过 Flux.transform() 操作自定义来自输入通道的源 Flux,例如,通过 publishOn()、doOnNext()、retry() 等。此功能通过 messaging 标记(@ServiceActivator、@Splitter 等)的 @Reactive 子标记表示,通过它们的 reactive() 属性表示。
Starting with version 5.5, the ConsumerEndpointSpec
introduces a reactive()
option to make the endpoint in the flow as a ReactiveStreamsConsumer
independently of the input channel.
The optional Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
can be provided to customise a source Flux
from the input channel via Flux.transform()
operation, e.g. with the publishOn()
, doOnNext()
, retry()
etc.
This functionality is represented as a @Reactive
sub-annotation for all the messaging annotation (@ServiceActivator
, @Splitter
etc.) via their reactive()
attribute.
Source Polling Channel Adapter
通常,SourcePollingChannelAdapter 依赖于由 TaskScheduler 启动的任务。轮询触发器基于所提供选项构建,用于定期计划任务以轮询数据或事件目标源。当 outputChannel 是 ReactiveStreamsSubscribableChannel 时,使用同一个 Trigger 确定下一次执行时间,但 SourcePollingChannelAdapter 并非计划任务,而是基于 nextExecutionTime 值创建一个 Flux<Message<?>>,用于 Flux.generate(),用于 Mono.delay(),以针对前一步骤确定持续时间。然后使用 Flux.flatMapMany() 轮询 maxMessagesPerPoll,并将它们发送到输出 Flux。此生成器 Flux 由所提供的 ReactiveStreamsSubscribableChannel 订阅,并遵守下游的反压。从版本 5.5 开始,当 maxMessagesPerPoll == 0 时,根本不会调用源,并且 flatMapMany() 会通过 Mono.empty() 结果立即完成,直到晚些时候将 maxMessagesPerPoll 更改为非零值(例如,通过控制总线)。通过这种方式,任何 MessageSource 实现都可以变成反应式现成资源。
Usually, the SourcePollingChannelAdapter
relies on the task which is initiated by the TaskScheduler
.
A polling trigger is built from the provided options and used for periodic scheduling a task to poll a target source of data or events.
When an outputChannel
is a ReactiveStreamsSubscribableChannel
, the same Trigger
is used to determine the next time for execution, but instead of scheduling tasks, the SourcePollingChannelAdapter
creates a Flux<Message<?>>
based on the Flux.generate()
for the nextExecutionTime
values and Mono.delay()
for a duration from the previous step.
A Flux.flatMapMany()
is used then to poll maxMessagesPerPoll
and sink them into an output Flux
.
This generator Flux
is subscribed by the provided ReactiveStreamsSubscribableChannel
honoring a back-pressure downstream.
Starting with version 5.5, when maxMessagesPerPoll == 0
, the source is not called at all, and flatMapMany()
is completed immediately via a Mono.empty()
result until the maxMessagesPerPoll
is changed to non-zero value at a later time, e.g. via a Control Bus.
This way, any MessageSource
implementation can be turned into a reactive hot source.
有关更多信息,请参阅 Polling Consumer。
See Polling Consumer for more information.
Event-Driven Channel Adapter
MessageProducerSupport
是事件驱动通道适配器的基类,通常,它的 sendMessage(Message<?>)
用于产生驱动器 API 中的侦听器回调。当消息生产者实现构建消息的 Flux
,而不是基于侦听器的功能时,此回调也可以轻松插入到 doOnNext()
Reactor 运算符中。事实上,当消息生产者的 “outputChannel
” 不是 ReactiveStreamsSubscribableChannel
时,框架中会执行此操作。但是,为了改善最终用户体验,并允许使用更多的反压就绪功能,MessageProducerSupport
提供了一个 subscribeToPublisher(Publisher<? extends Message<?>>)
API,供在将 Publisher<Message<?>>>
作为目标系统的数据源时在目标实现中使用。通常,当针对源数据的 Publisher
调用目标驱动器 API 时,它会从 doStart()
实现中使用。建议将反应式 MessageProducerSupport
实现与 FluxMessageChannel
结合使用,作为 “outputChannel
” 以进行按需订阅和事件消耗下游。当对 Publisher
的订阅被取消时,通道适配器将进入停止状态。对此类通道适配器调用 stop()
会完成从源 Publisher
的生成。可以重新启动通道适配器,并自动订阅新创建的源 Publisher
。
MessageProducerSupport
is the base class for event-driven channel adapters and, typically, its sendMessage(Message<?>)
is used as a listener callback in the producing driver API.
This callback can also be easily plugged into the doOnNext()
Reactor operator when a message producer implementation builds a Flux
of messages instead of listener-based functionality.
In fact, this is done in the framework when an outputChannel
of the message producer is not a ReactiveStreamsSubscribableChannel
.
However, for improved end-user experience, and to allow more back-pressure ready functionality, the MessageProducerSupport
provides a subscribeToPublisher(Publisher<? extends Message<?>>)
API to be used in the target implementation when a Publisher<Message<?>>>
is the source of data from the target system.
Typically, it is used from the doStart()
implementation when target driver API is called for a Publisher
of source data.
It is recommended to combine a reactive MessageProducerSupport
implementation with a FluxMessageChannel
as the outputChannel
for on-demand subscription and event consumption downstream.
The channel adapter goes to a stopped state when a subscription to the Publisher
is cancelled.
Calling stop()
on such a channel adapter completes the producing from the source Publisher
.
The channel adapter can be restarted with automatic subscription to a newly created source Publisher
.
Message Source to Reactive Streams
从 5.3 版本开始,提供了 ReactiveMessageSourceProducer
。它是提供的 MessageSource
和事件驱动生成到配置的 outputChannel
的组合。在内部,它将 MessageSource
包装到反复重新订阅的 Mono
中,生成 Flux<Message<?>>
以在上面提到的 subscribeToPublisher(Publisher<? extends Message<?>>)
中订阅。使用 Schedulers.boundedElastic()
对此 Mono
进行订阅,以避免目标 MessageSource
中可能的阻塞。当消息源返回 null
(没有要提取的数据)时,Mono
会变成 repeatWhenEmpty()
状态,并为后续重新订阅设置 “delay
”,这是基于订阅者上下文中的 IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY
Duration
条目。默认情况下,它为 1 秒。如果 MessageSource
生成了带有头中 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
信息的消息,则在 doOnSuccess()
中对其进行确认(如有必要),并在下游流抛出带有要拒绝的失败消息的 MessagingException
时在 doOnError()
中对其拒绝。此 ReactiveMessageSourceProducer
可用于任何用例,当轮询通道适配器的功能应该转化为针对任何现有 MessageSource<?>
实现的按需反应式解决方案时。
Starting with version 5.3, a ReactiveMessageSourceProducer
is provided.
It is a combination of a provided MessageSource
and event-driven production into the configured outputChannel
.
Internally it wraps a MessageSource
into the repeatedly resubscribed Mono
producing a Flux<Message<?>>
to be subscribed in the subscribeToPublisher(Publisher<? extends Message<?>>)
mentioned above.
The subscription for this Mono
is done using Schedulers.boundedElastic()
to avoid possible blocking in the target MessageSource
.
When the message source returns null
(no data to pull), the Mono
is turned into a repeatWhenEmpty()
state with a delay
for a subsequent re-subscription based on a IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY
Duration
entry from the subscriber context.
By default, it is 1 second.
If the MessageSource
produces messages with a IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
information in the headers, it is acknowledged (if necessary) in the doOnSuccess()
of the original Mono
and rejected in the doOnError()
if the downstream flow throws a MessagingException
with the failed message to reject.
This ReactiveMessageSourceProducer
could be used for any use-case when a polling channel adapter’s features should be turned into a reactive, on demand solution for any existing MessageSource<?>
implementation.
Splitter and Aggregator
当 AbstractMessageSplitter
为其逻辑获取 Publisher
时,该进程会自然地遍历 Publisher
中的项目,并将它们映射到要发送到 outputChannel
的消息中。如果此通道是一个 ReactiveStreamsSubscribableChannel
,则来自 Publisher
的 Flux
包装器会根据该通道按需订阅,并且此分割器行为看起来更像一个 flatMap
Reactor 运算符,当我们将传入事件映射到多值输出 Publisher
时。当在分割器前后使用 FluxMessageChannel
构建整个集成流时,它最有意义,并将 Spring Integration 配置与 Reactive Streams 要求及其事件处理运算符保持一致。对于常规通道,将 Publisher
转换为 Iterable
以进行标准的迭代和生成分割逻辑。
When an AbstractMessageSplitter
gets a Publisher
for its logic, the process goes naturally over the items in the Publisher
to map them into messages for sending to the outputChannel
.
If this channel is a ReactiveStreamsSubscribableChannel
, the Flux
wrapper for the Publisher
is subscribed on demand from that channel and this splitter behavior looks more like a flatMap
Reactor operator, when we map an incoming event into multi-value output Publisher
.
It makes most sense when the whole integration flow is built with a FluxMessageChannel
before and after the splitter, aligning Spring Integration configuration with a Reactive Streams requirements and its operators for event processing.
With a regular channel, a Publisher
is converted into an Iterable
for standard iterate-and-produce splitting logic.
FluxAggregatorMessageHandler
是另一个特定 Reactive Streams 逻辑实现的示例,在 Project Reactor 方面可以将其视为 “””反应式运算符““”。它基于 Flux.groupBy()
和 Flux.window()
(或 buffer()
)运算符。传入的消息会发送到在创建 FluxAggregatorMessageHandler
时启动的 Flux.create()
中,使其成为热源。此 Flux
由 ReactiveStreamsSubscribableChannel
按需订阅,或者当 outputChannel
不是反应式时直接在 FluxAggregatorMessageHandler.start()
中订阅。当在该组件前后使用 FluxMessageChannel
构建整个集成流时,此 MessageHandler
具有自己的强大功能,使整个逻辑都能够反压就绪。
A FluxAggregatorMessageHandler
is another sample of specific Reactive Streams logic implementation which could be treated as a "reactive operator"
in terms of Project Reactor.
It is based on the Flux.groupBy()
and Flux.window()
(or buffer()
) operators.
The incoming messages are sunk into a Flux.create()
initiated when a FluxAggregatorMessageHandler
is created, making it as a hot source.
This Flux
is subscribed to by a ReactiveStreamsSubscribableChannel
on demand, or directly in the FluxAggregatorMessageHandler.start()
when the outputChannel
is not reactive.
This MessageHandler
has its power, when the whole integration flow is built with a FluxMessageChannel
before and after this component, making the whole logic back-pressure ready.
有关更多信息,请参阅 Stream and Flux Splitting和 Flux Aggregator。
See Stream and Flux Splitting and Flux Aggregator for more information.
Java DSL
Java DSL 中的 IntegrationFlow
可以从任何 Publisher
实例开始(请参见 IntegrationFlow.from(Publisher<Message<T>>)
)。此外,使用 IntegrationFlowBuilder.toReactivePublisher()
运算符,IntegrationFlow
也可以变成反应式热源。两种情况下内部都使用了 FluxMessageChannel
;它可以根据其 ReactiveStreamsSubscribableChannel
合约订阅入站 Publisher
,并且本身对于下游订阅者来说就是 Publisher<Message<?>>
。通过动态 `IntegrationFlow
An IntegrationFlow
in Java DSL can start from any Publisher
instance (see IntegrationFlow.from(Publisher<Message<T>>)
).
Also, with an IntegrationFlowBuilder.toReactivePublisher()
operator, the IntegrationFlow
can be turned into a reactive hot source.
A FluxMessageChannel
is used internally in both cases; it can subscribe to an inbound Publisher
according to its ReactiveStreamsSubscribableChannel
contract and it is a Publisher<Message<?>>
by itself for downstream subscribers.
With a dynamic IntegrationFlow
registration we can implement a powerful logic combining Reactive Streams with this integration flow bridging to/from Publisher
.
从 5.5.6 版本开始,存在一个 toReactivePublisher(boolean autoStartOnSubscribe)
运算符变体,用于控制返回的 Publisher<Message<?>>
背后的整个 IntegrationFlow
的生命周期。通常,对响应式发布者的订阅和使用发生在较后的运行时阶段,而不是在响应流合成过程中,甚至是 ApplicationContext
启动时。为了避免在 Publisher<Message<?>>
订阅点管理 IntegrationFlow
的生命周期的样板代码,并为了更好的最终用户体验,引入了带有 autoStartOnSubscribe
标志的这个新运算符。如果为 true
,它会将 IntegrationFlow
及其组件标记为 autoStartup = false
,因此 ApplicationContext
不会自动启动生产并使用流中的消息。相反,IntegrationFlow
的 start()
从内部 Flux.doOnSubscribe()
启动。无论 autoStartOnSubscribe
的值如何,流都会从 Flux.doOnCancel()
和 Flux.doOnTerminate()
停止 - 如果没有东西要消费它们,就没有道理生产消息。
Starting with version 5.5.6, a toReactivePublisher(boolean autoStartOnSubscribe)
operator variant is present to control a lifecycle of the whole IntegrationFlow
behind the returned Publisher<Message<?>>
.
Typically, the subscription and consumption from the reactive publisher happens in the later runtime phase, not during reactive stream composition, or even ApplicationContext
startup.
To avoid boilerplate code for lifecycle management of the IntegrationFlow
at the Publisher<Message<?>>
subscription point and for better end-user experience, this new operator with the autoStartOnSubscribe
flag has been introduced.
It marks (if true
) the IntegrationFlow
and its components for autoStartup = false
, so an ApplicationContext
won’t initiate production and consumption of messages in the flow automatically.
Instead, the start()
for the IntegrationFlow
is initiated from the internal Flux.doOnSubscribe()
.
Independently of the autoStartOnSubscribe
value, the flow is stopped from a Flux.doOnCancel()
and Flux.doOnTerminate()
- it does not make sense to produce messages if there is nothing to consume them.
对于完全相反的用例,当 IntegrationFlow
应该调用响应流并在完成以后继续时,IntegrationFlowDefinition
中提供了 fluxTransform()
运算符。此时流会转变成会传播到提供的 fluxFunction
中的 FluxMessageChannel
,并在 Flux.transform()
运算符中执行。函数的结果会包装到 Mono<Message<?>>
中,用于平面映射到输出 Flux
,该 Flux
会被另一个 FluxMessageChannel
订阅,以便进行下游流。
For the exact opposite use-case, when IntegrationFlow
should call a reactive stream and continue after completion, a fluxTransform()
operator is provided in the IntegrationFlowDefinition
.
The flow at this point is turned into a FluxMessageChannel
which is propagated into a provided fluxFunction
, performed in the Flux.transform()
operator.
A result of the function is wrapped into a Mono<Message<?>>
for flat-mapping into an output Flux
which is subscribed by another FluxMessageChannel
for downstream flow.
有关更多信息,请参阅 Java DSL Chapter。
See Java DSL Chapter for more information.
ReactiveMessageHandler
从 5.3 版本开始,ReactiveMessageHandler
在框架中得到原生支持。此类消息处理程序专为响应式客户端而设计,这些客户端返回一个响应式类型,用于按需订阅低级别操作执行,并且不提供任何回复数据来继续响应流合成。当在命令式集成流中使用 ReactiveMessageHandler
时,handleMessage()
结果会在返回后立即订阅,原因在于此类流中没有可以响应背压的响应流合成。在这种情况下,框架会将此 ReactiveMessageHandler
包装到 ReactiveMessageHandlerAdapter
中 - 这是 MessageHandler
的一个普通实现。然而,当 ReactiveStreamsConsumer
参与流中时(例如,当要使用的通道是 FluxMessageChannel
时),这种 ReactiveMessageHandler
会使用 flatMap()
Reactor 运算符合成到整个响应流中,以便在使用过程中响应背压。
Starting with version 5.3, the ReactiveMessageHandler
is supported natively in the framework.
This type of message handler is designed for reactive clients which return a reactive type for on-demand subscription for low-level operation execution and doesn’t provide any reply data to continue a reactive stream composition.
When a ReactiveMessageHandler
is used in the imperative integration flow, the handleMessage()
result in subscribed immediately after return, just because there is no reactive streams composition in such a flow to honor back-pressure.
In this case the framework wraps this ReactiveMessageHandler
into a ReactiveMessageHandlerAdapter
- a plain implementation of MessageHandler
.
However, when a ReactiveStreamsConsumer
is involved in the flow (e.g. when channel to consume is a FluxMessageChannel
), such a ReactiveMessageHandler
is composed to the whole reactive stream with a flatMap()
Reactor operator to honor back-pressure during consumption.
开箱即用的 ReactiveMessageHandler`实现之一是出站通道适配器的 `ReactiveMongoDbStoringMessageHandler
。有关更多信息,请参阅 MongoDB Reactive Channel Adapters。
One of the out-of-the-box ReactiveMessageHandler
implementation is a ReactiveMongoDbStoringMessageHandler
for Outbound Channel Adapter.
See MongoDB Reactive Channel Adapters for more information.
从 6.1 版本开始,IntegrationFlowDefinition
公开了一个方便的 handleReactive(ReactiveMessageHandler)
终端运算符。任何 ReactiveMessageHandler
实现(甚至只是使用 Mono
API 的普通 lambda)都可以用于此运算符。框架会自动订阅返回的 Mono<Void>
。以下是此运算符可能的配置的一个简单示例:
Starting with version 6.1, the IntegrationFlowDefinition
exposes a convenient handleReactive(ReactiveMessageHandler)
terminal operator.
Any ReactiveMessageHandler
implementation (even just a plain lambda using the Mono
API) can be used for this operator.
The framework subscribes to the returned Mono<Void>
automatically.
Here is a simple sample of possible configuration for this operator:
@Bean
public IntegrationFlow wireTapFlow1() {
return IntegrationFlow.from("tappedChannel1")
.wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo")))
.handleReactive((message) -> Mono.just(message).log().then());
}
此运算符的一个重载版本会接受一个 Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>>
,用于围绕提供的 ReactiveMessageHandler
自定消费端点。
An overloaded version of this operator accepts a Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>>
to customize a consumer endpoint around the provided ReactiveMessageHandler
.
此外,还提供基于 ReactiveMessageHandlerSpec
的变体。在大多数情况下,它们用于特定于协议的通道适配器实现。请参阅下一节,了解指向目标技术的链接以及各个反应式通道适配器。
In addition, a ReactiveMessageHandlerSpec
-based variants are also provided.
In most cases they are used for protocol-specific channel adapter implementations.
See the next section following links to the target technologies with respective reactive channel adapters.
Reactive Channel Adapters
当集成目标协议提供 Reactive Streams 解决方案时,在 Spring Integration 中实现通道适配器就变得非常简单。
When the target protocol for integration provides a Reactive Streams solution, it becomes straightforward to implement channel adapters in Spring Integration.
入站的、事件驱动的通道适配器实现是,将请求(如果需要)包装到一个延迟的 Mono
或 Flux
中,并且仅在协议组件启动对侦听器方法返回的 Mono
的订阅时才执行发送(并产生答复,如果有)。这样,我们在该组件中封装了完全的反应式流解决方案。当然,订阅输出通道的下游集成流应遵循 Reactive Streams 规范,并且要以按需、支持背压的方式执行。
An inbound, event-driven channel adapter implementation is about wrapping a request (if necessary) into a deferred Mono
or Flux
and perform a send (and produce reply, if any) only when a protocol component initiates a subscription into a Mono
returned from the listener method.
This way we have a reactive stream solution encapsulated exactly in this component.
Of course, downstream integration flow subscribed on the output channel should honor Reactive Streams specification and be performed in the on demand, back-pressure ready manner.
这在集成流中使用的 MessageHandler
处理器的本质(或当前实现)中并不可用。当没有反应式实现时,可以再集成端点之前和之后使用线程池和队列或 FluxMessageChannel
(见上文)来处理此限制。
This is not always available by the nature (or with the current implementation) of MessageHandler
processor used in the integration flow.
This limitation can be handled using thread pools and queues or FluxMessageChannel
(see above) before and after integration endpoints when there is no reactive implementation.
一个反应式*事件驱动*入站通道适配器的示例:
An example for a reactive event-driven inbound channel adapter:
public class CustomReactiveMessageProducer extends MessageProducerSupport {
private final CustomReactiveSource customReactiveSource;
public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
this.customReactiveSource = customReactiveSource;
}
@Override
protected void doStart() {
Flux<Message<?>> messageFlux =
this.customReactiveSource
.map(event - >
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
subscribeToPublisher(messageFlux);
}
}
用法如下所示:
Usage would look like:
public class MainFlow {
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(customReactiveMessageProducer)
.channel(outputChannel)
.get();
}
}
或以声明式方式:
Or in a declarative way:
public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
.handle(outputChannel)
.get();
}
}
或者,甚至无需通道适配器,我们始终可以用以下方式使用 Java DSL:
Or even without a channel adapter, we can always use the Java DSL in the following way:
public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
Flux<Message<?>> myFlux = this.customReactiveSource
.map(event ->
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
return IntegrationFlow.from(myFlux)
.handle(outputChannel)
.get();
}
}
一个反应式出站通道适配器的实现是根据目标协议的反应式 API 启动(或继续)一个反应式流,以与外部系统进行交互。入站有效负载可能本来就是一个反应式类型,也可能是整个集成流的事件,该事件是上方的反应式流的一部分。如果我们处于单向的、即发即忘的情景,则返回的反应式类型可以立即订阅;或者,它会下游传播(请求 - 答复场景),以供在目标业务逻辑中进一步集成流或明确订阅,但下游仍然保留反应式流语义。
A reactive outbound channel adapter implementation is about the initiation (or continuation) of a reactive stream to interaction with an external system according to the provided reactive API for the target protocol. An inbound payload could be a reactive type per se or as an event of the whole integration flow which is a part of the reactive stream on top. A returned reactive type can be subscribed immediately if we are in a one-way, fire-and-forget scenario, or it is propagated downstream (request-reply scenarios) for further integration flow or an explicit subscription in the target business logic, but still downstream preserving reactive streams semantics.
一个反应式出站通道适配器的示例:
An example for a reactive outbound channel adapter:
public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {
private final CustomEntityOperations customEntityOperations;
public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
this.customEntityOperations = customEntityOperations;
}
@Override
protected Mono<Void> handleMessageInternal(Message<?> message) {
return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
.flatMap(mode -> {
switch (mode) {
case INSERT:
return handleInsert(message);
case UPDATE:
return handleUpdate(message);
default:
return Mono.error(new IllegalArgumentException());
}
}).then();
}
private Mono<Void> handleInsert(Message<?> message) {
return this.customEntityOperations.insert(message.getPayload())
.then();
}
private Mono<Void> handleUpdate(Message<?> message) {
return this.r2dbcEntityOperations.update(message.getPayload())
.then();
}
public enum Type {
INSERT,
UPDATE,
}
}
我们能够使用这两个通道适配器:
We will be able to use both of the channel adapters:
public class MainFlow {
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;
@Autowired
private CustomReactiveMessageHandler customReactiveMessageHandler;
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(customReactiveMessageProducer)
.transform(someOperation)
.handle(customReactiveMessageHandler)
.get();
}
}
目前,Spring Integration 为 WebFlux、RSocket、MongoDb、R2DBC、ZeroMQ、GraphQL和 Apache Cassandra提供了通道适配器(或网关)实现。Redis Stream Channel Adapters也具有响应能力,并使用来自 Spring Data 的 ReactiveStreamOperations
。有更多响应通道适配器即将推出,例如用于 Kafka的 Apache Kafka,它是基于 `ReactiveKafkaProducerTemplate`和 `ReactiveKafkaConsumerTemplate`以及 Spring for Apache Kafka中的其他通道适配器。对于许多其他非响应通道适配器,建议使用线程池来避免在响应流处理期间发生阻塞。
Currently, Spring Integration provides channel adapter (or gateway) implementations for WebFlux, RSocket, MongoDb, R2DBC, ZeroMQ, GraphQL, Apache Cassandra.
The Redis Stream Channel Adapters are also reactive and uses ReactiveStreamOperations
from Spring Data.
More reactive channel adapters are coming, for example for Apache Kafka in Kafka based on the ReactiveKafkaProducerTemplate
and ReactiveKafkaConsumerTemplate
from Spring for Apache Kafka etc.
For many other non-reactive channel adapters thread pools are recommended to avoid blocking during reactive stream processing.
Reactive to Imperative Context Propagation
当类路径上存在 Context Propagation库时,Project Reactor 可以获取 ThreadLocal`值(例如, Micrometer Observation或 `SecurityContextHolder
)并将它们存储到 Subscriber`上下文中。当我们需要为跟踪填充一个记录 MDC 或让从响应式流调用的服务从作用域中还原观察结果时,也可能执行相反的操作。有关上下文字符传播的特殊运算符,请参阅有关 Project Reactor documentation的更多信息。如果我们的整个解决方案是单个响应式流组合,则上下文字符的存储和还原将顺利进行,因为 `Subscriber`上下文从下游直到组合开始(`Flux`或 `Mono
)都是可见的。但是,如果应用程序在不同的 `Flux`实例间或切换成命令式处理并返回,则与 `Subscriber`关联的上下文可能不可用。对于此类用例,Spring Integration 提供了一个附加功能(从 `6.0.5`版本开始),可以将 Reactor `ContextView`存储到从响应式流产生的 `IntegrationMessageHeaderAccessor.REACTOR_CONTEXT`消息头中,例如,当我们执行直接 `send()`操作时。然后在 `FluxMessageChannel.subscribeTo()`中使用此标头,以便此通道要发出的 `Message`的 Reactor 上下文。目前,此标头是通过 `WebFluxInboundEndpoint`和 `RSocketInboundGateway`组件填充的,但可以在执行响应式到命令式集成的任何解决方案中使用。填充此标头的逻辑如下:
When the Context Propagation library is on the classpath, the Project Reactor can take ThreadLocal
values (e.g. Micrometer Observation or SecurityContextHolder
) and store them into a Subscriber
context.
The opposite operation is also possible, when we need to populate a logging MDC for tracing or let services we call from the reactive stream to restore an observation from the scope.
See more information in Project Reactor documentation about its special operators for context propagation.
The storing and restoring context works smoothly if our whole solution is a single reactive stream composition since a Subscriber
context is visible from downstream up to the beginning of the composition(Flux
or Mono
).
But, if the application switches between different Flux
instances or into imperative processing and back, then the context tied to the Subscriber
might not be available.
For such a use case, Spring Integration provides an additional capability (starting with version 6.0.5
) to store a Reactor ContextView
into the IntegrationMessageHeaderAccessor.REACTOR_CONTEXT
message header produced from the reactive stream, e.g. when we perform direct send()
operation.
This header is used then in the FluxMessageChannel.subscribeTo()
to restore a Reactor context for the Message
that this channel is going to emit.
Currently, this header is populated from the WebFluxInboundEndpoint
and RSocketInboundGateway
components, but can be used in any solution where reactive to imperative integration is performed.
The logic to populate this header is like this:
return requestMono
.flatMap((message) ->
Mono.deferContextual((context) ->
Mono.just(message)
.handle((messageToSend, sink) ->
send(messageWithReactorContextIfAny(messageToSend, context)))));
...
private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
if (!context.isEmpty()) {
return getMessageBuilderFactory()
.fromMessage(message)
.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
.build();
}
return message;
}
请注意,我们仍然需要使用 handle()
操作符,以便 Reactor 从上下文中恢复 ThreadLocal
值。即使将其作为标头发送,框架也无法假设是否将其恢复到下游的 ThreadLocal
值上。
Note, that we still need to use a handle()
operator to make Reactor restore ThreadLocal
values from the context.
Even if it is sent as a header, the framework cannot make an assumption if it is going to be to restore onto ThreadLocal
values downstream.
要从另一个 Flux
或 Mono
组合中的 Message
恢复上下文,可以执行此逻辑:
To restore the context from a Message
on the other Flux
or Mono
composition, this logic can be performed:
Mono.just(message)
.handle((messageToHandle, sink) -> ...)
.contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)));