Message Channel Implementations
Spring 集成提供了不同的消息通道实现。以下各节将对每种实现进行简要说明。
Spring Integration provides different message channel implementations. The following sections briefly describe each one.
PublishSubscribeChannel
PublishSubscribeChannel
实现将发送给它的任何 Message
广播到其所有订阅的处理程序。这最常用于发送事件消息,其主要作用是通知(与文档消息相反,文档消息通常由单个处理程序处理)。请注意,PublishSubscribeChannel
仅用于发送。由于在调用其 send(Message)
方法后直接广播到其订阅者,因此使用者无法轮询消息(它不实现 PollableChannel
,因此没有 receive()
方法)。相反,任何订阅者本身都必须是 MessageHandler
,并且依次调用订阅者的 handleMessage(Message)
方法。
The PublishSubscribeChannel
implementation broadcasts any Message
sent to it to all of its subscribed handlers.
This is most often used for sending event messages, whose primary role is notification (as opposed to document messages, which are generally intended to be processed by a single handler).
Note that the PublishSubscribeChannel
is intended for sending only.
Since it broadcasts to its subscribers directly when its send(Message)
method is invoked, consumers cannot poll for messages (it does not implement PollableChannel
and therefore has no receive()
method).
Instead, any subscriber must itself be a MessageHandler
, and the subscriber’s handleMessage(Message)
method is invoked in turn.
在 3.0 版之前,调用没有订阅者的 PublishSubscribeChannel
上的 send
方法会返回 false
。当与 MessagingTemplate
结合使用时,会抛出 MessageDeliveryException
。从 3.0 版开始,该行为已更改,只要存在最低订阅者(并成功处理消息),send
始终被视为成功。可以通过设置默认值为 0
的 minSubscribers
属性来修改此行为。
Prior to version 3.0, invoking the send
method on a PublishSubscribeChannel
that had no subscribers returned false
.
When used in conjunction with a MessagingTemplate
, a MessageDeliveryException
was thrown.
Starting with version 3.0, the behavior has changed such that a send
is always considered successful if at least the minimum subscribers are present (and successfully handle the message).
This behavior can be modified by setting the minSubscribers
property, which defaults to 0
.
如果你使用 |
If you use a |
QueueChannel
QueueChannel
实现封装了一个队列。与 PublishSubscribeChannel
不同,QueueChannel
具有点对点语义。换句话说,即使该信道有多个消费者,也只有一个消费者应该接收发送到该信道的任何 Message
。它提供了一个默认的无参数构造函数(提供 Integer.MAX_VALUE
的基本无限容量),以及一个接受队列容量的构造函数,如下表所示:
The QueueChannel
implementation wraps a queue.
Unlike the PublishSubscribeChannel
, the QueueChannel
has point-to-point semantics.
In other words, even if the channel has multiple consumers, only one of them should receive any Message
sent to that channel.
It provides a default no-argument constructor (providing an essentially unbounded capacity of Integer.MAX_VALUE
) as well as a constructor that accepts the queue capacity, as the following listing shows:
public QueueChannel(int capacity)
尚未达到其容量限制的信道会将其消息保存在其内部队列中,并且 send(Message<?>)
方法会立即返回,即使没有接收方准备处理该消息。如果队列已达到容量,则发送方会阻塞直到队列中有可用空间。或者,如果你使用具有其他超时参数的发送方法,则队列会阻塞,直到可用空间或超时时段过去,以先发生的为准。同样,如果队列中有消息可用,receive()
调用会立即返回,但如果队列为空,则接收调用可能会阻塞,直到有消息可用或超时(如果提供)过去。在这两种情况下,都可以通过传递超时值为 0 来强制立即返回,而不管队列的状态如何。但是,请注意,对没有 timeout
参数的 send()
和 receive()
版本的调用会无限期地阻塞。
A channel that has not reached its capacity limit stores messages in its internal queue, and the send(Message<?>)
method returns immediately, even if no receiver is ready to handle the message.
If the queue has reached capacity, the sender blocks until room is available in the queue.
Alternatively, if you use the send method that has an additional timeout parameter, the queue blocks until either room is available or the timeout period elapses, whichever occurs first.
Similarly, a receive()
call returns immediately if a message is available on the queue, but, if the queue is empty, then a receive call may block until either a message is available or the timeout, if provided, elapses.
In either case, it is possible to force an immediate return regardless of the queue’s state by passing a timeout value of 0.
Note, however, that calls to the versions of send()
and receive()
with no timeout
parameter block indefinitely.
PriorityChannel
虽然 QueueChannel
强制先进先出 (FIFO) 排序,但 PriorityChannel
是一个另一种实现,它允许根据优先级对信道内的消息进行排序。默认情况下,优先级由每条消息内的 priority
标头确定。但是,对于自定义优先级确定逻辑,可以将类型为 Comparator<Message<?>>
的比较器提供给 PriorityChannel
构造函数。
Whereas the QueueChannel
enforces first-in-first-out (FIFO) ordering, the PriorityChannel
is an alternative implementation that allows for messages to be ordered within the channel based upon a priority.
By default, the priority is determined by the priority
header within each message.
However, for custom priority determination logic, a comparator of type Comparator<Message<?>>
can be provided to the PriorityChannel
constructor.
RendezvousChannel
RendezvousChannel
启用了一个“直接移交”场景,其中发送方会阻塞直到另一方调用信道的 receive()
方法。另一方会阻塞直到发送方发送消息。在内部,此实现与 QueueChannel
非常相似,只不过它使用 SynchronousQueue
(BlockingQueue
的零容量实现)。在发送方和接收方在不同的线程中运行但异步将消息放入队列中不合适的情况下,此方法效果很好。换句话说,对于 RendezvousChannel
,发送方知道某个接收方已接受消息,而对于 QueueChannel
,消息会存储到内部队列中,而且可能永远不会接收。
The RendezvousChannel
enables a “direct-handoff” scenario, wherein a sender blocks until another party invokes the channel’s receive()
method.
The other party blocks until the sender sends the message.
Internally, this implementation is quite similar to the QueueChannel
, except that it uses a SynchronousQueue
(a zero-capacity implementation of BlockingQueue
).
This works well in situations where the sender and receiver operate in different threads, but asynchronously dropping the message in a queue is not appropriate.
In other words, with a RendezvousChannel
, the sender knows that some receiver has accepted the message, whereas with a QueueChannel
, the message would have been stored to the internal queue and potentially never received.
请记住,默认情况下,所有这些基于队列的通道仅将消息存储在内存中。当需要持久性时,你可以在“queue”元素中提供一个“message-store”属性以引用一个持久的 |
Keep in mind that all of these queue-based channels are storing messages in-memory only by default.
When persistence is required, you can either provide a 'message-store' attribute within the 'queue' element to reference a persistent |
RendezvousChannel
对于实现请求-答复操作也很有用。发送方可以创建 RendezvousChannel
的临时匿名实例,然后在构建 Message
时将其设置为“replyChannel”标头。在发送該 Message
之后,发送方可以立即调用 receive
(可以选择提供一个超时值)以便阻塞,同时等待答复 Message
。这与 Spring Integration 中许多请求-答复组件在内部使用的实现非常相似。
The RendezvousChannel
is also useful for implementing request-reply operations.
The sender can create a temporary, anonymous instance of RendezvousChannel
, which it then sets as the 'replyChannel' header when building a Message
.
After sending that Message
, the sender can immediately call receive
(optionally providing a timeout value) in order to block while waiting for a reply Message
.
This is very similar to the implementation used internally by many of Spring Integration’s request-reply components.
DirectChannel
DirectChannel
具有点对点语义,但其他方面与 PublishSubscribeChannel
而不是前面描述的基于队列的任何信道实现更相似。它实现 SubscribableChannel
接口,而不是 PollableChannel
接口,所以它直接将消息发送给订阅者。但是,作为一个点对点信道,它与 PublishSubscribeChannel
不同,因为它将每个 Message
发送给单个订阅的 MessageHandler
。
The DirectChannel
has point-to-point semantics but otherwise is more similar to the PublishSubscribeChannel
than any of the queue-based channel implementations described earlier.
It implements the SubscribableChannel
interface instead of the PollableChannel
interface, so it dispatches messages directly to a subscriber.
As a point-to-point channel, however, it differs from the PublishSubscribeChannel
in that it sends each Message
to a single subscribed MessageHandler
.
除了是最简单的点对点信道选项之外,它最重要的一个特点是它使单个线程能够在信道的“双方”执行操作。例如,如果一个处理程序订阅了一个 DirectChannel
,则向该信道发送一个 Message
会触发直接在发送者的线程中调用该处理程序的 handleMessage(Message)
方法,然后 send()
方法调用才能返回。
In addition to being the simplest point-to-point channel option, one of its most important features is that it enables a single thread to perform the operations on “both sides” of the channel.
For example, if a handler subscribes to a DirectChannel
, then sending a Message
to that channel triggers invocation of that handler’s handleMessage(Message)
method directly in the sender’s thread, before the send()
method invocation can return.
提供具有此行为的信道实现的主要动机是支持必须跨信道的事务,同时仍能从信道提供的抽象和松散耦合中受益。如果在事务的范围内调用 send()
调用,则处理程序的调用的结果(例如,更新数据库记录)会在确定该事务的最终结果(提交或回滚)中发挥作用。
The key motivation for providing a channel implementation with this behavior is to support transactions that must span across the channel while still benefiting from the abstraction and loose coupling that the channel provides.
If the send()
call is invoked within the scope of a transaction, the outcome of the handler’s invocation (for example, updating a database record) plays a role in determining the ultimate result of that transaction (commit or rollback).
由于 |
Since the |
DirectChannel
在内部委派给消息分配器以调用其订阅的消息处理程序,并且该分配器可以具有由 load-balancer
或 load-balancer-ref
属性(互斥)公开的负载均衡策略。当多个消息处理程序订阅同一个信道时,消息分配器会使用负载均衡策略来帮助确定如何将消息分配给消息处理程序。为了方便起见,load-balancer
属性公开了指向 LoadBalancingStrategy
预先存在实现的值的枚举。round-robin
(在处理程序中轮流负载均衡)和 none
(对于希望显式禁用负载均衡的情况)是仅有的可用值。可以在以后的版本中添加其他策略实现。但是,从 3.0 版本开始,你可以提供自己的 LoadBalancingStrategy
实现并使用 load-balancer-ref
属性注入它,该属性应指向实现 LoadBalancingStrategy
的 bean,如下例所示:
The DirectChannel
internally delegates to a message dispatcher to invoke its subscribed message handlers, and that dispatcher can have a load-balancing strategy exposed by load-balancer
or load-balancer-ref
attributes (mutually exclusive).
The load balancing strategy is used by the message dispatcher to help determine how messages are distributed amongst message handlers when multiple message handlers subscribe to the same channel.
As a convenience, the load-balancer
attribute exposes an enumeration of values pointing to pre-existing implementations of LoadBalancingStrategy
.
A round-robin
(load-balances across the handlers in rotation) and none
(for the cases where one wants to explicitly disable load balancing) are the only available values.
Other strategy implementations may be added in future versions.
However, since version 3.0, you can provide your own implementation of the LoadBalancingStrategy
and inject it by using the load-balancer-ref
attribute, which should point to a bean that implements LoadBalancingStrategy
, as the following example shows:
FixedSubscriberChannel
是一个 SubscribableChannel
,它只支持不可取消订阅的单个 MessageHandler
订阅者。当不涉及其他订阅者且不需要信道拦截器时,这对于高吞吐量性能用例非常有用。
A FixedSubscriberChannel
is a SubscribableChannel
that only supports a single MessageHandler
subscriber that cannot be unsubscribed.
This is useful for high-throughput performance use-cases when no other subscribers are involved and no channel interceptors are needed.
<int:channel id="lbRefChannel">
<int:dispatcher load-balancer-ref="lb"/>
</int:channel>
<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>
请注意,load-balancer
和 load-balancer-ref
属性是互斥的。
Note that the load-balancer
and load-balancer-ref
attributes are mutually exclusive.
负载均衡还会与布尔 failover
属性结合使用。如果 failover
的值为 true(默认值),则当前面的处理程序抛出异常时,分配器会回退到任何后续处理程序(视需要而定)。顺序由处理程序上定义的选择性顺序值确定或(如果没有此类值)由处理程序订阅的顺序确定。
The load-balancing also works in conjunction with a boolean failover
property.
If the failover
value is true (the default), the dispatcher falls back to any subsequent handlers (as necessary) when preceding handlers throw exceptions.
The order is determined by an optional order value defined on the handlers themselves or, if no such value exists, the order in which the handlers subscribed.
如果在出现错误时某些情况需要分配器总是尝试调用第一个处理程序然后按相同的固定顺序退回,则不应提供负载均衡策略。换句话说,即使未启用负载均衡,分配器仍支持 failover
布尔属性。但是,如果没有负载均衡,对处理程序的调用总从第一个开始,按照它们的顺序。例如,当对主、次、三等进行明确定义时,此方法效果很好。使用命名空间支持时,任何端点上的 order
属性都会确定顺序。
If a certain situation requires that the dispatcher always try to invoke the first handler and then fall back in the same fixed order sequence every time an error occurs, no load-balancing strategy should be provided.
In other words, the dispatcher still supports the failover
boolean property even when no load-balancing is enabled.
Without load-balancing, however, the invocation of handlers always begins with the first, according to their order.
For example, this approach works well when there is a clear definition of primary, secondary, tertiary, and so on.
When using the namespace support, the order
attribute on any endpoint determines the order.
请记住,负载平衡和 |
Keep in mind that load-balancing and |
从 5.2 版本开始,如果 failover
为 true,当前处理程序的故障以及已失败的消息会分别记录在 debug
或 info
下(如果已配置)。
Starting with version 5.2, when failover
is true, a failure of the current handler together with the failed message is logged under debug
or info
if configured respectively.
ExecutorChannel
ExecutorChannel
是一个点对点信道,它支持与 DirectChannel
相同的分配器配置(负载均衡策略和 failover
布尔属性)。这两种分发信道类型之间的主要区别在于,ExecutorChannel
委派给 TaskExecutor
的实例来执行分发。这意味着 send
方法通常不会阻塞,但这也意味着处理程序调用可能不会在发送方的线程中发生。因此,它不支持跨发送方和接收方处理程序的事务。
The ExecutorChannel
is a point-to-point channel that supports the same dispatcher configuration as DirectChannel
(load-balancing strategy and the failover
boolean property).
The key difference between these two dispatching channel types is that the ExecutorChannel
delegates to an instance of TaskExecutor
to perform the dispatch.
This means that the send method typically does not block, but it also means that the handler invocation may not occur in the sender’s thread.
It therefore does not support transactions that span the sender and receiving handler.
发件人有时可以阻止。例如,在使用 TaskExecutor
并采用限制客户端(如 ThreadPoolExecutor.CallerRunsPolicy
)的拒绝策略时,当线程池处于最大容量并且执行程序工作队列已满时,发件人线程随时可以执行该方法。由于那种情况只会以不可预测的方式发生,所以你不应该依靠它来进行事务。
The sender can sometimes block.
For example, when using a TaskExecutor
with a rejection policy that throttles the client (such as the ThreadPoolExecutor.CallerRunsPolicy
), the sender’s thread can execute the method any time the thread pool is at its maximum capacity and the executor’s work queue is full.
Since that situation would only occur in a non-predictable way, you should not rely upon it for transactions.
PartitionedChannel
从 6.1 版本开始,提供了一个 PartitionedChannel
实现。这是 AbstractExecutorChannel
的扩展,它表示点对点分发逻辑,其中根据从发送到此信道的消息评估的分区键处理实际消耗。此信道与上面提到的 ExecutorChannel
类似,但不同之处在于具有相同分区键的消息始终在同一线程中处理,从而保留了排序。它不需要外部 TaskExecutor
,但可以配置自定义 ThreadFactory
(例如,Thread.ofVirtual().name("partition-", 0).factory()
)。此工厂用于将单线程执行器填充到 MessageDispatcher
委托,每个分区一个。默认情况下,IntegrationMessageHeaderAccessor.CORRELATION_ID
消息标头用作分区键。此信道可以配置为一个简单的 bean:
Starting with version 6.1, a PartitionedChannel
implementation is provided.
This is an extension of AbstractExecutorChannel
and represents point-to-point dispatching logic where the actual consumption is processed on a specific thread, determined by the partition key evaluated from a message sent to this channel.
This channel is similar to the ExecutorChannel
mentioned above, but with the difference that messages with the same partition key are always handled in the same thread, preserving ordering.
It does not require an external TaskExecutor
, but can be configured with a custom ThreadFactory
(e.g. Thread.ofVirtual().name("partition-", 0).factory()
).
This factory is used to populate single-thread executors into a MessageDispatcher
delegate, per partition.
By default, the IntegrationMessageHeaderAccessor.CORRELATION_ID
message header is used as the partition key.
This channel can be configured as a simple bean:
@Bean
PartitionedChannel somePartitionedChannel() {
return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}
该信道将有 3
个分区,专门的线程;将使用 partitionKey
标头确定消息将在哪个分区中处理。有关更多信息,请参阅 PartitionedChannel
类 Javadoc。
The channel will have 3
partitions - dedicated threads; will use the partitionKey
header to determine in which partition the message will be handled.
See PartitionedChannel
class Javadocs for more information.
FluxMessageChannel
FluxMessageChannel
是一种 org.reactivestreams.Publisher
实现,用于将发送的消息“下沉”到内部 reactor.core.publisher.Flux
中以供下游的反应式订阅者按需消费。此信道实现既不是 SubscribableChannel
,也不是 PollableChannel
,因此只能使用 org.reactivestreams.Subscriber
实例从该信道消费,从而遵循反应式流的背压性质。另一方面,FluxMessageChannel
使用 subscribeTo(Publisher<Message<?>>)
合约实现 ReactiveStreamsSubscribableChannel
,该合约允许从反应式源发布器接收事件,从而将反应式流桥接到了集成流中。为了实现整个集成流的完全反应式行为,此类信道必须放置在流中的所有端点之间。
The FluxMessageChannel
is an org.reactivestreams.Publisher
implementation for "sinking"
sent messages into an internal reactor.core.publisher.Flux
for on demand consumption by reactive subscribers downstream.
This channel implementation is neither a SubscribableChannel
, nor a PollableChannel
, so only org.reactivestreams.Subscriber
instances can be used to consume from this channel honoring back-pressure nature of reactive streams.
On the other hand, the FluxMessageChannel
implements a ReactiveStreamsSubscribableChannel
with its subscribeTo(Publisher<Message<?>>)
contract allowing receiving events from reactive source publishers, bridging a reactive stream into the integration flow.
To achieve fully reactive behavior for the whole integration flow, such a channel must be placed between all the endpoints in the flow.
请参见 Reactive Streams Support 以了解有关与 Reactive Streams 交互的更多信息。
See Reactive Streams Support for more information about interaction with Reactive Streams.
Scoped Channel
Spring Integration 1.0 提供了 ThreadLocalChannel 实现,但已经从 2.0 中移除。现在,处理相同要求的更常规方式是向频道添加一个范围属性。属性的值可以是上下文中可用的范围的名称。例如,在 Web 环境中,某些范围可用,任何自定义范围实现都可以向上下文注册。以下示例显示了应用于频道(包括注册范围本身)的线程本地范围:
Spring Integration 1.0 provided a ThreadLocalChannel
implementation, but that has been removed as of 2.0.
Now the more general way to handle the same requirement is to add a scope
attribute to a channel.
The value of the attribute can be the name of a scope that is available within the context.
For example, in a web environment, certain scopes are available, and any custom scope implementations can be registered with the context.
The following example shows a thread-local scope being applied to a channel, including the registration of the scope itself:
<int:channel id="threadScopedChannel" scope="thread">
<int:queue />
</int:channel>
<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
<property name="scopes">
<map>
<entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
</map>
</property>
</bean>
先前示例中定义的频道也在内部向队列委派,但频道绑定到当前线程,因此队列的内容也类似地绑定。这样,发送到频道的线程稍后就可以接收相同的消息,但其他线程无法访问它们。虽然很少需要线程范围的频道,但当 DirectChannel 实例被用于强制执行单个操作线程但任何答复消息都应发送到“终端”频道时,它们会很有用。如果该终端频道是线程范围内的,则原始发送线程可以从终端频道收集其答复。
The channel defined in the previous example also delegates to a queue internally, but the channel is bound to the current thread, so the contents of the queue are similarly bound.
That way, the thread that sends to the channel can later receive those same messages, but no other thread would be able to access them.
While thread-scoped channels are rarely needed, they can be useful in situations where DirectChannel
instances are being used to enforce a single thread of operation but any reply messages should be sent to a “terminal” channel.
If that terminal channel is thread-scoped, the original sending thread can collect its replies from the terminal channel.
现在,由于任何频道都可以有范围,因此除了线程本地以外,您还可以定义自己的范围。
Now, since any channel can be scoped, you can define your own scopes in addition to thread-Local.