Messaging Gateways

网关隐藏了 Spring Integration 提供的消息传递 API。它让你的应用程序业务逻辑不知道 Spring Integration API。通过使用通用网关,你的代码仅与一个简单接口进行交互。

A gateway hides the messaging API provided by Spring Integration. It lets your application’s business logic be unaware of the Spring Integration API. By using a generic Gateway, your code interacts with only a simple interface.

Enter the GatewayProxyFactoryBean

如前所述,最好不依赖 Spring Integration API,包括网关类。因此,Spring Integration 提供了 GatewayProxyFactoryBean,它为任何接口生成一个代理,并在内部调用下面显示的网关方法。然后,通过使用依赖关系注入,你可以向你的业务方法公开接口。

As mentioned earlier, it would be great to have no dependency on the Spring Integration API — including the gateway class. For that reason, Spring Integration provides the GatewayProxyFactoryBean, which generates a proxy for any interface and internally invokes the gateway methods shown below. By using dependency injection, you can then expose the interface to your business methods.

以下示例显示了可用来与 Spring Integration 交互的接口:

The following example shows an interface that can be used to interact with Spring Integration:

public interface Cafe {

    void placeOrder(Order order);

}

Gateway XML Namespace Support

也提供了命名空间支持。它允许你将一个接口配置为一项服务,如下例所示:

Namespace support is also provided. It lets you configure an interface as a service, as the following example shows:

<int:gateway id="cafeService"
         service-interface="org.cafeteria.Cafe"
         default-request-channel="requestChannel"
         default-reply-timeout="10000"
         default-reply-channel="replyChannel"/>

在此配置定义下,cafeService 现在可注入到其他 Bean,并且在 Cafe 接口的代理实例上调用方法的代码无需知道 Spring Integration API。有关一个在 Cafe 示例中使用 gateway 元素的示例,请参阅 xref:samples.adoc#samples-impl[“Samples” 附件。

With this configuration defined, the cafeService can now be injected into other beans, and the code that invokes the methods on that proxied instance of the Cafe interface has no awareness of the Spring Integration API. See the “Samples” Appendix for an example that uses the gateway element (in the Cafe demo).

上述配置中的默认值将应用到网关接口上的所有方法。如果没有指定回复超时,则调用线程将等待回复 30 秒。请参见 Gateway Behavior When No response Arrives

The defaults in the preceding configuration are applied to all methods on the gateway interface. If a reply timeout is not specified, the calling thread waits for a reply for 30 seconds. See Gateway Behavior When No response Arrives.

可以针对各个方法覆盖默认值。请参见 Gateway Configuration with Annotations and XML

The defaults can be overridden for individual methods. See Gateway Configuration with Annotations and XML.

Setting the Default Reply Channel

通常,您无需指定 default-reply-channel,因为网关会自动创建一个临时的匿名回复通道,它将在其中监听回复。但是,某些情况下可能会提示您定义 default-reply-channel(或带有适配器网关(例如 HTTP、JMS 等)的 reply-channel)。

Typically, you need not specify the default-reply-channel, since a Gateway auto-creates a temporary, anonymous reply channel, where it listens for the reply. However, some cases may prompt you to define a default-reply-channel (or reply-channel with adapter gateways, such as HTTP, JMS, and others).

为了了解一些背景,我们简要讨论下网关的内部工作原理。网关创建一个临时的点对点回复通道。它是匿名的,且使用名为 replyChannel 的信息标头添加到其中。在提供一个显式 default-reply-channel(带有远程适配器网关的 reply-channel)时,您可以指向发布-订阅通道,之所以如此命名,是因为您可以向其中添加多个订阅者。在内部,Spring Integration 创建一个临时的 replyChannel 和明确定义的 default-reply-channel 之间的桥梁。

For some background, we briefly discuss some inner workings of the gateway. A gateway creates a temporary point-to-point reply channel. It is anonymous and is added to the message headers with the name, replyChannel. When providing an explicit default-reply-channel (reply-channel with remote adapter gateways), you can point to a publish-subscribe channel, which is so named because you can add more than one subscriber to it. Internally, Spring Integration creates a bridge between the temporary replyChannel and the explicitly defined default-reply-channel.

假设您希望回复不仅会发送到网关,还会发送到其他一些消费者。在这种情况下,您有两个需求:

Suppose you want your reply to go not only to the gateway but also to some other consumer. In this case, you want two things:

  • A named channel to which you can subscribe

  • That channel to be a publish-subscribe-channel

网关使用的默认策略不能满足这些需求,因为添加到标头中的回复通道是匿名的,并且是点对点的。这意味着没有其他订阅者可以处理它,即便可以处理,该通道具有点对点行为,因此只有一位订阅者可以获取消息。通过定义一个 default-reply-channel,您可以指向您选择的通道。在这种情况下,这是一个 publish-subscribe-channel。网关从它到存储在标头中的临时匿名回复通道创建一个桥梁。

The default strategy used by the gateway does not satisfy those needs, because the reply channel added to the header is anonymous and point-to-point. This means that no other subscriber can get a handle to it and, even if it could, the channel has point-to-point behavior such that only one subscriber would get the message. By defining a default-reply-channel you can point to a channel of your choosing. In this case, that is a publish-subscribe-channel. The gateway creates a bridge from it to the temporary, anonymous reply channel that is stored in the header.

您可能还想明确提供一个回复通道,以通过拦截器(例如,wiretap)进行监视或审核。要配置通道拦截器,您需要一个命名通道。

You might also want to explicitly provide a reply channel for monitoring or auditing through an interceptor (for example, wiretap). To configure a channel interceptor, you need a named channel.

从版本 5.4 开始,当网关方法返回类型为 void 时,如果未明确提供此类标头,框架会将 replyChannel 标头填充为 nullChannel bean 引用。这允许丢弃下游流的任何可能的回复,满足单向网关协议。

Starting with version 5.4, when gateway method return type is void, the framework populates a replyChannel header as a nullChannel bean reference if such a header is not provided explicitly. This allows any possible reply from the downstream flow to be discarded, meeting the one-way gateway contract.

Gateway Configuration with Annotations and XML

请考虑以下示例,它通过添加 @Gateway 注释扩展了之前的 Cafe 接口示例:

Consider the following example, which expands on the previous Cafe interface example by adding a @Gateway annotation:

public interface Cafe {

    @Gateway(requestChannel="orders")
    void placeOrder(Order order);

}

@Header 注释允许您添加解释为消息标头的值,如下例所示:

The @Header annotation lets you add values that are interpreted as message headers, as the following example shows:

public interface FileWriter {

    @Gateway(requestChannel="filesOut")
    void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);

}

如果您更喜欢使用 XML 方法配置网关方法,则可以向网关配置添加 method 元素,如下例所示:

If you prefer the XML approach to configuring gateway methods, you can add method elements to the gateway configuration, as the following example shows:

<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
      default-request-channel="inputC">
  <int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
  <int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
  <int:method name="echoUpperCase" request-channel="inputB"/>
  <int:method name="echoViaDefault"/>
</int:gateway>

