Configuring Message Channels

要创建消息频道实例,您可以使用 xml 的 <channel/> 元素或 Java 配置的 DirectChannel 实例,如下所示:

To create a message channel instance, you can use the <channel/> element for xml or DirectChannel instance for Java configuration, as follows:

  • Java

  • XML

@Bean
public MessageChannel exampleChannel() {
    return new DirectChannel();
}
<int:channel id="exampleChannel"/>

当您使用没有子元素的 <channel/> 元素时,它创建 DirectChannel 实例(SubscribableChannel)。

When you use the <channel/> element without any sub-elements, it creates a DirectChannel instance (a SubscribableChannel).

要创建发布-订阅频道,请使用 <publish-subscribe-channel/> 元素(Java 中的 PublishSubscribeChannel),如下所示:

To create a publish-subscribe channel, use the <publish-subscribe-channel/> element (the PublishSubscribeChannel in Java), as follows:

  • Java

  • XML

@Bean
public MessageChannel exampleChannel() {
    return new PublishSubscribeChannel();
}
<int:publish-subscribe-channel id="exampleChannel"/>

Alternatively 您还可以提供各种 <queue/> 子元素来创建任何可轮询通道类型(如 Message Channel Implementations 中所述)。以下几节将展示每种通道类型的示例。

You can alternatively provide a variety of <queue/> sub-elements to create any of the pollable channel types (as described in Message Channel Implementations). The following sections shows examples of each channel type.

DirectChannel Configuration

如前所述,DirectChannel 是默认类型。以下清单显示了谁定义一个:

As mentioned earlier, DirectChannel is the default type. The following listing shows who to define one:

  • Java

  • XML

@Bean
public MessageChannel directChannel() {
    return new DirectChannel();
}
<int:channel id="directChannel"/>

默认通道具有循环负载均衡并且已启用故障转移(更多详细信息,请参阅 xref:channel/implementations.adoc#channel-implementations-directchannel[DirectChannel)。要禁用这两种功能中的一种或两种,请添加 <dispatcher/> 子元素(DirectChannelLoadBalancingStrategy 构造函数),并按如下所示配置属性:

A default channel has a round-robin load-balancer and also has failover enabled (see DirectChannel for more detail). To disable one or both of these, add a <dispatcher/> sub-element (a LoadBalancingStrategy constructor of the DirectChannel) and configure the attributes as follows:

  • Java

  • XML

@Bean
public MessageChannel failFastChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setFailover(false);
    return channel;
}

@Bean
public MessageChannel failFastChannel() {
    return new DirectChannel(null);
}
<int:channel id="failFastChannel">
    <int:dispatcher failover="false"/>
</channel>

<int:channel id="channelWithFixedOrderSequenceFailover">
    <int:dispatcher load-balancer="none"/>
</int:channel>

从版本 6.3 开始,所有基于 UnicastingDispatcherMessageChannel 实现都可以使用 Predicate<Exception> failoverStrategy 代替普通 failover 选项进行配置。此谓词会根据从当前 MessageHandler 引发的异常来决定是否故障转移到下一个 MessageHandler。更复杂的错误分析应使用 xref:router/implementations.adoc#router-implementations-exception-router[ErrorMessageExceptionTypeRouter 完成。

Starting with version 6.3, all the MessageChannel implementations based on the UnicastingDispatcher can be configured with a Predicate<Exception> failoverStrategy instead of plain failover option. This predicate makes a decision to failover or not to the next MessageHandler based on an exception thrown from the current one. The more complex error analysis should be done using ErrorMessageExceptionTypeRouter.

Datatype Channel Configuration

有时,使用者只能处理特定类型的有效负载,从而迫使您确保输入消息的有效负载类型。首先想到的可能是使用消息过滤器。但是,消息过滤器所能做的只是过滤掉不符合消费者要求的消息。另一种方法是使用基于内容的路由器,并使用特定转换器将数据类型不合规的消息路由到特定的转换器,以强制转换为必需的数据类型。这会起作用,但完成相同操作的一个更简单的方法是应用 Datatype Channel 模式。您可以为每个特定有效负载数据类型使用单独的数据类型通道。

Sometimes, a consumer can process only a particular type of payload, forcing you to ensure the payload type of the input messages. The first thing that comes to mind may be to use a message filter. However, all that message filter can do is filter out messages that are not compliant with the requirements of the consumer. Another way would be to use a content-based router and route messages with non-compliant data-types to specific transformers to enforce transformation and conversion to the required data type. This would work, but a simpler way to accomplish the same thing is to apply the Datatype Channel pattern. You can use separate datatype channels for each specific payload data type.

若要创建一个只接受包含特定有效负载类型消息的数据类型通道,请在通道元素的`datatype` 属性中提供该数据类型的完全限定类名,如下面的示例所示:

To create a datatype channel that accepts only messages that contain a certain payload type, provide the data type’s fully-qualified class name in the channel element’s datatype attribute, as the following example shows:

  • Java

  • XML

@Bean
public MessageChannel numberChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setDatatypes(Number.class);
    return channel;
}
<int:channel id="numberChannel" datatype="java.lang.Number"/>

