Channel Adapter

信道适配器是一个消息端点,能够连接单个发送者或接收者到消息信道。Spring Integration 提供了许多适配器来支持各种传输,例如 JMS、文件、HTTP、web 服务、邮件等等。本参考指南的后续章节将讨论每种适配器。然而,本章专注于简单但灵活的方法调用信道适配器支持。既有入站适配器也有出站适配器,每个适配器都可以使用核心命名空间中提供的 XML 元素进行配置。这些提供了扩展 Spring Integration 的简便方法,只要您拥有一个方法可以作为源或目标被调用。

A channel adapter is a message endpoint that enables connecting a single sender or receiver to a message channel. Spring Integration provides a number of adapters to support various transports, such as JMS, file, HTTP, web services, mail, and more. Upcoming chapters of this reference guide discuss each adapter. However, this chapter focuses on the simple but flexible method-invoking channel adapter support. There are both inbound and outbound adapters, and each may be configured with XML elements provided in the core namespace. These provide an easy way to extend Spring Integration, as long as you have a method that can be invoked as either a source or a destination.

Configuring An Inbound Channel Adapter

“inbound-channel-adapter”元素(Java 配置中的“SourcePollingChannelAdapter”)可以在 Spring 管理的对象上调用任何方法,并在将方法输出转换为“Message”后将非空返回值发送到“MessageChannel”。当适配器的订阅被激活时,轮询器会尝试从源接收消息。轮询器根据提供的配置使用“TaskScheduler”进行计划。要为单个信道适配器配置轮询间隔或 cron 表达式,您可以提供一个带有计划属性之一(例如“fixed-rate”或“cron”)的“poller”元素。以下示例定义了两个“inbound-channel-adapter”实例:

An inbound-channel-adapter element (a SourcePollingChannelAdapter in Java configuration) can invoke any method on a Spring-managed object and send a non-null return value to a MessageChannel after converting the method’s output to a Message. When the adapter’s subscription is activated, a poller tries to receive messages from the source. The poller is scheduled with the TaskScheduler according to the provided configuration. To configure the polling interval or cron expression for an individual channel adapter, you can provide a 'poller' element with one of the scheduling attributes, such as 'fixed-rate' or 'cron'. The following example defines two inbound-channel-adapter instances:

  • Java DSL

  • Java

  • Kotlin DSL

  • XML

@Bean
public IntegrationFlow source1() {
    return IntegrationFlow.from(() -> new GenericMessage<>(...),
                             e -> e.poller(p -> p.fixedRate(5000)))
                ...
                .get();
}

@Bean
public IntegrationFlow source2() {
    return IntegrationFlow.from(() -> new GenericMessage<>(...),
                             e -> e.poller(p -> p.cron("30 * 9-17 * * MON-FRI")))
                ...
                .get();
}
public class SourceService {

    @InboundChannelAdapter(channel = "channel1", poller = @Poller(fixedRate = "5000"))
    Object method1() {
        ...
    }

    @InboundChannelAdapter(channel = "channel2", poller = @Poller(cron = "30 * 9-17 * * MON-FRI"))
    Object method2() {
        ...
    }
}
@Bean
fun messageSourceFlow() =
    integrationFlow( { GenericMessage<>(...) },
                    { poller { it.fixedRate(5000) } }) {
        ...
    }
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
    <int:poller fixed-rate="5000"/>
</int:inbound-channel-adapter>

<int:inbound-channel-adapter ref="source2" method="method2" channel="channel2">
    <int:poller cron="30 * 9-17 * * MON-FRI"/>
</int:channel-adapter>

如果没有提供轮询器,则必须在上下文中注册一个默认轮询器。有关更多详细信息,请参阅 Endpoint Namespace Support

If no poller is provided, then a single default poller must be registered within the context. See Endpoint Namespace Support for more detail.

轮询端点的默认触发器是一个具有 1 秒固定延迟期的 PeriodicTrigger 实例。

The default trigger for polling endpoint is a PeriodicTrigger instance with a 1 second fixed delay period.

