Messaging Gateways

网关隐藏了 Spring Integration 提供的消息传递 API。它让你的应用程序业务逻辑不知道 Spring Integration API。通过使用通用网关,你的代码仅与一个简单接口进行交互。

Enter the GatewayProxyFactoryBean

如前所述,最好不依赖 Spring Integration API,包括网关类。因此,Spring Integration 提供了 GatewayProxyFactoryBean,它为任何接口生成一个代理,并在内部调用下面显示的网关方法。然后,通过使用依赖关系注入,你可以向你的业务方法公开接口。

以下示例显示了可用来与 Spring Integration 交互的接口:

public interface Cafe {

    void placeOrder(Order order);

}

Gateway XML Namespace Support

也提供了命名空间支持。它允许你将一个接口配置为一项服务,如下例所示:

<int:gateway id="cafeService"
         service-interface="org.cafeteria.Cafe"
         default-request-channel="requestChannel"
         default-reply-timeout="10000"
         default-reply-channel="replyChannel"/>

在此配置定义下,cafeService 现在可注入到其他 Bean,并且在 Cafe 接口的代理实例上调用方法的代码无需知道 Spring Integration API。有关一个在 Cafe 示例中使用 gateway 元素的示例,请参阅 xref:samples.adoc#samples-impl[“Samples” 附件。

上述配置中的默认值将应用到网关接口上的所有方法。如果没有指定回复超时,则调用线程将等待回复 30 秒。请参见 Gateway Behavior When No response Arrives

可以针对各个方法覆盖默认值。请参见 Gateway Configuration with Annotations and XML

Setting the Default Reply Channel

通常,您无需指定 default-reply-channel,因为网关会自动创建一个临时的匿名回复通道,它将在其中监听回复。但是,某些情况下可能会提示您定义 default-reply-channel(或带有适配器网关(例如 HTTP、JMS 等)的 reply-channel)。

为了了解一些背景,我们简要讨论下网关的内部工作原理。网关创建一个临时的点对点回复通道。它是匿名的,且使用名为 replyChannel 的信息标头添加到其中。在提供一个显式 default-reply-channel(带有远程适配器网关的 reply-channel)时,您可以指向发布-订阅通道,之所以如此命名,是因为您可以向其中添加多个订阅者。在内部,Spring Integration 创建一个临时的 replyChannel 和明确定义的 default-reply-channel 之间的桥梁。

假设您希望回复不仅会发送到网关,还会发送到其他一些消费者。在这种情况下,您有两个需求:

  • 你可以订阅的命名通道

  • 该通道为发布-订阅-通道

网关使用的默认策略不能满足这些需求,因为添加到标头中的回复通道是匿名的,并且是点对点的。这意味着没有其他订阅者可以处理它,即便可以处理,该通道具有点对点行为,因此只有一位订阅者可以获取消息。通过定义一个 default-reply-channel,您可以指向您选择的通道。在这种情况下,这是一个 publish-subscribe-channel。网关从它到存储在标头中的临时匿名回复通道创建一个桥梁。

您可能还想明确提供一个回复通道,以通过拦截器(例如,wiretap)进行监视或审核。要配置通道拦截器,您需要一个命名通道。

从版本 5.4 开始,当网关方法返回类型为 void 时,如果未明确提供此类标头,框架会将 replyChannel 标头填充为 nullChannel bean 引用。这允许丢弃下游流的任何可能的回复,满足单向网关协议。

Gateway Configuration with Annotations and XML

请考虑以下示例,它通过添加 @Gateway 注释扩展了之前的 Cafe 接口示例:

public interface Cafe {

    @Gateway(requestChannel="orders")
    void placeOrder(Order order);

}

@Header 注释允许您添加解释为消息标头的值,如下例所示:

public interface FileWriter {

    @Gateway(requestChannel="filesOut")
    void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);

}

如果您更喜欢使用 XML 方法配置网关方法,则可以向网关配置添加 method 元素,如下例所示:

<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
      default-request-channel="inputC">
  <int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
  <int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
  <int:method name="echoUpperCase" request-channel="inputB"/>
  <int:method name="echoViaDefault"/>
</int:gateway>

