Message Channel Implementations

Spring 集成提供了不同的消息通道实现。以下各节将对每种实现进行简要说明。

Spring Integration provides different message channel implementations. The following sections briefly describe each one.

PublishSubscribeChannel

PublishSubscribeChannel 实现将发送给它的任何 Message 广播到其所有订阅的处理程序。这最常用于发送事件消息,其主要作用是通知(与文档消息相反,文档消息通常由单个处理程序处理)。请注意,PublishSubscribeChannel 仅用于发送。由于在调用其 send(Message) 方法后直接广播到其订阅者,因此使用者无法轮询消息(它不实现 PollableChannel,因此没有 receive() 方法)。相反,任何订阅者本身都必须是 MessageHandler,并且依次调用订阅者的 handleMessage(Message) 方法。

The PublishSubscribeChannel implementation broadcasts any Message sent to it to all of its subscribed handlers. This is most often used for sending event messages, whose primary role is notification (as opposed to document messages, which are generally intended to be processed by a single handler). Note that the PublishSubscribeChannel is intended for sending only. Since it broadcasts to its subscribers directly when its send(Message) method is invoked, consumers cannot poll for messages (it does not implement PollableChannel and therefore has no receive() method). Instead, any subscriber must itself be a MessageHandler, and the subscriber’s handleMessage(Message) method is invoked in turn.

在 3.0 版之前,调用没有订阅者的 PublishSubscribeChannel 上的 send 方法会返回 false。当与 MessagingTemplate 结合使用时,会抛出 MessageDeliveryException。从 3.0 版开始,该行为已更改,只要存在最低订阅者(并成功处理消息),send 始终被视为成功。可以通过设置默认值为 0minSubscribers 属性来修改此行为。

Prior to version 3.0, invoking the send method on a PublishSubscribeChannel that had no subscribers returned false. When used in conjunction with a MessagingTemplate, a MessageDeliveryException was thrown. Starting with version 3.0, the behavior has changed such that a send is always considered successful if at least the minimum subscribers are present (and successfully handle the message). This behavior can be modified by setting the minSubscribers property, which defaults to 0.

如果你使用 TaskExecutor,只有正确的订阅者数量的存在才用于此判定,因为消息的实际处理是异步进行的。

If you use a TaskExecutor, only the presence of the correct number of subscribers is used for this determination, because the actual handling of the message is performed asynchronously.

QueueChannel

QueueChannel 实现封装了一个队列。与 PublishSubscribeChannel 不同,QueueChannel 具有点对点语义。换句话说,即使该信道有多个消费者,也只有一个消费者应该接收发送到该信道的任何 Message。它提供了一个默认的无参数构造函数(提供 Integer.MAX_VALUE 的基本无限容量),以及一个接受队列容量的构造函数,如下表所示:

The QueueChannel implementation wraps a queue. Unlike the PublishSubscribeChannel, the QueueChannel has point-to-point semantics. In other words, even if the channel has multiple consumers, only one of them should receive any Message sent to that channel. It provides a default no-argument constructor (providing an essentially unbounded capacity of Integer.MAX_VALUE) as well as a constructor that accepts the queue capacity, as the following listing shows:

public QueueChannel(int capacity)

尚未达到其容量限制的信道会将其消息保存在其内部队列中,并且 send(Message<?>) 方法会立即返回,即使没有接收方准备处理该消息。如果队列已达到容量,则发送方会阻塞直到队列中有可用空间。或者,如果你使用具有其他超时参数的发送方法,则队列会阻塞,直到可用空间或超时时段过去,以先发生的为准。同样,如果队列中有消息可用,receive() 调用会立即返回,但如果队列为空,则接收调用可能会阻塞,直到有消息可用或超时(如果提供)过去。在这两种情况下,都可以通过传递超时值为 0 来强制立即返回,而不管队列的状态如何。但是,请注意,对没有 timeout 参数的 send()receive() 版本的调用会无限期地阻塞。