请注意,类型检查允许为任何可分配给通道数据类型的类型通过。换句话说,前面示例中的 numberChannel 将接受有效负载为 java.lang.Integerjava.lang.Double 的消息。可以将多个类型作为逗号分隔的列表提供,如下面的示例所示:

Note that the type check passes for any type that is assignable to the channel’s datatype. In other words, the numberChannel in the preceding example would accept messages whose payload is java.lang.Integer or java.lang.Double. Multiple types can be provided as a comma-delimited list, as the following example shows:

  • Java

  • XML

@Bean
public MessageChannel numberChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setDatatypes(String.class, Number.class);
    return channel;
}
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>

因此,前面示例中的 'numberChannel' 专门接受数据类型为 java.lang.Number 的消息。但是,如果消息的有效负载不是必需的类型,会发生什么情况?这取决于您是否定义了名为 integrationConversionService 的 bean,该 bean 是 Spring 的 Conversion Service 实例。如果没有,则会立即引发 Exception。但是,如果您已经定义了 integrationConversionService bean,则会使用该 bean 来尝试将消息的有效负载转换为可接受的类型。

So the 'numberChannel' in the preceding example accepts only messages with a data type of java.lang.Number. But what happens if the payload of the message is not of the required type? It depends on whether you have defined a bean named integrationConversionService that is an instance of Spring’s Conversion Service. If not, then an Exception would be thrown immediately. However, if you have defined an integrationConversionService bean, it is used in an attempt to convert the message’s payload to the acceptable type.

你甚至可以注册自定义转换器。例如,假设你向上面配置的“numberChannel”发送一个有效负载为`String` 的消息。你可以照如下方式处理此消息:

You can even register custom converters. For example, suppose you send a message with a String payload to the 'numberChannel' we configured above. You might handle the message as follows:

MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));

通常情况下,这个操作是完全合理的。然而,由于我们使用数据类型通道,此操作的结果将生成类似于以下内容的异常:

Typically, this would be a perfectly legal operation. However, since we use Datatype Channel, the result of such operation would generate an exception similar to the following:

Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]
…

异常发生是因为我们要求有效负载类型为`Number`,但我们发送的是`String`。因此,我们需要一些东西将`String` 转换为`Number`。为此,我们可以实现类似于以下示例的转换器:

The exception happens because we require the payload type to be a Number, but we sent a String. So we need something to convert a String to a Number. For that, we can implement a converter similar to the following example:

public static class StringToIntegerConverter implements Converter<String, Integer> {
    public Integer convert(String source) {
        return Integer.parseInt(source);
    }
}

然后,我们可以用以下示例所示的方式将它注册为一个转换器,让集成转换服务使用:

Then we can register it as a converter with the Integration Conversion Service, as the following example shows:

  • Java

  • XML

@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt {
    return new StringToIntegerConverter();
}
<int:converter ref="strToInt"/>

<bean id="strToInt" class="org.springframework.integration.util.Demo.StringToIntegerConverter"/>

或者,当 StringToIntegerConverter 类用 @Component 注释自动扫描时。

Or on the StringToIntegerConverter class when it is marked with the @Component annotation for auto-scanning.

当解析’converter' 元素时,如果尚未定义 integrationConversionService bean,它将创建该 bean。有了这个转换器,send 操作现在将成功,因为数据类型通道使用该转换器将`String` 有效负载转换为`Integer`。

When the 'converter' element is parsed, it creates the integrationConversionService bean if one is not already defined. With that converter in place, the send operation would now be successful, because the datatype channel uses that converter to convert the String payload to an Integer.

有关有效负载类型转换的更多信息,请参见 Payload Type Conversion

For more information regarding payload type conversion, see Payload Type Conversion.

从版本 4.0 开始,DefaultDatatypeChannelMessageConverter 将调用 integrationConversionService,它在应用程序上下文中查找转换服务。若要使用不同的转换技术,你可以在通道上指定 message-converter 属性。这必须是对 MessageConverter 实现的引用。只使用 fromMessage 方法。它为转换器提供访问消息头的信息(如果转换可能需要头中的信息,例如 content-type)。此方法只能返回转换后的有效负载或一个完整的`Message` 对象。如果是后者,转换器必须小心地从入站消息中复制所有头。

Beginning with version 4.0, the integrationConversionService is invoked by the DefaultDatatypeChannelMessageConverter, which looks up the conversion service in the application context. To use a different conversion technique, you can specify the message-converter attribute on the channel. This must be a reference to a MessageConverter implementation. Only the fromMessage method is used. It provides the converter with access to the message headers (in case the conversion might need information from the headers, such as content-type). The method can return only the converted payload or a full Message object. If the latter, the converter must be careful to copy all the headers from the inbound message.

或者,你可以声明一个 ID 为 datatypeChannelMessageConverterMessageConverter 类型的 <bean/>,该转换器由带 datatype 的所有通道使用。

Alternatively, you can declare a <bean/> of type MessageConverter with an ID of datatypeChannelMessageConverter, and that converter is used by all channels with a datatype.

QueueChannel Configuration

若要创建一个 QueueChannel,请使用 <queue/> 子元素。你可以按如下方式指定通道的容量:

To create a QueueChannel, use the <queue/> sub-element. You may specify the channel’s capacity as follows:

  • Java

  • XML

