Service Activator

Service Activator 是将任何 Spring 管理的对象连接到输入通道的端点类型,以便它可以发挥 service 的作用。如果 service 产生输出,则也可以将其连接到输出通道。或者,输出生成服务可以位于处理管道或消息流的末端,在这种情况下,可以使用入站消息的 replyChannel 标头。如果没有定义输出通道,则这是默认行为。与此处描述的大多数配置选项一样,相同的行为实际上适用于大多数其他组件。 Service Activator 本质上是一个通用端点,用于使用输入消息(有效负载和标头)调用某个对象上的方法。其内部逻辑基于 MessageHandlerMessageHandler 可以是针对特定用例的任何可能的实现,例如 DefaultMessageSplitterAggregatingMessageHandlerSftpMessageHandlerJpaOutboundGateway 等。因此,此参考手册中提到的任何出站网关和出站通道适配器都应视为此服务激活器端点的特定扩展;它们最终都会调用某个对象的方法。

Configuring Service Activator

使用 Java 和注释配置时,只需使用 @ServiceActivator 注释标记相应服务方法即可,当消息从输入通道消费时,框架将调用该方法:

public class SomeService {

    @ServiceActivator(inputChannel = "exampleChannel")
    public void exampleHandler(SomeData payload) {
        ...
    }

}

Annotation Support 中查看更多信息。

对于 Java、Groovy 或 Kotlin DSL,IntegrationFlow.handle() 运算符表示 Service Activator:

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

@Bean
public IntegrationFlow someFlow() {
    return IntegrationFlow
             .from("exampleChannel")
             .handle(someService, "exampleHandler")
             .get();
}
@Bean
fun someFlow() =
    integrationFlow("exampleChannel") {
        handle(someService, "exampleHandler")
    }
@Bean
someFlow() {
    integrationFlow 'exampleChannel',
            {
                handle someService, 'exampleHandler'
            }
}

请参阅各个章节中有关 DSL 的更多信息:

若要在使用 XML 配置时创建 Service Activator,请使用带有 input-channelref 属性的 service-activator 元素,如下面的示例所示:

<int:service-activator input-channel="exampleChannel" ref="exampleHandler"/>

前面的配置选择 exampleHandler 中符合下列消息要求的所有方法:

  • annotated with @ServiceActivator

  • is public

  • 如果 requiresReply == true 未返回 void,则提示

通过其 payload 类型为运行时的调用目标方法选择,如果目标类存在这种方法,则作为后备选择 Message<?> 类型。

从 5.0 版开始,一个服务方法可以通过 {@org.springframework.integration.annotation.Default} 标记为所有不匹配情况的后备。在使用 { content-type conversion}(当目标方法在转化后被调用)时,这可能很有用。

要委派给任何对象的明确定义的方法,您可以添加 method 属性,如下面的示例所示:

<int:service-activator input-channel="exampleChannel" ref="somePojo" method="someMethod"/>

在这两种情况下,当服务方法返回非空值时,端点尝试将回复消息发送到合适的回复通道。要确定回复通道,它首先检查是否在端点配置中提供了 output-channel,如下面的示例所示:

<int:service-activator input-channel="exampleChannel" output-channel="replyChannel"
                       ref="somePojo" method="someMethod"/>

如果该方法返回一个结果,并且未定义 output-channel,那么框架会检查请求消息的 replyChannel 标头值。如果该值可用,那么它会检查其类型。如果它是一个 MessageChannel,则回复消息将发送到该通道。如果它是一个 String,那么端点尝试将通道名称解析到一个通道实例。如果无法解析该通道,则会抛出 DestinationResolutionException。如果可以解析,则消息将发送到那里。如果请求消息没有 replyChannel 标头,并且 reply 对象是一个 Message,则会查阅它的 replyChannel 标头查找目标目的地。这是 Spring Integration 中请求-回复消息使用的技术,它也是返回地址模式的一个示例。

如果你的方法返回一个结果,且你想丢弃该结果并结束流程,你应该配置 output-channel 来发送到一个 NullChannel。为了方便起见,该框架使用 nullChannel 这个名称注册了一个。有关更多信息,请参见 Special Channels

服务激活器是不必生成回复消息的那些组件之一。如果您的方法返回 null 或者具有 void 返回类型,那么服务激活器会在不发出任何信号的情况下在方法调用后退出。此行为可由 AbstractReplyProducingMessageHandler.requiresReply 选项控制,在使用 XML 命名空间进行配置时,它还公开为 requires-reply。如果将该标志设为 true,并且该方法返回 null,则会抛出 ReplyRequiredException

服务方法中的参数既可以是一个消息,也可以是一个任意类型。如果是后者,则认为它是一个消息负载,其从消息中提取并注入到服务方法中。我们通常建议这种方法,因为它遵循和促进在使用 Spring Integration 时使用 POJO 模型。如 Annotation Support 所述,参数也可以有 @Header@Headers 注释。

服务方法不需要有任何参数,这意味着你可以实现事件样式的服务激活器(在其中你所关心的只是服务方法的调用),而不用担心消息的内容。将其视为一个 null JMS 消息。对于此实现的一个示例用例是简单地计算或监视沉积在输入通道上的消息。