您还可以使用 XML 为每个方法调用提供单独的标头。如果要设置的标头本质上是静态的,并且不想通过使用 @Header 注释将它们嵌入到网关的方法签名中,这可能很有用。例如,在贷款经纪人示例中,我们希望影响贷款报价的聚合方式,具体取决于发起哪种类型的请求(单报价或所有报价)。确定请求的类型通过评估调用的网关方法(虽然可以实现),但这将违反关注点分离范式(该方法是 Java 数据制品)。但是,在消息标头中表述您的意图(元信息)在消息传递体系结构中是自然的。以下示例展示了如何为两个方法中的每个方法添加不同的消息标头:

<int:gateway id="loanBrokerGateway"
         service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
  <int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
    <int:header name="RESPONSE_TYPE" value="BEST"/>
  </int:method>
  <int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
    <int:header name="RESPONSE_TYPE" value="ALL"/>
  </int:method>
</int:gateway>

在上一个示例中,根据网关的方法对 'RESPONSE_TYPE' 标头设置了不同的值。

例如,如果您在 <int:method/> 中将 requestChannel@Gateway 注释中指定,注释值获胜。

如果在 XML 中指定了无参数的网关,并且接口方法同时带有 @Payload@Gateway 注释(在 <int:method/> 元素中带有 payloadExpressionpayload-expression),则 @Payload 值将被忽略。

Expressions and “Global” Headers

<header/> 元素支持 expression 作为 value 的替代方案。SpEL 表达式经过求值以确定标头的值。从版本 5.2 开始,评估上下文的 #root 对象是具有 getMethod()getArgs() 访问器的 MethodArgsHolder。例如,如果您希望对简单的方法名进行路由,则可以添加具有以下表达式的标头:method.name

java.reflect.Method 不可序列化。如果您稍后序列化消息,则带有 method 表达式的标头将丢失。因此,您可能希望在这些情况下使用 method.namemethod.toString()toString() 方法提供方法的 String 表示,包括参数和返回类型。

自版本 3.0 以来,可以定义 <default-header/> 元素以向网关生成的所有消息添加标头,无论调用的方法是什么。为方法定义的特定标头优先于默认标头。此处为方法定义的特定标头将覆盖服务接口中的任何 @Header 注释。但是,默认标头不会覆盖服务接口中的任何 @Header 注释。

该网关现在还支持 default-payload-expression,它适用于所有方法(除非另行规定)。

Mapping Method Arguments to a Message

使用前一节中的配置技术可以控制如何将方法参数映射到消息元素(有效负载和标头)。当不使用显式配置时,将使用特定约定执行映射。在某些情况下,这些约定无法确定哪个参数是有效负载,哪些参数应映射到标头。请考虑以下示例:

public String send1(Object thing1, Map thing2);

public String send2(Map thing1, Map thing2);

在第一种情况下,约定是将第一个参数映射到有效负载(只要它不是 Map),而第二个参数的内容将变为标头。

在第二种情况下(或参数 thing1Map 的第一种情况),框架无法确定哪个参数应为有效负载。因此,映射将失败。这通常可以使用 payload-expression@Payload 注释或 @Headers 注释来解决。

或者(每当约定失效时),您可以承担将方法调用映射到消息的全部责任。为此,实现 MethodArgsMessageMapper,并使用 mapper 属性将其提供给 <gateway/>。映射器映射 MethodArgsHolder,它是一个封装 java.reflect.Method 实例和包含参数的 Object[] 的简单类。提供定制映射器时,网关上不允许出现 default-payload-expression 属性和 <default-header/> 元素。同样,任何 <method/> 上都不允许出现 payload-expression 属性和 <header/> 元素。

Mapping Method Arguments

以下示例展示了如何将方法参数映射至消息,并展示了一些无效配置示例:

public interface MyGateway {

    void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);

    void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);

    void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);

    void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added

    void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);

    @Payload("args[0] + args[1] + '!'")
    void payloadAnnotationAtMethodLevel(String a, String b);

    @Payload("@someBean.exclaim(args[0])")
    void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);

    void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);

    void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); //  1

    // invalid
    void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);

    // invalid
    void twoPayloads(@Payload String s1, @Payload String s2);

    // invalid
    void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);

    // invalid
    void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);

}
1 请注意,在此示例中,SpEL 变量 #this 引用自变量,在本例中,即 s 的值。

XML 等价项看起来有点不同,因为没有 #this 方法参数的上下文。但是,表达式可以通过使用 MethodArgsHolder 根对象(有关更多信息,请参阅 xref:gateway.adoc#gateway-expressions[Expressions and “Global” 标题)的 args 属性来引用方法参数,如下例所示:

<int:gateway id="myGateway" service-interface="org.something.MyGateway">
  <int:method name="send1" payload-expression="args[0] + 'thing2'"/>
  <int:method name="send2" payload-expression="@someBean.sum(args[0])"/>
  <int:method name="send3" payload-expression="method"/>
  <int:method name="send4">
    <int:header name="thing1" expression="args[2].toUpperCase()"/>
  </int:method>
</int:gateway>

@MessagingGateway Annotation

从版本 4.0 开始,网关服务接口可以使用 @MessagingGateway 注释来标记,而无需定义 <gateway/> xml 元素进行配置。以下两组示例对比了为同一个网关配置的两种方法:

<int:gateway id="myGateway" service-interface="org.something.TestGateway"
      default-request-channel="inputC">
  <int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
  <int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
  <int:method name="echoUpperCase" request-channel="inputB">
    <int:header name="thing1" value="thing2"/>
  </int:method>
  <int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
		  defaultHeaders = @GatewayHeader(name = "calledMethod",
		                           expression="#gatewayMethod.name"))
public interface TestGateway {

   @Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
   String echo(String payload);

   @Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
   String echoUpperCase(String payload);

   String echoViaDefault(String payload);

}

与 XML 版本类似,当 Spring Integration 在组件扫描期间发现这些注解时,它会使用其消息传递基础设施创建 proxy 实现。要执行此扫描并将 BeanDefinition 注册到应用程序上下文中,请将 @IntegrationComponentScan 注解添加到 @Configuration 类。标准 @ComponentScan 基础设施不会处理接口。因此,我们引入了自定义 @IntegrationComponentScan 逻辑,以在接口上查找 @MessagingGateway 注解,并为它们注册 GatewayProxyFactoryBean 实例。另请参阅 Annotation Support

你可以使用 @Profile 注释来标记 @MessagingGateway 注释的服务接口,以在非活动状态中避免 bean 创建。

从版本 6.0 开始,带有 @MessagingGateway 的接口还可以使用 @Primary 注释来标记各自的配置逻辑,就像任何 Spring @Component 定义一样。

从版本 6.0 开始,@MessagingGateway 接口可以用于标准的 Spring @Import 配置。这可以用作 @IntegrationComponentScan 或手动 AnnotationGatewayProxyFactoryBean bean 定义的替代方案。

从版本 6.0 开始,@MessagingGateway 会使用 @MessageEndpoint 作为元注释,并且 name() 属性本质上是 @Compnent.value() 的别名。这样,网关代理的 bean 名称生成策略就与扫描和导入组件的标准 Spring 注释配置保持一致。可以通过 AnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR 或作为 @IntegrationComponentScan.nameGenerator() 属性在全局范围内覆盖默认的 AnnotationBeanNameGenerator

如果您没有 XML 配置,则至少一个 @Configuration 类需要 @EnableIntegration 注解。请参阅 Configuration and @EnableIntegration 以获取更多信息。

Invoking No-Argument Methods

在调用网关接口中没有任何参数的方法时,默认行为是接收来自 PollableChannelMessage

然而,有时你想触发无参数方法,以便你可以与不需要用户提供的参数的下游其他组件交互,例如触发无参数 SQL 调用或存储过程。

为了实现收发语义,你必须提供一个有效负载。要生成有效负载,接口中的方法参数不是必需的。你可以在方法元素的 XML 中使用 @Payload 注释或 payload-expression 属性。以下列表包括一些有效负载示例:

  • a literal string

  • #gatewayMethod.name

  • new java.util.Date()

  • @someBean.someMethod()'s return value

以下示例展示如何使用 @Payload 注释:

public interface Cafe {

    @Payload("new java.util.Date()")
    List<Order> retrieveOpenOrders();

}

你还可以使用 @Gateway 注释。

public interface Cafe {

    @Gateway(payloadExpression = "new java.util.Date()")
    List<Order> retrieveOpenOrders();

}

如果同时存在这两个注释(并且提供了 payloadExpression),则 @Gateway 胜出。

如果一个方法没有参数也没有返回值,但是包含一个有效负载表达式,则将其视为仅发送操作。

Invoking default Methods