@Bean
public PollableChannel queueChannel() {
    return new QueueChannel(25);
}
<int:channel id="queueChannel">
    <queue capacity="25"/>
</int:channel>

如果您没为此 <queue/> 子元素的“capacity”属性提供值,结果队列将是不受限制的。为了避免用尽内存等问题,我们强烈建议您为受限队列设置一个显式值。

If you do not provide a value for the 'capacity' attribute on this <queue/> sub-element, the resulting queue is unbounded. To avoid issues such as running out of memory, we highly recommend that you set an explicit value for a bounded queue.

Persistent QueueChannel Configuration

由于 QueueChannel 提供了缓冲消息的功能,但默认情况下只在内存中进行缓冲,因此也带来了消息可能会在系统故障时丢失的可能性。为了减轻此风险,QueueChannel 可能会受到 MessageGroupStore 策略接口的持久实现的支持。有关 MessageGroupStoreMessageStore 的更多详细信息,请参见 Message Store

Since a QueueChannel provides the capability to buffer messages but does so in-memory only by default, it also introduces a possibility that messages could be lost in the event of a system failure. To mitigate this risk, a QueueChannel may be backed by a persistent implementation of the MessageGroupStore strategy interface. For more details on MessageGroupStore and MessageStore, see Message Store.

使用 message-store 属性时不允许使用 capacity 属性。

The capacity attribute is not allowed when the message-store attribute is used.

QueueChannel 接收 Message 时,它将该消息添加到消息存储中。当从 QueueChannel 轮询到 Message 时,它将从消息存储中删除。

When a QueueChannel receives a Message, it adds the message to the message store. When a Message is polled from a QueueChannel, it is removed from the message store.

默认情况下,QueueChannel 将其消息存储在内存队列中,这可能会导致前面提到的消息丢失场景。但是,Spring Integration 提供了持久化存储,例如 JdbcChannelMessageStore

By default, a QueueChannel stores its messages in an in-memory queue, which can lead to the lost message scenario mentioned earlier. However, Spring Integration provides persistent stores, such as the JdbcChannelMessageStore.

您可以通过添加 message-store 属性为任何 QueueChannel 配置消息存储,如下面的示例所示:

You can configure a message store for any QueueChannel by adding the message-store attribute, as the following example shows:

<int:channel id="dbBackedChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

(有关 Java/Kotlin 配置选项,请参阅下面的示例。)

(See samples below for Java/Kotlin Configuration options.)

Spring Integration JDBC 模块还提供用于多种流行数据库的模式数据定义语言 (DDL)。这些模式位于该模块的 org.springframework.integration.jdbc.store.channel 包 (spring-integration-jdbc) 中。

The Spring Integration JDBC module also provides a schema Data Definition Language (DDL) for a number of popular databases. These schemas are located in the org.springframework.integration.jdbc.store.channel package of that module (spring-integration-jdbc).

一个重要的特性是,对于任何事务性持久性存储(如 JdbcChannelMessageStore),只要轮询程序配置有事务,从存储中删除的消息只能在事务成功完成后永久删除。否则,事务回滚,Message 不会丢失。

One important feature is that, with any transactional persistent store (such as JdbcChannelMessageStore), as long as the poller has a transaction configured, a message removed from the store can be permanently removed only if the transaction completes successfully. Otherwise, the transaction rolls back, and the Message is not lost.

随着越来越多的 Spring 项目涉及 “NoSQL” 数据存储并为这些存储提供底层支持,还有许多其他可用于消息存储的实现。如果您找不到满足您特定需求的实现,您还可以提供您自己的 MessageGroupStore 接口实现。

Many other implementations of the message store are available as the growing number of Spring projects related to “NoSQL” data stores come to provide underlying support for these stores. You can also provide your own implementation of the MessageGroupStore interface if you cannot find one that meets your particular needs.

从 4.0 版开始,我们建议将 QueueChannel 实例配置为尽可能使用 ChannelMessageStore。与普通消息存储相比,它们通常针对此用途进行了优化。如果 ChannelMessageStoreChannelPriorityMessageStore,则消息将按 FIFO 顺序接收优先级。优先级的概念由消息存储实现决定。例如,以下示例显示了 MongoDB Channel Message Store 的 Java 配置:

Since version 4.0, we recommend that QueueChannel instances be configured to use a ChannelMessageStore, if possible. These are generally optimized for this use, as compared to a general message store. If the ChannelMessageStore is a ChannelPriorityMessageStore, the messages are received in FIFO within priority order. The notion of priority is determined by the message store implementation. For example, the following example shows the Java configuration for the MongoDB Channel Message Store:

  • Java

  • Java DSL

  • Kotlin DSL

@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
    MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
    store.setPriorityEnabled(true);
    return store;
}

@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
    return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
@Bean
public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) {
    return IntegrationFlow.from((Channels c) ->
            c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup"))
            ....
            .get();
}
@Bean
fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) =
    integrationFlow {
        channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") }
    }

注意 MessageGroupQueue 类。那是一个 BlockingQueue 实现,用于使用 MessageGroupStore 操作。

Pay attention to the MessageGroupQueue class. That is a BlockingQueue implementation to use the MessageGroupStore operations.

