Message Publishing
(面向方面的编程)AOP 消息发布功能允许您在方法调用过程中作为副产品构建和发送消息。例如,假设您有一个组件,每当这个组件的状态发生更改时,您都希望通过一条消息收到通知。发送此类通知最简单的方法是将消息发送到一个专用通道,但是您将如何将更改对象状态的方法调用连接到消息发送过程,以及通知消息的结构如何?AOP 消息发布功能通过配置驱动的办法处理这些责任。
The (Aspect-oriented Programming) AOP message publishing feature lets you construct and send a message as a by-product of a method invocation. For example, imagine you have a component and, every time the state of this component changes, you want to be notified by a message. The easiest way to send such notifications is to send a message to a dedicated channel, but how would you connect the method invocation that changes the state of the object to a message sending process, and how should the notification message be structured? The AOP message publishing feature handles these responsibilities with a configuration-driven approach.
Message Publishing Configuration
Spring Integration 提供两种方法:XML 配置和注释驱动(Java)配置。
Spring Integration provides two approaches: XML configuration and annotation-driven (Java) configuration.
Annotation-driven Configuration with the @Publisher
Annotation
注释驱动的方法可让您通过 @Publisher
注释注释任何方法以指定“通道”属性。从版本 5.1 开始,要启用此功能,您必须在某些 @Configuration
类上使用 @EnablePublisher
注释。请参阅 Configuration and @EnableIntegration
了解更多信息。该消息是通过方法调用的返回值构建的,然后发送到由“通道”属性指定的通道。要进一步管理消息结构,您还可以结合使用 @Payload
和 @Header
注释。
The annotation-driven approach lets you annotate any method with the @Publisher
annotation to specify a 'channel' attribute.
Starting with version 5.1, to switch this functionality on, you must use the @EnablePublisher
annotation on some @Configuration
class.
See Configuration and @EnableIntegration
for more information.
The message is constructed from the return value of the method invocation and sent to the channel specified by the 'channel' attribute.
To further manage message structure, you can also use a combination of both @Payload
and @Header
annotations.
在内部,Spring Integration 的此消息发布功能同时使用 Spring AOP 定义 PublisherAnnotationAdvisor
和 Spring 表达式语言 (SpEL),为您提供的灵活性以及对它发布的 Message
结构的控制相当大。
Internally, this message publishing feature of Spring Integration uses both Spring AOP by defining PublisherAnnotationAdvisor
and the Spring Expression Language (SpEL), giving you considerable flexibility and control over the structure of the Message
it publishes.
PublisherAnnotationAdvisor
定义并绑定以下变量:
The PublisherAnnotationAdvisor
defines and binds the following variables:
-
#return
: Binds to a return value, letting you reference it or its attributes (for example,#return.something
, where 'something' is an attribute of the object bound to#return
) -
#exception
: Binds to an exception if one is thrown by the method invocation -
#args
: Binds to method arguments so that you can extract individual arguments by name (for example,#args.fname
)
请考虑以下示例:
Consider the following example:
@Publisher
public String defaultPayload(String fname, String lname) {
return fname + " " + lname;
}
在上一个示例中,使用以下结构构建消息:
In the preceding example, the message is constructed with the following structure:
-
The message payload is the return type and value of the method. This is the default.
-
A newly constructed message is sent to a default publisher channel that is configured with an annotation post processor (covered later in this section).
以下示例与上一个示例相同,不同之处在于它没有使用默认的发布通道:
The following example is the same as the preceding example, except that it does not use a default publishing channel:
@Publisher(channel="testChannel")
public String defaultPayload(String fname, @Header("last") String lname) {
return fname + " " + lname;
}
我们不是使用默认发布通道,而是通过设置 @Publisher
注释的 channel
属性指定发布通道。我们还添加了一个 @Header
注释,这将导致名为“last”的消息头具有与 lname
方法参数相同的。该头添加到新构建的消息中。
Instead of using a default publishing channel, we specify the publishing channel by setting the 'channel' attribute of the @Publisher
annotation.
We also add a @Header
annotation, which results in the message header named 'last' having the same value as the 'lname' method parameter.
That header is added to the newly constructed message.
以下示例与上一个示例差不多:
The following example is almost identical to the preceding example:
@Publisher(channel="testChannel")
@Payload
public String defaultPayloadButExplicitAnnotation(String fname, @Header String lname) {
return fname + " " + lname;
}
唯一的区别是我们在方法中使用 @Payload
注释来明确指定方法的返回值应该用作消息的有效内容。
The only difference is that we use a @Payload
annotation on the method to explicitly specify that the return value of the method should be used as the payload of the message.
以下示例通过在 @Payload
注释中使用 Spring 表达式语言来扩展以前的配置,从而进一步指示框架如何构建消息:
The following example expands on the previous configuration by using the Spring Expression Language in the @Payload
annotation to further instruct the framework about how the message should be constructed:
@Publisher(channel="testChannel")
@Payload("#return + #args.lname")
public String setName(String fname, String lname, @Header("x") int num) {
return fname + " " + lname;
}
在上一个示例中,消息是方法调用返回值和 lname
输入参数的连接。名为 x
的消息头通过 num
输入参数确定其值。该头添加到新构建的消息中。
In the preceding example, the message is a concatenation of the return value of the method invocation and the 'lname' input argument. The Message header named 'x' has its value determined by the 'num' input argument. That header is added to the newly constructed message.
@Publisher(channel="testChannel")
public String argumentAsPayload(@Payload String fname, @Header String lname) {
return fname + " " + lname;
}
在上一个示例中,你看到 @Payload
注释的另一种用法。在此,我们注释一个方法参数,该方法参数成为新构建消息的有效载荷。
In the preceding example, you see another usage of the @Payload
annotation.
Here, we annotate a method argument that becomes the payload of the newly constructed message.
与 Spring 中大多数其他注释驱动功能一样,你需要注册一个后处理器(PublisherAnnotationBeanPostProcessor
)。以下示例演示如何执行此操作:
As with most other annotation-driven features in Spring, you need to register a post-processor (PublisherAnnotationBeanPostProcessor
).
The following example shows how to do so:
<bean class="org.springframework.integration.aop.PublisherAnnotationBeanPostProcessor"/>
为了进行更简洁的配置,你可以改用命名空间支持,如下面的示例所示:
For a more concise configuration, you can instead use namespace support, as the following example shows:
<int:annotation-config>
<int:enable-publisher default-publisher-channel="defaultChannel"/>
</int:annotation-config>
对于 Java 配置,你必须使用 @EnablePublisher
注释,如下面的示例所示:
For Java configuration, you must use the @EnablePublisher
annotation, as the following example shows:
@Configuration
@EnableIntegration
@EnablePublisher("defaultChannel")
public class IntegrationConfiguration {
...
}
从 5.1.3 版本开始,<int:enable-publisher>
组件以及 @EnablePublisher
注释具有用于调整 ProxyFactory
配置的 proxy-target-class
和 order
属性。
Starting with version 5.1.3, the <int:enable-publisher>
component, as well as the @EnablePublisher
annotation have the proxy-target-class
and order
attributes for tuning the ProxyFactory
configuration.
与其他 Spring 注释(@Component
、@Scheduled
等等)类似,你还可以将 @Publisher
用作元注释。这意味着你可以定义自己的注释,这些注释以与 @Publisher
本身相同的方式处理。以下示例演示如何执行此操作:
Similar to other Spring annotations (@Component
, @Scheduled
, and so on), you can also use @Publisher
as a meta-annotation.
This means that you can define your own annotations that are treated in the same way as the @Publisher
itself.
The following example shows how to do so:
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Publisher(channel="auditChannel")
public @interface Audit {
...
}
在上一个示例中,我们定义了 @Audit
注释,它本身用 @Publisher
进行了注释。另外请注意,你可以在元注释上定义一个 channel
属性,以封装消息在此注释内发送的位置。现在,你可以使用 @Audit
注释对任何方法进行注释,如下面的示例所示:
In the preceding example, we define the @Audit
annotation, which is itself annotated with @Publisher
.
Also note that you can define a channel
attribute on the meta-annotation to encapsulate where messages are sent inside of this annotation.
Now you can annotate any method with the @Audit
annotation, as the following example shows:
@Audit
public String test() {
return "Hello";
}
在上一个示例中,每次调用 test()
方法都会导致使用其返回值创建有效负载的消息。每条消息都会发送到名为 auditChannel
的通道。此技术的一个好处是,你可以避免在多个注释中重复使用相同的通道名称。你还可以提供自己的潜在特定于领域的注释和框架提供的注释之间的一级间接。
In the preceding example, every invocation of the test()
method results in a message with a payload created from its return value.
Each message is sent to the channel named auditChannel
.
One of the benefits of this technique is that you can avoid the duplication of the same channel name across multiple annotations.
You also can provide a level of indirection between your own, potentially domain-specific, annotations and those provided by the framework.
你还可以注释类,这使你可以在该类的每个 public 方法上应用此注释的属性,如下面的示例所示:
You can also annotate the class, which lets you apply the properties of this annotation on every public method of that class, as the following example shows:
@Audit
static class BankingOperationsImpl implements BankingOperations {
public String debit(String amount) {
. . .
}
public String credit(String amount) {
. . .
}
}
XML-based Approach with the <publishing-interceptor>
element
基于 XML 的方法使你可以将相同基于 AOP 的消息发布功能配置为 MessagePublishingInterceptor
的基于命名空间的配置。它当然比注释驱动的做法有一些优势,因为它使你可以使用 AOP 切入点表达式,从而可能同时拦截多个方法或拦截和发布你没有源代码的方法。
The XML-based approach lets you configure the same AOP-based message publishing functionality as a namespace-based configuration of a MessagePublishingInterceptor
.
It certainly has some benefits over the annotation-driven approach, since it lets you use AOP pointcut expressions, thus possibly intercepting multiple methods at once or intercepting and publishing methods to which you do not have the source code.
要使用 XML 配置消息发布,只需要执行以下两件事:
To configure message publishing with XML, you need only do the following two things:
-
Provide configuration for
MessagePublishingInterceptor
by using the<publishing-interceptor>
XML element. -
Provide AOP configuration to apply the
MessagePublishingInterceptor
to managed objects.
以下示例展示如何配置 publishing-interceptor
元素:
The following example shows how to configure a publishing-interceptor
element:
<aop:config>
<aop:advisor advice-ref="interceptor" pointcut="bean(testBean)" />
</aop:config>
<publishing-interceptor id="interceptor" default-channel="defaultChannel">
<method pattern="echo" payload="'Echoing: ' + #return" channel="echoChannel">
<header name="things" value="something"/>
</method>
<method pattern="repl*" payload="'Echoing: ' + #return" channel="echoChannel">
<header name="things" expression="'something'.toUpperCase()"/>
</method>
<method pattern="echoDef*" payload="#return"/>
</publishing-interceptor>
<publishing-interceptor>
配置看上去与基于注释的方法相当类似,并且它还利用了 Spring 表达式语言的强大功能。
The <publishing-interceptor>
configuration looks rather similar to the annotation-based approach, and it also uses the power of the Spring Expression Language.
在前面的示例中,testBean
的 echo
方法执行会呈现一个具有以下结构的 Message
:
In the preceding example, the execution of the echo
method of a testBean
renders a Message
with the following structure:
-
The
Message
payload is of typeString
with the following content:Echoing: [value]
, wherevalue
is the value returned by an executed method. -
The
Message
has a header with a name ofthings
and a value ofsomething
. -
The
Message
is sent toechoChannel
.
第二个方法与第一个方法非常相似。此处,从 'repl' 开始的每个方法都会呈现一个具有以下结构的 Message
:
The second method is very similar to the first.
Here, every method that begins with 'repl' renders a Message
with the following structure:
-
The
Message
payload is the same as in the preceding sample. -
The
Message
has a header namedthings
whose value is the result of the SpEL expression ’something'.toUpperCase()`. -
The
Message
is sent toechoChannel
.
第二个方法映射以 echoDef
开始的任何方法的执行,会生成一个具有以下结构的 Message
:
The second method, mapping the execution of any method that begins with echoDef
, produces a Message
with the following structure:
-
The
Message
payload is the value returned by an executed method. -
Since the
channel
attribute is not provided, theMessage
is sent to thedefaultChannel
defined by thepublisher
.
对于简单的映射规则,您可以依赖于 publisher
默认值,如下面的示例所示:
For simple mapping rules you can rely on the publisher
defaults, as the following example shows:
<publishing-interceptor id="anotherInterceptor"/>
前面的示例将表达式匹配的每个方法的返回值映射到有效负载,并将其发送到 default-channel
。如果您未指定 defaultChannel
(如前面的示例未指定),那么消息将会发送到全局 nullChannel
(相当于 /dev/null
)。
The preceding example maps the return value of every method that matches the pointcut expression to a payload and is sent to a default-channel
.
If you do not specify the defaultChannel
(as the preceding example does not do), the messages are sent to the global nullChannel
(the equivalent of /dev/null
).
Asynchronous Publishing
发布发生在与组件执行相同的线程中。因此,默认情况下,它是同步的。这意味着整个消息流程必须等到发布程序的流程完成。但是,开发人员通常需要完全相反:使用此消息发布功能来发起异步流程。例如,您可以托管接收远程请求的服务(HTTP、WS 等)。您可能希望在此流程中内部发送此请求,但可能需要一段时间。但是,您可能还希望立即回复用户。因此,您可以使用“output-channel”或“replyChannel”标头将简单的确认式回复发送回调用方,而不是将入站请求发送到输出通道(传统方式)进行处理,而使用消息发布功能来发起复杂流程。
Publishing occurs in the same thread as your component’s execution. So, by default, it is synchronous. This means that the entire message flow has to wait until the publisher’s flow completes. However, developers often want the complete opposite: to use this message-publishing feature to initiate asynchronous flows. For example, you might host a service (HTTP, WS, and so on) which receives a remote request. You may want to send this request internally into a process that might take a while. However, you may also want to reply to the user right away. So, instead of sending inbound requests for processing to the output channel (the conventional way), you can use 'output-channel' or a 'replyChannel' header to send a simple acknowledgment-like reply back to the caller while using the message-publisher feature to initiate a complex flow.
以下示例中的服务接收复杂的有效负载(需要进一步发送以进行处理),但它还需要通过简单的确认向调用方回复:
The service in the following example receives a complex payload (which needs to be sent further for processing), but it also needs to reply to the caller with a simple acknowledgment:
public String echo(Object complexPayload) {
return "ACK";
}
因此,我们使用消息发布功能替换将复杂流程连接到输出通道。我们对其进行配置以使用服务方法的输入参数(在前面的示例中展示)创建一个新消息,并将其发送到 localProcessChannel
。为了确保此流程是异步的,我们需要做的就是将其发送到任何类型的异步通道(在下一个示例中为 ExecutorChannel
)。以下示例展示如何使用异步 publishing-interceptor
:
So, instead of hooking up the complex flow to the output channel, we use the message-publishing feature instead.
We configure it to create a new message, by using the input argument of the service method (shown in the preceding example), and send that to the 'localProcessChannel'.
To make sure this flow is asynchronous, all we need to do is send it to any type of asynchronous channel (ExecutorChannel
in the next example).
The following example shows how to an asynchronous publishing-interceptor
:
<int:service-activator input-channel="inputChannel" output-channel="outputChannel" ref="sampleservice"/>
<bean id="sampleService" class="test.SampleService"/>
<aop:config>
<aop:advisor advice-ref="interceptor" pointcut="bean(sampleService)" />
</aop:config>
<int:publishing-interceptor id="interceptor" >
<int:method pattern="echo" payload="#args[0]" channel="localProcessChannel">
<int:header name="sample_header" expression="'some sample value'"/>
</int:method>
</int:publishing-interceptor>
<int:channel id="localProcessChannel">
<int:dispatcher task-executor="executor"/>
</int:channel>
<task:executor id="executor" pool-size="5"/>
另一种处理这种情况的方法是使用窃听。请参阅 Wire Tap。
Another way of handling this type of scenario is with a wire-tap. See Wire Tap.
Producing and Publishing Messages Based on a Scheduled Trigger
在前面的部分中,我们了解了消息发布功能,它构建并发布消息作为方法调用的副产品。但是,在这些情况下,您仍然负责调用方法。Spring Integration 2.0 添加了对调度消息生产者和发布者的支持,并在“inbound-channel-adapter”元素上添加了新的 expression
属性。您可以基于多个触发器安排,其中任何一个都可以配置在“poller”元素上。目前,我们支持 cron
、fixed-rate
、fixed-delay
以及由您实现并由 trigger
属性值引用的任何自定义触发器。
In the preceding sections, we looked at the message-publishing feature, which constructs and publishes messages as by-products of method invocations.
However, in those cases, you are still responsible for invoking the method.
Spring Integration 2.0 added support for scheduled message producers and publishers with the new expression
attribute on the 'inbound-channel-adapter' element.
You can schedule based on several triggers, any one of which can be configured on the 'poller' element.
Currently, we support cron
, fixed-rate
, fixed-delay
and any custom trigger implemented by you and referenced by the 'trigger' attribute value.
如前所述,对调度生产者和发布者的支持是通过 <inbound-channel-adapter>
XML 元素提供的。请考虑以下示例:
As mentioned earlier, support for scheduled producers and publishers is provided via the <inbound-channel-adapter>
XML element.
Consider the following example:
<int:inbound-channel-adapter id="fixedDelayProducer"
expression="'fixedDelayTest'"
channel="fixedDelayChannel">
<int:poller fixed-delay="1000"/>
</int:inbound-channel-adapter>
前面的示例创建了一个入站通道适配器,它构造了一个 Message
,其有效负载是 expression
属性中定义的表达式的结果。每当 fixed-delay
属性指定的时间间隔发生时,都会创建和发送此类消息。
The preceding example creates an inbound channel adapter that constructs a Message
, with its payload being the result of the expression defined in the expression
attribute.
Such messages are created and sent every time the delay specified by the fixed-delay
attribute occurs.
以下示例类似于前面的示例,只是它使用了 fixed-rate
属性:
The following example is similar to the preceding example, except that it uses the fixed-rate
attribute:
<int:inbound-channel-adapter id="fixedRateProducer"
expression="'fixedRateTest'"
channel="fixedRateChannel">
<int:poller fixed-rate="1000"/>
</int:inbound-channel-adapter>
fixed-rate
属性允许您以固定速率发送消息(从每个任务的开始时间测量)。
The fixed-rate
attribute lets you send messages at a fixed rate (measuring from the start time of each task).
以下示例展示如何对 cron
属性中指定的值应用 Cron 触发器:
The following example shows how you can apply a Cron trigger with a value specified in the cron
attribute:
<int:inbound-channel-adapter id="cronProducer"
expression="'cronTest'"
channel="cronChannel">
<int:poller cron="7 6 5 4 3 ?"/>
</int:inbound-channel-adapter>
以下示例展示如何将附加标头插入消息中:
The following example shows how to insert additional headers into the message:
<int:inbound-channel-adapter id="headerExpressionsProducer"
expression="'headerExpressionsTest'"
channel="headerExpressionsChannel"
auto-startup="false">
<int:poller fixed-delay="5000"/>
<int:header name="foo" expression="6 * 7"/>
<int:header name="bar" value="x"/>
</int:inbound-channel-adapter>
附加的消息标头可以采用标量值或求值 Spring 表达式的结果。
The additional message headers can take scalar values or the results of evaluating Spring expressions.
如果您需要实现您自己的自定义触发器,您可以使用 trigger
属性来提供对实现 org.springframework.scheduling.Trigger
接口的任何 Spring 配置 bean 的引用。下面的示例展示了如何这么做:
If you need to implement your own custom trigger, you can use the trigger
attribute to provide a reference to any spring configured bean that implements the org.springframework.scheduling.Trigger
interface.
The following example shows how to do so:
<int:inbound-channel-adapter id="triggerRefProducer"
expression="'triggerRefTest'" channel="triggerRefChannel">
<int:poller trigger="customTrigger"/>
</int:inbound-channel-adapter>
<beans:bean id="customTrigger" class="o.s.scheduling.support.PeriodicTrigger">
<beans:constructor-arg value="9999"/>
</beans:bean>