Configuring Message Channels

要创建消息频道实例,您可以使用 xml 的 <channel/> 元素或 Java 配置的 DirectChannel 实例,如下所示:

  • Java

  • XML

@Bean
public MessageChannel exampleChannel() {
    return new DirectChannel();
}
<int:channel id="exampleChannel"/>

当您使用没有子元素的 <channel/> 元素时,它创建 DirectChannel 实例(SubscribableChannel)。 要创建发布-订阅频道,请使用 <publish-subscribe-channel/> 元素(Java 中的 PublishSubscribeChannel),如下所示:

  • Java

  • XML

@Bean
public MessageChannel exampleChannel() {
    return new PublishSubscribeChannel();
}
<int:publish-subscribe-channel id="exampleChannel"/>

Alternatively 您还可以提供各种 <queue/> 子元素来创建任何可轮询通道类型(如 Message Channel Implementations 中所述)。以下几节将展示每种通道类型的示例。

DirectChannel Configuration

如前所述,DirectChannel 是默认类型。以下清单显示了谁定义一个:

  • Java

  • XML

@Bean
public MessageChannel directChannel() {
    return new DirectChannel();
}
<int:channel id="directChannel"/>

默认通道具有循环负载均衡并且已启用故障转移(更多详细信息,请参阅 xref:channel/implementations.adoc#channel-implementations-directchannel[DirectChannel)。要禁用这两种功能中的一种或两种,请添加 <dispatcher/> 子元素(DirectChannelLoadBalancingStrategy 构造函数),并按如下所示配置属性:

  • Java

  • XML

@Bean
public MessageChannel failFastChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setFailover(false);
    return channel;
}

@Bean
public MessageChannel failFastChannel() {
    return new DirectChannel(null);
}
<int:channel id="failFastChannel">
    <int:dispatcher failover="false"/>
</channel>

<int:channel id="channelWithFixedOrderSequenceFailover">
    <int:dispatcher load-balancer="none"/>
</int:channel>

从版本 6.3 开始,所有基于 UnicastingDispatcherMessageChannel 实现都可以使用 Predicate<Exception> failoverStrategy 代替普通 failover 选项进行配置。此谓词会根据从当前 MessageHandler 引发的异常来决定是否故障转移到下一个 MessageHandler。更复杂的错误分析应使用 xref:router/implementations.adoc#router-implementations-exception-router[ErrorMessageExceptionTypeRouter 完成。

Datatype Channel Configuration

有时,使用者只能处理特定类型的有效负载,从而迫使您确保输入消息的有效负载类型。首先想到的可能是使用消息过滤器。但是,消息过滤器所能做的只是过滤掉不符合消费者要求的消息。另一种方法是使用基于内容的路由器,并使用特定转换器将数据类型不合规的消息路由到特定的转换器,以强制转换为必需的数据类型。这会起作用,但完成相同操作的一个更简单的方法是应用 Datatype Channel 模式。您可以为每个特定有效负载数据类型使用单独的数据类型通道。

若要创建一个只接受包含特定有效负载类型消息的数据类型通道,请在通道元素的`datatype` 属性中提供该数据类型的完全限定类名,如下面的示例所示:

  • Java

  • XML

@Bean
public MessageChannel numberChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setDatatypes(Number.class);
    return channel;
}
<int:channel id="numberChannel" datatype="java.lang.Number"/>

请注意,类型检查允许为任何可分配给通道数据类型的类型通过。换句话说,前面示例中的 numberChannel 将接受有效负载为 java.lang.Integerjava.lang.Double 的消息。可以将多个类型作为逗号分隔的列表提供,如下面的示例所示:

  • Java

  • XML

@Bean
public MessageChannel numberChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setDatatypes(String.class, Number.class);
    return channel;
}
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>

因此,前面示例中的 'numberChannel' 专门接受数据类型为 java.lang.Number 的消息。但是,如果消息的有效负载不是必需的类型,会发生什么情况?这取决于您是否定义了名为 integrationConversionService 的 bean,该 bean 是 Spring 的 Conversion Service 实例。如果没有,则会立即引发 Exception。但是,如果您已经定义了 integrationConversionService bean,则会使用该 bean 来尝试将消息的有效负载转换为可接受的类型。