Example 1. Important: Poller Configuration

所有“inbound-channel-adapter”类型都受“SourcePollingChannelAdapter”支持,这意味着它们包含一个轮询器配置,该配置根据轮询器中指定配置轮询“MessageSource”(调用生成成为“Message”有效负载的值的自定义方法)。以下示例显示了两个轮询器的配置:

All the inbound-channel-adapter types are backed by a SourcePollingChannelAdapter, which means they contain a poller configuration that polls the MessageSource (to invoke a custom method that produces the value that becomes a Message payload) based on the configuration specified in the Poller. The following example shows the configuration of two pollers:

<int:poller max-messages-per-poll="1" fixed-rate="1000"/>

<int:poller max-messages-per-poll="10" fixed-rate="1000"/>

在第一个配置中,轮询任务在每次轮询中调用一次,并且,在每次任务(轮询)中,都根据“max-messages-per-poll”属性值调用方法(导致消息生成)一次。在第二个配置中,轮询任务每轮询调用 10 次或直到返回“null”,因此每次轮询可能产生十条消息,同时每个轮询每间隔一秒发生一次。但是,如果配置看起来像以下示例,会发生什么情况:

In the first configuration, the polling task is invoked once per poll, and, during each task (poll), the method (which results in the production of the message) is invoked once, based on the max-messages-per-poll attribute value. In the second configuration, the polling task is invoked 10 times per poll or until it returns 'null', thus possibly producing ten messages per poll while each poll happens at one-second intervals. However, what happens if the configuration looks like the following example:

<int:poller fixed-rate="1000"/>

请注意,未指定“max-messages-per-poll”。正如我们稍后介绍的那样,“PollingConsumer”中相同的轮询器配置(例如“service-activator”、“filter”、“router”等)的“max-messages-per-poll”的默认值为“-1”,这意味着“不停执行轮询任务,除非轮询方法返回 null (可能是因为“QueueChannel”中不再有更多消息)”,然后休眠一秒钟。

Note that there is no max-messages-per-poll specified. As we cover later, the identical poller configuration in the PollingConsumer (for example, service-activator, filter, router, and others) would have a default value of -1 for max-messages-per-poll, which means “execute the polling task non-stop unless the polling method returns null (perhaps because there are no more messages in the QueueChannel)” and then sleep for one second.

但是,在“SourcePollingChannelAdapter”中,情况略有不同。“max-messages-per-poll”的默认值为“1”,除非您明确将其设置为负值(例如“-1”)。这确保轮询器可以对生命周期事件(例如启动和停止)做出反应,并防止如果“MessageSource”的自定义方法的实现有可能永远不返回 null 且碰巧是不可中断的,则轮询器可能会无限循环。

However, in the SourcePollingChannelAdapter, it is a bit different. The default value for max-messages-per-poll is 1, unless you explicitly set it to a negative value (such as -1). This makes sure that the poller can react to lifecycle events (such as start and stop) and prevents it from potentially spinning in an infinite loop if the implementation of the custom method of the MessageSource has a potential to never return null and happens to be non-interruptible.

但是,如果您确信您的方法可以返回 null,并且您需要轮询每个轮询中可用的所有源,则应明确将“max-messages-per-poll”设置为负值,如下示例所示:

However, if you are sure that your method can return null, and you need to poll for as many sources as available per each poll, you should explicitly set max-messages-per-poll to a negative value, as the following example shows:

<int:poller max-messages-per-poll="-1" fixed-rate="1000"/>

从 5.5 版开始,“max-messages-per-poll”的“0”值具有特殊含义 - 完全跳过“MessageSource.receive()”调用,这可以认为是对此入站信道适配器暂停,直到“maxMessagesPerPoll”在以后的某个时间被更改为非零值,例如通过控制总线。

Starting with version 5.5, a 0 value for max-messages-per-poll has a special meaning - skip the MessageSource.receive() call altogether, which may be considered as pausing for this inbound channel adapter until the maxMessagesPerPoll is changed to a non-zero value at a later time, e.g. via a Control Bus.