QueueChannel 环境的另一种自定义选项由 <int:queue> 子元素或其特定构造函数的 ref 属性提供。此属性提供对任何 java.util.Queue 实现的引用。例如,可以按如下方式配置 Hazelcast 分布式 link:https://hazelcast.com/use-cases/imdg/imdg-messaging/[IQueue

Another option to customize the QueueChannel environment is provided by the ref attribute of the <int:queue> sub-element or its particular constructor. This attribute supplies the reference to any java.util.Queue implementation. For example, a Hazelcast distributed IQueue can be configured as follows:

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance(new Config()
                                           .setProperty("hazelcast.logging.type", "log4j"));
}

@Bean
public PollableChannel distributedQueue() {
    return new QueueChannel(hazelcastInstance()
                              .getQueue("springIntegrationQueue"));
}

PublishSubscribeChannel Configuration

若要创建 PublishSubscribeChannel,请使用 <publish-subscribe-channel/> 元素。使用此元素时,您还可以指定用于发布消息的 task-executor(如果未指定,它将在发送方的线程中发布),如下所示:

To create a PublishSubscribeChannel, use the <publish-subscribe-channel/> element. When using this element, you can also specify the task-executor used for publishing messages (if none is specified, it publishes in the sender’s thread), as follows:

  • Java

  • XML

@Bean
public MessageChannel pubsubChannel() {
    return new PublishSubscribeChannel(someExecutor());
}
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>

如果您在 PublishSubscribeChannel 下游提供重新排序器或聚合器,则可以将通道上的 apply-sequence 属性设置为 true。这样做表示通道应在传递消息之前设置 sequence-sizesequence-number 消息头以及关联 ID。例如,如果有五个订阅者,则 sequence-size 将设置为 5,并且消息将具有从 15sequence-number 头值。

If you provide a resequencer or aggregator downstream from a PublishSubscribeChannel, you can set the 'apply-sequence' property on the channel to true. Doing so indicates that the channel should set the sequence-size and sequence-number message headers as well as the correlation ID prior to passing along the messages. For example, if there are five subscribers, the sequence-size would be set to 5, and the messages would have sequence-number header values ranging from 1 to 5.

除了 Executor`之外,您还可以配置 `ErrorHandler。默认情况下,PublishSubscribeChannel 使用 MessagePublishingErrorHandler 实现将错误从 errorChannel 标头或发送到全局 errorChannel 实例中的 MessageChannel。如果未配置 Executor,则会忽略 ErrorHandler,并且异常将直接抛出到调用者的线程。

Along with the Executor, you can also configure an ErrorHandler. By default, the PublishSubscribeChannel uses a MessagePublishingErrorHandler implementation to send an error to the MessageChannel from the errorChannel header or into the global errorChannel instance. If an Executor is not configured, the ErrorHandler is ignored and exceptions are thrown directly to the caller’s thread.

如果您在 PublishSubscribeChannel 下游提供 ResequencerAggregator,则可以将通道上的 apply-sequence 属性设置为 true。这样做表示通道应在传递消息之前设置顺序大小和顺序编号消息头以及关联 ID。例如,如果有五个订阅者,则顺序大小将设置为 5,并且消息将具有从 15 的顺序编号头值。

If you provide a Resequencer or Aggregator downstream from a PublishSubscribeChannel, you can set the 'apply-sequence' property on the channel to true. Doing so indicates that the channel should set the sequence-size and sequence-number message headers as well as the correlation ID prior to passing along the messages. For example, if there are five subscribers, the sequence-size would be set to 5, and the messages would have sequence-number header values ranging from 1 to 5.

以下示例展示了如何将 apply-sequence 标头设置为 true

The following example shows how to set the apply-sequence header to true:

  • Java

  • XML

@Bean
public MessageChannel pubsubChannel() {
    PublishSubscribeChannel channel = new PublishSubscribeChannel();
    channel.setApplySequence(true);
    return channel;
}
<int:publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>

默认情况下,apply-sequence 值为 false,以便发布-订阅通道可以将完全相同的消息实例发送到多个输出通道。由于 Spring Integration 强制执行有效负载和标头引用的不可变性,因此当标志设置为 true 时,通道将创建具有相同有效负载引用但不同标头值的新 Message 实例。

The apply-sequence value is false by default so that a publish-subscribe channel can send the exact same message instances to multiple outbound channels. Since Spring Integration enforces immutability of the payload and header references, when the flag is set to true, the channel creates new Message instances with the same payload reference but different header values.

从 5.4.3 版本开始,PublishSubscribeChannel 还可以使用其 BroadcastingDispatcherrequireSubscribers 选项进行配置,以指示此通道在没有订阅者时不会默默地忽略消息。当没有订阅者且此选项设置为 true 时,将抛出一个带有 “Dispatcher 没有订阅者” 消息的 MessageDispatchingException

Starting with version 5.4.3, the PublishSubscribeChannel can also be configured with the requireSubscribers option of its BroadcastingDispatcher to indicate that this channel will not ignore a message silently when it has no subscribers. A MessageDispatchingException with a Dispatcher has no subscribers message is thrown when there are no subscribers and this option is set to true.

ExecutorChannel