你甚至可以注册自定义转换器。例如,假设你向上面配置的“numberChannel”发送一个有效负载为`String` 的消息。你可以照如下方式处理此消息:

MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));

通常情况下,这个操作是完全合理的。然而,由于我们使用数据类型通道,此操作的结果将生成类似于以下内容的异常:

Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]
…

异常发生是因为我们要求有效负载类型为`Number`,但我们发送的是`String`。因此,我们需要一些东西将`String` 转换为`Number`。为此,我们可以实现类似于以下示例的转换器:

public static class StringToIntegerConverter implements Converter<String, Integer> {
    public Integer convert(String source) {
        return Integer.parseInt(source);
    }
}

然后,我们可以用以下示例所示的方式将它注册为一个转换器,让集成转换服务使用:

  • Java

  • XML

@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt {
    return new StringToIntegerConverter();
}
<int:converter ref="strToInt"/>

<bean id="strToInt" class="org.springframework.integration.util.Demo.StringToIntegerConverter"/>

或者,当 StringToIntegerConverter 类用 @Component 注释自动扫描时。

当解析’converter' 元素时,如果尚未定义 integrationConversionService bean,它将创建该 bean。有了这个转换器,send 操作现在将成功,因为数据类型通道使用该转换器将`String` 有效负载转换为`Integer`。

有关有效负载类型转换的更多信息,请参见 Payload Type Conversion

从版本 4.0 开始,DefaultDatatypeChannelMessageConverter 将调用 integrationConversionService,它在应用程序上下文中查找转换服务。若要使用不同的转换技术,你可以在通道上指定 message-converter 属性。这必须是对 MessageConverter 实现的引用。只使用 fromMessage 方法。它为转换器提供访问消息头的信息(如果转换可能需要头中的信息,例如 content-type)。此方法只能返回转换后的有效负载或一个完整的`Message` 对象。如果是后者,转换器必须小心地从入站消息中复制所有头。

或者,你可以声明一个 ID 为 datatypeChannelMessageConverterMessageConverter 类型的 <bean/>,该转换器由带 datatype 的所有通道使用。

QueueChannel Configuration

若要创建一个 QueueChannel,请使用 <queue/> 子元素。你可以按如下方式指定通道的容量:

  • Java

  • XML

@Bean
public PollableChannel queueChannel() {
    return new QueueChannel(25);
}
<int:channel id="queueChannel">
    <queue capacity="25"/>
</int:channel>

如果您没为此 <queue/> 子元素的“capacity”属性提供值,结果队列将是不受限制的。为了避免用尽内存等问题,我们强烈建议您为受限队列设置一个显式值。

Persistent QueueChannel Configuration

由于 QueueChannel 提供了缓冲消息的功能,但默认情况下只在内存中进行缓冲,因此也带来了消息可能会在系统故障时丢失的可能性。为了减轻此风险,QueueChannel 可能会受到 MessageGroupStore 策略接口的持久实现的支持。有关 MessageGroupStoreMessageStore 的更多详细信息,请参见 Message Store

使用 message-store 属性时不允许使用 capacity 属性。

QueueChannel 接收 Message 时,它将该消息添加到消息存储中。当从 QueueChannel 轮询到 Message 时,它将从消息存储中删除。

默认情况下,QueueChannel 将其消息存储在内存队列中,这可能会导致前面提到的消息丢失场景。但是,Spring Integration 提供了持久化存储,例如 JdbcChannelMessageStore

您可以通过添加 message-store 属性为任何 QueueChannel 配置消息存储,如下面的示例所示:

<int:channel id="dbBackedChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

(有关 Java/Kotlin 配置选项,请参阅下面的示例。)

Spring Integration JDBC 模块还提供用于多种流行数据库的模式数据定义语言 (DDL)。这些模式位于该模块的 org.springframework.integration.jdbc.store.channel 包 (spring-integration-jdbc) 中。

一个重要的特性是,对于任何事务性持久性存储(如 JdbcChannelMessageStore),只要轮询程序配置有事务,从存储中删除的消息只能在事务成功完成后永久删除。否则,事务回滚,Message 不会丢失。