从版本 4.1 开始,框架正确地将消息属性(payloadheaders)转换为 Java 8 Optional POJO 方法参数,如下面的示例所示:

public class MyBean {
    public String computeValue(Optional<String> payload,
               @Header(value="foo", required=false) String foo1,
               @Header(value="foo") Optional<String> foo2) {
        if (payload.isPresent()) {
            String value = payload.get();
            ...
        }
        else {
           ...
       }
    }

}

如果自定义服务激活器处理程序实现可以在其他 <service-activator> 定义中重复使用,我们通常建议使用 ref 属性。但是,如果自定义服务激活器处理程序实现仅在 <service-activator> 的单个定义中使用,您可以提供一个内部 Bean 定义,如下面的示例所示:

<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
            output-channel = "outChannel" method="someMethod">
    <beans:bean class="org.something.ExampleServiceActivator"/>
</int:service-activator>

在同一 <service-activator> 配置中使用 ref 属性和内部处理程序定义是不可行的,因为它会创建模棱两可的情况并导致抛出异常。

如果 ref 属性引用扩展 AbstractMessageProducingHandler 的 bean(例如框架自身提供的处理程序),则通过将输出通道直接注入处理程序来优化配置。在这种情况下,每个 ref 都必须是单独的 bean 实例(或 prototype 范围的 bean)或使用内部 <bean/> 配置类型。如果你不小心从多个 bean 引用同一个消息处理程序,你会得到一个配置异常。

Service Activators and the Spring Expression Language (SpEL)

自 Spring Integration 2.0 以来,服务激活器还可以受益于 SpEL

例如,您可以调用任何 bean 方法,而不指向 ref 属性中的 bean 或将其包含为内部 bean 定义,如下所示:

<int:service-activator input-channel="in" output-channel="out"
	expression="@accountService.processAccount(payload, headers.accountId)"/>

	<bean id="accountService" class="thing1.thing2.Account"/>

在前面的配置中,我们没有使用 ref 或内部 bean 注入 accountService,而是使用 SpEL 的 @beanId 符号调用采用与消息负载兼容的类型的某个方法。我们还传递了一个标头值。任何有效的 SpEL 表达式都可以针对消息中的任何内容求值。对于简单场景,如果所有逻辑都可以封存在这种表达式中,那么您的服务激活器不必引用 bean,如下面的示例所示:

<int:service-activator input-channel="in" output-channel="out" expression="payload * 2"/>

在前面的配置中,我们的服务逻辑是将负载值乘以 2。SpEL 让我们相对轻松地处理它。

有关配置服务激活器的更多信息,请参见 Java DSL 一章中的 Service Activators and the .handle() method

Asynchronous Service Activator

服务激活器由调用线程调用。如果输入通道是一个 SubscribableChannel,则这是一个上游线程,或者对于 PollableChannel 则是一个轮询线程。如果服务返回一个 CompletableFuture<?>,则默认操作是将该 CompletableFuture<?> 作为发送到输出(或回复)通道的消息的负载发送。从版本 4.3 开始,您现在可以将 async 属性设为 true(在使用 Java 配置时使用 setAsync(true))。如果在将 async 属性设为 true 时,服务返回一个 CompletableFuture<?>,则调用线程立即释放,并且在完成将来所需的线程(从服务内)上发送回复消息。这对于使用 PollableChannel 的长期运行服务尤其有利,因为轮询线程被释放以在框架内执行其他服务。

如果服务用 Exception 完成将来,则会正常发生错误处理。如果存在,则会将 ErrorMessage 发送到 errorChannel 消息标头。否则,会将 ErrorMessage 发送到默认的 errorChannel(如果可用)。

从版本 6.1 开始,如果 AbstractMessageProducingHandler 的输出通道配置到 ReactiveStreamsSubscribableChannel,则默认情况下打开异步模式。如果处理程序结果不是反应性类型或 CompletableFuture<?>,则尽管有输出通道类型,仍然会发生常规回复生成流程。

有关更多信息,还可以参见 Reactive Streams Support

Service Activator and Method Return Type

服务方法可以返回成为回复消息有效负载的任何类型。在此情况下,将创建一个新的 Message<?> 对象,并复制请求消息中的所有标头。当交互基于 POJO 方法调用时,这与大多数 Spring Integration MessageHandler 实现的工作方式相同。

方法还可以返回一个完整的 Message<?> 对象。但是,请记住,与 transformers 不同的是,对于一个服务激活器,这个消息将通过复制请求消息中的标头进行修改(如果没有在返回的消息中存在标头)。因此,如果你的方法参数是一个 Message<?>,而且你在服务方法中复制了一些(但不是全部)现有标头,它们将重新出现在回复消息中。删除回复消息中的标头并不是服务激活器的职责,并且遵循松散耦合原则,最好在集成流中添加一个 HeaderFilter。或者,可以使用 Transformer 而不是服务激活器,但是,在这种情况下,在返回一个完整的 Message<?> 时,该方法将完全负责消息,包括复制请求消息标头(如果需要)。你必须确保保留重要的框架标头(例如,replyChannelerrorChannel),如果存在的话。