网关代理的接口也可以有 default 方法,从版本 5.3 开始,框架会向代理注入 DefaultMethodInvokingMethodInterceptor,以使用 java.lang.invoke.MethodHandle 方法代替代理来调用 default 方法。来自 JDK 的接口,如 java.util.function.Function,仍然可以用于网关代理,但由于针对 JDK 类使用 MethodHandles.Lookup 实例化的内部 Java 安全原因,它们不能被调用 default 方法。这些方法还可以使用对方法的显式 @Gateway 注释代理(丢失它们的实现逻辑并同时恢复先前的网关代理行为),或者在 @MessagingGateway 注释或 <gateway> XML 组件上使用 proxyDefaultMethods

Error Handling

网关调用会产生错误。默认情况下,在网关方法调用时,任何下游发生的错误都将“原样”重新抛出。例如,考虑以下简单流程:

gateway -> service-activator

如果服务激活器调用的服务抛出 MyException(例如),框架会将其封装在 MessagingException 中,并将传递给服务激活器的消息附加到 failedMessage 属性中。因此,框架执行的任何日志记录都具有完整的故障上下文。默认情况下,当网关捕获异常时,会取消 MyException 的封装并将其抛给调用方。你可以在网关方法声明的 throws 子句中配置一个,以匹配原因链中的特定异常类型。例如,如果你想捕获一个带有所有下游错误原因消息传递信息的 MessagingException,则网关方法应该类似于以下内容:

public interface MyGateway {

    void performProcess() throws MessagingException;

}

由于我们鼓励 POJO 编程,你可能不想让调用方接触消息传递基础架构。

如果你的网关方法没有 throws 子句,网关会遍历原因树,查找不是 MessagingExceptionRuntimeException。如果没有找到,框架会抛出 MessagingException。如果前面讨论中的 MyExceptionSomeOtherException 的原因,并且你的方法 throws SomeOtherException,网关会进一步取消封装并将其抛给调用方。

未用 service-interface 声明网关时,将使用内部框架接口 RequestReplyExchanger

请考虑以下示例:

public interface RequestReplyExchanger {

	Message<?> exchange(Message<?> request) throws MessagingException;

}

5.0 版之前,该 exchange 方法没有 throws 子句,结果是异常未包装。如果你使用此接口并想要还原先前的未包装行为,请改用自定义的 service-interface 或自己访问 MessagingExceptioncause

但是,你可能希望记录错误而不是传播错误,或者可能希望将异常视为有效答复(通过将其映射到遵循某种“错误消息”契约且调用者理解的消息)。为了实现此目的,网关提供对专门用于错误的消息通道的支持,方法是包括对 error-channel 属性的支持。在以下示例中,一个“转换器”从 Exception 创建一个答复 Message

<int:gateway id="sampleGateway"
    default-request-channel="gatewayChannel"
    service-interface="foo.bar.SimpleGateway"
    error-channel="exceptionTransformationChannel"/>

<int:transformer input-channel="exceptionTransformationChannel"
        ref="exceptionTransformer" method="createErrorResponse"/>

exceptionTransformer 可以是一个简单的 POJO,它知道如何创建预期的错误响应对象。这将成为发送回调用者的有效内容。如果必要,你可以在这样的“错误流”中执行更多精心的事情。它可能涉及路由器(包括 Spring Integration 的 ErrorMessageExceptionTypeRouter)、过滤器等。但是,大多数情况下,一个简单的“转换器”就足够了。

或者,你可能只想记录异常(或异步地将其发送到某个地方)。如果你提供单向流,不会向调用者发送任何内容。如果你想要完全禁止异常,你可以提供对全局 nullChannel 的引用(基本上是 /dev/null 方法)。最后,正如上文所述,如果没有定义 error-channel,则异常会像往常一样传播。

