Message Channel Implementations

Spring 集成提供了不同的消息通道实现。以下各节将对每种实现进行简要说明。

PublishSubscribeChannel

PublishSubscribeChannel 实现将发送给它的任何 Message 广播到其所有订阅的处理程序。这最常用于发送事件消息,其主要作用是通知(与文档消息相反,文档消息通常由单个处理程序处理)。请注意,PublishSubscribeChannel 仅用于发送。由于在调用其 send(Message) 方法后直接广播到其订阅者,因此使用者无法轮询消息(它不实现 PollableChannel,因此没有 receive() 方法)。相反,任何订阅者本身都必须是 MessageHandler,并且依次调用订阅者的 handleMessage(Message) 方法。

在 3.0 版之前,调用没有订阅者的 PublishSubscribeChannel 上的 send 方法会返回 false。当与 MessagingTemplate 结合使用时,会抛出 MessageDeliveryException。从 3.0 版开始,该行为已更改,只要存在最低订阅者(并成功处理消息),send 始终被视为成功。可以通过设置默认值为 0minSubscribers 属性来修改此行为。

如果你使用 TaskExecutor,只有正确的订阅者数量的存在才用于此判定,因为消息的实际处理是异步进行的。

QueueChannel

QueueChannel 实现封装了一个队列。与 PublishSubscribeChannel 不同,QueueChannel 具有点对点语义。换句话说,即使该信道有多个消费者,也只有一个消费者应该接收发送到该信道的任何 Message。它提供了一个默认的无参数构造函数(提供 Integer.MAX_VALUE 的基本无限容量),以及一个接受队列容量的构造函数,如下表所示:

public QueueChannel(int capacity)

尚未达到其容量限制的信道会将其消息保存在其内部队列中,并且 send(Message<?>) 方法会立即返回,即使没有接收方准备处理该消息。如果队列已达到容量,则发送方会阻塞直到队列中有可用空间。或者,如果你使用具有其他超时参数的发送方法,则队列会阻塞,直到可用空间或超时时段过去,以先发生的为准。同样,如果队列中有消息可用,receive() 调用会立即返回,但如果队列为空,则接收调用可能会阻塞,直到有消息可用或超时(如果提供)过去。在这两种情况下,都可以通过传递超时值为 0 来强制立即返回,而不管队列的状态如何。但是,请注意,对没有 timeout 参数的 send()receive() 版本的调用会无限期地阻塞。

PriorityChannel

虽然 QueueChannel 强制先进先出 (FIFO) 排序,但 PriorityChannel 是一个另一种实现,它允许根据优先级对信道内的消息进行排序。默认情况下,优先级由每条消息内的 priority 标头确定。但是,对于自定义优先级确定逻辑,可以将类型为 Comparator<Message<?>> 的比较器提供给 PriorityChannel 构造函数。

RendezvousChannel

RendezvousChannel 启用了一个“直接移交”场景,其中发送方会阻塞直到另一方调用信道的 receive() 方法。另一方会阻塞直到发送方发送消息。在内部,此实现与 QueueChannel 非常相似,只不过它使用 SynchronousQueueBlockingQueue 的零容量实现)。在发送方和接收方在不同的线程中运行但异步将消息放入队列中不合适的情况下,此方法效果很好。换句话说,对于 RendezvousChannel,发送方知道某个接收方已接受消息,而对于 QueueChannel,消息会存储到内部队列中,而且可能永远不会接收。

请记住,默认情况下,所有这些基于队列的通道仅将消息存储在内存中。当需要持久性时,你可以在“queue”元素中提供一个“message-store”属性以引用一个持久的 MessageStore 实现,或者你可以用一个持久代理支持的通道来替换本地通道,例如 JMS 支持的通道或通道适配器。后一种选择让你可以利用任何 JMS 提供程序的实现来进行消息持久性,如 JMS Support 中所述。但是,当不需要在队列中进行缓冲时,最简单的方法是依赖 DirectChannel,下一节将对此进行讨论。