A channel that has not reached its capacity limit stores messages in its internal queue, and the send(Message<?>) method returns immediately, even if no receiver is ready to handle the message. If the queue has reached capacity, the sender blocks until room is available in the queue. Alternatively, if you use the send method that has an additional timeout parameter, the queue blocks until either room is available or the timeout period elapses, whichever occurs first. Similarly, a receive() call returns immediately if a message is available on the queue, but, if the queue is empty, then a receive call may block until either a message is available or the timeout, if provided, elapses. In either case, it is possible to force an immediate return regardless of the queue’s state by passing a timeout value of 0. Note, however, that calls to the versions of send() and receive() with no timeout parameter block indefinitely.

PriorityChannel

虽然 QueueChannel 强制先进先出 (FIFO) 排序,但 PriorityChannel 是一个另一种实现,它允许根据优先级对信道内的消息进行排序。默认情况下,优先级由每条消息内的 priority 标头确定。但是,对于自定义优先级确定逻辑,可以将类型为 Comparator<Message<?>> 的比较器提供给 PriorityChannel 构造函数。

Whereas the QueueChannel enforces first-in-first-out (FIFO) ordering, the PriorityChannel is an alternative implementation that allows for messages to be ordered within the channel based upon a priority. By default, the priority is determined by the priority header within each message. However, for custom priority determination logic, a comparator of type Comparator<Message<?>> can be provided to the PriorityChannel constructor.

RendezvousChannel

RendezvousChannel 启用了一个“直接移交”场景,其中发送方会阻塞直到另一方调用信道的 receive() 方法。另一方会阻塞直到发送方发送消息。在内部,此实现与 QueueChannel 非常相似,只不过它使用 SynchronousQueueBlockingQueue 的零容量实现)。在发送方和接收方在不同的线程中运行但异步将消息放入队列中不合适的情况下,此方法效果很好。换句话说,对于 RendezvousChannel,发送方知道某个接收方已接受消息,而对于 QueueChannel,消息会存储到内部队列中,而且可能永远不会接收。

The RendezvousChannel enables a “direct-handoff” scenario, wherein a sender blocks until another party invokes the channel’s receive() method. The other party blocks until the sender sends the message. Internally, this implementation is quite similar to the QueueChannel, except that it uses a SynchronousQueue (a zero-capacity implementation of BlockingQueue). This works well in situations where the sender and receiver operate in different threads, but asynchronously dropping the message in a queue is not appropriate. In other words, with a RendezvousChannel, the sender knows that some receiver has accepted the message, whereas with a QueueChannel, the message would have been stored to the internal queue and potentially never received.

请记住,默认情况下,所有这些基于队列的通道仅将消息存储在内存中。当需要持久性时,你可以在“queue”元素中提供一个“message-store”属性以引用一个持久的 MessageStore 实现,或者你可以用一个持久代理支持的通道来替换本地通道,例如 JMS 支持的通道或通道适配器。后一种选择让你可以利用任何 JMS 提供程序的实现来进行消息持久性,如 JMS Support 中所述。但是,当不需要在队列中进行缓冲时,最简单的方法是依赖 DirectChannel,下一节将对此进行讨论。

Keep in mind that all of these queue-based channels are storing messages in-memory only by default. When persistence is required, you can either provide a 'message-store' attribute within the 'queue' element to reference a persistent MessageStore implementation or you can replace the local channel with one that is backed by a persistent broker, such as a JMS-backed channel or channel adapter. The latter option lets you take advantage of any JMS provider’s implementation for message persistence, as discussed in JMS Support. However, when buffering in a queue is not necessary, the simplest approach is to rely upon the DirectChannel, discussed in the next section.