您还可以使用 XML 为每个方法调用提供单独的标头。如果要设置的标头本质上是静态的,并且不想通过使用 @Header 注释将它们嵌入到网关的方法签名中,这可能很有用。例如,在贷款经纪人示例中,我们希望影响贷款报价的聚合方式,具体取决于发起哪种类型的请求(单报价或所有报价)。确定请求的类型通过评估调用的网关方法(虽然可以实现),但这将违反关注点分离范式(该方法是 Java 数据制品)。但是,在消息标头中表述您的意图(元信息)在消息传递体系结构中是自然的。以下示例展示了如何为两个方法中的每个方法添加不同的消息标头:

You can also use XML to provide individual headers for each method invocation. This could be useful if the headers you want to set are static in nature, and you do not want to embed them in the gateway’s method signature by using @Header annotations. For example, in the loan broker example, we want to influence how aggregation of the loan quotes is done, based on what type of request was initiated (single quote or all quotes). Determining the type of the request by evaluating which gateway method was invoked, although possible, would violate the separation of concerns paradigm (the method is a Java artifact). However, expressing your intention (meta information) in message headers is natural in a messaging architecture. The following example shows how to add a different message header for each of two methods:

<int:gateway id="loanBrokerGateway"
         service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
  <int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
    <int:header name="RESPONSE_TYPE" value="BEST"/>
  </int:method>
  <int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
    <int:header name="RESPONSE_TYPE" value="ALL"/>
  </int:method>
</int:gateway>

在上一个示例中,根据网关的方法对 'RESPONSE_TYPE' 标头设置了不同的值。

In the preceding example a different value is set for the 'RESPONSE_TYPE' header, based on the gateway’s method.

例如,如果您在 <int:method/> 中将 requestChannel@Gateway 注释中指定,注释值获胜。

If you specify, for example, the requestChannel in <int:method/> as well as in a @Gateway annotation, the annotation value wins.

如果在 XML 中指定了无参数的网关,并且接口方法同时带有 @Payload@Gateway 注释(在 <int:method/> 元素中带有 payloadExpressionpayload-expression),则 @Payload 值将被忽略。

If a no-argument gateway is specified in XML, and the interface method has both a @Payload and @Gateway annotation (with a payloadExpression or a payload-expression in an <int:method/> element), the @Payload value is ignored.

Expressions and “Global” Headers

<header/> 元素支持 expression 作为 value 的替代方案。SpEL 表达式经过求值以确定标头的值。从版本 5.2 开始,评估上下文的 #root 对象是具有 getMethod()getArgs() 访问器的 MethodArgsHolder。例如,如果您希望对简单的方法名进行路由,则可以添加具有以下表达式的标头:method.name

The <header/> element supports expression as an alternative to value. The SpEL expression is evaluated to determine the value of the header. Starting with version 5.2, the #root object of the evaluation context is a MethodArgsHolder with getMethod() and getArgs() accessors. For example, if you wish to route on the simple method name, you might add a header with the following expression: method.name.

java.reflect.Method 不可序列化。如果您稍后序列化消息,则带有 method 表达式的标头将丢失。因此,您可能希望在这些情况下使用 method.namemethod.toString()toString() 方法提供方法的 String 表示,包括参数和返回类型。

The java.reflect.Method is not serializable. A header with an expression of method is lost if you later serialize the message. Consequently, you may wish to use method.name or method.toString() in those cases. The toString() method provides a String representation of the method, including parameter and return types.

自版本 3.0 以来,可以定义 <default-header/> 元素以向网关生成的所有消息添加标头,无论调用的方法是什么。为方法定义的特定标头优先于默认标头。此处为方法定义的特定标头将覆盖服务接口中的任何 @Header 注释。但是,默认标头不会覆盖服务接口中的任何 @Header 注释。

Since version 3.0, <default-header/> elements can be defined to add headers to all the messages produced by the gateway, regardless of the method invoked. Specific headers defined for a method take precedence over default headers. Specific headers defined for a method here override any @Header annotations in the service interface. However, default headers do NOT override any @Header annotations in the service interface.

该网关现在还支持 default-payload-expression,它适用于所有方法(除非另行规定)。

The gateway now also supports a default-payload-expression, which is applied for all methods (unless overridden).

Mapping Method Arguments to a Message

使用前一节中的配置技术可以控制如何将方法参数映射到消息元素(有效负载和标头)。当不使用显式配置时,将使用特定约定执行映射。在某些情况下,这些约定无法确定哪个参数是有效负载,哪些参数应映射到标头。请考虑以下示例:

Using the configuration techniques in the previous section allows control of how method arguments are mapped to message elements (payload and headers). When no explicit configuration is used, certain conventions are used to perform the mapping. In some cases, these conventions cannot determine which argument is the payload and which should be mapped to headers. Consider the following example:

public String send1(Object thing1, Map thing2);

public String send2(Map thing1, Map thing2);

在第一种情况下,约定是将第一个参数映射到有效负载(只要它不是 Map),而第二个参数的内容将变为标头。

In the first case, the convention is to map the first argument to the payload (as long as it is not a Map) and the contents of the second argument become headers.

在第二种情况下(或参数 thing1Map 的第一种情况),框架无法确定哪个参数应为有效负载。因此,映射将失败。这通常可以使用 payload-expression@Payload 注释或 @Headers 注释来解决。

In the second case (or the first when the argument for parameter thing1 is a Map), the framework cannot determine which argument should be the payload. Consequently, mapping fails. This can generally be resolved using a payload-expression, a @Payload annotation, or a @Headers annotation.

或者(每当约定失效时),您可以承担将方法调用映射到消息的全部责任。为此,实现 MethodArgsMessageMapper,并使用 mapper 属性将其提供给 <gateway/>。映射器映射 MethodArgsHolder,它是一个封装 java.reflect.Method 实例和包含参数的 Object[] 的简单类。提供定制映射器时,网关上不允许出现 default-payload-expression 属性和 <default-header/> 元素。同样,任何 <method/> 上都不允许出现 payload-expression 属性和 <header/> 元素。

Alternatively (and whenever the conventions break down), you can take the entire responsibility for mapping the method calls to messages. To do so, implement an MethodArgsMessageMapper and provide it to the <gateway/> by using the mapper attribute. The mapper maps a MethodArgsHolder, which is a simple class that wraps the java.reflect.Method instance and an Object[] containing the arguments. When providing a custom mapper, the default-payload-expression attribute and <default-header/> elements are not allowed on the gateway. Similarly, the payload-expression attribute and <header/> elements are not allowed on any <method/> elements.

Mapping Method Arguments

以下示例展示了如何将方法参数映射至消息,并展示了一些无效配置示例:

The following examples show how method arguments can be mapped to the message and shows some examples of invalid configuration:

public interface MyGateway {

    void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);

    void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);

    void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);

    void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added

    void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);

    @Payload("args[0] + args[1] + '!'")
    void payloadAnnotationAtMethodLevel(String a, String b);

    @Payload("@someBean.exclaim(args[0])")
    void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);

    void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);

    void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); //  1

    // invalid
    void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);

    // invalid
    void twoPayloads(@Payload String s1, @Payload String s2);

    // invalid
    void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);

    // invalid
    void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);

}
1 Note that, in this example, the SpEL variable, #this, refers to the argument — in this case, the value of s.

XML 等价项看起来有点不同,因为没有 #this 方法参数的上下文。但是,表达式可以通过使用 MethodArgsHolder 根对象(有关更多信息,请参阅 xref:gateway.adoc#gateway-expressions[Expressions and “Global” 标题)的 args 属性来引用方法参数,如下例所示:

The XML equivalent looks a little different, since there is no #this context for the method argument. However, expressions can refer to method arguments by using the args property for the MethodArgsHolder root object (see Expressions and “Global” Headers for more information), as the following example shows:

<int:gateway id="myGateway" service-interface="org.something.MyGateway">
  <int:method name="send1" payload-expression="args[0] + 'thing2'"/>
  <int:method name="send2" payload-expression="@someBean.sum(args[0])"/>
  <int:method name="send3" payload-expression="method"/>
  <int:method name="send4">
    <int:header name="thing1" expression="args[2].toUpperCase()"/>
  </int:method>
</int:gateway>

@MessagingGateway Annotation

从版本 4.0 开始,网关服务接口可以使用 @MessagingGateway 注释来标记,而无需定义 <gateway/> xml 元素进行配置。以下两组示例对比了为同一个网关配置的两种方法:

Starting with version 4.0, gateway service interfaces can be marked with a @MessagingGateway annotation instead of requiring the definition of a <gateway /> xml element for configuration. The following pair of examples compares the two approaches for configuring the same gateway:

<int:gateway id="myGateway" service-interface="org.something.TestGateway"
      default-request-channel="inputC">
  <int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
  <int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
  <int:method name="echoUpperCase" request-channel="inputB">
    <int:header name="thing1" value="thing2"/>
  </int:method>
  <int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
		  defaultHeaders = @GatewayHeader(name = "calledMethod",
		                           expression="#gatewayMethod.name"))
public interface TestGateway {

   @Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
   String echo(String payload);

   @Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
   String echoUpperCase(String payload);

   String echoViaDefault(String payload);

}

与 XML 版本类似,当 Spring Integration 在组件扫描期间发现这些注解时,它会使用其消息传递基础设施创建 proxy 实现。要执行此扫描并将 BeanDefinition 注册到应用程序上下文中,请将 @IntegrationComponentScan 注解添加到 @Configuration 类。标准 @ComponentScan 基础设施不会处理接口。因此,我们引入了自定义 @IntegrationComponentScan 逻辑,以在接口上查找 @MessagingGateway 注解,并为它们注册 GatewayProxyFactoryBean 实例。另请参阅 Annotation Support

Similarly to the XML version, when Spring Integration discovers these annotations during a component scan, it creates the proxy implementation with its messaging infrastructure. To perform this scan and register the BeanDefinition in the application context, add the @IntegrationComponentScan annotation to a @Configuration class. The standard @ComponentScan infrastructure does not deal with interfaces. Consequently, we introduced the custom @IntegrationComponentScan logic to find the @MessagingGateway annotation on the interfaces and register GatewayProxyFactoryBean instances for them. See also Annotation Support.

你可以使用 @Profile 注释来标记 @MessagingGateway 注释的服务接口,以在非活动状态中避免 bean 创建。

Along with the @MessagingGateway annotation you can mark a service interface with the @Profile annotation to avoid the bean creation, if such a profile is not active.

从版本 6.0 开始,带有 @MessagingGateway 的接口还可以使用 @Primary 注释来标记各自的配置逻辑,就像任何 Spring @Component 定义一样。

Starting with version 6.0, an interface with the @MessagingGateway can also be marked with a @Primary annotation for respective configuration logic as its possible with any Spring @Component definition.

从版本 6.0 开始,@MessagingGateway 接口可以用于标准的 Spring @Import 配置。这可以用作 @IntegrationComponentScan 或手动 AnnotationGatewayProxyFactoryBean bean 定义的替代方案。

Starting with version 6.0, @MessagingGateway interfaces can be used in the standard Spring @Import configuration. This may be used as an alternative to the @IntegrationComponentScan or manual AnnotationGatewayProxyFactoryBean bean definitions.

从版本 6.0 开始,@MessagingGateway 会使用 @MessageEndpoint 作为元注释,并且 name() 属性本质上是 @Compnent.value() 的别名。这样,网关代理的 bean 名称生成策略就与扫描和导入组件的标准 Spring 注释配置保持一致。可以通过 AnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR 或作为 @IntegrationComponentScan.nameGenerator() 属性在全局范围内覆盖默认的 AnnotationBeanNameGenerator

The @MessagingGateway is meta-annotated with a @MessageEndpoint since version 6.0 and the name() attribute is, essentially, aliased to the @Compnent.value(). This way the bean names generating strategy for gateway proxies is realigned with the standard Spring annotation configuration for scanned and imported components. The default AnnotationBeanNameGenerator can be overridden globally via an AnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR or as a @IntegrationComponentScan.nameGenerator() attribute.

如果您没有 XML 配置,则至少一个 @Configuration 类需要 @EnableIntegration 注解。请参阅 Configuration and @EnableIntegration 以获取更多信息。

If you have no XML configuration, the @EnableIntegration annotation is required on at least one @Configuration class. See Configuration and @EnableIntegration for more information.

Invoking No-Argument Methods

在调用网关接口中没有任何参数的方法时,默认行为是接收来自 PollableChannelMessage

When invoking methods on a Gateway interface that do not have any arguments, the default behavior is to receive a Message from a PollableChannel.

然而,有时你想触发无参数方法,以便你可以与不需要用户提供的参数的下游其他组件交互,例如触发无参数 SQL 调用或存储过程。

Sometimes, however, you may want to trigger no-argument methods so that you can interact with other components downstream that do not require user-provided parameters, such as triggering no-argument SQL calls or stored procedures.

为了实现收发语义,你必须提供一个有效负载。要生成有效负载,接口中的方法参数不是必需的。你可以在方法元素的 XML 中使用 @Payload 注释或 payload-expression 属性。以下列表包括一些有效负载示例:

To achieve send-and-receive semantics, you must provide a payload. To generate a payload, method parameters on the interface are not necessary. You can either use the @Payload annotation or the payload-expression attribute in XML on the method element. The following list includes a few examples of what the payloads could be:

  • a literal string

  • #gatewayMethod.name

  • new java.util.Date()

  • @someBean.someMethod()'s return value

以下示例展示如何使用 @Payload 注释:

The following example shows how to use the @Payload annotation:

public interface Cafe {

    @Payload("new java.util.Date()")
    List<Order> retrieveOpenOrders();

}

你还可以使用 @Gateway 注释。

You can also use the @Gateway annotation.

public interface Cafe {

    @Gateway(payloadExpression = "new java.util.Date()")
    List<Order> retrieveOpenOrders();

}

如果同时存在这两个注释(并且提供了 payloadExpression),则 @Gateway 胜出。

If both annotations are present (and the payloadExpression is provided), @Gateway wins.

如果一个方法没有参数也没有返回值,但是包含一个有效负载表达式,则将其视为仅发送操作。

If a method has no argument and no return value but does contain a payload expression, it is treated as a send-only operation.

Invoking default Methods

网关代理的接口也可以有 default 方法,从版本 5.3 开始,框架会向代理注入 DefaultMethodInvokingMethodInterceptor,以使用 java.lang.invoke.MethodHandle 方法代替代理来调用 default 方法。来自 JDK 的接口,如 java.util.function.Function,仍然可以用于网关代理,但由于针对 JDK 类使用 MethodHandles.Lookup 实例化的内部 Java 安全原因,它们不能被调用 default 方法。这些方法还可以使用对方法的显式 @Gateway 注释代理(丢失它们的实现逻辑并同时恢复先前的网关代理行为),或者在 @MessagingGateway 注释或 <gateway> XML 组件上使用 proxyDefaultMethods