若要创建 ExecutorChannel,请添加带有 task-executor 属性的 <dispatcher> 子元素。此属性的值可以引用上下文中的任何 TaskExecutor。例如,这样做可以为调度消息到已订阅的处理程序配置一个线程池。如前所述,这样做会中断发送方和接收方之间的单线程执行上下文,以便活动事务上下文不会与处理程序的调用共享(即,处理程序可能会抛出一个 Exception,但 send 调用已经成功返回)。以下示例展示了如何使用 dispatcher 元素并在 task-executor 属性中指定执行器:

To create an ExecutorChannel, add the <dispatcher> sub-element with a task-executor attribute. The attribute’s value can reference any TaskExecutor within the context. For example, doing so enables configuration of a thread pool for dispatching messages to subscribed handlers. As mentioned earlier, doing so breaks the single-threaded execution context between sender and receiver so that any active transaction context is not shared by the invocation of the handler (that is, the handler may throw an Exception, but the send invocation has already returned successfully). The following example shows how to use the dispatcher element and specify an executor in the task-executor attribute:

  • Java

  • XML

@Bean
public MessageChannel executorChannel() {
    return new ExecutorChannel(someExecutor());
}
<int:channel id="executorChannel">
    <int:dispatcher task-executor="someExecutor"/>
</int:channel>

load-balancerfailover 选项都可以在 <dispatcher/> 子元素上使用,如 DirectChannel Configuration 中前面所述。应用相同的默认设置。因此,除非针对这两个属性中的一个或两个明确提供了配置,否则通道将采用具有启用故障转移功能的循环负载均衡策略,如下例所示:

The load-balancer and failover options are also both available on the <dispatcher/> sub-element, as described earlier in DirectChannel Configuration. The same defaults apply. Consequently, the channel has a round-robin load-balancing strategy with failover enabled unless explicit configuration is provided for one or both of those attributes, as the following example shows:

<int:channel id="executorChannelWithoutFailover">
    <int:dispatcher task-executor="someExecutor" failover="false"/>
</int:channel>

PriorityChannel Configuration

若要创建 PriorityChannel,请使用 <priority-queue/> 子元素,如下面的示例所示:

To create a PriorityChannel, use the <priority-queue/> sub-element, as the following example shows:

  • Java

  • XML

@Bean
public PollableChannel priorityChannel() {
    return new PriorityChannel(20);
}
<int:channel id="priorityChannel">
    <int:priority-queue capacity="20"/>
</int:channel>

默认情况下,通道会查询消息的 priority 标头。不过,您也可以提供一个自定义 Comparator 引用。此外,请注意 PriorityChannel(与其他类型一样)支持 datatype 属性。与 QueueChannel 一样,它还支持 capacity 属性。以下示例对此进行了演示:

By default, the channel consults the priority header of the message. However, you can instead provide a custom Comparator reference. Also, note that the PriorityChannel (like the other types) does support the datatype attribute. As with the QueueChannel, it also supports a capacity attribute. The following example demonstrates all of these:

  • Java

  • XML

@Bean
public PollableChannel priorityChannel() {
    PriorityChannel channel = new PriorityChannel(20, widgetComparator());
    channel.setDatatypes(example.Widget.class);
    return channel;
}
<int:channel id="priorityChannel" datatype="example.Widget">
    <int:priority-queue comparator="widgetComparator"
                    capacity="10"/>
</int:channel>

从 4.0 版开始,priority-channel 子元素支持 message-store 选项(此时不允许使用 comparatorcapacity)。消息存储必须是 PriorityCapableChannelMessageStore。目前为 RedisJDBCMongoDB 提供了 PriorityCapableChannelMessageStore 的实现。有关更多信息,请参见 QueueChannel ConfigurationMessage Store。您可以在 Backing Message Channels 中找到示例配置。

Since version 4.0, the priority-channel child element supports the message-store option (comparator and capacity are not allowed in that case). The message store must be a PriorityCapableChannelMessageStore. Implementations of the PriorityCapableChannelMessageStore are currently provided for Redis, JDBC, and MongoDB. See QueueChannel Configuration and Message Store for more information. You can find sample configuration in Backing Message Channels.

RendezvousChannel Configuration

当队列子元素为 <rendezvous-queue> 时,将创建 RendezvousChannel。它不提供对前面描述的任何其他配置选项,并且其队列不接受任何容量值,因为它是一个零容量直交队列。以下示例展示了如何声明 RendezvousChannel

A RendezvousChannel is created when the queue sub-element is a <rendezvous-queue>. It does not provide any additional configuration options to those described earlier, and its queue does not accept any capacity value, since it is a zero-capacity direct handoff queue. The following example shows how to declare a RendezvousChannel:

  • Java

  • XML

@Bean
public PollableChannel rendezvousChannel() {
    return new RendezvousChannel();
}
<int:channel id="rendezvousChannel"/>
    <int:rendezvous-queue/>
</int:channel>

Scoped Channel Configuration

任何通道都可以配置 scope 属性,如下面的示例所示:

Any channel can be configured with a scope attribute, as the following example shows:

<int:channel id="threadLocalChannel" scope="thread"/>

Channel Interceptor Configuration

消息通道也可以具有拦截器,如 Channel Interceptors 中所述。<interceptors/> 子元素可以添加到 <channel/>(或更具体的元素类型)中。您可以提供 ref 属性以引用实现 ChannelInterceptor 接口的任何 Spring 管理对象,如下例所示:

Message channels may also have interceptors, as described in Channel Interceptors. The <interceptors/> sub-element can be added to a <channel/> (or the more specific element types). You can provide the ref attribute to reference any Spring-managed object that implements the ChannelInterceptor interface, as the following example shows:

<int:channel id="exampleChannel">
    <int:interceptors>
        <ref bean="trafficMonitoringInterceptor"/>
    </int:interceptors>
</int:channel>

一般来说,我们建议在单独的位置定义拦截器实现,因为它们通常提供可在多个通道中重复使用的常见行为。

In general, we recommend defining the interceptor implementations in a separate location, since they usually provide common behavior that can be reused across multiple channels.

Global Channel Interceptor Configuration

通道拦截器提供了一种适用于每个通道的干净简洁的方式来应用横切行为。如果应将相同行为应用于多个通道,则为每个通道配置同一组拦截器并不是最高效的方式。为了避免重复配置,同时还允许拦截器应用于多个通道,Spring Integration 提供全局拦截器。考虑以下一对示例:

Channel interceptors provide a clean and concise way of applying cross-cutting behavior per individual channel. If the same behavior should be applied on multiple channels, configuring the same set of interceptors for each channel would not be the most efficient way. To avoid repeated configuration while also enabling interceptors to apply to multiple channels, Spring Integration provides global interceptors. Consider the following pair of examples:

<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
    <bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>

<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>

每个 <channel-interceptor/> 元素让你可以定义一个全局拦截器,该拦截器将应用于与 pattern 属性定义的任何模式匹配的所有通道中。在前一种情况下,全局拦截器应用于“thing1”通道和以“thing2”或“input”开头(但不是以“thing3”开头)的所有其他通道(自版本 5.0 起)。

Each <channel-interceptor/> element lets you define a global interceptor, which is applied on all channels that match any patterns defined by the pattern attribute. In the preceding case, the global interceptor is applied on the 'thing1' channel and all other channels that begin with 'thing2' or 'input' but not to channels starting with 'thing3' (since version 5.0).

将此语法添加到模式中会引起一个可能(尽管不常见)的问题。如果您有 bean 的名称 !thing1,并在通道拦截器的 pattern 模式中包含模式 !thing1,则它不再匹配。现在,模式匹配所有不名为 thing1 的 bean。在这种情况下,您可以使用 \ 转义模式中的 !。模式 \!thing1 匹配名为 !thing1 的 bean。

The addition of this syntax to the pattern causes one possible (though perhaps unlikely) problem. If you have a bean named !thing1 and you included a pattern of !thing1 in your channel interceptor’s pattern patterns, it no longer matches. The pattern now matches all beans not named thing1. In this case, you can escape the ! in the pattern with \. The pattern \!thing1 matches a bean named !thing1.

order 属性允许你在给定通道上有多个拦截器时管理该拦截器被注入的位置。例如,通道“inputChannel”可以本地配置单个拦截器(见下文),如下例所示:

The order attribute lets you manage where this interceptor is injected when there are multiple interceptors on a given channel. For example, channel 'inputChannel' could have individual interceptors configured locally (see below), as the following example shows:

<int:channel id="inputChannel">
  <int:interceptors>
    <int:wire-tap channel="logger"/>
  </int:interceptors>
</int:channel>

一个合理的问题是“全局拦截器如何与在本地或通过其他全局拦截器定义配置的其他拦截器相关联注入?”当前实现提供了一个用于定义拦截器执行顺序的简单机制。order 属性中的正数确保拦截器在任何现有拦截器之后注入,而负数确保拦截器在现有拦截器之前注入。这意味着,在前面的示例中,全局拦截器在本地配置的“wire-tap”拦截器之后注入(因为它的 order 大于 0)。如果还有另一个具有匹配 pattern 的全局拦截器,则其顺序将通过比较两个拦截器的 order 属性的值来确定。要将全局拦截器注入到现有拦截器之前,请使用 order 属性的负值。

A reasonable question is “how is a global interceptor injected in relation to other interceptors configured locally or through other global interceptor definitions?” The current implementation provides a simple mechanism for defining the order of interceptor execution. A positive number in the order attribute ensures interceptor injection after any existing interceptors, while a negative number ensures that the interceptor is injected before existing interceptors. This means that, in the preceding example, the global interceptor is injected after (since its order is greater than 0) the 'wire-tap' interceptor configured locally. If there were another global interceptor with a matching pattern, its order would be determined by comparing the values of both interceptors' order attributes. To inject a global interceptor before the existing interceptors, use a negative value for the order attribute.

请注意,orderpattern 属性都是可选的。order 的默认值将为 0,并且对于 pattern,默认值为 '*'(匹配所有通道)。

Note that both the order and pattern attributes are optional. The default value for order will be 0 and for pattern, the default is '*' (to match all channels).

Wire Tap

如前所述,Spring Integration 提供了一个简单的 wire tap 拦截器。你可以在 <interceptors/> 元素中的任何通道上配置 wire tap。这样做对于调试特别有用,并且可以结合 Spring Integration 的日志通道适配器一起使用,如下所示:

As mentioned earlier, Spring Integration provides a simple wire tap interceptor. You can configure a wire tap on any channel within an <interceptors/> element. Doing so is especially useful for debugging and can be used in conjunction with Spring Integration’s logging channel adapter as follows:

<int:channel id="in">
    <int:interceptors>
        <int:wire-tap channel="logger"/>
    </int:interceptors>
</int:channel>

<int:logging-channel-adapter id="logger" level="DEBUG"/>

“logging-channel-adapter” 还接受“expression”属性,以便您可以对“payload”和“headers”变量计算 SpEL 表达式。或者,要记录完整消息 toString() 结果,请为“log-full-message”属性提供 true 值。默认情况下,它是 false,因此仅记录有效负载。将它设置为 true 可以记录所有头以及有效负载。“expression”选项提供了最大的灵活性(例如,expression="payload.user.name")。

The 'logging-channel-adapter' also accepts an 'expression' attribute so that you can evaluate a SpEL expression against the 'payload' and 'headers' variables. Alternatively, to log the full message toString() result, provide a value of true for the 'log-full-message' attribute. By default, it is false so that only the payload is logged. Setting it to true enables logging of all headers in addition to the payload. The 'expression' option provides the most flexibility (for example, expression="payload.user.name").

关于窃听器和其他类似组件 (Message Publishing Configuration) 的一个常见误解是,它们本质上是自动异步的。默认情况下,窃听器作为组件不会异步调用。相反,Spring Integration 专注于异步行为配置的单一统一方法:消息通道。使消息流的某些部分同步或异步的是已在此流程中配置的消息通道的类型。这是消息通道抽象的主要好处之一。从该框架的创立之初,我们就一直强调消息通道作为该框架的一等公民的必要性和价值。它不仅仅是 EIP 模式的一个内部、隐式实现。它作为可配置组件完全向最终用户公开。因此,窃听器组件只负责执行以下任务:

One of the common misconceptions about the wire tap and other similar components (Message Publishing Configuration) is that they are automatically asynchronous in nature. By default, wire tap as a component is not invoked asynchronously. Instead, Spring Integration focuses on a single unified approach to configuring asynchronous behavior: the message channel. What makes certain parts of the message flow synchronous or asynchronous is the type of Message Channel that has been configured within that flow. That is one of the primary benefits of the message channel abstraction. From the inception of the framework, we have always emphasized the need and the value of the message channel as a first-class citizen of the framework. It is not just an internal, implicit realization of the EIP pattern. It is fully exposed as a configurable component to the end user. So, the wire tap component is only responsible for performing the following tasks:

  • Intercept a message flow by tapping into a channel (for example, channelA)

  • Grab each message

  • Send the message to another channel (for example, channelB)

它本质上是桥接模式的一个变体,但它封装在一个通道定义中(因此更容易启用和禁用而不中断流)。此外,与桥接不同,它基本上分叉另一个消息流。该流是同步的还是异步的?答案取决于消息通道的类型,即“channelB”。我们有以下选项:直接通道、可轮询通道和执行器通道。后两个突破线程边界,使通过此类通道的通信成为异步,因为将消息从该通道分派到其订阅的处理程序在不同于用于将消息发送到该通道的线程上。这将使你的 wire-tap 流同步或异步。它与框架中的其他组件(例如消息发布器)保持一致,并且通过免除你事先考虑(除了编写线程安全代码之外)特定代码段应实现为同步还是异步,从而增加了协调一致性和简单性。通过消息通道对两段代码(例如组件 A 和组件 B)的实际连接使其协作为同步或异步。你甚至可能希望在将来从同步更改为异步,消息通道让你可以快速执行此操作,而无需接触代码。

It is essentially a variation of the bridge pattern, but it is encapsulated within a channel definition (and hence easier to enable and disable without disrupting a flow). Also, unlike the bridge, it basically forks another message flow. Is that flow synchronous or asynchronous? The answer depends on the type of message channel that 'channelB' is. We have the following options: direct channel, pollable channel, and executor channel. The last two break the thread boundary, making communication over such channels asynchronous, because the dispatching of the message from that channel to its subscribed handlers happens on a different thread than the one used to send the message to that channel. That is what is going to make your wire-tap flow synchronous or asynchronous. It is consistent with other components within the framework (such as message publisher) and adds a level of consistency and simplicity by sparing you from worrying in advance (other than writing thread-safe code) about whether a particular piece of code should be implemented as synchronous or asynchronous. The actual wiring of two pieces of code (say, component A and component B) over a message channel is what makes their collaboration synchronous or asynchronous. You may even want to change from synchronous to asynchronous in the future, and message channel lets you do it swiftly without ever touching the code.

关于 wire tap 的最后一点是,尽管提供了上述理由来说明它默认情况下不是异步的,但你应该记住,通常希望尽快传递消息。因此,将异步通道选项用作 wire tap 的出站通道非常常见。但是,默认情况下不会强制执行异步行为。如果这样做,会破坏许多用例,包括你可能不想破坏事务边界。也许你出于审计目的使用了 wire tap 模式,并且确实希望在原始事务中发送审计消息。例如,你可以将 wire tap 连接到 JMS 出站通道适配器。这样,你可以获得两全其美的效果:1)可以在事务中发送 JMS 消息,同时 2)它仍然是“立即遗忘”操作,从而防止主消息流中出现任何明显的延迟。