随着越来越多的 Spring 项目涉及 “NoSQL” 数据存储并为这些存储提供底层支持,还有许多其他可用于消息存储的实现。如果您找不到满足您特定需求的实现,您还可以提供您自己的 MessageGroupStore 接口实现。

从 4.0 版开始,我们建议将 QueueChannel 实例配置为尽可能使用 ChannelMessageStore。与普通消息存储相比,它们通常针对此用途进行了优化。如果 ChannelMessageStoreChannelPriorityMessageStore,则消息将按 FIFO 顺序接收优先级。优先级的概念由消息存储实现决定。例如,以下示例显示了 MongoDB Channel Message Store 的 Java 配置:

  • Java

  • Java DSL

  • Kotlin DSL

@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
    MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
    store.setPriorityEnabled(true);
    return store;
}

@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
    return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
@Bean
public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) {
    return IntegrationFlow.from((Channels c) ->
            c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup"))
            ....
            .get();
}
@Bean
fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) =
    integrationFlow {
        channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") }
    }

注意 MessageGroupQueue 类。那是一个 BlockingQueue 实现,用于使用 MessageGroupStore 操作。

QueueChannel 环境的另一种自定义选项由 <int:queue> 子元素或其特定构造函数的 ref 属性提供。此属性提供对任何 java.util.Queue 实现的引用。例如,可以按如下方式配置 Hazelcast 分布式 link:https://hazelcast.com/use-cases/imdg/imdg-messaging/[IQueue

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance(new Config()
                                           .setProperty("hazelcast.logging.type", "log4j"));
}

@Bean
public PollableChannel distributedQueue() {
    return new QueueChannel(hazelcastInstance()
                              .getQueue("springIntegrationQueue"));
}

PublishSubscribeChannel Configuration

若要创建 PublishSubscribeChannel,请使用 <publish-subscribe-channel/> 元素。使用此元素时,您还可以指定用于发布消息的 task-executor(如果未指定,它将在发送方的线程中发布),如下所示:

  • Java

  • XML

@Bean
public MessageChannel pubsubChannel() {
    return new PublishSubscribeChannel(someExecutor());
}
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>

如果您在 PublishSubscribeChannel 下游提供重新排序器或聚合器,则可以将通道上的 apply-sequence 属性设置为 true。这样做表示通道应在传递消息之前设置 sequence-sizesequence-number 消息头以及关联 ID。例如,如果有五个订阅者,则 sequence-size 将设置为 5,并且消息将具有从 15sequence-number 头值。

除了 Executor`之外,您还可以配置 `ErrorHandler。默认情况下,PublishSubscribeChannel 使用 MessagePublishingErrorHandler 实现将错误从 errorChannel 标头或发送到全局 errorChannel 实例中的 MessageChannel。如果未配置 Executor,则会忽略 ErrorHandler,并且异常将直接抛出到调用者的线程。

如果您在 PublishSubscribeChannel 下游提供 ResequencerAggregator,则可以将通道上的 apply-sequence 属性设置为 true。这样做表示通道应在传递消息之前设置顺序大小和顺序编号消息头以及关联 ID。例如,如果有五个订阅者,则顺序大小将设置为 5,并且消息将具有从 15 的顺序编号头值。

以下示例展示了如何将 apply-sequence 标头设置为 true

  • Java

  • XML

@Bean
public MessageChannel pubsubChannel() {
    PublishSubscribeChannel channel = new PublishSubscribeChannel();
    channel.setApplySequence(true);
    return channel;
}
<int:publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>

默认情况下,apply-sequence 值为 false,以便发布-订阅通道可以将完全相同的消息实例发送到多个输出通道。由于 Spring Integration 强制执行有效负载和标头引用的不可变性,因此当标志设置为 true 时,通道将创建具有相同有效负载引用但不同标头值的新 Message 实例。

从 5.4.3 版本开始,PublishSubscribeChannel 还可以使用其 BroadcastingDispatcherrequireSubscribers 选项进行配置,以指示此通道在没有订阅者时不会默默地忽略消息。当没有订阅者且此选项设置为 true 时,将抛出一个带有 “Dispatcher 没有订阅者” 消息的 MessageDispatchingException