当您使用 @MessagingGateway 注释时(请参见 @MessagingGateway` Annotation), you can use an `errorChannel 属性。

从版本 5.0 开始,当你使用带有 void 返回类型(单向流)的网关方法时,error-channel 引用(如果提供)会填充到每个已发送消息的标准 errorChannel 头中。此功能允许基于标准 ExecutorChannel 配置(或 QueueChannel)的下游异步流覆盖默认全局 errorChannel 异常发送行为。以前,你必须使用 @GatewayHeader 注解或 <header> 元素手动指定 errorChannel 头。对于带有异步流的 void 方法,会忽略 error-channel 属性。相反,错误消息会发送到默认 errorChannel

通过简单的 POJI 网关公开消息传递系统能带来好处,但 “hiding” 底层消息传递系统在现实中需要付出代价,所以需要考虑某些因素。我们希望我们的 Java 方法能够尽快返回,而不是在调用方等待其返回(无论是 void、返回值还是抛出异常)时无限期地挂起。当将常规方法用作消息传递系统之前的代理时,我们必须考虑底层消息传递的潜在异步性质。这意味着由网关发起的某个消息可能被过滤器丢弃而永远无法到达负责生成回复的组件。某些服务激活器方法可能会导致异常,因此无法提供回复(因为我们不会生成空消息)。换句话说,多种情况可能导致永远不会有回复消息返回。这是消息传递系统中的自然现象。但是,考虑它对网关方法的影响。网关方法输入参数被合并到一条消息中并下发。回复消息将转换为网关方法的返回值。因此,你可能需要确保对于每个网关调用,始终都有回复消息。否则,如果将 reply-timeout 设置为负值,网关方法可能永远不会返回并无限期地挂起。处理这种情况的一种方法是使用异步网关(稍后在本节中进行了解释)。处理它的另一种方法是依赖于 reply-timeout 作为 30 秒的默认设置。这样,网关将不会挂起超过 reply-timeout 中指定的时间,并且如果该超时时间过去,则返回 'null'。最后,你可能需要考虑设置下游标志,例如服务激活器上的 'requires-reply' 或过滤器上的 'throw-exceptions-on-rejection'。本章的末尾部分将对这些选项进行更详细的讨论。

如果下游流返回一个 ErrorMessage,则其 payload (一个 Throwable)被视为常规下游错误。如果配置了 error-channel,则将其发送到错误流。否则,将负载抛出给网关的调用方。类似地,如果 error-channel 上的错误流返回 ErrorMessage,则其负载将抛出给调用方。对于具有 Throwable 负载的任何消息,也适用相同的原则。这在异步情况下很有用,因为您需要将 Exception 直接传播给调用方。为此,您可以返回 Exception(作为某些服务的 reply)或抛出它。通常,即使在异步流中,框架也会处理将下游流抛出的异常传播回网关。 TCP Client-Server Multiplex 示例演示了将异常返回给调用方的两种技术。它通过使用具有 group-timeoutaggregator(请参阅 Aggregator and Group Timeout)模拟对等待线程的套接字 IO 错误,并在放弃流上进行 MessagingTimeoutException 应答。

Gateway Timeouts

网关有两个超时属性:requestTimeoutreplyTimeout。请求超时仅在通道可以阻塞(例如,满的边界 QueueChannel)时才适用。replyTimeout 值是指网关等待答复或返回 null 的时间长度。其默认为无穷大。

超时可以针对网关上的所有方法(defaultRequestTimeoutdefaultReplyTimeout)或针对 MessagingGateway 接口注解进行设置。各个方法可以在 <method/> 子元素中或在 @Gateway 注解上覆盖这些默认值。

从版本 5.0 开始,超时可以定义为表达式,如下例所示:

@Gateway(payloadExpression = "args[0]", requestChannel = "someChannel",
        requestTimeoutExpression = "args[1]", replyTimeoutExpression = "args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);

评估上下文具有 BeanResolver(使用 @someBean 引用其他 Bean),并且 args 数组属性来自 #root 对象可用。有关此根对象的更多信息,请参阅 xref:gateway.adoc#gateway-expressions[Expressions and “Global” 标题。使用 XML 配置时,超时属性可以是长值或 SpEL 表达式,如下例所示:

<method name="someMethod" request-channel="someRequestChannel"
                      payload-expression="args[0]"
                      request-timeout="1000"
                      reply-timeout="args[1]">
</method>

Asynchronous Gateway

作为一种模式,消息传递网关提供了一种很好的方法来隐藏特定于消息传递的代码,同时仍公开消息传递系统的所有功能。正如 described earlierGatewayProxyFactoryBean 提供了一种便捷的方法,可以通过服务接口公开一个代理,使您能够基于 POJO 访问消息传递系统(基于您自己域、基元/字符串或其他对象中的对象)。但是,当通过返回值的简单 POJO 方法公开网关时,这意味着对于每个请求消息(在调用该方法时生成),都必须有一个响应消息(在该方法返回时生成)。由于消息传递系统本质上是异步的,因此您可能无法始终保证合约中的 “for each request, there will always be a reply”。Spring Integration 2.0 引入了对异步网关的支持,这提供了一种便捷的方法来启动流程,在该流程中您可能不知道是否预期收到回复或收到回复需要多长时间。

为了处理这些类型的场景,Spring Integration 使用 java.util.concurrent.Future 实例来支持异步网关。

在 XML 配置中,没有任何更改,你仍然像定义常规网关一样定义异步网关,如下例所示:

<int:gateway id="mathService"
     service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
     default-request-channel="requestChannel"/>

但是,网关接口(一个服务接口)有点不同,如下所示:

public interface MathServiceGateway {

  Future<Integer> multiplyByTwo(int i);

}

如前例所示,网关方法的返回类型是 Future。当 GatewayProxyFactoryBean 看到网关方法的返回类型是 Future 时,它会立即通过使用 AsyncTaskExecutor 切换到异步模式。这就是不同之处的全部。对这样的方法的调用总是立即返回一个 Future 实例。然后,你可以按照自己的步调与 Future 进行交互以获取结果、取消等等。而且,与任何其他使用 Future 实例的情况一样,调用 get() 可能会显示超时、执行异常等。以下示例展示了如何使用从异步网关返回的 Future

MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult =  result.get(1000, TimeUnit.SECONDS);

有关更详细的示例,请参阅 Spring 集成示例中的 async-gateway 示例。

AsyncTaskExecutor

默认情况下,当为其返回类型是 Future 的任何网关方法提交内部 AsyncInvocationTask 实例时,GatewayProxyFactoryBean 使用 org.springframework.core.task.SimpleAsyncTaskExecutor。但是,<gateway/> 元素配置中的 async-executor 属性允许你提供对 Spring 应用程序上下文中可用的 java.util.concurrent.Executor 任何实现的引用。

(默认)SimpleAsyncTaskExecutor 支持 FutureCompletableFuture 返回类型。请参阅 xref:gateway.adoc#gw-completable-future[CompletableFuture。尽管有一个默认执行器,但经常提供一个外部执行器很有用,以便您可以在日志中标识它的线程(当使用 XML 时,线程名称基于执行器的 Bean 名称),如下例所示:

@Bean
public AsyncTaskExecutor exec() {
    SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
    simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
    return simpleAsyncTaskExecutor;
}

@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {

    @Gateway(requestChannel = "gatewayChannel")
    Future<?> doAsync(String foo);

}

如果您希望返回不同的`Future`实现,则可以提供一个自定义执行器,也可以完全禁用执行器,然后在从下游流程返回的回复消息有效负载中返回`Future`。要禁用执行器,请在`GatewayProxyFactoryBean`中将其设置为`null`(通过使用`setAsyncTaskExecutor(null))。在使用 XML 配置网关时,使用`async-executor=""。在使用`@MessagingGateway`注释进行配置时,请使用类似于以下内容的代码:

@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {

    @Gateway(requestChannel = "gatewayChannel")
    Future<?> doAsync(String foo);

}

如果返回类型是某个具体的 Future 实现或配置的执行程序不支持的某个其他子接口,则流程在调用方的线程中运行,并且流程必须在回复消息有效负载中返回所需类型。

CompletableFuture

从版本 4.2 开始,网关方法现在可以返回`CompletableFuture<?>`。返回此类型时有两种操作模式:

  • 如果提供了异步执行程序并且返回类型恰好为 CompletableFuture(不是子类),则框架会在执行程序上运行此任务并立即向调用方返回一个 CompletableFutureCompletableFuture.supplyAsync(Supplier&lt;U&gt; supplier, Executor executor) 用于创建 future。

  • 如果将异步执行程序明确设置为 null 且返回类型为 CompletableFuture 或返回类型为 CompletableFuture 的子类,则会在调用方的线程上调用该流程。在此场景中,下游流程必须返回相应类型的 CompletableFuture

org.springframework.util.concurrent.ListenableFuture 从 Spring 框架 6.0 开始已被弃用。现在建议迁移到 CompletableFuture,它提供类似的处理功能。

Usage Scenarios

在以下场景中,调用者线程使用`CompletableFuture<Invoice>`立即返回,当下游流程回复网关(提供`Invoice`对象)时,该线程完成。

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />

在以下场景中,当下游流程将其提供为对网关回复的有效负载时,调用者线程使用`CompletableFuture<Invoice>`返回。发票准备就绪时,某个其他流程必须完成未来。

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
    async-executor="" />

在以下场景中,当下游流程将其提供为对网关回复的有效负载时,调用者线程使用`CompletableFuture<Invoice>`返回。发票准备就绪时,某个其他流程必须完成未来。如果启用了`DEBUG`日志记录,则会发出一个日志条目,表明该异步执行器不能用于此方案。

MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />

`CompletableFuture`实例可用于对回复执行额外的操作,如下例所示:

CompletableFuture<String> process(String data);

...

CompletableFuture result = process("foo")
    .thenApply(t -> t.toUpperCase());

...

String out = result.get(10, TimeUnit.SECONDS);

Reactor Mono

从版本 5.0 开始,GatewayProxyFactoryBean 允许使用 Mono<T> 返回类型,通过 Gateway 接口和 Project Reactor 方法使用。内部 AsyncInvocationTask 封装在 Mono.fromCallable() 中。

Mono`可用于稍后检索结果(类似于`Future<?>),或者您可以在结果返回到网关时使用分派器使用`Consumer`对它进行使用。

Mono 不会立即被框架刷新。因此,底层消息流程不会在网关方法返回前启动(如带有 Future<?> Executor 任务一样)。当订阅 Mono 时,流程才开始。另一个选择是 Mono(作为 “Composable”)可能属于 Reactor 流,此时 subscribe() 与整个 Flux 相关。以下示例演示如何使用 Project Reactor 创建一个网关:

@MessagingGateway
public interface TestGateway {

    @Gateway(requestChannel = "multiplyChannel")
    Mono<Integer> multiply(Integer value);

}

@ServiceActivator(inputChannel = "multiplyChannel")
public Integer multiply(Integer value) {
    return value * 2;
}

其中此类网关可用于处理`Flux`数据的某些服务中:

@Autowired
TestGateway testGateway;

public void hadnleFlux() {
    Flux.just("1", "2", "3", "4", "5")
            .map(Integer::parseInt)
            .flatMap(this.testGateway::multiply)
            .collectList()
            .subscribe(System.out::println);
}

使用 Project Reactor 的另一个示例是一个简单的回调场景,如下例所示:

Mono<Invoice> mono = service.process(myOrder);

mono.subscribe(invoice -> handleInvoice(invoice));

调用线程继续,流程完成后将调用`handleInvoice()`。

有关详细信息,请 Kotlin Coroutines 查看。

Downstream Flows Returning an Asynchronous Type

如上文 AsyncTaskExecutor 部分中所述,如果您希望某个下游组件返回带有异步有效负载(FutureMono 和其他)的消息,您必须将异步执行器显式设置为 null(或在使用 XML 配置时设置为 "")。然后在调用方线程上调用该流,稍后可以检索结果。

Asynchronous void Return Type

消息网关方法可以这样声明:

@MessagingGateway
public interface MyGateway {

    @Gateway(requestChannel = "sendAsyncChannel")
    @Async
    void sendAsync(String payload);

}

但下游异常不会传播回调用者。为了确保下游流程调用的异步行为和异常传播到调用者,从版本 6.0 开始,框架提供了对`Future<Void>`和`Mono<Void>`返回类型的支持。用例类似于之前为普通`void`返回类型描述的发送和忘记行为,但不同之处在于,流程执行发生在异步情况下,返回的`Future`(或`Mono`)以`null`完成,或根据`send`操作结果异常完成。

如果 Future<Void> 是准确的下游流程回复,则网关的 asyncExecutor 选项必须设置为 null(对于 @MessagingGateway 配置为 AnnotationConstants.NULL),send 部分在生产程序线程上执行。一个回复取决于下游流程配置。这样,目标应用程序可以正确地生成 Future<Void> 回复。Mono 用例已经超出了框架线程控制,因此将 asyncExecutor 设置为 null 没有意义。Mono<Void> 作为请求-回复网关操作的结果必须配置为网关方法的 Mono<?> 返回类型。

Gateway Behavior When No response Arrives

explained earlier 相同,该网关提供了一种通过 POJO 方法调用与消息传递系统进行交互的便捷方式。但是,通常预期始终返回的典型方法调用(即使出现异常)可能并不总是映射为一对一的消息交换(例如,回复消息可能无法到达,相当于方法未返回)。

本节的其余部分涵盖了各种场景以及如何使网关的行为更可预测。可以配置某些属性以使同步网关行为更可预测,但其中一些属性可能并不总是按预期工作。其中之一是`reply-timeout`(在方法级别或`default-reply-timeout`在网关级别)。我们检查`reply-timeout`属性,看看它可以在哪些场景中影响同步网关的行为,又有哪些场景中不能。我们检查单线程场景(下游所有组件都通过直接信道连接)和多线程场景(例如,在下游的某个地方,您可能有一个可轮询信道或执行器信道,它打破了单线程边界)。

Long-running Process Downstream

Sync Gateway, single-threaded

If a component downstream is still running (perhaps because of an infinite loop or a slow service), setting a reply-timeout has no effect, and the gateway method call does not return until the downstream service exits (by returning or throwing an exception).

Sync Gateway, multi-threaded

If a component downstream is still running (perhaps because of an infinite loop or a slow service) in a multi-threaded message flow, setting the reply-timeout has an effect by allowing gateway method invocation to return once the timeout has been reached, because the GatewayProxyFactoryBean polls on the reply channel, waiting for a message until the timeout expires. However, if the timeout has been reached before the actual reply was produced, it could result in a 'null' return from the gateway method. You should understand that the reply message (if produced) is sent to a reply channel after the gateway method invocation might have returned, so you must be aware of that and design your flow with it in mind.

另请参阅`errorOnTimeout`属性,以便在发生超时时引发`MessageTimeoutException`,而不是返回`null`。

Downstream Component Returns 'null'

Sync Gateway — single-threaded

If a component downstream returns 'null' and the reply-timeout has been configured to negative value, the gateway method call hangs indefinitely, unless the requires-reply attribute has been set on the downstream component (for example, a service activator) that might return 'null'. In this case, an exception would be thrown and propagated to the gateway.

Sync Gateway — multi-threaded

The behavior is the same as the previous case.

Downstream Component Return Signature is 'void' While Gateway Method Signature Is Non-void

Sync Gateway — single-threaded

If a component downstream returns 'void' and the reply-timeout has been configured to negative value, the gateway method call hangs indefinitely.

Sync Gateway — multi-threaded

The behavior is the same as the previous case.

Downstream Component Results in Runtime Exception

Sync Gateway — single-threaded

If a component downstream throws a runtime exception, the exception is propagated through an error message back to the gateway and re-thrown.

Sync Gateway — multi-threaded

The behavior is the same as the previous case.

您应该了解,默认情况下,reply-timeout 是无界的。因此,如果您将 reply-timeout 设置为负值,则您的网关方法调用可能会无限期地挂起。因此,为了确保您分析您的流程,并且即使这些场景中有一个最微小的可能性出现,您也应该将 reply-timeout 属性设置为“safe”的值。默认情况下,它是 30 秒。更好的是,您可以将下游组件的 requires-reply 属性设置为 true,以确保及时响应,就像一旦该下游组件在内部返回 null 时抛出异常所产生的一样。但是,您还应该意识到,某些场景(参见 the first one)中,reply-timeout 无济于事。这意味着分析您的消息流并决定何时使用同步网关而不是异步网关也很重要。正如 described earlier 所说,后一种情况是定义返回 Future 实例的网关方法的问题。然后,您保证会收到该返回值,并且可以更精细地控制调用的结果。此外,在处理路由器时,您应该记住,将 resolution-required 属性设置为 true 会导致路由器无法解析特定通道而抛出异常。同样,在处理过滤器时,您可以设置 throw-exception-on-rejection 属性。在这两种情况下,所生成流的行为就像它包含具有“requires-reply”属性的服务激活器。换句话说,它有助于确保网关方法调用的及时响应。

您应该理解,当线程返回到网关时定时器将启动——即当流完成或一条消息移交给另一线程时启动。在该时刻,调用线程开始等待答复。如果流完全是同步的,则可立即收到答复。对于异步流,该线程最长等待此时间。

从版本 6.2 开始,errorOnTimeoutMethodInvocationGateway 的内部 MessagingGatewaySupport 扩展的属性公开在 @MessagingGatewayGatewayEndpointSpec 中。此选项具有与 Endpoint Summary 章节末尾解释的任何入站网关完全相同的含义。换句话说,将此选项设置为 true,将导致在接收超时耗尽时从发送和接收网关操作中抛出 MessageTimeoutException,而不是返回 null

有关通过 IntegrationFlow 定义网关的选项,请参阅 Java DSL 章节中的 IntegrationFlow as Gateway