RendezvousChannel 对于实现请求-答复操作也很有用。发送方可以创建 RendezvousChannel 的临时匿名实例,然后在构建 Message 时将其设置为“replyChannel”标头。在发送該 Message 之后,发送方可以立即调用 receive(可以选择提供一个超时值)以便阻塞,同时等待答复 Message。这与 Spring Integration 中许多请求-答复组件在内部使用的实现非常相似。

The RendezvousChannel is also useful for implementing request-reply operations. The sender can create a temporary, anonymous instance of RendezvousChannel, which it then sets as the 'replyChannel' header when building a Message. After sending that Message, the sender can immediately call receive (optionally providing a timeout value) in order to block while waiting for a reply Message. This is very similar to the implementation used internally by many of Spring Integration’s request-reply components.

DirectChannel

DirectChannel 具有点对点语义,但其他方面与 PublishSubscribeChannel 而不是前面描述的基于队列的任何信道实现更相似。它实现 SubscribableChannel 接口,而不是 PollableChannel 接口,所以它直接将消息发送给订阅者。但是,作为一个点对点信道,它与 PublishSubscribeChannel 不同,因为它将每个 Message 发送给单个订阅的 MessageHandler

The DirectChannel has point-to-point semantics but otherwise is more similar to the PublishSubscribeChannel than any of the queue-based channel implementations described earlier. It implements the SubscribableChannel interface instead of the PollableChannel interface, so it dispatches messages directly to a subscriber. As a point-to-point channel, however, it differs from the PublishSubscribeChannel in that it sends each Message to a single subscribed MessageHandler.

除了是最简单的点对点信道选项之外,它最重要的一个特点是它使单个线程能够在信道的“双方”执行操作。例如,如果一个处理程序订阅了一个 DirectChannel,则向该信道发送一个 Message 会触发直接在发送者的线程中调用该处理程序的 handleMessage(Message) 方法,然后 send() 方法调用才能返回。

In addition to being the simplest point-to-point channel option, one of its most important features is that it enables a single thread to perform the operations on “both sides” of the channel. For example, if a handler subscribes to a DirectChannel, then sending a Message to that channel triggers invocation of that handler’s handleMessage(Message) method directly in the sender’s thread, before the send() method invocation can return.

提供具有此行为的信道实现的主要动机是支持必须跨信道的事务,同时仍能从信道提供的抽象和松散耦合中受益。如果在事务的范围内调用 send() 调用,则处理程序的调用的结果(例如,更新数据库记录)会在确定该事务的最终结果(提交或回滚)中发挥作用。

The key motivation for providing a channel implementation with this behavior is to support transactions that must span across the channel while still benefiting from the abstraction and loose coupling that the channel provides. If the send() call is invoked within the scope of a transaction, the outcome of the handler’s invocation (for example, updating a database record) plays a role in determining the ultimate result of that transaction (commit or rollback).

由于 DirectChannel 是最简单的选项,不会增加调度和管理轮询程序线程所需的任何额外开销,因此它在 Spring Integration 中是默认通道类型。一般理念是定义应用程序通道,考虑哪些需要提供缓冲或限流输入,并将其修改为基于队列的 PollableChannels。同样地,如果通道需要广播消息,它不应是 DirectChannel,而应该是 PublishSubscribeChannel。稍后,我们将展示如何配置这些通道中的每一个。

Since the DirectChannel is the simplest option and does not add any additional overhead that would be required for scheduling and managing the threads of a poller, it is the default channel type within Spring Integration. The general idea is to define the channels for an application, consider which of those need to provide buffering or to throttle input, and modify those to be queue-based PollableChannels. Likewise, if a channel needs to broadcast messages, it should not be a DirectChannel but rather a PublishSubscribeChannel. Later, we show how each of these channels can be configured.

