Messaging Gateways
网关隐藏了 Spring Integration 提供的消息传递 API。它让你的应用程序业务逻辑不知道 Spring Integration API。通过使用通用网关,你的代码仅与一个简单接口进行交互。
A gateway hides the messaging API provided by Spring Integration. It lets your application’s business logic be unaware of the Spring Integration API. By using a generic Gateway, your code interacts with only a simple interface.
Enter the GatewayProxyFactoryBean
如前所述,最好不依赖 Spring Integration API,包括网关类。因此,Spring Integration 提供了 GatewayProxyFactoryBean
,它为任何接口生成一个代理,并在内部调用下面显示的网关方法。然后,通过使用依赖关系注入,你可以向你的业务方法公开接口。
As mentioned earlier, it would be great to have no dependency on the Spring Integration API — including the gateway class.
For that reason, Spring Integration provides the GatewayProxyFactoryBean
, which generates a proxy for any interface and internally invokes the gateway methods shown below.
By using dependency injection, you can then expose the interface to your business methods.
以下示例显示了可用来与 Spring Integration 交互的接口:
The following example shows an interface that can be used to interact with Spring Integration:
public interface Cafe {
void placeOrder(Order order);
}
Gateway XML Namespace Support
也提供了命名空间支持。它允许你将一个接口配置为一项服务,如下例所示:
Namespace support is also provided. It lets you configure an interface as a service, as the following example shows:
<int:gateway id="cafeService"
service-interface="org.cafeteria.Cafe"
default-request-channel="requestChannel"
default-reply-timeout="10000"
default-reply-channel="replyChannel"/>
在此配置定义下,cafeService
现在可注入到其他 Bean,并且在 Cafe
接口的代理实例上调用方法的代码无需知道 Spring Integration API。有关一个在 Cafe 示例中使用 gateway
元素的示例,请参阅 xref:samples.adoc#samples-impl[“Samples” 附件。
With this configuration defined, the cafeService
can now be injected into other beans, and the code that invokes the methods on that proxied instance of the Cafe
interface has no awareness of the Spring Integration API.
See the “Samples” Appendix for an example that uses the gateway
element (in the Cafe demo).
上述配置中的默认值将应用到网关接口上的所有方法。如果没有指定回复超时,则调用线程将等待回复 30 秒。请参见 Gateway Behavior When No response Arrives。
The defaults in the preceding configuration are applied to all methods on the gateway interface. If a reply timeout is not specified, the calling thread waits for a reply for 30 seconds. See Gateway Behavior When No response Arrives.
可以针对各个方法覆盖默认值。请参见 Gateway Configuration with Annotations and XML。
The defaults can be overridden for individual methods. See Gateway Configuration with Annotations and XML.
Setting the Default Reply Channel
通常,您无需指定 default-reply-channel
,因为网关会自动创建一个临时的匿名回复通道,它将在其中监听回复。但是,某些情况下可能会提示您定义 default-reply-channel
(或带有适配器网关(例如 HTTP、JMS 等)的 reply-channel
)。
Typically, you need not specify the default-reply-channel
, since a Gateway auto-creates a temporary, anonymous reply channel, where it listens for the reply.
However, some cases may prompt you to define a default-reply-channel
(or reply-channel
with adapter gateways, such as HTTP, JMS, and others).
为了了解一些背景,我们简要讨论下网关的内部工作原理。网关创建一个临时的点对点回复通道。它是匿名的,且使用名为 replyChannel
的信息标头添加到其中。在提供一个显式 default-reply-channel
(带有远程适配器网关的 reply-channel
)时,您可以指向发布-订阅通道,之所以如此命名,是因为您可以向其中添加多个订阅者。在内部,Spring Integration 创建一个临时的 replyChannel
和明确定义的 default-reply-channel
之间的桥梁。
For some background, we briefly discuss some inner workings of the gateway.
A gateway creates a temporary point-to-point reply channel.
It is anonymous and is added to the message headers with the name, replyChannel
.
When providing an explicit default-reply-channel
(reply-channel
with remote adapter gateways), you can point to a publish-subscribe channel, which is so named because you can add more than one subscriber to it.
Internally, Spring Integration creates a bridge between the temporary replyChannel
and the explicitly defined default-reply-channel
.
假设您希望回复不仅会发送到网关,还会发送到其他一些消费者。在这种情况下,您有两个需求:
Suppose you want your reply to go not only to the gateway but also to some other consumer. In this case, you want two things:
-
A named channel to which you can subscribe
-
That channel to be a publish-subscribe-channel
网关使用的默认策略不能满足这些需求,因为添加到标头中的回复通道是匿名的,并且是点对点的。这意味着没有其他订阅者可以处理它,即便可以处理,该通道具有点对点行为,因此只有一位订阅者可以获取消息。通过定义一个 default-reply-channel
,您可以指向您选择的通道。在这种情况下,这是一个 publish-subscribe-channel
。网关从它到存储在标头中的临时匿名回复通道创建一个桥梁。
The default strategy used by the gateway does not satisfy those needs, because the reply channel added to the header is anonymous and point-to-point.
This means that no other subscriber can get a handle to it and, even if it could, the channel has point-to-point behavior such that only one subscriber would get the message.
By defining a default-reply-channel
you can point to a channel of your choosing.
In this case, that is a publish-subscribe-channel
.
The gateway creates a bridge from it to the temporary, anonymous reply channel that is stored in the header.
您可能还想明确提供一个回复通道,以通过拦截器(例如,wiretap)进行监视或审核。要配置通道拦截器,您需要一个命名通道。
You might also want to explicitly provide a reply channel for monitoring or auditing through an interceptor (for example, wiretap). To configure a channel interceptor, you need a named channel.
从版本 5.4 开始,当网关方法返回类型为 |
Starting with version 5.4, when gateway method return type is |
Gateway Configuration with Annotations and XML
请考虑以下示例,它通过添加 @Gateway
注释扩展了之前的 Cafe
接口示例:
Consider the following example, which expands on the previous Cafe
interface example by adding a @Gateway
annotation:
public interface Cafe {
@Gateway(requestChannel="orders")
void placeOrder(Order order);
}
@Header
注释允许您添加解释为消息标头的值,如下例所示:
The @Header
annotation lets you add values that are interpreted as message headers, as the following example shows:
public interface FileWriter {
@Gateway(requestChannel="filesOut")
void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);
}
如果您更喜欢使用 XML 方法配置网关方法,则可以向网关配置添加 method
元素,如下例所示:
If you prefer the XML approach to configuring gateway methods, you can add method
elements to the gateway configuration, as the following example shows:
<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB"/>
<int:method name="echoViaDefault"/>
</int:gateway>
您还可以使用 XML 为每个方法调用提供单独的标头。如果要设置的标头本质上是静态的,并且不想通过使用 @Header
注释将它们嵌入到网关的方法签名中,这可能很有用。例如,在贷款经纪人示例中,我们希望影响贷款报价的聚合方式,具体取决于发起哪种类型的请求(单报价或所有报价)。确定请求的类型通过评估调用的网关方法(虽然可以实现),但这将违反关注点分离范式(该方法是 Java 数据制品)。但是,在消息标头中表述您的意图(元信息)在消息传递体系结构中是自然的。以下示例展示了如何为两个方法中的每个方法添加不同的消息标头:
You can also use XML to provide individual headers for each method invocation.
This could be useful if the headers you want to set are static in nature, and you do not want to embed them in the gateway’s method signature by using @Header
annotations.
For example, in the loan broker example, we want to influence how aggregation of the loan quotes is done, based on what type of request was initiated (single quote or all quotes).
Determining the type of the request by evaluating which gateway method was invoked, although possible, would violate the separation of concerns paradigm (the method is a Java artifact).
However, expressing your intention (meta information) in message headers is natural in a messaging architecture.
The following example shows how to add a different message header for each of two methods:
<int:gateway id="loanBrokerGateway"
service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
<int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="BEST"/>
</int:method>
<int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="ALL"/>
</int:method>
</int:gateway>
在上一个示例中,根据网关的方法对 'RESPONSE_TYPE' 标头设置了不同的值。
In the preceding example a different value is set for the 'RESPONSE_TYPE' header, based on the gateway’s method.
例如,如果您在 <int:method/>
中将 requestChannel
和 @Gateway
注释中指定,注释值获胜。
If you specify, for example, the requestChannel
in <int:method/>
as well as in a @Gateway
annotation, the annotation value wins.
如果在 XML 中指定了无参数的网关,并且接口方法同时带有 |
If a no-argument gateway is specified in XML, and the interface method has both a |
Expressions and “Global” Headers
<header/>
元素支持 expression
作为 value
的替代方案。SpEL 表达式经过求值以确定标头的值。从版本 5.2 开始,评估上下文的 #root
对象是具有 getMethod()
和 getArgs()
访问器的 MethodArgsHolder
。例如,如果您希望对简单的方法名进行路由,则可以添加具有以下表达式的标头:method.name
。
The <header/>
element supports expression
as an alternative to value
.
The SpEL expression is evaluated to determine the value of the header.
Starting with version 5.2, the #root
object of the evaluation context is a MethodArgsHolder
with getMethod()
and getArgs()
accessors.
For example, if you wish to route on the simple method name, you might add a header with the following expression: method.name
.
|
The |
自版本 3.0 以来,可以定义 <default-header/>
元素以向网关生成的所有消息添加标头,无论调用的方法是什么。为方法定义的特定标头优先于默认标头。此处为方法定义的特定标头将覆盖服务接口中的任何 @Header
注释。但是,默认标头不会覆盖服务接口中的任何 @Header
注释。
Since version 3.0, <default-header/>
elements can be defined to add headers to all the messages produced by the gateway, regardless of the method invoked.
Specific headers defined for a method take precedence over default headers.
Specific headers defined for a method here override any @Header
annotations in the service interface.
However, default headers do NOT override any @Header
annotations in the service interface.
该网关现在还支持 default-payload-expression
,它适用于所有方法(除非另行规定)。
The gateway now also supports a default-payload-expression
, which is applied for all methods (unless overridden).
Mapping Method Arguments to a Message
使用前一节中的配置技术可以控制如何将方法参数映射到消息元素(有效负载和标头)。当不使用显式配置时,将使用特定约定执行映射。在某些情况下,这些约定无法确定哪个参数是有效负载,哪些参数应映射到标头。请考虑以下示例:
Using the configuration techniques in the previous section allows control of how method arguments are mapped to message elements (payload and headers). When no explicit configuration is used, certain conventions are used to perform the mapping. In some cases, these conventions cannot determine which argument is the payload and which should be mapped to headers. Consider the following example:
public String send1(Object thing1, Map thing2);
public String send2(Map thing1, Map thing2);
在第一种情况下,约定是将第一个参数映射到有效负载(只要它不是 Map
),而第二个参数的内容将变为标头。
In the first case, the convention is to map the first argument to the payload (as long as it is not a Map
) and the contents of the second argument become headers.
在第二种情况下(或参数 thing1
是 Map
的第一种情况),框架无法确定哪个参数应为有效负载。因此,映射将失败。这通常可以使用 payload-expression
、@Payload
注释或 @Headers
注释来解决。
In the second case (or the first when the argument for parameter thing1
is a Map
), the framework cannot determine which argument should be the payload.
Consequently, mapping fails.
This can generally be resolved using a payload-expression
, a @Payload
annotation, or a @Headers
annotation.
或者(每当约定失效时),您可以承担将方法调用映射到消息的全部责任。为此,实现 MethodArgsMessageMapper
,并使用 mapper
属性将其提供给 <gateway/>
。映射器映射 MethodArgsHolder
,它是一个封装 java.reflect.Method
实例和包含参数的 Object[]
的简单类。提供定制映射器时,网关上不允许出现 default-payload-expression
属性和 <default-header/>
元素。同样,任何 <method/>
上都不允许出现 payload-expression
属性和 <header/>
元素。
Alternatively (and whenever the conventions break down), you can take the entire responsibility for mapping the method calls to messages.
To do so, implement an MethodArgsMessageMapper
and provide it to the <gateway/>
by using the mapper
attribute.
The mapper maps a MethodArgsHolder
, which is a simple class that wraps the java.reflect.Method
instance and an Object[]
containing the arguments.
When providing a custom mapper, the default-payload-expression
attribute and <default-header/>
elements are not allowed on the gateway.
Similarly, the payload-expression
attribute and <header/>
elements are not allowed on any <method/>
elements.
Mapping Method Arguments
以下示例展示了如何将方法参数映射至消息,并展示了一些无效配置示例:
The following examples show how method arguments can be mapped to the message and shows some examples of invalid configuration:
public interface MyGateway {
void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);
void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);
void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);
void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added
void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);
@Payload("args[0] + args[1] + '!'")
void payloadAnnotationAtMethodLevel(String a, String b);
@Payload("@someBean.exclaim(args[0])")
void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);
void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);
void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); // 1
// invalid
void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);
// invalid
void twoPayloads(@Payload String s1, @Payload String s2);
// invalid
void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);
// invalid
void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);
}
1 | Note that, in this example, the SpEL variable, #this , refers to the argument — in this case, the value of s . |
XML 等价项看起来有点不同,因为没有 #this
方法参数的上下文。但是,表达式可以通过使用 MethodArgsHolder
根对象(有关更多信息,请参阅 xref:gateway.adoc#gateway-expressions[Expressions and “Global” 标题)的 args
属性来引用方法参数,如下例所示:
The XML equivalent looks a little different, since there is no #this
context for the method argument.
However, expressions can refer to method arguments by using the args
property for the MethodArgsHolder
root object (see Expressions and “Global” Headers for more information), as the following example shows:
<int:gateway id="myGateway" service-interface="org.something.MyGateway">
<int:method name="send1" payload-expression="args[0] + 'thing2'"/>
<int:method name="send2" payload-expression="@someBean.sum(args[0])"/>
<int:method name="send3" payload-expression="method"/>
<int:method name="send4">
<int:header name="thing1" expression="args[2].toUpperCase()"/>
</int:method>
</int:gateway>
@MessagingGateway
Annotation
从版本 4.0 开始,网关服务接口可以使用 @MessagingGateway
注释来标记,而无需定义 <gateway/>
xml 元素进行配置。以下两组示例对比了为同一个网关配置的两种方法:
Starting with version 4.0, gateway service interfaces can be marked with a @MessagingGateway
annotation instead of requiring the definition of a <gateway />
xml element for configuration.
The following pair of examples compares the two approaches for configuring the same gateway:
<int:gateway id="myGateway" service-interface="org.something.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB">
<int:header name="thing1" value="thing2"/>
</int:method>
<int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
defaultHeaders = @GatewayHeader(name = "calledMethod",
expression="#gatewayMethod.name"))
public interface TestGateway {
@Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
String echo(String payload);
@Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
String echoUpperCase(String payload);
String echoViaDefault(String payload);
}
与 XML 版本类似,当 Spring Integration 在组件扫描期间发现这些注解时,它会使用其消息传递基础设施创建 proxy
实现。要执行此扫描并将 BeanDefinition
注册到应用程序上下文中,请将 @IntegrationComponentScan
注解添加到 @Configuration
类。标准 @ComponentScan
基础设施不会处理接口。因此,我们引入了自定义 @IntegrationComponentScan
逻辑,以在接口上查找 @MessagingGateway
注解,并为它们注册 GatewayProxyFactoryBean
实例。另请参阅 Annotation Support。
Similarly to the XML version, when Spring Integration discovers these annotations during a component scan, it creates the proxy
implementation with its messaging infrastructure.
To perform this scan and register the BeanDefinition
in the application context, add the @IntegrationComponentScan
annotation to a @Configuration
class.
The standard @ComponentScan
infrastructure does not deal with interfaces.
Consequently, we introduced the custom @IntegrationComponentScan
logic to find the @MessagingGateway
annotation on the interfaces and register GatewayProxyFactoryBean
instances for them.
See also Annotation Support.
你可以使用 @Profile
注释来标记 @MessagingGateway
注释的服务接口,以在非活动状态中避免 bean 创建。
Along with the @MessagingGateway
annotation you can mark a service interface with the @Profile
annotation to avoid the bean creation, if such a profile is not active.
从版本 6.0 开始,带有 @MessagingGateway
的接口还可以使用 @Primary
注释来标记各自的配置逻辑,就像任何 Spring @Component
定义一样。
Starting with version 6.0, an interface with the @MessagingGateway
can also be marked with a @Primary
annotation for respective configuration logic as its possible with any Spring @Component
definition.
从版本 6.0 开始,@MessagingGateway
接口可以用于标准的 Spring @Import
配置。这可以用作 @IntegrationComponentScan
或手动 AnnotationGatewayProxyFactoryBean
bean 定义的替代方案。
Starting with version 6.0, @MessagingGateway
interfaces can be used in the standard Spring @Import
configuration.
This may be used as an alternative to the @IntegrationComponentScan
or manual AnnotationGatewayProxyFactoryBean
bean definitions.
从版本 6.0
开始,@MessagingGateway
会使用 @MessageEndpoint
作为元注释,并且 name()
属性本质上是 @Compnent.value()
的别名。这样,网关代理的 bean 名称生成策略就与扫描和导入组件的标准 Spring 注释配置保持一致。可以通过 AnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR
或作为 @IntegrationComponentScan.nameGenerator()
属性在全局范围内覆盖默认的 AnnotationBeanNameGenerator
。
The @MessagingGateway
is meta-annotated with a @MessageEndpoint
since version 6.0
and the name()
attribute is, essentially, aliased to the @Compnent.value()
.
This way the bean names generating strategy for gateway proxies is realigned with the standard Spring annotation configuration for scanned and imported components.
The default AnnotationBeanNameGenerator
can be overridden globally via an AnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR
or as a @IntegrationComponentScan.nameGenerator()
attribute.
如果您没有 XML 配置,则至少一个 |
If you have no XML configuration, the |
Invoking No-Argument Methods
在调用网关接口中没有任何参数的方法时,默认行为是接收来自 PollableChannel
的 Message
。
When invoking methods on a Gateway interface that do not have any arguments, the default behavior is to receive a Message
from a PollableChannel
.
然而,有时你想触发无参数方法,以便你可以与不需要用户提供的参数的下游其他组件交互,例如触发无参数 SQL 调用或存储过程。
Sometimes, however, you may want to trigger no-argument methods so that you can interact with other components downstream that do not require user-provided parameters, such as triggering no-argument SQL calls or stored procedures.
为了实现收发语义,你必须提供一个有效负载。要生成有效负载,接口中的方法参数不是必需的。你可以在方法元素的 XML 中使用 @Payload
注释或 payload-expression
属性。以下列表包括一些有效负载示例:
To achieve send-and-receive semantics, you must provide a payload.
To generate a payload, method parameters on the interface are not necessary.
You can either use the @Payload
annotation or the payload-expression
attribute in XML on the method
element.
The following list includes a few examples of what the payloads could be:
-
a literal string
-
#gatewayMethod.name
-
new java.util.Date()
-
@someBean.someMethod()'s return value
以下示例展示如何使用 @Payload
注释:
The following example shows how to use the @Payload
annotation:
public interface Cafe {
@Payload("new java.util.Date()")
List<Order> retrieveOpenOrders();
}
你还可以使用 @Gateway
注释。
You can also use the @Gateway
annotation.
public interface Cafe {
@Gateway(payloadExpression = "new java.util.Date()")
List<Order> retrieveOpenOrders();
}
如果同时存在这两个注释(并且提供了 |
If both annotations are present (and the |
如果一个方法没有参数也没有返回值,但是包含一个有效负载表达式,则将其视为仅发送操作。
If a method has no argument and no return value but does contain a payload expression, it is treated as a send-only operation.
Invoking default
Methods
网关代理的接口也可以有 default
方法,从版本 5.3 开始,框架会向代理注入 DefaultMethodInvokingMethodInterceptor
,以使用 java.lang.invoke.MethodHandle
方法代替代理来调用 default
方法。来自 JDK 的接口,如 java.util.function.Function
,仍然可以用于网关代理,但由于针对 JDK 类使用 MethodHandles.Lookup
实例化的内部 Java 安全原因,它们不能被调用 default
方法。这些方法还可以使用对方法的显式 @Gateway
注释代理(丢失它们的实现逻辑并同时恢复先前的网关代理行为),或者在 @MessagingGateway
注释或 <gateway>
XML 组件上使用 proxyDefaultMethods
。
An interface for gateway proxy may have default
methods as well and starting with version 5.3, the framework injects a DefaultMethodInvokingMethodInterceptor
into a proxy for calling default
methods using a java.lang.invoke.MethodHandle
approach instead of proxying.
The interfaces from JDK, such as java.util.function.Function
, still can be used for gateway proxy, but their default
methods cannot be called because of internal Java security reasons for a MethodHandles.Lookup
instantiation against JDK classes.
These methods also can be proxied (losing their implementation logic and, at the same time, restoring previous gateway proxy behavior) using an explicit @Gateway
annotation on the method, or proxyDefaultMethods
on the @MessagingGateway
annotation or <gateway>
XML component.
Error Handling
网关调用会产生错误。默认情况下,在网关方法调用时,任何下游发生的错误都将“原样”重新抛出。例如,考虑以下简单流程:
The gateway invocation can result in errors. By default, any error that occurs downstream is re-thrown “as is” upon the gateway’s method invocation. For example, consider the following simple flow:
gateway -> service-activator
如果服务激活器调用的服务抛出 MyException
(例如),框架会将其封装在 MessagingException
中,并将传递给服务激活器的消息附加到 failedMessage
属性中。因此,框架执行的任何日志记录都具有完整的故障上下文。默认情况下,当网关捕获异常时,会取消 MyException
的封装并将其抛给调用方。你可以在网关方法声明的 throws
子句中配置一个,以匹配原因链中的特定异常类型。例如,如果你想捕获一个带有所有下游错误原因消息传递信息的 MessagingException
,则网关方法应该类似于以下内容:
If the service invoked by the service activator throws a MyException
(for example), the framework wraps it in a MessagingException
and attaches the message passed to the service activator in the failedMessage
property.
Consequently, any logging performed by the framework has full the context of the failure.
By default, when the exception is caught by the gateway, the MyException
is unwrapped and thrown to the caller.
You can configure a throws
clause on the gateway method declaration to match the particular exception type in the cause chain.
For example, if you want to catch a whole MessagingException
with all the messaging information of the reason of downstream error, you should have a gateway method similar to the following:
public interface MyGateway {
void performProcess() throws MessagingException;
}
由于我们鼓励 POJO 编程,你可能不想让调用方接触消息传递基础架构。
Since we encourage POJO programming, you may not want to expose the caller to messaging infrastructure.
如果你的网关方法没有 throws
子句,网关会遍历原因树,查找不是 MessagingException
的 RuntimeException
。如果没有找到,框架会抛出 MessagingException
。如果前面讨论中的 MyException
是 SomeOtherException
的原因,并且你的方法 throws SomeOtherException
,网关会进一步取消封装并将其抛给调用方。
If your gateway method does not have a throws
clause, the gateway traverses the cause tree, looking for a RuntimeException
that is not a MessagingException
.
If none is found, the framework throws the MessagingException
.
If the MyException
in the preceding discussion has a cause of SomeOtherException
and your method throws SomeOtherException
, the gateway further unwraps that and throws it to the caller.
未用 service-interface
声明网关时,将使用内部框架接口 RequestReplyExchanger
。
When a gateway is declared with no service-interface
, an internal framework interface RequestReplyExchanger
is used.
请考虑以下示例:
Consider the following example:
public interface RequestReplyExchanger {
Message<?> exchange(Message<?> request) throws MessagingException;
}
5.0 版之前,该 exchange
方法没有 throws
子句,结果是异常未包装。如果你使用此接口并想要还原先前的未包装行为,请改用自定义的 service-interface
或自己访问 MessagingException
的 cause
。
Before version 5.0, this exchange
method did not have a throws
clause and, as a result, the exception was unwrapped.
If you use this interface and want to restore the previous unwrap behavior, use a custom service-interface
instead or access the cause
of the MessagingException
yourself.
但是,你可能希望记录错误而不是传播错误,或者可能希望将异常视为有效答复(通过将其映射到遵循某种“错误消息”契约且调用者理解的消息)。为了实现此目的,网关提供对专门用于错误的消息通道的支持,方法是包括对 error-channel
属性的支持。在以下示例中,一个“转换器”从 Exception
创建一个答复 Message
:
However, you may want to log the error rather than propagating it, or you may want to treat an exception as a valid reply (by mapping it to a message that conforms to some "error message" contract that the caller understands).
To accomplish this, the gateway provides support for a message channel dedicated to the errors by including support for the error-channel
attribute.
In the following example, a 'transformer' creates a reply Message
from the Exception
:
<int:gateway id="sampleGateway"
default-request-channel="gatewayChannel"
service-interface="foo.bar.SimpleGateway"
error-channel="exceptionTransformationChannel"/>
<int:transformer input-channel="exceptionTransformationChannel"
ref="exceptionTransformer" method="createErrorResponse"/>
exceptionTransformer
可以是一个简单的 POJO,它知道如何创建预期的错误响应对象。这将成为发送回调用者的有效内容。如果必要,你可以在这样的“错误流
”中执行更多精心的事情。它可能涉及路由器(包括 Spring Integration 的 ErrorMessageExceptionTypeRouter
)、过滤器等。但是,大多数情况下,一个简单的“转换器”就足够了。
The exceptionTransformer
could be a simple POJO that knows how to create the expected error response objects.
That becomes the payload that is sent back to the caller.
You could do many more elaborate things in such an “error flow”, if necessary.
It might involve routers (including Spring Integration’s ErrorMessageExceptionTypeRouter
), filters, and so on.
Most of the time, a simple 'transformer' should be sufficient, however.
或者,你可能只想记录异常(或异步地将其发送到某个地方)。如果你提供单向流,不会向调用者发送任何内容。如果你想要完全禁止异常,你可以提供对全局 nullChannel
的引用(基本上是 /dev/null
方法)。最后,正如上文所述,如果没有定义 error-channel
,则异常会像往常一样传播。
Alternatively, you might want to only log the exception (or send it somewhere asynchronously).
If you provide a one-way flow, nothing would be sent back to the caller.
If you want to completely suppress exceptions, you can provide a reference to the global nullChannel
(essentially a /dev/null
approach).
Finally, as mentioned above, if no error-channel
is defined, then the exceptions propagate as usual.
When you use the @MessagingGateway
annotation (see @MessagingGateway` Annotation
), you can use an `errorChannel
attribute.
从版本 5.0 开始,当你使用带有 void
返回类型(单向流)的网关方法时,error-channel
引用(如果提供)会填充到每个已发送消息的标准 errorChannel
头中。此功能允许基于标准 ExecutorChannel
配置(或 QueueChannel
)的下游异步流覆盖默认全局 errorChannel
异常发送行为。以前,你必须使用 @GatewayHeader
注解或 <header>
元素手动指定 errorChannel
头。对于带有异步流的 void
方法,会忽略 error-channel
属性。相反,错误消息会发送到默认 errorChannel
。
Starting with version 5.0, when you use a gateway method with a void
return type (one-way flow), the error-channel
reference (if provided) is populated in the standard errorChannel
header of each sent message.
This feature allows a downstream asynchronous flow, based on the standard ExecutorChannel
configuration (or a QueueChannel
), to override a default global errorChannel
exceptions sending behavior.
Previously you had to manually specify an errorChannel
header with the @GatewayHeader
annotation or the <header>
element.
The error-channel
property was ignored for void
methods with an asynchronous flow.
Instead, error messages were sent to the default errorChannel
.
通过简单的 POJI 网关公开消息传递系统能带来好处,但 “hiding” 底层消息传递系统在现实中需要付出代价,所以需要考虑某些因素。我们希望我们的 Java 方法能够尽快返回,而不是在调用方等待其返回(无论是 void、返回值还是抛出异常)时无限期地挂起。当将常规方法用作消息传递系统之前的代理时,我们必须考虑底层消息传递的潜在异步性质。这意味着由网关发起的某个消息可能被过滤器丢弃而永远无法到达负责生成回复的组件。某些服务激活器方法可能会导致异常,因此无法提供回复(因为我们不会生成空消息)。换句话说,多种情况可能导致永远不会有回复消息返回。这是消息传递系统中的自然现象。但是,考虑它对网关方法的影响。网关方法输入参数被合并到一条消息中并下发。回复消息将转换为网关方法的返回值。因此,你可能需要确保对于每个网关调用,始终都有回复消息。否则,如果将 reply-timeout
设置为负值,网关方法可能永远不会返回并无限期地挂起。处理这种情况的一种方法是使用异步网关(稍后在本节中进行了解释)。处理它的另一种方法是依赖于 reply-timeout
作为 30
秒的默认设置。这样,网关将不会挂起超过 reply-timeout
中指定的时间,并且如果该超时时间过去,则返回 'null'。最后,你可能需要考虑设置下游标志,例如服务激活器上的 'requires-reply' 或过滤器上的 'throw-exceptions-on-rejection'。本章的末尾部分将对这些选项进行更详细的讨论。
Exposing the messaging system through simple POJI Gateways provides benefits, but “hiding” the reality of the underlying messaging system does come at a price, so there are certain things you should consider.
We want our Java method to return as quickly as possible and not hang for an indefinite amount of time while the caller is waiting on it to return (whether void, a return value, or a thrown Exception).
When regular methods are used as a proxies in front of the messaging system, we have to take into account the potentially asynchronous nature of the underlying messaging.
This means that there might be a chance that a message that was initiated by a gateway could be dropped by a filter and never reach a component that is responsible for producing a reply.
Some service activator method might result in an exception, thus providing no reply (as we do not generate null messages).
In other words, multiple scenarios can cause a reply message to never come.
That is perfectly natural in messaging systems.
However, think about the implication on the gateway method.
The gateway’s method input arguments were incorporated into a message and sent downstream.
The reply message would be converted to a return value of the gateway’s method.
So you might want to ensure that, for each gateway call, there is always a reply message.
Otherwise, your gateway method might never return and hang indefinitely if reply-timeout
is set to negative value.
One way to handle this situation is by using an asynchronous gateway (explained later in this section).
Another way of handling it is to rely on a default reply-timeout
as a 30
seconds.
That way, the gateway does not hang any longer than the time specified by the reply-timeout
and returns 'null' if that timeout does elapse.
Finally, you might want to consider setting downstream flags, such as 'requires-reply', on a service-activator or 'throw-exceptions-on-rejection' on a filter.
These options are discussed in more detail in the final section of this chapter.
如果下游流返回一个 |
If the downstream flow returns an |
Gateway Timeouts
网关有两个超时属性:requestTimeout
和 replyTimeout
。请求超时仅在通道可以阻塞(例如,满的边界 QueueChannel
)时才适用。replyTimeout
值是指网关等待答复或返回 null
的时间长度。其默认为无穷大。
Gateways have two timeout properties: requestTimeout
and replyTimeout
.
The request timeout applies only if the channel can block (for example, a bounded QueueChannel
that is full).
The replyTimeout
value is how long the gateway waits for a reply or returns null
.
It defaults to infinity.
超时可以针对网关上的所有方法(defaultRequestTimeout
和 defaultReplyTimeout
)或针对 MessagingGateway
接口注解进行设置。各个方法可以在 <method/>
子元素中或在 @Gateway
注解上覆盖这些默认值。
The timeouts can be set as defaults for all methods on the gateway (defaultRequestTimeout
and defaultReplyTimeout
) or on the MessagingGateway
interface annotation.
Individual methods can override these defaults (in <method/>
child elements) or on the @Gateway
annotation.
从版本 5.0 开始,超时可以定义为表达式,如下例所示:
Starting with version 5.0, the timeouts can be defined as expressions, as the following example shows:
@Gateway(payloadExpression = "args[0]", requestChannel = "someChannel",
requestTimeoutExpression = "args[1]", replyTimeoutExpression = "args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);
评估上下文具有 BeanResolver
(使用 @someBean
引用其他 Bean),并且 args
数组属性来自 #root
对象可用。有关此根对象的更多信息,请参阅 xref:gateway.adoc#gateway-expressions[Expressions and “Global” 标题。使用 XML 配置时,超时属性可以是长值或 SpEL 表达式,如下例所示:
The evaluation context has a BeanResolver
(use @someBean
to reference other beans), and the args
array property from the #root
object is available.
See Expressions and “Global” Headers for more information about this root object.
When configuring with XML, the timeout attributes can be a long value or a SpEL expression, as the following example shows:
<method name="someMethod" request-channel="someRequestChannel"
payload-expression="args[0]"
request-timeout="1000"
reply-timeout="args[1]">
</method>
Asynchronous Gateway
作为一种模式,消息传递网关提供了一种很好的方法来隐藏特定于消息传递的代码,同时仍公开消息传递系统的所有功能。正如 described earlier,GatewayProxyFactoryBean
提供了一种便捷的方法,可以通过服务接口公开一个代理,使您能够基于 POJO 访问消息传递系统(基于您自己域、基元/字符串或其他对象中的对象)。但是,当通过返回值的简单 POJO 方法公开网关时,这意味着对于每个请求消息(在调用该方法时生成),都必须有一个响应消息(在该方法返回时生成)。由于消息传递系统本质上是异步的,因此您可能无法始终保证合约中的 “for each request, there will always be a reply”。Spring Integration 2.0 引入了对异步网关的支持,这提供了一种便捷的方法来启动流程,在该流程中您可能不知道是否预期收到回复或收到回复需要多长时间。
As a pattern, the messaging gateway offers a nice way to hide messaging-specific code while still exposing the full capabilities of the messaging system.
As described earlier, the GatewayProxyFactoryBean
provides a convenient way to expose a proxy over a service-interface giving you POJO-based access to a messaging system (based on objects in your own domain, primitives/Strings, or other objects).
However, when a gateway is exposed through simple POJO methods that return values, it implies that, for each request message (generated when the method is invoked), there must be a reply message (generated when the method has returned).
Since messaging systems are naturally asynchronous, you may not always be able to guarantee the contract where “for each request, there will always be a reply”. Spring Integration 2.0 introduced support for an asynchronous gateway, which offers a convenient way to initiate flows when you may not know if a reply is expected or how long it takes for replies to arrive.
为了处理这些类型的场景,Spring Integration 使用 java.util.concurrent.Future
实例来支持异步网关。
To handle these types of scenarios, Spring Integration uses java.util.concurrent.Future
instances to support an asynchronous gateway.
在 XML 配置中,没有任何更改,你仍然像定义常规网关一样定义异步网关,如下例所示:
From the XML configuration, nothing changes, and you still define asynchronous gateway the same way as you define a regular gateway, as the following example shows:
<int:gateway id="mathService"
service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
default-request-channel="requestChannel"/>
但是,网关接口(一个服务接口)有点不同,如下所示:
However, the gateway interface (a service interface) is a little different, as follows:
public interface MathServiceGateway {
Future<Integer> multiplyByTwo(int i);
}
如前例所示,网关方法的返回类型是 Future
。当 GatewayProxyFactoryBean
看到网关方法的返回类型是 Future
时,它会立即通过使用 AsyncTaskExecutor
切换到异步模式。这就是不同之处的全部。对这样的方法的调用总是立即返回一个 Future
实例。然后,你可以按照自己的步调与 Future
进行交互以获取结果、取消等等。而且,与任何其他使用 Future
实例的情况一样,调用 get()
可能会显示超时、执行异常等。以下示例展示了如何使用从异步网关返回的 Future
:
As the preceding example shows, the return type for the gateway method is a Future
.
When GatewayProxyFactoryBean
sees that the return type of the gateway method is a Future
, it immediately switches to the asynchronous mode by using an AsyncTaskExecutor
.
That is the extent of the differences.
The call to such a method always returns immediately with a Future
instance.
Then you can interact with the Future
at your own pace to get the result, cancel, and so on.
Also, as with any other use of Future
instances, calling get()
may reveal a timeout, an execution exception, and so on.
The following example shows how to use a Future
that returns from an asynchronous gateway:
MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult = result.get(1000, TimeUnit.SECONDS);
有关更详细的示例,请参阅 Spring 集成示例中的 async-gateway 示例。
For a more detailed example, see the async-gateway sample in the Spring Integration samples.
AsyncTaskExecutor
默认情况下,当为其返回类型是 Future
的任何网关方法提交内部 AsyncInvocationTask
实例时,GatewayProxyFactoryBean
使用 org.springframework.core.task.SimpleAsyncTaskExecutor
。但是,<gateway/>
元素配置中的 async-executor
属性允许你提供对 Spring 应用程序上下文中可用的 java.util.concurrent.Executor
任何实现的引用。
By default, the GatewayProxyFactoryBean
uses org.springframework.core.task.SimpleAsyncTaskExecutor
when submitting internal AsyncInvocationTask
instances for any gateway method whose return type is a Future
.
However, the async-executor
attribute in the <gateway/>
element’s configuration lets you provide a reference to any implementation of java.util.concurrent.Executor
available within the Spring application context.
(默认)SimpleAsyncTaskExecutor
支持 Future
和 CompletableFuture
返回类型。请参阅 xref:gateway.adoc#gw-completable-future[CompletableFuture
。尽管有一个默认执行器,但经常提供一个外部执行器很有用,以便您可以在日志中标识它的线程(当使用 XML 时,线程名称基于执行器的 Bean 名称),如下例所示:
The (default) SimpleAsyncTaskExecutor
supports both Future
and CompletableFuture
return types.
See CompletableFuture
.
Even though there is a default executor, it is often useful to provide an external one so that you can identify its threads in logs (when using XML, the thread name is based on the executor’s bean name), as the following example shows:
@Bean
public AsyncTaskExecutor exec() {
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
return simpleAsyncTaskExecutor;
}
@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
如果您希望返回不同的`Future`实现,则可以提供一个自定义执行器,也可以完全禁用执行器,然后在从下游流程返回的回复消息有效负载中返回`Future`。要禁用执行器,请在`GatewayProxyFactoryBean`中将其设置为`null`(通过使用`setAsyncTaskExecutor(null))。在使用 XML 配置网关时,使用`async-executor=""
。在使用`@MessagingGateway`注释进行配置时,请使用类似于以下内容的代码:
If you wish to return a different Future
implementation, you can provide a custom executor or disable the executor altogether and return the Future
in the reply message payload from the downstream flow.
To disable the executor, set it to null
in the GatewayProxyFactoryBean
(by using setAsyncTaskExecutor(null)
).
When configuring the gateway with XML, use async-executor=""
.
When configuring by using the @MessagingGateway
annotation, use code similar to the following:
@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
如果返回类型是某个具体的 Future
实现或配置的执行程序不支持的某个其他子接口,则流程在调用方的线程中运行,并且流程必须在回复消息有效负载中返回所需类型。
If the return type is a specific concrete Future
implementation or some other sub-interface that is not supported by the configured executor, the flow runs on the caller’s thread and the flow must return the required type in the reply message payload.
CompletableFuture
从版本 4.2 开始,网关方法现在可以返回`CompletableFuture<?>`。返回此类型时有两种操作模式:
Starting with version 4.2, gateway methods can now return CompletableFuture<?>
.
There are two modes of operation when returning this type:
-
When an async executor is provided and the return type is exactly
CompletableFuture
(not a subclass), the framework runs the task on the executor and immediately returns aCompletableFuture
to the caller.CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
is used to create the future. -
When the async executor is explicitly set to
null
and the return type isCompletableFuture
or the return type is a subclass ofCompletableFuture
, the flow is invoked on the caller’s thread. In this scenario, the downstream flow is expected to return aCompletableFuture
of the appropriate type.
|
The |
Usage Scenarios
在以下场景中,调用者线程使用`CompletableFuture<Invoice>`立即返回,当下游流程回复网关(提供`Invoice`对象)时,该线程完成。
In the following scenario, the caller thread returns immediately with a CompletableFuture<Invoice>
, which is completed when the downstream flow replies to the gateway (with an Invoice
object).
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />
在以下场景中,当下游流程将其提供为对网关回复的有效负载时,调用者线程使用`CompletableFuture<Invoice>`返回。发票准备就绪时,某个其他流程必须完成未来。
In the following scenario, the caller thread returns with a CompletableFuture<Invoice>
when the downstream flow provides it as the payload of the reply to the gateway.
Some other process must complete the future when the invoice is ready.
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
async-executor="" />
在以下场景中,当下游流程将其提供为对网关回复的有效负载时,调用者线程使用`CompletableFuture<Invoice>`返回。发票准备就绪时,某个其他流程必须完成未来。如果启用了`DEBUG`日志记录,则会发出一个日志条目,表明该异步执行器不能用于此方案。
In the following scenario, the caller thread returns with a CompletableFuture<Invoice>
when the downstream flow provides it as the payload of the reply to the gateway.
Some other process must complete the future when the invoice is ready.
If DEBUG
logging is enabled, a log entry is emitted, indicating that the async executor cannot be used for this scenario.
MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />
`CompletableFuture`实例可用于对回复执行额外的操作,如下例所示:
CompletableFuture
instances can be used to perform additional manipulation on the reply, as the following example shows:
CompletableFuture<String> process(String data);
...
CompletableFuture result = process("foo")
.thenApply(t -> t.toUpperCase());
...
String out = result.get(10, TimeUnit.SECONDS);
Reactor Mono
从版本 5.0 开始,GatewayProxyFactoryBean
允许使用 Mono<T>
返回类型,通过 Gateway 接口和 Project Reactor 方法使用。内部 AsyncInvocationTask
封装在 Mono.fromCallable()
中。
Starting with version 5.0, the GatewayProxyFactoryBean
allows the use of Project Reactor with gateway interface methods, using a Mono<T>
return type.
The internal AsyncInvocationTask
is wrapped in a Mono.fromCallable()
.
Mono`可用于稍后检索结果(类似于`Future<?>
),或者您可以在结果返回到网关时使用分派器使用`Consumer`对它进行使用。
A Mono
can be used to retrieve the result later (similar to a Future<?>
), or you can consume from it with the dispatcher by invoking your Consumer
when the result is returned to the gateway.
Mono
不会立即被框架刷新。因此,底层消息流程不会在网关方法返回前启动(如带有 Future<?>
Executor
任务一样)。当订阅 Mono
时,流程才开始。另一个选择是 Mono
(作为 “Composable”)可能属于 Reactor 流,此时 subscribe()
与整个 Flux
相关。以下示例演示如何使用 Project Reactor 创建一个网关:
The Mono
is not immediately flushed by the framework.
Consequently, the underlying message flow is not started before the gateway method returns (as it is with a Future<?>
Executor
task).
The flow starts when the Mono
is subscribed to.
Alternatively, the Mono
(being a “Composable”) might be a part of Reactor stream, when the subscribe()
is related to the entire Flux
.
The following example shows how to create a gateway with Project Reactor:
@MessagingGateway
public interface TestGateway {
@Gateway(requestChannel = "multiplyChannel")
Mono<Integer> multiply(Integer value);
}
@ServiceActivator(inputChannel = "multiplyChannel")
public Integer multiply(Integer value) {
return value * 2;
}
其中此类网关可用于处理`Flux`数据的某些服务中:
where such a gateway can be used in some service which deals with the Flux
of data:
@Autowired
TestGateway testGateway;
public void hadnleFlux() {
Flux.just("1", "2", "3", "4", "5")
.map(Integer::parseInt)
.flatMap(this.testGateway::multiply)
.collectList()
.subscribe(System.out::println);
}
使用 Project Reactor 的另一个示例是一个简单的回调场景,如下例所示:
Another example that uses Project Reactor is a simple callback scenario, as the following example shows:
Mono<Invoice> mono = service.process(myOrder);
mono.subscribe(invoice -> handleInvoice(invoice));
调用线程继续,流程完成后将调用`handleInvoice()`。
The calling thread continues, with handleInvoice()
being called when the flow completes.
有关详细信息,请 Kotlin Coroutines 查看。
Also see Kotlin Coroutines for more information.
Downstream Flows Returning an Asynchronous Type
如上文 AsyncTaskExecutor
部分中所述,如果您希望某个下游组件返回带有异步有效负载(Future
、Mono
和其他)的消息,您必须将异步执行器显式设置为 null
(或在使用 XML 配置时设置为 ""
)。然后在调用方线程上调用该流,稍后可以检索结果。
As mentioned in the AsyncTaskExecutor
section above, if you wish some downstream component to return a message with an async payload (Future
, Mono
, and others), you must explicitly set the async executor to null
(or ""
when using XML configuration).
The flow is then invoked on the caller thread and the result can be retrieved later.
Asynchronous void
Return Type
消息网关方法可以这样声明:
The messaging gateway method can be declared like this:
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "sendAsyncChannel")
@Async
void sendAsync(String payload);
}
但下游异常不会传播回调用者。为了确保下游流程调用的异步行为和异常传播到调用者,从版本 6.0 开始,框架提供了对`Future<Void>`和`Mono<Void>`返回类型的支持。用例类似于之前为普通`void`返回类型描述的发送和忘记行为,但不同之处在于,流程执行发生在异步情况下,返回的`Future`(或`Mono`)以`null`完成,或根据`send`操作结果异常完成。
But downstream exceptions are not going to be propagated back to the caller.
To ensure asynchronous behavior for downstream flow invocation and exception propagation to the caller, starting with version 6.0, the framework provides support for the Future<Void>
and Mono<Void>
return types.
The use-case is similar to send-and-forget behavior described before for plain void
return type, but with a difference that flow execution happens asynchronously and returned Future
(or Mono
) is complete with a null
or exceptionally according to the send
operation result.
如果 |
If the |
Gateway Behavior When No response Arrives
与 explained earlier 相同,该网关提供了一种通过 POJO 方法调用与消息传递系统进行交互的便捷方式。但是,通常预期始终返回的典型方法调用(即使出现异常)可能并不总是映射为一对一的消息交换(例如,回复消息可能无法到达,相当于方法未返回)。
As explained earlier, the gateway provides a convenient way of interacting with a messaging system through POJO method invocations. However, a typical method invocation, which is generally expected to always return (even with an Exception), might not always map one-to-one to message exchanges (for example, a reply message might not arrive — the equivalent to a method not returning).
本节的其余部分涵盖了各种场景以及如何使网关的行为更可预测。可以配置某些属性以使同步网关行为更可预测,但其中一些属性可能并不总是按预期工作。其中之一是`reply-timeout`(在方法级别或`default-reply-timeout`在网关级别)。我们检查`reply-timeout`属性,看看它可以在哪些场景中影响同步网关的行为,又有哪些场景中不能。我们检查单线程场景(下游所有组件都通过直接信道连接)和多线程场景(例如,在下游的某个地方,您可能有一个可轮询信道或执行器信道,它打破了单线程边界)。
The rest of this section covers various scenarios and how to make the gateway behave more predictably.
Certain attributes can be configured to make synchronous gateway behavior more predictable, but some of them might not always work as you might expect.
One of them is reply-timeout
(at the method level or default-reply-timeout
at the gateway level).
We examine the reply-timeout
attribute to see how it can and cannot influence the behavior of the synchronous gateway in various scenarios.
We examine a single-threaded scenario (all components downstream are connected through a direct channel) and multi-threaded scenarios (for example, somewhere downstream you may have a pollable or executor channel that breaks the single-thread boundary).
Long-running Process Downstream
- Sync Gateway, single-threaded
-
If a component downstream is still running (perhaps because of an infinite loop or a slow service), setting a
reply-timeout
has no effect, and the gateway method call does not return until the downstream service exits (by returning or throwing an exception). - Sync Gateway, multi-threaded
-
If a component downstream is still running (perhaps because of an infinite loop or a slow service) in a multi-threaded message flow, setting the
reply-timeout
has an effect by allowing gateway method invocation to return once the timeout has been reached, because theGatewayProxyFactoryBean
polls on the reply channel, waiting for a message until the timeout expires. However, if the timeout has been reached before the actual reply was produced, it could result in a 'null' return from the gateway method. You should understand that the reply message (if produced) is sent to a reply channel after the gateway method invocation might have returned, so you must be aware of that and design your flow with it in mind.
另请参阅`errorOnTimeout`属性,以便在发生超时时引发`MessageTimeoutException`,而不是返回`null`。
Also see the errorOnTimeout
property to throw a MessageTimeoutException
instead of returning null
, when a timeout occurs.
Downstream Component Returns 'null'
- Sync Gateway — single-threaded
-
If a component downstream returns 'null' and the
reply-timeout
has been configured to negative value, the gateway method call hangs indefinitely, unless therequires-reply
attribute has been set on the downstream component (for example, a service activator) that might return 'null'. In this case, an exception would be thrown and propagated to the gateway. - Sync Gateway — multi-threaded
-
The behavior is the same as the previous case.
Downstream Component Return Signature is 'void' While Gateway Method Signature Is Non-void
- Sync Gateway — single-threaded
-
If a component downstream returns 'void' and the
reply-timeout
has been configured to negative value, the gateway method call hangs indefinitely. - Sync Gateway — multi-threaded
-
The behavior is the same as the previous case.
Downstream Component Results in Runtime Exception
- Sync Gateway — single-threaded
-
If a component downstream throws a runtime exception, the exception is propagated through an error message back to the gateway and re-thrown.
- Sync Gateway — multi-threaded
-
The behavior is the same as the previous case.
您应该了解,默认情况下,reply-timeout
是无界的。因此,如果您将 reply-timeout
设置为负值,则您的网关方法调用可能会无限期地挂起。因此,为了确保您分析您的流程,并且即使这些场景中有一个最微小的可能性出现,您也应该将 reply-timeout
属性设置为“safe”的值。默认情况下,它是 30
秒。更好的是,您可以将下游组件的 requires-reply
属性设置为 true,以确保及时响应,就像一旦该下游组件在内部返回 null 时抛出异常所产生的一样。但是,您还应该意识到,某些场景(参见 the first one)中,reply-timeout
无济于事。这意味着分析您的消息流并决定何时使用同步网关而不是异步网关也很重要。正如 described earlier 所说,后一种情况是定义返回 Future
实例的网关方法的问题。然后,您保证会收到该返回值,并且可以更精细地控制调用的结果。此外,在处理路由器时,您应该记住,将 resolution-required
属性设置为 true 会导致路由器无法解析特定通道而抛出异常。同样,在处理过滤器时,您可以设置 throw-exception-on-rejection
属性。在这两种情况下,所生成流的行为就像它包含具有“requires-reply”属性的服务激活器。换句话说,它有助于确保网关方法调用的及时响应。
You should understand that, by default, reply-timeout
is unbounded.
Consequently, if you set the reply-timeout
to negative value, your gateway method invocation might hang indefinitely.
So, to make sure you analyze your flow and if there is even a remote possibility of one of these scenarios to occur, you should set the reply-timeout
attribute to a "'safe'" value.
It is 30
seconds by default.
Even better, you can set the requires-reply
attribute of the downstream component to 'true' to ensure a timely response, as produced by the throwing of an exception as soon as that downstream component returns null internally.
However, you should also realize that there are some scenarios (see the first one) where reply-timeout
does not help.
That means it is also important to analyze your message flow and decide when to use a synchronous gateway rather than an asynchronous gateway.
As described earlier, the latter case is a matter of defining gateway methods that return Future
instances.
Then you are guaranteed to receive that return value, and you have more granular control over the results of the invocation.
Also, when dealing with a router, you should remember that setting the resolution-required
attribute to 'true' results in an exception thrown by the router if it can not resolve a particular channel.
Likewise, when dealing with a Filter, you can set the throw-exception-on-rejection
attribute.
In both of these cases, the resulting flow behaves like it contain a service activator with the 'requires-reply' attribute.
In other words, it helps to ensure a timely response from the gateway method invocation.
您应该理解,当线程返回到网关时定时器将启动——即当流完成或一条消息移交给另一线程时启动。在该时刻,调用线程开始等待答复。如果流完全是同步的,则可立即收到答复。对于异步流,该线程最长等待此时间。
You should understand that the timer starts when the thread returns to the gateway — that is, when the flow completes or a message is handed off to another thread. At that time, the calling thread starts waiting for the reply. If the flow was completely synchronous, the reply is immediately available. For asynchronous flows, the thread waits for up to this time.
从版本 6.2 开始,errorOnTimeout
的 MethodInvocationGateway
的内部 MessagingGatewaySupport
扩展的属性公开在 @MessagingGateway
和 GatewayEndpointSpec
中。此选项具有与 Endpoint Summary 章节末尾解释的任何入站网关完全相同的含义。换句话说,将此选项设置为 true
,将导致在接收超时耗尽时从发送和接收网关操作中抛出 MessageTimeoutException
,而不是返回 null
。
Starting with version 6.2, the errorOnTimeout
property of the internal MethodInvocationGateway
extension of the MessagingGatewaySupport
is exposed on the @MessagingGateway
and GatewayEndpointSpec
.
This option has exactly the same meaning as for any inbound gateway explained in the end of Endpoint Summary chapter.
In other words, setting this option to true
, would lead to the MessageTimeoutException
being thrown from a send-and-receive gateway operation instead of returning null
when the receive timeout is exhausted.
有关通过 IntegrationFlow
定义网关的选项,请参阅 Java DSL 章节中的 IntegrationFlow
as Gateway。
See IntegrationFlow
as Gateway in the Java DSL chapter for options to define gateways through IntegrationFlow
.