从 6.2 版本开始,fixed-delay`和 `fixed-rate`可以配置为 ISO 8601 Duration格式,例如 `PT10S、`P1D`等。此外,底层的 `PeriodicTrigger`的 `initial-interval`也会以类似于 `fixed-delay`和 `fixed-rate`的值格式显示。

Starting with version 6.2, the fixed-delay and fixed-rate can be configured in ISO 8601 Duration format, e.g. PT10S, P1D etc. In addition, an initial-interval of the underlying PeriodicTrigger is also exposed with similar value formats as fixed-delay and fixed-rate.

另请参见 Global Default Poller,了解更多信息。

Also see Global Default Poller for more information.

Configuring An Outbound Channel Adapter

“outbound-channel-adapter”元素(Java 配置的“@ServiceActivator”)也可以将“MessageChannel”连接到任何 POJO 消费者方法,该方法应使用发送到该信道消息的有效负载进行调用。以下示例显示了如何定义出站信道适配器:

An outbound-channel-adapter element (a @ServiceActivator for Java configuration) can also connect a MessageChannel to any POJO consumer method that should be invoked with the payload of messages sent to that channel. The following example shows how to define an outbound channel adapter:

  • Java DSL

  • Java

  • Kotlin DSL

  • XML

@Bean
public IntegrationFlow outboundChannelAdapterFlow(MyPojo myPojo) {
    return f -> f
             .handle(myPojo, "handle");
}
public class MyPojo {

    @ServiceActivator(channel = "channel1")
    void handle(Object payload) {
        ...
    }

}
@Bean
fun outboundChannelAdapterFlow(myPojo: MyPojo) =
    integrationFlow {
        handle(myPojo, "handle")
    }
<int:outbound-channel-adapter channel="channel1" ref="target" method="handle"/>

<beans:bean id="target" class="org.MyPojo"/>

如果要适配的信道是“PollableChannel”,则必须提供一个轮询器子元素(“@ServiceActivator”上的“@Poller”子注解),如下例所示:

If the channel being adapted is a PollableChannel, you must provide a poller sub-element (the @Poller sub-annotation on the @ServiceActivator), as the following example shows:

  • Java

  • XML

public class MyPojo {

    @ServiceActivator(channel = "channel1", poller = @Poller(fixedRate = "3000"))
    void handle(Object payload) {
        ...
    }

}
<int:outbound-channel-adapter channel="channel2" ref="target" method="handle">
    <int:poller fixed-rate="3000" />
</int:outbound-channel-adapter>

<beans:bean id="target" class="org.MyPojo"/>

如果您可以在其他“<outbound-channel-adapter>”定义中重复使用 POJO 消费者实现,则应使用“ref”属性。但是,如果消费者实现仅由“<outbound-channel-adapter>”的单个定义引用,则可以将其定义为内部 bean,如下例所示:

You should use a ref attribute if the POJO consumer implementation can be reused in other <outbound-channel-adapter> definitions. However, if the consumer implementation is referenced by only a single definition of the <outbound-channel-adapter>, you can define it as an inner bean, as the following example shows:

<int:outbound-channel-adapter channel="channel" method="handle">
    <beans:bean class="org.Foo"/>
</int:outbound-channel-adapter>

在同一个 <outbound-channel-adapter> 配置中同时使用 ref 属性和内部处理器定义是不允许的,因为它会产生歧义条件。这样的配置会导致抛出异常。

Using both the ref attribute and an inner handler definition in the same <outbound-channel-adapter> configuration is not allowed, as it creates an ambiguous condition. Such a configuration results in an exception being thrown.

可以在没有“channel”引用的情况下创建任何信道适配器,在这种情况下它将隐式创建“DirectChannel”实例。创建的信道的名称与“<inbound-channel-adapter>”或“<outbound-channel-adapter>”元素的“id”属性匹配。因此,如果未提供“channel”,则需要“id”。