DirectChannel 在内部委派给消息分配器以调用其订阅的消息处理程序,并且该分配器可以具有由 load-balancerload-balancer-ref 属性(互斥)公开的负载均衡策略。当多个消息处理程序订阅同一个信道时,消息分配器会使用负载均衡策略来帮助确定如何将消息分配给消息处理程序。为了方便起见,load-balancer 属性公开了指向 LoadBalancingStrategy 预先存在实现的值的枚举。round-robin(在处理程序中轮流负载均衡)和 none(对于希望显式禁用负载均衡的情况)是仅有的可用值。可以在以后的版本中添加其他策略实现。但是,从 3.0 版本开始,你可以提供自己的 LoadBalancingStrategy 实现并使用 load-balancer-ref 属性注入它,该属性应指向实现 LoadBalancingStrategy 的 bean,如下例所示:

The DirectChannel internally delegates to a message dispatcher to invoke its subscribed message handlers, and that dispatcher can have a load-balancing strategy exposed by load-balancer or load-balancer-ref attributes (mutually exclusive). The load balancing strategy is used by the message dispatcher to help determine how messages are distributed amongst message handlers when multiple message handlers subscribe to the same channel. As a convenience, the load-balancer attribute exposes an enumeration of values pointing to pre-existing implementations of LoadBalancingStrategy. A round-robin (load-balances across the handlers in rotation) and none (for the cases where one wants to explicitly disable load balancing) are the only available values. Other strategy implementations may be added in future versions. However, since version 3.0, you can provide your own implementation of the LoadBalancingStrategy and inject it by using the load-balancer-ref attribute, which should point to a bean that implements LoadBalancingStrategy, as the following example shows:

FixedSubscriberChannel 是一个 SubscribableChannel,它只支持不可取消订阅的单个 MessageHandler 订阅者。当不涉及其他订阅者且不需要信道拦截器时,这对于高吞吐量性能用例非常有用。

A FixedSubscriberChannel is a SubscribableChannel that only supports a single MessageHandler subscriber that cannot be unsubscribed. This is useful for high-throughput performance use-cases when no other subscribers are involved and no channel interceptors are needed.

<int:channel id="lbRefChannel">
  <int:dispatcher load-balancer-ref="lb"/>
</int:channel>

<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>

请注意,load-balancerload-balancer-ref 属性是互斥的。

Note that the load-balancer and load-balancer-ref attributes are mutually exclusive.

负载均衡还会与布尔 failover 属性结合使用。如果 failover 的值为 true(默认值),则当前面的处理程序抛出异常时,分配器会回退到任何后续处理程序(视需要而定)。顺序由处理程序上定义的选择性顺序值确定或(如果没有此类值)由处理程序订阅的顺序确定。

The load-balancing also works in conjunction with a boolean failover property. If the failover value is true (the default), the dispatcher falls back to any subsequent handlers (as necessary) when preceding handlers throw exceptions. The order is determined by an optional order value defined on the handlers themselves or, if no such value exists, the order in which the handlers subscribed.

如果在出现错误时某些情况需要分配器总是尝试调用第一个处理程序然后按相同的固定顺序退回,则不应提供负载均衡策略。换句话说,即使未启用负载均衡,分配器仍支持 failover 布尔属性。但是,如果没有负载均衡,对处理程序的调用总从第一个开始,按照它们的顺序。例如,当对主、次、三等进行明确定义时,此方法效果很好。使用命名空间支持时,任何端点上的 order 属性都会确定顺序。

If a certain situation requires that the dispatcher always try to invoke the first handler and then fall back in the same fixed order sequence every time an error occurs, no load-balancing strategy should be provided. In other words, the dispatcher still supports the failover boolean property even when no load-balancing is enabled. Without load-balancing, however, the invocation of handlers always begins with the first, according to their order. For example, this approach works well when there is a clear definition of primary, secondary, tertiary, and so on. When using the namespace support, the order attribute on any endpoint determines the order.

请记住,负载平衡和 failover 仅在通道有多个已订阅消息处理程序时适用。使用名称空间支持时,这意味着多个端点共享 input-channel 属性中定义的同一通道引用。