ExecutorChannel

若要创建 ExecutorChannel,请添加带有 task-executor 属性的 <dispatcher> 子元素。此属性的值可以引用上下文中的任何 TaskExecutor。例如,这样做可以为调度消息到已订阅的处理程序配置一个线程池。如前所述,这样做会中断发送方和接收方之间的单线程执行上下文,以便活动事务上下文不会与处理程序的调用共享(即,处理程序可能会抛出一个 Exception,但 send 调用已经成功返回)。以下示例展示了如何使用 dispatcher 元素并在 task-executor 属性中指定执行器:

  • Java

  • XML

@Bean
public MessageChannel executorChannel() {
    return new ExecutorChannel(someExecutor());
}
<int:channel id="executorChannel">
    <int:dispatcher task-executor="someExecutor"/>
</int:channel>

load-balancerfailover 选项都可以在 <dispatcher/> 子元素上使用,如 DirectChannel Configuration 中前面所述。应用相同的默认设置。因此,除非针对这两个属性中的一个或两个明确提供了配置,否则通道将采用具有启用故障转移功能的循环负载均衡策略,如下例所示:

<int:channel id="executorChannelWithoutFailover">
    <int:dispatcher task-executor="someExecutor" failover="false"/>
</int:channel>

PriorityChannel Configuration

若要创建 PriorityChannel,请使用 <priority-queue/> 子元素,如下面的示例所示:

  • Java

  • XML

@Bean
public PollableChannel priorityChannel() {
    return new PriorityChannel(20);
}
<int:channel id="priorityChannel">
    <int:priority-queue capacity="20"/>
</int:channel>

默认情况下,通道会查询消息的 priority 标头。不过,您也可以提供一个自定义 Comparator 引用。此外,请注意 PriorityChannel(与其他类型一样)支持 datatype 属性。与 QueueChannel 一样,它还支持 capacity 属性。以下示例对此进行了演示:

  • Java

  • XML

@Bean
public PollableChannel priorityChannel() {
    PriorityChannel channel = new PriorityChannel(20, widgetComparator());
    channel.setDatatypes(example.Widget.class);
    return channel;
}
<int:channel id="priorityChannel" datatype="example.Widget">
    <int:priority-queue comparator="widgetComparator"
                    capacity="10"/>
</int:channel>

从 4.0 版开始,priority-channel 子元素支持 message-store 选项(此时不允许使用 comparatorcapacity)。消息存储必须是 PriorityCapableChannelMessageStore。目前为 RedisJDBCMongoDB 提供了 PriorityCapableChannelMessageStore 的实现。有关更多信息,请参见 QueueChannel ConfigurationMessage Store。您可以在 Backing Message Channels 中找到示例配置。

RendezvousChannel Configuration

当队列子元素为 <rendezvous-queue> 时,将创建 RendezvousChannel。它不提供对前面描述的任何其他配置选项,并且其队列不接受任何容量值,因为它是一个零容量直交队列。以下示例展示了如何声明 RendezvousChannel

  • Java

  • XML

@Bean
public PollableChannel rendezvousChannel() {
    return new RendezvousChannel();
}
<int:channel id="rendezvousChannel"/>
    <int:rendezvous-queue/>
</int:channel>

Scoped Channel Configuration

任何通道都可以配置 scope 属性,如下面的示例所示:

<int:channel id="threadLocalChannel" scope="thread"/>

Channel Interceptor Configuration

消息通道也可以具有拦截器,如 Channel Interceptors 中所述。<interceptors/> 子元素可以添加到 <channel/>(或更具体的元素类型)中。您可以提供 ref 属性以引用实现 ChannelInterceptor 接口的任何 Spring 管理对象,如下例所示:

<int:channel id="exampleChannel">
    <int:interceptors>
        <ref bean="trafficMonitoringInterceptor"/>
    </int:interceptors>
</int:channel>

一般来说,我们建议在单独的位置定义拦截器实现,因为它们通常提供可在多个通道中重复使用的常见行为。