RendezvousChannel 对于实现请求-答复操作也很有用。发送方可以创建 RendezvousChannel 的临时匿名实例,然后在构建 Message 时将其设置为“replyChannel”标头。在发送該 Message 之后,发送方可以立即调用 receive(可以选择提供一个超时值)以便阻塞,同时等待答复 Message。这与 Spring Integration 中许多请求-答复组件在内部使用的实现非常相似。

DirectChannel

DirectChannel 具有点对点语义,但其他方面与 PublishSubscribeChannel 而不是前面描述的基于队列的任何信道实现更相似。它实现 SubscribableChannel 接口,而不是 PollableChannel 接口,所以它直接将消息发送给订阅者。但是,作为一个点对点信道,它与 PublishSubscribeChannel 不同,因为它将每个 Message 发送给单个订阅的 MessageHandler

除了是最简单的点对点信道选项之外,它最重要的一个特点是它使单个线程能够在信道的“双方”执行操作。例如,如果一个处理程序订阅了一个 DirectChannel,则向该信道发送一个 Message 会触发直接在发送者的线程中调用该处理程序的 handleMessage(Message) 方法,然后 send() 方法调用才能返回。

提供具有此行为的信道实现的主要动机是支持必须跨信道的事务,同时仍能从信道提供的抽象和松散耦合中受益。如果在事务的范围内调用 send() 调用,则处理程序的调用的结果(例如,更新数据库记录)会在确定该事务的最终结果(提交或回滚)中发挥作用。

由于 DirectChannel 是最简单的选项,不会增加调度和管理轮询程序线程所需的任何额外开销,因此它在 Spring Integration 中是默认通道类型。一般理念是定义应用程序通道,考虑哪些需要提供缓冲或限流输入,并将其修改为基于队列的 PollableChannels。同样地,如果通道需要广播消息,它不应是 DirectChannel,而应该是 PublishSubscribeChannel。稍后,我们将展示如何配置这些通道中的每一个。

DirectChannel 在内部委派给消息分配器以调用其订阅的消息处理程序,并且该分配器可以具有由 load-balancerload-balancer-ref 属性(互斥)公开的负载均衡策略。当多个消息处理程序订阅同一个信道时,消息分配器会使用负载均衡策略来帮助确定如何将消息分配给消息处理程序。为了方便起见,load-balancer 属性公开了指向 LoadBalancingStrategy 预先存在实现的值的枚举。round-robin(在处理程序中轮流负载均衡)和 none(对于希望显式禁用负载均衡的情况)是仅有的可用值。可以在以后的版本中添加其他策略实现。但是,从 3.0 版本开始,你可以提供自己的 LoadBalancingStrategy 实现并使用 load-balancer-ref 属性注入它,该属性应指向实现 LoadBalancingStrategy 的 bean,如下例所示:

FixedSubscriberChannel 是一个 SubscribableChannel,它只支持不可取消订阅的单个 MessageHandler 订阅者。当不涉及其他订阅者且不需要信道拦截器时,这对于高吞吐量性能用例非常有用。

<int:channel id="lbRefChannel">
  <int:dispatcher load-balancer-ref="lb"/>
</int:channel>

<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>

请注意,load-balancerload-balancer-ref 属性是互斥的。

负载均衡还会与布尔 failover 属性结合使用。如果 failover 的值为 true(默认值),则当前面的处理程序抛出异常时,分配器会回退到任何后续处理程序(视需要而定)。顺序由处理程序上定义的选择性顺序值确定或(如果没有此类值)由处理程序订阅的顺序确定。

如果在出现错误时某些情况需要分配器总是尝试调用第一个处理程序然后按相同的固定顺序退回,则不应提供负载均衡策略。换句话说,即使未启用负载均衡,分配器仍支持 failover 布尔属性。但是,如果没有负载均衡,对处理程序的调用总从第一个开始,按照它们的顺序。例如,当对主、次、三等进行明确定义时,此方法效果很好。使用命名空间支持时,任何端点上的 order 属性都会确定顺序。

请记住,负载平衡和 failover 仅在通道有多个已订阅消息处理程序时适用。使用名称空间支持时,这意味着多个端点共享 input-channel 属性中定义的同一通道引用。