Keep in mind that load-balancing and failover apply only when a channel has more than one subscribed message handler. When using the namespace support, this means that more than one endpoint shares the same channel reference defined in the input-channel attribute.

从 5.2 版本开始,如果 failover 为 true,当前处理程序的故障以及已失败的消息会分别记录在 debuginfo 下(如果已配置)。

Starting with version 5.2, when failover is true, a failure of the current handler together with the failed message is logged under debug or info if configured respectively.

ExecutorChannel

ExecutorChannel 是一个点对点信道,它支持与 DirectChannel 相同的分配器配置(负载均衡策略和 failover 布尔属性)。这两种分发信道类型之间的主要区别在于,ExecutorChannel 委派给 TaskExecutor 的实例来执行分发。这意味着 send 方法通常不会阻塞,但这也意味着处理程序调用可能不会在发送方的线程中发生。因此,它不支持跨发送方和接收方处理程序的事务。

The ExecutorChannel is a point-to-point channel that supports the same dispatcher configuration as DirectChannel (load-balancing strategy and the failover boolean property). The key difference between these two dispatching channel types is that the ExecutorChannel delegates to an instance of TaskExecutor to perform the dispatch. This means that the send method typically does not block, but it also means that the handler invocation may not occur in the sender’s thread. It therefore does not support transactions that span the sender and receiving handler.

发件人有时可以阻止。例如,在使用 TaskExecutor 并采用限制客户端(如 ThreadPoolExecutor.CallerRunsPolicy)的拒绝策略时,当线程池处于最大容量并且执行程序工作队列已满时,发件人线程随时可以执行该方法。由于那种情况只会以不可预测的方式发生,所以你不应该依靠它来进行事务。

The sender can sometimes block. For example, when using a TaskExecutor with a rejection policy that throttles the client (such as the ThreadPoolExecutor.CallerRunsPolicy), the sender’s thread can execute the method any time the thread pool is at its maximum capacity and the executor’s work queue is full. Since that situation would only occur in a non-predictable way, you should not rely upon it for transactions.

PartitionedChannel

从 6.1 版本开始,提供了一个 PartitionedChannel 实现。这是 AbstractExecutorChannel 的扩展,它表示点对点分发逻辑,其中根据从发送到此信道的消息评估的分区键处理实际消耗。此信道与上面提到的 ExecutorChannel 类似,但不同之处在于具有相同分区键的消息始终在同一线程中处理,从而保留了排序。它不需要外部 TaskExecutor,但可以配置自定义 ThreadFactory(例如,Thread.ofVirtual().name("partition-", 0).factory())。此工厂用于将单线程执行器填充到 MessageDispatcher 委托,每个分区一个。默认情况下,IntegrationMessageHeaderAccessor.CORRELATION_ID 消息标头用作分区键。此信道可以配置为一个简单的 bean:

Starting with version 6.1, a PartitionedChannel implementation is provided. This is an extension of AbstractExecutorChannel and represents point-to-point dispatching logic where the actual consumption is processed on a specific thread, determined by the partition key evaluated from a message sent to this channel. This channel is similar to the ExecutorChannel mentioned above, but with the difference that messages with the same partition key are always handled in the same thread, preserving ordering. It does not require an external TaskExecutor, but can be configured with a custom ThreadFactory (e.g. Thread.ofVirtual().name("partition-", 0).factory()). This factory is used to populate single-thread executors into a MessageDispatcher delegate, per partition. By default, the IntegrationMessageHeaderAccessor.CORRELATION_ID message header is used as the partition key. This channel can be configured as a simple bean:

@Bean
PartitionedChannel somePartitionedChannel() {
    return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}

该信道将有 3 个分区,专门的线程;将使用 partitionKey 标头确定消息将在哪个分区中处理。有关更多信息,请参阅 PartitionedChannel 类 Javadoc。