Any channel adapter can be created without a channel reference, in which case it implicitly creates an instance of DirectChannel. The created channel’s name matches the id attribute of the <inbound-channel-adapter> or <outbound-channel-adapter> element. Therefore, if channel is not provided, id is required.

Channel Adapter Expressions and Scripts

与许多其他 Spring Integration 组件类似,“<inbound-channel-adapter>”和“<outbound-channel-adapter>”还支持 SpEL 表达式评估。要使用 SpEL,请在“expression”属性中提供表达式字符串,而不是为 bean 上的方法调用提供“ref”和“method”属性。当评估表达式时,它遵循与方法调用相同的约定,其中:对于“<inbound-channel-adapter>”的表达式,每当评估结果为非空值时都会生成一条消息,而对于“<outbound-channel-adapter>”的表达式,它必须等效于一个返回 void 的方法调用。

Like many other Spring Integration components, the <inbound-channel-adapter> and <outbound-channel-adapter> also provide support for SpEL expression evaluation. To use SpEL, provide the expression string in the 'expression' attribute instead of providing the 'ref' and 'method' attributes that are used for method-invocation on a bean. When an expression is evaluated, it follows the same contract as method-invocation where: the expression for an <inbound-channel-adapter> generates a message any time the evaluation result is a non-null value, while the expression for an <outbound-channel-adapter> must be the equivalent of a void-returning method invocation.

从 Spring Integration 3.0 开始,“<int:inbound-channel-adapter/>”还可以使用 SpEL “<expression/>”(或甚至带有“<script/>”)子元素进行配置,以便在比简单的“expression”属性所能实现的功能更复杂的情况下使用。如果您使用“location”属性将脚本作为“Resource”提供,还可以设置“refresh-check-delay”,这允许定期刷新资源。如果您希望在每次轮询时检查脚本,则需要将其设置与轮询器的触发器进行协调,如下例所示:

Starting with Spring Integration 3.0, an <int:inbound-channel-adapter/> can also be configured with a SpEL <expression/> (or even with a <script/>) sub-element, for when more sophistication is required than can be achieved with the simple 'expression' attribute. If you provide a script as a Resource by using the location attribute, you can also set refresh-check-delay, which allows the resource to be periodically refreshed. If you want the script to be checked on each poll, you would need to coordinate this setting with the poller’s trigger, as the following example shows:

<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
    <int:poller max-messages-per-poll="1" fixed-delay="5000"/>
    <script:script lang="ruby" location="Foo.rb" refresh-check-delay="5000"/>
</int:inbound-channel-adapter>

在使用 `<expression/>`子元素时,请参见 `ReloadableResourceBundleExpressionSource`上的 `cacheSeconds`属性。有关表达式的更多信息,请参见 Spring Expression Language (SpEL)。有关脚本的信息,请参见 Groovy supportScripting Support

See also the cacheSeconds property on the ReloadableResourceBundleExpressionSource when using the <expression/> sub-element. For more information regarding expressions, see Spring Expression Language (SpEL). For scripts, see Groovy support and Scripting Support.

<int:inbound-channel-adapter/> (SourcePollingChannelAdapter) 是一个端点,通过定期触发轮询一些底层 MessageSource 来启动消息流。由于在轮询时没有消息对象,表达式和脚本无法访问根 Message,因此大多数其他消息 SpEL 表达式中没有可用的有效负载或标头属性。脚本可以生成并返回一个带有标头和有效负载的完整 Message 对象或仅返回一个有效负载,它将被框架添加到带有基本标头的消息中。

The <int:inbound-channel-adapter/> (SourcePollingChannelAdapter) is an endpoint which starts a message flow by periodically triggering to poll some underlying MessageSource. Since, at the time of polling, there is no message object, expressions and scripts do not have access to a root Message, so there are no payload or headers properties that are available in most other messaging SpEL expressions. The script can generate and return a complete Message object with headers and payload or only a payload, which is added to a message with basic headers by the framework.