Channel Adapter

信道适配器是一个消息端点,能够连接单个发送者或接收者到消息信道。Spring Integration 提供了许多适配器来支持各种传输,例如 JMS、文件、HTTP、web 服务、邮件等等。本参考指南的后续章节将讨论每种适配器。然而,本章专注于简单但灵活的方法调用信道适配器支持。既有入站适配器也有出站适配器,每个适配器都可以使用核心命名空间中提供的 XML 元素进行配置。这些提供了扩展 Spring Integration 的简便方法,只要您拥有一个方法可以作为源或目标被调用。

Configuring An Inbound Channel Adapter

“inbound-channel-adapter”元素(Java 配置中的“SourcePollingChannelAdapter”)可以在 Spring 管理的对象上调用任何方法,并在将方法输出转换为“Message”后将非空返回值发送到“MessageChannel”。当适配器的订阅被激活时,轮询器会尝试从源接收消息。轮询器根据提供的配置使用“TaskScheduler”进行计划。要为单个信道适配器配置轮询间隔或 cron 表达式,您可以提供一个带有计划属性之一(例如“fixed-rate”或“cron”)的“poller”元素。以下示例定义了两个“inbound-channel-adapter”实例:

  • Java DSL

  • Java

  • Kotlin DSL

  • XML

@Bean
public IntegrationFlow source1() {
    return IntegrationFlow.from(() -> new GenericMessage<>(...),
                             e -> e.poller(p -> p.fixedRate(5000)))
                ...
                .get();
}

@Bean
public IntegrationFlow source2() {
    return IntegrationFlow.from(() -> new GenericMessage<>(...),
                             e -> e.poller(p -> p.cron("30 * 9-17 * * MON-FRI")))
                ...
                .get();
}
public class SourceService {

    @InboundChannelAdapter(channel = "channel1", poller = @Poller(fixedRate = "5000"))
    Object method1() {
        ...
    }

    @InboundChannelAdapter(channel = "channel2", poller = @Poller(cron = "30 * 9-17 * * MON-FRI"))
    Object method2() {
        ...
    }
}
@Bean
fun messageSourceFlow() =
    integrationFlow( { GenericMessage<>(...) },
                    { poller { it.fixedRate(5000) } }) {
        ...
    }
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
    <int:poller fixed-rate="5000"/>
</int:inbound-channel-adapter>

<int:inbound-channel-adapter ref="source2" method="method2" channel="channel2">
    <int:poller cron="30 * 9-17 * * MON-FRI"/>
</int:channel-adapter>

如果没有提供轮询器,则必须在上下文中注册一个默认轮询器。有关更多详细信息,请参阅 Endpoint Namespace Support

轮询端点的默认触发器是一个具有 1 秒固定延迟期的 PeriodicTrigger 实例。

Example 1. Important: Poller Configuration

所有“inbound-channel-adapter”类型都受“SourcePollingChannelAdapter”支持,这意味着它们包含一个轮询器配置,该配置根据轮询器中指定配置轮询“MessageSource”(调用生成成为“Message”有效负载的值的自定义方法)。以下示例显示了两个轮询器的配置:

<int:poller max-messages-per-poll="1" fixed-rate="1000"/>

<int:poller max-messages-per-poll="10" fixed-rate="1000"/>

在第一个配置中,轮询任务在每次轮询中调用一次,并且,在每次任务(轮询)中,都根据“max-messages-per-poll”属性值调用方法(导致消息生成)一次。在第二个配置中,轮询任务每轮询调用 10 次或直到返回“null”,因此每次轮询可能产生十条消息,同时每个轮询每间隔一秒发生一次。但是,如果配置看起来像以下示例,会发生什么情况:

<int:poller fixed-rate="1000"/>

请注意,未指定“max-messages-per-poll”。正如我们稍后介绍的那样,“PollingConsumer”中相同的轮询器配置(例如“service-activator”、“filter”、“router”等)的“max-messages-per-poll”的默认值为“-1”,这意味着“不停执行轮询任务,除非轮询方法返回 null (可能是因为“QueueChannel”中不再有更多消息)”,然后休眠一秒钟。 但是,在“SourcePollingChannelAdapter”中,情况略有不同。“max-messages-per-poll”的默认值为“1”,除非您明确将其设置为负值(例如“-1”)。这确保轮询器可以对生命周期事件(例如启动和停止)做出反应,并防止如果“MessageSource”的自定义方法的实现有可能永远不返回 null 且碰巧是不可中断的,则轮询器可能会无限循环。 但是,如果您确信您的方法可以返回 null,并且您需要轮询每个轮询中可用的所有源,则应明确将“max-messages-per-poll”设置为负值,如下示例所示:

<int:poller max-messages-per-poll="-1" fixed-rate="1000"/>