An interface for gateway proxy may have default methods as well and starting with version 5.3, the framework injects a DefaultMethodInvokingMethodInterceptor into a proxy for calling default methods using a java.lang.invoke.MethodHandle approach instead of proxying. The interfaces from JDK, such as java.util.function.Function, still can be used for gateway proxy, but their default methods cannot be called because of internal Java security reasons for a MethodHandles.Lookup instantiation against JDK classes. These methods also can be proxied (losing their implementation logic and, at the same time, restoring previous gateway proxy behavior) using an explicit @Gateway annotation on the method, or proxyDefaultMethods on the @MessagingGateway annotation or <gateway> XML component.

Error Handling

网关调用会产生错误。默认情况下,在网关方法调用时,任何下游发生的错误都将“原样”重新抛出。例如,考虑以下简单流程:

The gateway invocation can result in errors. By default, any error that occurs downstream is re-thrown “as is” upon the gateway’s method invocation. For example, consider the following simple flow:

gateway -> service-activator

如果服务激活器调用的服务抛出 MyException(例如),框架会将其封装在 MessagingException 中,并将传递给服务激活器的消息附加到 failedMessage 属性中。因此,框架执行的任何日志记录都具有完整的故障上下文。默认情况下,当网关捕获异常时,会取消 MyException 的封装并将其抛给调用方。你可以在网关方法声明的 throws 子句中配置一个,以匹配原因链中的特定异常类型。例如,如果你想捕获一个带有所有下游错误原因消息传递信息的 MessagingException,则网关方法应该类似于以下内容:

If the service invoked by the service activator throws a MyException (for example), the framework wraps it in a MessagingException and attaches the message passed to the service activator in the failedMessage property. Consequently, any logging performed by the framework has full the context of the failure. By default, when the exception is caught by the gateway, the MyException is unwrapped and thrown to the caller. You can configure a throws clause on the gateway method declaration to match the particular exception type in the cause chain. For example, if you want to catch a whole MessagingException with all the messaging information of the reason of downstream error, you should have a gateway method similar to the following:

public interface MyGateway {

    void performProcess() throws MessagingException;

}

由于我们鼓励 POJO 编程,你可能不想让调用方接触消息传递基础架构。

Since we encourage POJO programming, you may not want to expose the caller to messaging infrastructure.

如果你的网关方法没有 throws 子句,网关会遍历原因树,查找不是 MessagingExceptionRuntimeException。如果没有找到,框架会抛出 MessagingException。如果前面讨论中的 MyExceptionSomeOtherException 的原因,并且你的方法 throws SomeOtherException,网关会进一步取消封装并将其抛给调用方。

If your gateway method does not have a throws clause, the gateway traverses the cause tree, looking for a RuntimeException that is not a MessagingException. If none is found, the framework throws the MessagingException. If the MyException in the preceding discussion has a cause of SomeOtherException and your method throws SomeOtherException, the gateway further unwraps that and throws it to the caller.

未用 service-interface 声明网关时,将使用内部框架接口 RequestReplyExchanger

When a gateway is declared with no service-interface, an internal framework interface RequestReplyExchanger is used.

请考虑以下示例:

Consider the following example:

public interface RequestReplyExchanger {

	Message<?> exchange(Message<?> request) throws MessagingException;

}

5.0 版之前,该 exchange 方法没有 throws 子句,结果是异常未包装。如果你使用此接口并想要还原先前的未包装行为,请改用自定义的 service-interface 或自己访问 MessagingExceptioncause

Before version 5.0, this exchange method did not have a throws clause and, as a result, the exception was unwrapped. If you use this interface and want to restore the previous unwrap behavior, use a custom service-interface instead or access the cause of the MessagingException yourself.

但是,你可能希望记录错误而不是传播错误,或者可能希望将异常视为有效答复(通过将其映射到遵循某种“错误消息”契约且调用者理解的消息)。为了实现此目的,网关提供对专门用于错误的消息通道的支持,方法是包括对 error-channel 属性的支持。在以下示例中,一个“转换器”从 Exception 创建一个答复 Message

However, you may want to log the error rather than propagating it, or you may want to treat an exception as a valid reply (by mapping it to a message that conforms to some "error message" contract that the caller understands). To accomplish this, the gateway provides support for a message channel dedicated to the errors by including support for the error-channel attribute. In the following example, a 'transformer' creates a reply Message from the Exception:

<int:gateway id="sampleGateway"
    default-request-channel="gatewayChannel"
    service-interface="foo.bar.SimpleGateway"
    error-channel="exceptionTransformationChannel"/>

<int:transformer input-channel="exceptionTransformationChannel"
        ref="exceptionTransformer" method="createErrorResponse"/>

exceptionTransformer 可以是一个简单的 POJO,它知道如何创建预期的错误响应对象。这将成为发送回调用者的有效内容。如果必要,你可以在这样的“错误流”中执行更多精心的事情。它可能涉及路由器(包括 Spring Integration 的 ErrorMessageExceptionTypeRouter)、过滤器等。但是,大多数情况下,一个简单的“转换器”就足够了。

The exceptionTransformer could be a simple POJO that knows how to create the expected error response objects. That becomes the payload that is sent back to the caller. You could do many more elaborate things in such an “error flow”, if necessary. It might involve routers (including Spring Integration’s ErrorMessageExceptionTypeRouter), filters, and so on. Most of the time, a simple 'transformer' should be sufficient, however.

或者,你可能只想记录异常(或异步地将其发送到某个地方)。如果你提供单向流,不会向调用者发送任何内容。如果你想要完全禁止异常,你可以提供对全局 nullChannel 的引用(基本上是 /dev/null 方法)。最后,正如上文所述,如果没有定义 error-channel,则异常会像往常一样传播。

Alternatively, you might want to only log the exception (or send it somewhere asynchronously). If you provide a one-way flow, nothing would be sent back to the caller. If you want to completely suppress exceptions, you can provide a reference to the global nullChannel (essentially a /dev/null approach). Finally, as mentioned above, if no error-channel is defined, then the exceptions propagate as usual.