The channel will have 3 partitions - dedicated threads; will use the partitionKey header to determine in which partition the message will be handled. See PartitionedChannel class Javadocs for more information.

FluxMessageChannel

FluxMessageChannel 是一种 org.reactivestreams.Publisher 实现,用于将发送的消息“下沉”到内部 reactor.core.publisher.Flux 中以供下游的反应式订阅者按需消费。此信道实现既不是 SubscribableChannel,也不是 PollableChannel,因此只能使用 org.reactivestreams.Subscriber 实例从该信道消费,从而遵循反应式流的背压性质。另一方面,FluxMessageChannel 使用 subscribeTo(Publisher<Message<?>>) 合约实现 ReactiveStreamsSubscribableChannel,该合约允许从反应式源发布器接收事件,从而将反应式流桥接到了集成流中。为了实现整个集成流的完全反应式行为,此类信道必须放置在流中的所有端点之间。

The FluxMessageChannel is an org.reactivestreams.Publisher implementation for "sinking" sent messages into an internal reactor.core.publisher.Flux for on demand consumption by reactive subscribers downstream. This channel implementation is neither a SubscribableChannel, nor a PollableChannel, so only org.reactivestreams.Subscriber instances can be used to consume from this channel honoring back-pressure nature of reactive streams. On the other hand, the FluxMessageChannel implements a ReactiveStreamsSubscribableChannel with its subscribeTo(Publisher<Message<?>>) contract allowing receiving events from reactive source publishers, bridging a reactive stream into the integration flow. To achieve fully reactive behavior for the whole integration flow, such a channel must be placed between all the endpoints in the flow.

请参见 Reactive Streams Support 以了解有关与 Reactive Streams 交互的更多信息。

See Reactive Streams Support for more information about interaction with Reactive Streams.

Scoped Channel

Spring Integration 1.0 提供了 ThreadLocalChannel 实现,但已经从 2.0 中移除。现在,处理相同要求的更常规方式是向频道添加一个范围属性。属性的值可以是上下文中可用的范围的名称。例如,在 Web 环境中,某些范围可用,任何自定义范围实现都可以向上下文注册。以下示例显示了应用于频道(包括注册范围本身)的线程本地范围:

Spring Integration 1.0 provided a ThreadLocalChannel implementation, but that has been removed as of 2.0. Now the more general way to handle the same requirement is to add a scope attribute to a channel. The value of the attribute can be the name of a scope that is available within the context. For example, in a web environment, certain scopes are available, and any custom scope implementations can be registered with the context. The following example shows a thread-local scope being applied to a channel, including the registration of the scope itself:

<int:channel id="threadScopedChannel" scope="thread">
     <int:queue />
</int:channel>

<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
    <property name="scopes">
        <map>
            <entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
        </map>
    </property>
</bean>

先前示例中定义的频道也在内部向队列委派,但频道绑定到当前线程,因此队列的内容也类似地绑定。这样,发送到频道的线程稍后就可以接收相同的消息,但其他线程无法访问它们。虽然很少需要线程范围的频道,但当 DirectChannel 实例被用于强制执行单个操作线程但任何答复消息都应发送到“终端”频道时,它们会很有用。如果该终端频道是线程范围内的,则原始发送线程可以从终端频道收集其答复。

The channel defined in the previous example also delegates to a queue internally, but the channel is bound to the current thread, so the contents of the queue are similarly bound. That way, the thread that sends to the channel can later receive those same messages, but no other thread would be able to access them. While thread-scoped channels are rarely needed, they can be useful in situations where DirectChannel instances are being used to enforce a single thread of operation but any reply messages should be sent to a “terminal” channel. If that terminal channel is thread-scoped, the original sending thread can collect its replies from the terminal channel.

现在,由于任何频道都可以有范围,因此除了线程本地以外,您还可以定义自己的范围。

Now, since any channel can be scoped, you can define your own scopes in addition to thread-Local.