从 5.2 版本开始,如果 failover 为 true,当前处理程序的故障以及已失败的消息会分别记录在 debuginfo 下(如果已配置)。

ExecutorChannel

ExecutorChannel 是一个点对点信道,它支持与 DirectChannel 相同的分配器配置(负载均衡策略和 failover 布尔属性)。这两种分发信道类型之间的主要区别在于,ExecutorChannel 委派给 TaskExecutor 的实例来执行分发。这意味着 send 方法通常不会阻塞,但这也意味着处理程序调用可能不会在发送方的线程中发生。因此,它不支持跨发送方和接收方处理程序的事务。

发件人有时可以阻止。例如,在使用 TaskExecutor 并采用限制客户端(如 ThreadPoolExecutor.CallerRunsPolicy)的拒绝策略时,当线程池处于最大容量并且执行程序工作队列已满时,发件人线程随时可以执行该方法。由于那种情况只会以不可预测的方式发生,所以你不应该依靠它来进行事务。

PartitionedChannel

从 6.1 版本开始,提供了一个 PartitionedChannel 实现。这是 AbstractExecutorChannel 的扩展,它表示点对点分发逻辑,其中根据从发送到此信道的消息评估的分区键处理实际消耗。此信道与上面提到的 ExecutorChannel 类似,但不同之处在于具有相同分区键的消息始终在同一线程中处理,从而保留了排序。它不需要外部 TaskExecutor,但可以配置自定义 ThreadFactory(例如,Thread.ofVirtual().name("partition-", 0).factory())。此工厂用于将单线程执行器填充到 MessageDispatcher 委托,每个分区一个。默认情况下,IntegrationMessageHeaderAccessor.CORRELATION_ID 消息标头用作分区键。此信道可以配置为一个简单的 bean:

@Bean
PartitionedChannel somePartitionedChannel() {
    return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}

该信道将有 3 个分区,专门的线程;将使用 partitionKey 标头确定消息将在哪个分区中处理。有关更多信息,请参阅 PartitionedChannel 类 Javadoc。

FluxMessageChannel

FluxMessageChannel 是一种 org.reactivestreams.Publisher 实现,用于将发送的消息“下沉”到内部 reactor.core.publisher.Flux 中以供下游的反应式订阅者按需消费。此信道实现既不是 SubscribableChannel,也不是 PollableChannel,因此只能使用 org.reactivestreams.Subscriber 实例从该信道消费,从而遵循反应式流的背压性质。另一方面,FluxMessageChannel 使用 subscribeTo(Publisher<Message<?>>) 合约实现 ReactiveStreamsSubscribableChannel,该合约允许从反应式源发布器接收事件,从而将反应式流桥接到了集成流中。为了实现整个集成流的完全反应式行为,此类信道必须放置在流中的所有端点之间。

请参见 Reactive Streams Support 以了解有关与 Reactive Streams 交互的更多信息。

Scoped Channel

Spring Integration 1.0 提供了 ThreadLocalChannel 实现,但已经从 2.0 中移除。现在,处理相同要求的更常规方式是向频道添加一个范围属性。属性的值可以是上下文中可用的范围的名称。例如,在 Web 环境中,某些范围可用,任何自定义范围实现都可以向上下文注册。以下示例显示了应用于频道(包括注册范围本身)的线程本地范围:

<int:channel id="threadScopedChannel" scope="thread">
     <int:queue />
</int:channel>

<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
    <property name="scopes">
        <map>
            <entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
        </map>
    </property>
</bean>

先前示例中定义的频道也在内部向队列委派,但频道绑定到当前线程,因此队列的内容也类似地绑定。这样,发送到频道的线程稍后就可以接收相同的消息,但其他线程无法访问它们。虽然很少需要线程范围的频道,但当 DirectChannel 实例被用于强制执行单个操作线程但任何答复消息都应发送到“终端”频道时,它们会很有用。如果该终端频道是线程范围内的,则原始发送线程可以从终端频道收集其答复。

现在,由于任何频道都可以有范围,因此除了线程本地以外,您还可以定义自己的范围。