One final point regarding the wire tap is that, despite the rationale provided above for not being asynchronous by default, you should keep in mind that it is usually desirable to hand off the message as soon as possible. Therefore, it would be quite common to use an asynchronous channel option as the wire tap’s outbound channel. However, the asynchronous behavior is not enforced by default. There are a number of use cases that would break if we did, including that you might not want to break a transactional boundary. Perhaps you use the wire tap pattern for auditing purposes, and you do want the audit messages to be sent within the original transaction. As an example, you might connect the wire tap to a JMS outbound channel adapter. That way, you get the best of both worlds: 1) the sending of a JMS Message can occur within the transaction while 2) it is still a “fire-and-forget” action, thereby preventing any noticeable delay in the main message flow.

从 v4.0.0 版开始,当拦截器(例如 link:https://docs.spring.io/autorepo/docs/spring-integration/current/api/org/springframework/integration/channel/interceptor/WireTap.html[WireTap 类)引用通道时,避免循环引用非常重要。您需要将这类通道从当前拦截器拦截的通道中排除在外。可以通过适当的模式或以编程方式来完成此操作。如果您有一个自定义 ChannelInterceptor 引用 channel,请考虑实施 VetoCapableInterceptor。通过这种方式,框架会询问拦截器是否可以根据提供的模式拦截每个候选通道。您还可以在拦截器方法中添加运行时保护,以确保通道不是拦截器引用的通道。WireTap 使用了这两种技术。

Starting with version 4.0, it is important to avoid circular references when an interceptor (such as the WireTap class) references a channel. You need to exclude such channels from those being intercepted by the current interceptor. This can be done with appropriate patterns or programmatically. If you have a custom ChannelInterceptor that references a channel, consider implementing VetoCapableInterceptor. That way, the framework asks the interceptor if it is OK to intercept each channel that is a candidate, based on the supplied pattern. You can also add runtime protection in the interceptor methods to ensure that the channel is not one that is referenced by the interceptor. The WireTap uses both of these techniques.

从 4.3 版本开始,WireTap 具有其他构造函数,它们采用 channelName 而不是 MessageChannel 实例。对于 Java 配置和使用通道自动创建逻辑时,这非常方便。目标 MessageChannel bean 稍后从提供的 channelName 中解析,在与拦截器进行首次交互时进行解析。

Starting with version 4.3, the WireTap has additional constructors that take a channelName instead of a MessageChannel instance. This can be convenient for Java configuration and when channel auto-creation logic is being used. The target MessageChannel bean is resolved from the provided channelName later, on the first interaction with the interceptor.

通道解析需要 BeanFactory,所以窃听实例必须是 Spring 管理的 bean。

Channel resolution requires a BeanFactory, so the wire tap instance must be a Spring-managed bean.

这种延迟绑定方法还允许用 Java DSL 配置简化典型的 wire-tap 模式,如下例所示:

This late-binding approach also allows simplification of typical wire-tapping patterns with Java DSL configuration, as the following example shows:

@Bean
public PollableChannel myChannel() {
    return MessageChannels.queue()
            .wireTap("loggingFlow.input")
            .get();
}

@Bean
public IntegrationFlow loggingFlow() {
    return f -> f.log();
}

Conditional Wire Taps

可以通过使用 selector 或 selector-expression 属性使 wire tap 变得有条件。selector 引用 MessageSelector bean,它可以在运行时确定消息是否应转到 tap 通道。类似地,selector-expression 是执行相同目的的布尔 SpEL 表达式:如果表达式计算结果为 true,则消息将发送到 tap 通道。

Wire taps can be made conditional by using the selector or selector-expression attributes. The selector references a MessageSelector bean, which can determine at runtime whether the message should go to the tap channel. Similarly, the selector-expression is a boolean SpEL expression that performs the same purpose: If the expression evaluates to true, the message is sent to the tap channel.

Global Wire Tap Configuration

可以将全局窃听器配置为 Global Channel Interceptor Configuration 的特例。为此,请配置顶级 wire-tap 元素。现在,除了常规 wire-tap 命名空间支持外,还支持 patternorder 属性,并且工作方式与在 channel-interceptor 中完全相同。以下示例演示如何配置全局窃听器:

It is possible to configure a global wire tap as a special case of the Global Channel Interceptor Configuration. To do so, configure a top level wire-tap element. Now, in addition to the normal wire-tap namespace support, the pattern and order attributes are supported and work in exactly the same way as they do for the channel-interceptor. The following example shows how to configure a global wire tap:

  • Java

  • XML

@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
    return new WireTap(wiretapChannel);
}
<int:wire-tap pattern="input*, thing2*, thing1" order="3" channel="wiretapChannel"/>

全局窃听提供了一种便捷的方式,可以在不修改现有通道配置的情况下外部配置单个通道窃听。为此,将 pattern 属性设置为目标通道名称。例如,您可以使用此技术将测试用例配置为验证通道上的消息。

A global wire tap provides a convenient way to configure a single-channel wire tap externally without modifying the existing channel configuration. To do so, set the pattern attribute to the target channel name. For example, you can use this technique to configure a test case to verify messages on a channel.