Global Channel Interceptor Configuration

通道拦截器提供了一种适用于每个通道的干净简洁的方式来应用横切行为。如果应将相同行为应用于多个通道,则为每个通道配置同一组拦截器并不是最高效的方式。为了避免重复配置,同时还允许拦截器应用于多个通道,Spring Integration 提供全局拦截器。考虑以下一对示例:

<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
    <bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>

<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>

每个 <channel-interceptor/> 元素让你可以定义一个全局拦截器,该拦截器将应用于与 pattern 属性定义的任何模式匹配的所有通道中。在前一种情况下,全局拦截器应用于“thing1”通道和以“thing2”或“input”开头(但不是以“thing3”开头)的所有其他通道(自版本 5.0 起)。

将此语法添加到模式中会引起一个可能(尽管不常见)的问题。如果您有 bean 的名称 !thing1,并在通道拦截器的 pattern 模式中包含模式 !thing1,则它不再匹配。现在,模式匹配所有不名为 thing1 的 bean。在这种情况下,您可以使用 \ 转义模式中的 !。模式 \!thing1 匹配名为 !thing1 的 bean。

order 属性允许你在给定通道上有多个拦截器时管理该拦截器被注入的位置。例如,通道“inputChannel”可以本地配置单个拦截器(见下文),如下例所示:

<int:channel id="inputChannel">
  <int:interceptors>
    <int:wire-tap channel="logger"/>
  </int:interceptors>
</int:channel>

一个合理的问题是“全局拦截器如何与在本地或通过其他全局拦截器定义配置的其他拦截器相关联注入?”当前实现提供了一个用于定义拦截器执行顺序的简单机制。order 属性中的正数确保拦截器在任何现有拦截器之后注入,而负数确保拦截器在现有拦截器之前注入。这意味着,在前面的示例中,全局拦截器在本地配置的“wire-tap”拦截器之后注入(因为它的 order 大于 0)。如果还有另一个具有匹配 pattern 的全局拦截器,则其顺序将通过比较两个拦截器的 order 属性的值来确定。要将全局拦截器注入到现有拦截器之前,请使用 order 属性的负值。

请注意,orderpattern 属性都是可选的。order 的默认值将为 0,并且对于 pattern,默认值为 '*'(匹配所有通道)。

Wire Tap

如前所述,Spring Integration 提供了一个简单的 wire tap 拦截器。你可以在 <interceptors/> 元素中的任何通道上配置 wire tap。这样做对于调试特别有用,并且可以结合 Spring Integration 的日志通道适配器一起使用,如下所示:

<int:channel id="in">
    <int:interceptors>
        <int:wire-tap channel="logger"/>
    </int:interceptors>
</int:channel>

<int:logging-channel-adapter id="logger" level="DEBUG"/>

“logging-channel-adapter” 还接受“expression”属性,以便您可以对“payload”和“headers”变量计算 SpEL 表达式。或者,要记录完整消息 toString() 结果,请为“log-full-message”属性提供 true 值。默认情况下,它是 false,因此仅记录有效负载。将它设置为 true 可以记录所有头以及有效负载。“expression”选项提供了最大的灵活性(例如,expression="payload.user.name")。