当您使用 @MessagingGateway 注释时(请参见 @MessagingGateway` Annotation), you can use an `errorChannel 属性。

When you use the @MessagingGateway annotation (see @MessagingGateway` Annotation), you can use an `errorChannel attribute.

从版本 5.0 开始,当你使用带有 void 返回类型(单向流)的网关方法时,error-channel 引用(如果提供)会填充到每个已发送消息的标准 errorChannel 头中。此功能允许基于标准 ExecutorChannel 配置(或 QueueChannel)的下游异步流覆盖默认全局 errorChannel 异常发送行为。以前,你必须使用 @GatewayHeader 注解或 <header> 元素手动指定 errorChannel 头。对于带有异步流的 void 方法,会忽略 error-channel 属性。相反,错误消息会发送到默认 errorChannel

Starting with version 5.0, when you use a gateway method with a void return type (one-way flow), the error-channel reference (if provided) is populated in the standard errorChannel header of each sent message. This feature allows a downstream asynchronous flow, based on the standard ExecutorChannel configuration (or a QueueChannel), to override a default global errorChannel exceptions sending behavior. Previously you had to manually specify an errorChannel header with the @GatewayHeader annotation or the <header> element. The error-channel property was ignored for void methods with an asynchronous flow. Instead, error messages were sent to the default errorChannel.

通过简单的 POJI 网关公开消息传递系统能带来好处,但 “hiding” 底层消息传递系统在现实中需要付出代价,所以需要考虑某些因素。我们希望我们的 Java 方法能够尽快返回,而不是在调用方等待其返回(无论是 void、返回值还是抛出异常)时无限期地挂起。当将常规方法用作消息传递系统之前的代理时,我们必须考虑底层消息传递的潜在异步性质。这意味着由网关发起的某个消息可能被过滤器丢弃而永远无法到达负责生成回复的组件。某些服务激活器方法可能会导致异常,因此无法提供回复(因为我们不会生成空消息)。换句话说,多种情况可能导致永远不会有回复消息返回。这是消息传递系统中的自然现象。但是,考虑它对网关方法的影响。网关方法输入参数被合并到一条消息中并下发。回复消息将转换为网关方法的返回值。因此,你可能需要确保对于每个网关调用,始终都有回复消息。否则,如果将 reply-timeout 设置为负值,网关方法可能永远不会返回并无限期地挂起。处理这种情况的一种方法是使用异步网关(稍后在本节中进行了解释)。处理它的另一种方法是依赖于 reply-timeout 作为 30 秒的默认设置。这样,网关将不会挂起超过 reply-timeout 中指定的时间,并且如果该超时时间过去,则返回 'null'。最后,你可能需要考虑设置下游标志,例如服务激活器上的 'requires-reply' 或过滤器上的 'throw-exceptions-on-rejection'。本章的末尾部分将对这些选项进行更详细的讨论。

Exposing the messaging system through simple POJI Gateways provides benefits, but “hiding” the reality of the underlying messaging system does come at a price, so there are certain things you should consider. We want our Java method to return as quickly as possible and not hang for an indefinite amount of time while the caller is waiting on it to return (whether void, a return value, or a thrown Exception). When regular methods are used as a proxies in front of the messaging system, we have to take into account the potentially asynchronous nature of the underlying messaging. This means that there might be a chance that a message that was initiated by a gateway could be dropped by a filter and never reach a component that is responsible for producing a reply. Some service activator method might result in an exception, thus providing no reply (as we do not generate null messages). In other words, multiple scenarios can cause a reply message to never come. That is perfectly natural in messaging systems. However, think about the implication on the gateway method. The gateway’s method input arguments were incorporated into a message and sent downstream. The reply message would be converted to a return value of the gateway’s method. So you might want to ensure that, for each gateway call, there is always a reply message. Otherwise, your gateway method might never return and hang indefinitely if reply-timeout is set to negative value. One way to handle this situation is by using an asynchronous gateway (explained later in this section). Another way of handling it is to rely on a default reply-timeout as a 30 seconds. That way, the gateway does not hang any longer than the time specified by the reply-timeout and returns 'null' if that timeout does elapse. Finally, you might want to consider setting downstream flags, such as 'requires-reply', on a service-activator or 'throw-exceptions-on-rejection' on a filter. These options are discussed in more detail in the final section of this chapter.

如果下游流返回一个 ErrorMessage,则其 payload (一个 Throwable)被视为常规下游错误。如果配置了 error-channel,则将其发送到错误流。否则,将负载抛出给网关的调用方。类似地,如果 error-channel 上的错误流返回 ErrorMessage,则其负载将抛出给调用方。对于具有 Throwable 负载的任何消息,也适用相同的原则。这在异步情况下很有用,因为您需要将 Exception 直接传播给调用方。为此,您可以返回 Exception(作为某些服务的 reply)或抛出它。通常,即使在异步流中,框架也会处理将下游流抛出的异常传播回网关。 TCP Client-Server Multiplex 示例演示了将异常返回给调用方的两种技术。它通过使用具有 group-timeoutaggregator(请参阅 Aggregator and Group Timeout)模拟对等待线程的套接字 IO 错误,并在放弃流上进行 MessagingTimeoutException 应答。

If the downstream flow returns an ErrorMessage, its payload (a Throwable) is treated as a regular downstream error. If there is an error-channel configured, it is sent to the error flow. Otherwise, the payload is thrown to the caller of the gateway. Similarly, if the error flow on the error-channel returns an ErrorMessage, its payload is thrown to the caller. The same applies to any message with a Throwable payload. This can be useful in asynchronous situations when you need to propagate an Exception directly to the caller. To do so, you can either return an Exception (as the reply from some service) or throw it. Generally, even with an asynchronous flow, the framework takes care of propagating an exception thrown by the downstream flow back to the gateway. The TCP Client-Server Multiplex sample demonstrates both techniques to return the exception to the caller. It emulates a socket IO error to the waiting thread by using an aggregator with group-timeout (see Aggregator and Group Timeout) and a MessagingTimeoutException reply on the discard flow.

Gateway Timeouts

网关有两个超时属性:requestTimeoutreplyTimeout。请求超时仅在通道可以阻塞(例如,满的边界 QueueChannel)时才适用。replyTimeout 值是指网关等待答复或返回 null 的时间长度。其默认为无穷大。

Gateways have two timeout properties: requestTimeout and replyTimeout. The request timeout applies only if the channel can block (for example, a bounded QueueChannel that is full). The replyTimeout value is how long the gateway waits for a reply or returns null. It defaults to infinity.

超时可以针对网关上的所有方法(defaultRequestTimeoutdefaultReplyTimeout)或针对 MessagingGateway 接口注解进行设置。各个方法可以在 <method/> 子元素中或在 @Gateway 注解上覆盖这些默认值。

The timeouts can be set as defaults for all methods on the gateway (defaultRequestTimeout and defaultReplyTimeout) or on the MessagingGateway interface annotation. Individual methods can override these defaults (in <method/> child elements) or on the @Gateway annotation.

从版本 5.0 开始,超时可以定义为表达式,如下例所示:

Starting with version 5.0, the timeouts can be defined as expressions, as the following example shows:

@Gateway(payloadExpression = "args[0]", requestChannel = "someChannel",
        requestTimeoutExpression = "args[1]", replyTimeoutExpression = "args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);

评估上下文具有 BeanResolver(使用 @someBean 引用其他 Bean),并且 args 数组属性来自 #root 对象可用。有关此根对象的更多信息,请参阅 xref:gateway.adoc#gateway-expressions[Expressions and “Global” 标题。使用 XML 配置时,超时属性可以是长值或 SpEL 表达式,如下例所示:

The evaluation context has a BeanResolver (use @someBean to reference other beans), and the args array property from the #root object is available. See Expressions and “Global” Headers for more information about this root object. When configuring with XML, the timeout attributes can be a long value or a SpEL expression, as the following example shows:

<method name="someMethod" request-channel="someRequestChannel"
                      payload-expression="args[0]"
                      request-timeout="1000"
                      reply-timeout="args[1]">
</method>

Asynchronous Gateway

作为一种模式,消息传递网关提供了一种很好的方法来隐藏特定于消息传递的代码,同时仍公开消息传递系统的所有功能。正如 described earlierGatewayProxyFactoryBean 提供了一种便捷的方法,可以通过服务接口公开一个代理,使您能够基于 POJO 访问消息传递系统(基于您自己域、基元/字符串或其他对象中的对象)。但是,当通过返回值的简单 POJO 方法公开网关时,这意味着对于每个请求消息(在调用该方法时生成),都必须有一个响应消息(在该方法返回时生成)。由于消息传递系统本质上是异步的,因此您可能无法始终保证合约中的 “for each request, there will always be a reply”。Spring Integration 2.0 引入了对异步网关的支持,这提供了一种便捷的方法来启动流程,在该流程中您可能不知道是否预期收到回复或收到回复需要多长时间。

As a pattern, the messaging gateway offers a nice way to hide messaging-specific code while still exposing the full capabilities of the messaging system. As described earlier, the GatewayProxyFactoryBean provides a convenient way to expose a proxy over a service-interface giving you POJO-based access to a messaging system (based on objects in your own domain, primitives/Strings, or other objects). However, when a gateway is exposed through simple POJO methods that return values, it implies that, for each request message (generated when the method is invoked), there must be a reply message (generated when the method has returned). Since messaging systems are naturally asynchronous, you may not always be able to guarantee the contract where “for each request, there will always be a reply”. Spring Integration 2.0 introduced support for an asynchronous gateway, which offers a convenient way to initiate flows when you may not know if a reply is expected or how long it takes for replies to arrive.

为了处理这些类型的场景,Spring Integration 使用 java.util.concurrent.Future 实例来支持异步网关。

To handle these types of scenarios, Spring Integration uses java.util.concurrent.Future instances to support an asynchronous gateway.

在 XML 配置中,没有任何更改,你仍然像定义常规网关一样定义异步网关,如下例所示:

From the XML configuration, nothing changes, and you still define asynchronous gateway the same way as you define a regular gateway, as the following example shows:

<int:gateway id="mathService"
     service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
     default-request-channel="requestChannel"/>

但是,网关接口(一个服务接口)有点不同,如下所示:

However, the gateway interface (a service interface) is a little different, as follows:

public interface MathServiceGateway {

  Future<Integer> multiplyByTwo(int i);

}

如前例所示,网关方法的返回类型是 Future。当 GatewayProxyFactoryBean 看到网关方法的返回类型是 Future 时,它会立即通过使用 AsyncTaskExecutor 切换到异步模式。这就是不同之处的全部。对这样的方法的调用总是立即返回一个 Future 实例。然后,你可以按照自己的步调与 Future 进行交互以获取结果、取消等等。而且,与任何其他使用 Future 实例的情况一样,调用 get() 可能会显示超时、执行异常等。以下示例展示了如何使用从异步网关返回的 Future

As the preceding example shows, the return type for the gateway method is a Future. When GatewayProxyFactoryBean sees that the return type of the gateway method is a Future, it immediately switches to the asynchronous mode by using an AsyncTaskExecutor. That is the extent of the differences. The call to such a method always returns immediately with a Future instance. Then you can interact with the Future at your own pace to get the result, cancel, and so on. Also, as with any other use of Future instances, calling get() may reveal a timeout, an execution exception, and so on. The following example shows how to use a Future that returns from an asynchronous gateway:

MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult =  result.get(1000, TimeUnit.SECONDS);

有关更详细的示例,请参阅 Spring 集成示例中的 async-gateway 示例。

For a more detailed example, see the async-gateway sample in the Spring Integration samples.

AsyncTaskExecutor

默认情况下,当为其返回类型是 Future 的任何网关方法提交内部 AsyncInvocationTask 实例时,GatewayProxyFactoryBean 使用 org.springframework.core.task.SimpleAsyncTaskExecutor。但是,<gateway/> 元素配置中的 async-executor 属性允许你提供对 Spring 应用程序上下文中可用的 java.util.concurrent.Executor 任何实现的引用。

By default, the GatewayProxyFactoryBean uses org.springframework.core.task.SimpleAsyncTaskExecutor when submitting internal AsyncInvocationTask instances for any gateway method whose return type is a Future. However, the async-executor attribute in the <gateway/> element’s configuration lets you provide a reference to any implementation of java.util.concurrent.Executor available within the Spring application context.

(默认)SimpleAsyncTaskExecutor 支持 FutureCompletableFuture 返回类型。请参阅 xref:gateway.adoc#gw-completable-future[CompletableFuture。尽管有一个默认执行器,但经常提供一个外部执行器很有用,以便您可以在日志中标识它的线程(当使用 XML 时,线程名称基于执行器的 Bean 名称),如下例所示:

The (default) SimpleAsyncTaskExecutor supports both Future and CompletableFuture return types. See CompletableFuture. Even though there is a default executor, it is often useful to provide an external one so that you can identify its threads in logs (when using XML, the thread name is based on the executor’s bean name), as the following example shows:

@Bean
public AsyncTaskExecutor exec() {
    SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
    simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
    return simpleAsyncTaskExecutor;
}

@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {

    @Gateway(requestChannel = "gatewayChannel")
    Future<?> doAsync(String foo);

}

如果您希望返回不同的`Future`实现,则可以提供一个自定义执行器,也可以完全禁用执行器,然后在从下游流程返回的回复消息有效负载中返回`Future`。要禁用执行器,请在`GatewayProxyFactoryBean`中将其设置为`null`(通过使用`setAsyncTaskExecutor(null))。在使用 XML 配置网关时,使用`async-executor=""。在使用`@MessagingGateway`注释进行配置时,请使用类似于以下内容的代码:

If you wish to return a different Future implementation, you can provide a custom executor or disable the executor altogether and return the Future in the reply message payload from the downstream flow. To disable the executor, set it to null in the GatewayProxyFactoryBean (by using setAsyncTaskExecutor(null)). When configuring the gateway with XML, use async-executor="". When configuring by using the @MessagingGateway annotation, use code similar to the following:

@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {

    @Gateway(requestChannel = "gatewayChannel")
    Future<?> doAsync(String foo);

}

如果返回类型是某个具体的 Future 实现或配置的执行程序不支持的某个其他子接口,则流程在调用方的线程中运行,并且流程必须在回复消息有效负载中返回所需类型。

If the return type is a specific concrete Future implementation or some other sub-interface that is not supported by the configured executor, the flow runs on the caller’s thread and the flow must return the required type in the reply message payload.

CompletableFuture

从版本 4.2 开始,网关方法现在可以返回`CompletableFuture<?>`。返回此类型时有两种操作模式:

Starting with version 4.2, gateway methods can now return CompletableFuture<?>. There are two modes of operation when returning this type:

  • When an async executor is provided and the return type is exactly CompletableFuture (not a subclass), the framework runs the task on the executor and immediately returns a CompletableFuture to the caller. CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor) is used to create the future.

  • When the async executor is explicitly set to null and the return type is CompletableFuture or the return type is a subclass of CompletableFuture, the flow is invoked on the caller’s thread. In this scenario, the downstream flow is expected to return a CompletableFuture of the appropriate type.

org.springframework.util.concurrent.ListenableFuture 从 Spring 框架 6.0 开始已被弃用。现在建议迁移到 CompletableFuture,它提供类似的处理功能。

The org.springframework.util.concurrent.ListenableFuture has been deprecated starting with Spring Framework 6.0. It is recommended now to migrate to the CompletableFuture which provides similar processing functionality.

Usage Scenarios

在以下场景中,调用者线程使用`CompletableFuture<Invoice>`立即返回,当下游流程回复网关(提供`Invoice`对象)时,该线程完成。

In the following scenario, the caller thread returns immediately with a CompletableFuture<Invoice>, which is completed when the downstream flow replies to the gateway (with an Invoice object).

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />

在以下场景中,当下游流程将其提供为对网关回复的有效负载时,调用者线程使用`CompletableFuture<Invoice>`返回。发票准备就绪时,某个其他流程必须完成未来。

In the following scenario, the caller thread returns with a CompletableFuture<Invoice> when the downstream flow provides it as the payload of the reply to the gateway. Some other process must complete the future when the invoice is ready.

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
    async-executor="" />

在以下场景中,当下游流程将其提供为对网关回复的有效负载时,调用者线程使用`CompletableFuture<Invoice>`返回。发票准备就绪时,某个其他流程必须完成未来。如果启用了`DEBUG`日志记录,则会发出一个日志条目,表明该异步执行器不能用于此方案。

In the following scenario, the caller thread returns with a CompletableFuture<Invoice> when the downstream flow provides it as the payload of the reply to the gateway. Some other process must complete the future when the invoice is ready. If DEBUG logging is enabled, a log entry is emitted, indicating that the async executor cannot be used for this scenario.

MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />

`CompletableFuture`实例可用于对回复执行额外的操作,如下例所示:

CompletableFuture instances can be used to perform additional manipulation on the reply, as the following example shows:

CompletableFuture<String> process(String data);

...

CompletableFuture result = process("foo")
    .thenApply(t -> t.toUpperCase());

...

String out = result.get(10, TimeUnit.SECONDS);

Reactor Mono

从版本 5.0 开始,GatewayProxyFactoryBean 允许使用 Mono<T> 返回类型,通过 Gateway 接口和 Project Reactor 方法使用。内部 AsyncInvocationTask 封装在 Mono.fromCallable() 中。

Starting with version 5.0, the GatewayProxyFactoryBean allows the use of Project Reactor with gateway interface methods, using a Mono<T> return type. The internal AsyncInvocationTask is wrapped in a Mono.fromCallable().

Mono`可用于稍后检索结果(类似于`Future<?>),或者您可以在结果返回到网关时使用分派器使用`Consumer`对它进行使用。

A Mono can be used to retrieve the result later (similar to a Future<?>), or you can consume from it with the dispatcher by invoking your Consumer when the result is returned to the gateway.

Mono 不会立即被框架刷新。因此,底层消息流程不会在网关方法返回前启动(如带有 Future<?> Executor 任务一样)。当订阅 Mono 时,流程才开始。另一个选择是 Mono(作为 “Composable”)可能属于 Reactor 流,此时 subscribe() 与整个 Flux 相关。以下示例演示如何使用 Project Reactor 创建一个网关:

The Mono is not immediately flushed by the framework. Consequently, the underlying message flow is not started before the gateway method returns (as it is with a Future<?> Executor task). The flow starts when the Mono is subscribed to. Alternatively, the Mono (being a “Composable”) might be a part of Reactor stream, when the subscribe() is related to the entire Flux. The following example shows how to create a gateway with Project Reactor:

@MessagingGateway
public interface TestGateway {

    @Gateway(requestChannel = "multiplyChannel")
    Mono<Integer> multiply(Integer value);

}

@ServiceActivator(inputChannel = "multiplyChannel")
public Integer multiply(Integer value) {
    return value * 2;
}

其中此类网关可用于处理`Flux`数据的某些服务中:

where such a gateway can be used in some service which deals with the Flux of data:

@Autowired
TestGateway testGateway;

public void hadnleFlux() {
    Flux.just("1", "2", "3", "4", "5")
            .map(Integer::parseInt)
            .flatMap(this.testGateway::multiply)
            .collectList()
            .subscribe(System.out::println);
}

使用 Project Reactor 的另一个示例是一个简单的回调场景,如下例所示:

Another example that uses Project Reactor is a simple callback scenario, as the following example shows:

Mono<Invoice> mono = service.process(myOrder);

mono.subscribe(invoice -> handleInvoice(invoice));

调用线程继续,流程完成后将调用`handleInvoice()`。

The calling thread continues, with handleInvoice() being called when the flow completes.

有关详细信息,请 Kotlin Coroutines 查看。

Also see Kotlin Coroutines for more information.

Downstream Flows Returning an Asynchronous Type

如上文 AsyncTaskExecutor 部分中所述,如果您希望某个下游组件返回带有异步有效负载(FutureMono 和其他)的消息,您必须将异步执行器显式设置为 null(或在使用 XML 配置时设置为 "")。然后在调用方线程上调用该流,稍后可以检索结果。

As mentioned in the AsyncTaskExecutor section above, if you wish some downstream component to return a message with an async payload (Future, Mono, and others), you must explicitly set the async executor to null (or "" when using XML configuration). The flow is then invoked on the caller thread and the result can be retrieved later.

Asynchronous void Return Type

消息网关方法可以这样声明:

The messaging gateway method can be declared like this:

@MessagingGateway
public interface MyGateway {

    @Gateway(requestChannel = "sendAsyncChannel")
    @Async
    void sendAsync(String payload);

}

但下游异常不会传播回调用者。为了确保下游流程调用的异步行为和异常传播到调用者,从版本 6.0 开始,框架提供了对`Future<Void>`和`Mono<Void>`返回类型的支持。用例类似于之前为普通`void`返回类型描述的发送和忘记行为,但不同之处在于,流程执行发生在异步情况下,返回的`Future`(或`Mono`)以`null`完成,或根据`send`操作结果异常完成。

But downstream exceptions are not going to be propagated back to the caller. To ensure asynchronous behavior for downstream flow invocation and exception propagation to the caller, starting with version 6.0, the framework provides support for the Future<Void> and Mono<Void> return types. The use-case is similar to send-and-forget behavior described before for plain void return type, but with a difference that flow execution happens asynchronously and returned Future (or Mono) is complete with a null or exceptionally according to the send operation result.

如果 Future<Void> 是准确的下游流程回复,则网关的 asyncExecutor 选项必须设置为 null(对于 @MessagingGateway 配置为 AnnotationConstants.NULL),send 部分在生产程序线程上执行。一个回复取决于下游流程配置。这样,目标应用程序可以正确地生成 Future<Void> 回复。Mono 用例已经超出了框架线程控制,因此将 asyncExecutor 设置为 null 没有意义。Mono<Void> 作为请求-回复网关操作的结果必须配置为网关方法的 Mono<?> 返回类型。

If the Future<Void> is exact downstream flow reply, then an asyncExecutor option of the gateway must be set to null (AnnotationConstants.NULL for a @MessagingGateway configuration) and the send part is performed on a producer thread. The reply one depends on the downstream flow configuration. This way it is up target application to produce a Future<Void> reply correctly. The Mono use-case is already out of the framework threading control, so setting asyncExecutor to null won’t make sense. There Mono<Void> as a result of the request-reply gateway operation must be configured as a Mono<?> return type of the gateway method.

Gateway Behavior When No response Arrives

explained earlier 相同,该网关提供了一种通过 POJO 方法调用与消息传递系统进行交互的便捷方式。但是,通常预期始终返回的典型方法调用(即使出现异常)可能并不总是映射为一对一的消息交换(例如,回复消息可能无法到达,相当于方法未返回)。

As explained earlier, the gateway provides a convenient way of interacting with a messaging system through POJO method invocations. However, a typical method invocation, which is generally expected to always return (even with an Exception), might not always map one-to-one to message exchanges (for example, a reply message might not arrive — the equivalent to a method not returning).

本节的其余部分涵盖了各种场景以及如何使网关的行为更可预测。可以配置某些属性以使同步网关行为更可预测,但其中一些属性可能并不总是按预期工作。其中之一是`reply-timeout`(在方法级别或`default-reply-timeout`在网关级别)。我们检查`reply-timeout`属性,看看它可以在哪些场景中影响同步网关的行为,又有哪些场景中不能。我们检查单线程场景(下游所有组件都通过直接信道连接)和多线程场景(例如,在下游的某个地方,您可能有一个可轮询信道或执行器信道,它打破了单线程边界)。

The rest of this section covers various scenarios and how to make the gateway behave more predictably. Certain attributes can be configured to make synchronous gateway behavior more predictable, but some of them might not always work as you might expect. One of them is reply-timeout (at the method level or default-reply-timeout at the gateway level). We examine the reply-timeout attribute to see how it can and cannot influence the behavior of the synchronous gateway in various scenarios. We examine a single-threaded scenario (all components downstream are connected through a direct channel) and multi-threaded scenarios (for example, somewhere downstream you may have a pollable or executor channel that breaks the single-thread boundary).

Long-running Process Downstream

Sync Gateway, single-threaded

If a component downstream is still running (perhaps because of an infinite loop or a slow service), setting a reply-timeout has no effect, and the gateway method call does not return until the downstream service exits (by returning or throwing an exception).

Sync Gateway, multi-threaded

If a component downstream is still running (perhaps because of an infinite loop or a slow service) in a multi-threaded message flow, setting the reply-timeout has an effect by allowing gateway method invocation to return once the timeout has been reached, because the GatewayProxyFactoryBean polls on the reply channel, waiting for a message until the timeout expires. However, if the timeout has been reached before the actual reply was produced, it could result in a 'null' return from the gateway method. You should understand that the reply message (if produced) is sent to a reply channel after the gateway method invocation might have returned, so you must be aware of that and design your flow with it in mind.

另请参阅`errorOnTimeout`属性,以便在发生超时时引发`MessageTimeoutException`,而不是返回`null`。

Also see the errorOnTimeout property to throw a MessageTimeoutException instead of returning null, when a timeout occurs.

Downstream Component Returns 'null'

Sync Gateway — single-threaded

If a component downstream returns 'null' and the reply-timeout has been configured to negative value, the gateway method call hangs indefinitely, unless the requires-reply attribute has been set on the downstream component (for example, a service activator) that might return 'null'. In this case, an exception would be thrown and propagated to the gateway.

Sync Gateway — multi-threaded

The behavior is the same as the previous case.

Downstream Component Return Signature is 'void' While Gateway Method Signature Is Non-void

Sync Gateway — single-threaded

If a component downstream returns 'void' and the reply-timeout has been configured to negative value, the gateway method call hangs indefinitely.

Sync Gateway — multi-threaded

The behavior is the same as the previous case.

Downstream Component Results in Runtime Exception

Sync Gateway — single-threaded

If a component downstream throws a runtime exception, the exception is propagated through an error message back to the gateway and re-thrown.

Sync Gateway — multi-threaded

The behavior is the same as the previous case.

您应该了解,默认情况下,reply-timeout 是无界的。因此,如果您将 reply-timeout 设置为负值,则您的网关方法调用可能会无限期地挂起。因此,为了确保您分析您的流程,并且即使这些场景中有一个最微小的可能性出现,您也应该将 reply-timeout 属性设置为“safe”的值。默认情况下,它是 30 秒。更好的是,您可以将下游组件的 requires-reply 属性设置为 true,以确保及时响应,就像一旦该下游组件在内部返回 null 时抛出异常所产生的一样。但是,您还应该意识到,某些场景(参见 the first one)中,reply-timeout 无济于事。这意味着分析您的消息流并决定何时使用同步网关而不是异步网关也很重要。正如 described earlier 所说,后一种情况是定义返回 Future 实例的网关方法的问题。然后,您保证会收到该返回值,并且可以更精细地控制调用的结果。此外,在处理路由器时,您应该记住,将 resolution-required 属性设置为 true 会导致路由器无法解析特定通道而抛出异常。同样,在处理过滤器时,您可以设置 throw-exception-on-rejection 属性。在这两种情况下,所生成流的行为就像它包含具有“requires-reply”属性的服务激活器。换句话说,它有助于确保网关方法调用的及时响应。

You should understand that, by default, reply-timeout is unbounded. Consequently, if you set the reply-timeout to negative value, your gateway method invocation might hang indefinitely. So, to make sure you analyze your flow and if there is even a remote possibility of one of these scenarios to occur, you should set the reply-timeout attribute to a "'safe'" value. It is 30 seconds by default. Even better, you can set the requires-reply attribute of the downstream component to 'true' to ensure a timely response, as produced by the throwing of an exception as soon as that downstream component returns null internally. However, you should also realize that there are some scenarios (see the first one) where reply-timeout does not help. That means it is also important to analyze your message flow and decide when to use a synchronous gateway rather than an asynchronous gateway. As described earlier, the latter case is a matter of defining gateway methods that return Future instances. Then you are guaranteed to receive that return value, and you have more granular control over the results of the invocation. Also, when dealing with a router, you should remember that setting the resolution-required attribute to 'true' results in an exception thrown by the router if it can not resolve a particular channel. Likewise, when dealing with a Filter, you can set the throw-exception-on-rejection attribute. In both of these cases, the resulting flow behaves like it contain a service activator with the 'requires-reply' attribute. In other words, it helps to ensure a timely response from the gateway method invocation.

您应该理解,当线程返回到网关时定时器将启动——即当流完成或一条消息移交给另一线程时启动。在该时刻,调用线程开始等待答复。如果流完全是同步的,则可立即收到答复。对于异步流,该线程最长等待此时间。

You should understand that the timer starts when the thread returns to the gateway — that is, when the flow completes or a message is handed off to another thread. At that time, the calling thread starts waiting for the reply. If the flow was completely synchronous, the reply is immediately available. For asynchronous flows, the thread waits for up to this time.

从版本 6.2 开始,errorOnTimeoutMethodInvocationGateway 的内部 MessagingGatewaySupport 扩展的属性公开在 @MessagingGatewayGatewayEndpointSpec 中。此选项具有与 Endpoint Summary 章节末尾解释的任何入站网关完全相同的含义。换句话说,将此选项设置为 true,将导致在接收超时耗尽时从发送和接收网关操作中抛出 MessageTimeoutException,而不是返回 null

Starting with version 6.2, the errorOnTimeout property of the internal MethodInvocationGateway extension of the MessagingGatewaySupport is exposed on the @MessagingGateway and GatewayEndpointSpec. This option has exactly the same meaning as for any inbound gateway explained in the end of Endpoint Summary chapter. In other words, setting this option to true, would lead to the MessageTimeoutException being thrown from a send-and-receive gateway operation instead of returning null when the receive timeout is exhausted.

有关通过 IntegrationFlow 定义网关的选项,请参阅 Java DSL 章节中的 IntegrationFlow as Gateway

See IntegrationFlow as Gateway in the Java DSL chapter for options to define gateways through IntegrationFlow.