从 5.5 版开始,“max-messages-per-poll”的“0”值具有特殊含义 - 完全跳过“MessageSource.receive()”调用,这可以认为是对此入站信道适配器暂停,直到“maxMessagesPerPoll”在以后的某个时间被更改为非零值,例如通过控制总线。 从 6.2 版本开始,fixed-delay`和 `fixed-rate`可以配置为 ISO 8601 Duration格式,例如 `PT10S、`P1D`等。此外,底层的 `PeriodicTrigger`的 `initial-interval`也会以类似于 `fixed-delay`和 `fixed-rate`的值格式显示。 另请参见 Global Default Poller,了解更多信息。

Configuring An Outbound Channel Adapter

“outbound-channel-adapter”元素(Java 配置的“@ServiceActivator”)也可以将“MessageChannel”连接到任何 POJO 消费者方法,该方法应使用发送到该信道消息的有效负载进行调用。以下示例显示了如何定义出站信道适配器:

  • Java DSL

  • Java

  • Kotlin DSL

  • XML

@Bean
public IntegrationFlow outboundChannelAdapterFlow(MyPojo myPojo) {
    return f -> f
             .handle(myPojo, "handle");
}
public class MyPojo {

    @ServiceActivator(channel = "channel1")
    void handle(Object payload) {
        ...
    }

}
@Bean
fun outboundChannelAdapterFlow(myPojo: MyPojo) =
    integrationFlow {
        handle(myPojo, "handle")
    }
<int:outbound-channel-adapter channel="channel1" ref="target" method="handle"/>

<beans:bean id="target" class="org.MyPojo"/>

如果要适配的信道是“PollableChannel”,则必须提供一个轮询器子元素(“@ServiceActivator”上的“@Poller”子注解),如下例所示:

  • Java

  • XML

public class MyPojo {

    @ServiceActivator(channel = "channel1", poller = @Poller(fixedRate = "3000"))
    void handle(Object payload) {
        ...
    }

}
<int:outbound-channel-adapter channel="channel2" ref="target" method="handle">
    <int:poller fixed-rate="3000" />
</int:outbound-channel-adapter>

<beans:bean id="target" class="org.MyPojo"/>

如果您可以在其他“<outbound-channel-adapter>”定义中重复使用 POJO 消费者实现,则应使用“ref”属性。但是,如果消费者实现仅由“<outbound-channel-adapter>”的单个定义引用,则可以将其定义为内部 bean,如下例所示:

<int:outbound-channel-adapter channel="channel" method="handle">
    <beans:bean class="org.Foo"/>
</int:outbound-channel-adapter>

在同一个 <outbound-channel-adapter> 配置中同时使用 ref 属性和内部处理器定义是不允许的,因为它会产生歧义条件。这样的配置会导致抛出异常。

可以在没有“channel”引用的情况下创建任何信道适配器,在这种情况下它将隐式创建“DirectChannel”实例。创建的信道的名称与“<inbound-channel-adapter>”或“<outbound-channel-adapter>”元素的“id”属性匹配。因此,如果未提供“channel”,则需要“id”。

Channel Adapter Expressions and Scripts

与许多其他 Spring Integration 组件类似,“<inbound-channel-adapter>”和“<outbound-channel-adapter>”还支持 SpEL 表达式评估。要使用 SpEL,请在“expression”属性中提供表达式字符串,而不是为 bean 上的方法调用提供“ref”和“method”属性。当评估表达式时,它遵循与方法调用相同的约定,其中:对于“<inbound-channel-adapter>”的表达式,每当评估结果为非空值时都会生成一条消息,而对于“<outbound-channel-adapter>”的表达式,它必须等效于一个返回 void 的方法调用。

从 Spring Integration 3.0 开始,“<int:inbound-channel-adapter/>”还可以使用 SpEL “<expression/>”(或甚至带有“<script/>”)子元素进行配置,以便在比简单的“expression”属性所能实现的功能更复杂的情况下使用。如果您使用“location”属性将脚本作为“Resource”提供,还可以设置“refresh-check-delay”,这允许定期刷新资源。如果您希望在每次轮询时检查脚本,则需要将其设置与轮询器的触发器进行协调,如下例所示:

<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
    <int:poller max-messages-per-poll="1" fixed-delay="5000"/>
    <script:script lang="ruby" location="Foo.rb" refresh-check-delay="5000"/>
</int:inbound-channel-adapter>

在使用 `<expression/>`子元素时,请参见 `ReloadableResourceBundleExpressionSource`上的 `cacheSeconds`属性。有关表达式的更多信息,请参见 Spring Expression Language (SpEL)。有关脚本的信息,请参见 Groovy supportScripting Support

<int:inbound-channel-adapter/> (SourcePollingChannelAdapter) 是一个端点,通过定期触发轮询一些底层 MessageSource 来启动消息流。由于在轮询时没有消息对象,表达式和脚本无法访问根 Message,因此大多数其他消息 SpEL 表达式中没有可用的有效负载或标头属性。脚本可以生成并返回一个带有标头和有效负载的完整 Message 对象或仅返回一个有效负载,它将被框架添加到带有基本标头的消息中。