关于窃听器和其他类似组件 (Message Publishing Configuration) 的一个常见误解是,它们本质上是自动异步的。默认情况下,窃听器作为组件不会异步调用。相反,Spring Integration 专注于异步行为配置的单一统一方法:消息通道。使消息流的某些部分同步或异步的是已在此流程中配置的消息通道的类型。这是消息通道抽象的主要好处之一。从该框架的创立之初,我们就一直强调消息通道作为该框架的一等公民的必要性和价值。它不仅仅是 EIP 模式的一个内部、隐式实现。它作为可配置组件完全向最终用户公开。因此,窃听器组件只负责执行以下任务:

  • 通过接入一个通道(例如 channelA)来拦截消息流

  • Grab each message

  • 将消息发送到另一个通道(例如 channelB

它本质上是桥接模式的一个变体,但它封装在一个通道定义中(因此更容易启用和禁用而不中断流)。此外,与桥接不同,它基本上分叉另一个消息流。该流是同步的还是异步的?答案取决于消息通道的类型,即“channelB”。我们有以下选项:直接通道、可轮询通道和执行器通道。后两个突破线程边界,使通过此类通道的通信成为异步,因为将消息从该通道分派到其订阅的处理程序在不同于用于将消息发送到该通道的线程上。这将使你的 wire-tap 流同步或异步。它与框架中的其他组件(例如消息发布器)保持一致,并且通过免除你事先考虑(除了编写线程安全代码之外)特定代码段应实现为同步还是异步,从而增加了协调一致性和简单性。通过消息通道对两段代码(例如组件 A 和组件 B)的实际连接使其协作为同步或异步。你甚至可能希望在将来从同步更改为异步,消息通道让你可以快速执行此操作,而无需接触代码。

关于 wire tap 的最后一点是,尽管提供了上述理由来说明它默认情况下不是异步的,但你应该记住,通常希望尽快传递消息。因此,将异步通道选项用作 wire tap 的出站通道非常常见。但是,默认情况下不会强制执行异步行为。如果这样做,会破坏许多用例,包括你可能不想破坏事务边界。也许你出于审计目的使用了 wire tap 模式,并且确实希望在原始事务中发送审计消息。例如,你可以将 wire tap 连接到 JMS 出站通道适配器。这样,你可以获得两全其美的效果:1)可以在事务中发送 JMS 消息,同时 2)它仍然是“立即遗忘”操作,从而防止主消息流中出现任何明显的延迟。

从 v4.0.0 版开始,当拦截器(例如 link:https://docs.spring.io/autorepo/docs/spring-integration/current/api/org/springframework/integration/channel/interceptor/WireTap.html[WireTap 类)引用通道时,避免循环引用非常重要。您需要将这类通道从当前拦截器拦截的通道中排除在外。可以通过适当的模式或以编程方式来完成此操作。如果您有一个自定义 ChannelInterceptor 引用 channel,请考虑实施 VetoCapableInterceptor。通过这种方式,框架会询问拦截器是否可以根据提供的模式拦截每个候选通道。您还可以在拦截器方法中添加运行时保护,以确保通道不是拦截器引用的通道。WireTap 使用了这两种技术。

从 4.3 版本开始,WireTap 具有其他构造函数,它们采用 channelName 而不是 MessageChannel 实例。对于 Java 配置和使用通道自动创建逻辑时,这非常方便。目标 MessageChannel bean 稍后从提供的 channelName 中解析,在与拦截器进行首次交互时进行解析。

通道解析需要 BeanFactory,所以窃听实例必须是 Spring 管理的 bean。

这种延迟绑定方法还允许用 Java DSL 配置简化典型的 wire-tap 模式,如下例所示:

@Bean
public PollableChannel myChannel() {
    return MessageChannels.queue()
            .wireTap("loggingFlow.input")
            .get();
}

@Bean
public IntegrationFlow loggingFlow() {
    return f -> f.log();
}

Conditional Wire Taps

可以通过使用 selector 或 selector-expression 属性使 wire tap 变得有条件。selector 引用 MessageSelector bean,它可以在运行时确定消息是否应转到 tap 通道。类似地,selector-expression 是执行相同目的的布尔 SpEL 表达式:如果表达式计算结果为 true,则消息将发送到 tap 通道。

Global Wire Tap Configuration

可以将全局窃听器配置为 Global Channel Interceptor Configuration 的特例。为此,请配置顶级 wire-tap 元素。现在,除了常规 wire-tap 命名空间支持外,还支持 patternorder 属性,并且工作方式与在 channel-interceptor 中完全相同。以下示例演示如何配置全局窃听器:

  • Java

  • XML

@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
    return new WireTap(wiretapChannel);
}
<int:wire-tap pattern="input*, thing2*, thing1" order="3" channel="wiretapChannel"/>

全局窃听提供了一种便捷的方式,可以在不修改现有通道配置的情况下外部配置单个通道窃听。为此,将 pattern 属性设置为目标通道名称。例如,您可以使用此技术将测试用例配置为验证通道上的消息。