Message Publishing
(面向方面的编程)AOP 消息发布功能允许您在方法调用过程中作为副产品构建和发送消息。例如,假设您有一个组件,每当这个组件的状态发生更改时,您都希望通过一条消息收到通知。发送此类通知最简单的方法是将消息发送到一个专用通道,但是您将如何将更改对象状态的方法调用连接到消息发送过程,以及通知消息的结构如何?AOP 消息发布功能通过配置驱动的办法处理这些责任。
Message Publishing Configuration
Spring Integration 提供两种方法:XML 配置和注释驱动(Java)配置。
Annotation-driven Configuration with the @Publisher
Annotation
注释驱动的方法可让您通过 @Publisher
注释注释任何方法以指定“通道”属性。从版本 5.1 开始,要启用此功能,您必须在某些 @Configuration
类上使用 @EnablePublisher
注释。请参阅 Configuration and @EnableIntegration
了解更多信息。该消息是通过方法调用的返回值构建的,然后发送到由“通道”属性指定的通道。要进一步管理消息结构,您还可以结合使用 @Payload
和 @Header
注释。
在内部,Spring Integration 的此消息发布功能同时使用 Spring AOP 定义 PublisherAnnotationAdvisor
和 Spring 表达式语言 (SpEL),为您提供的灵活性以及对它发布的 Message
结构的控制相当大。
PublisherAnnotationAdvisor
定义并绑定以下变量:
-
#return
:绑定到一个返回值,让你可以引用它或它的属性(例如,#return.something
,其中“something”是绑定到#return
的对象的属性) -
#exception
:如果方法调用抛出一个异常,则绑定到该异常 -
#args
:绑定到方法参数,以便你可以按名称提取各个参数(例如,#args.fname
)
请考虑以下示例:
@Publisher
public String defaultPayload(String fname, String lname) {
return fname + " " + lname;
}
在上一个示例中,使用以下结构构建消息:
-
消息有效负载是方法的返回类型和值。这是默认值。
-
将发送一条新构造的消息到一个配置了注释后处理程序(在本节后面部分有说明)的默认发布者通道。
以下示例与上一个示例相同,不同之处在于它没有使用默认的发布通道:
@Publisher(channel="testChannel")
public String defaultPayload(String fname, @Header("last") String lname) {
return fname + " " + lname;
}
我们不是使用默认发布通道,而是通过设置 @Publisher
注释的 channel
属性指定发布通道。我们还添加了一个 @Header
注释,这将导致名为“last”的消息头具有与 lname
方法参数相同的。该头添加到新构建的消息中。
以下示例与上一个示例差不多:
@Publisher(channel="testChannel")
@Payload
public String defaultPayloadButExplicitAnnotation(String fname, @Header String lname) {
return fname + " " + lname;
}
唯一的区别是我们在方法中使用 @Payload
注释来明确指定方法的返回值应该用作消息的有效内容。
以下示例通过在 @Payload
注释中使用 Spring 表达式语言来扩展以前的配置,从而进一步指示框架如何构建消息:
@Publisher(channel="testChannel")
@Payload("#return + #args.lname")
public String setName(String fname, String lname, @Header("x") int num) {
return fname + " " + lname;
}
在上一个示例中,消息是方法调用返回值和 lname
输入参数的连接。名为 x
的消息头通过 num
输入参数确定其值。该头添加到新构建的消息中。
@Publisher(channel="testChannel")
public String argumentAsPayload(@Payload String fname, @Header String lname) {
return fname + " " + lname;
}
在上一个示例中,你看到 @Payload
注释的另一种用法。在此,我们注释一个方法参数,该方法参数成为新构建消息的有效载荷。
与 Spring 中大多数其他注释驱动功能一样,你需要注册一个后处理器(PublisherAnnotationBeanPostProcessor
)。以下示例演示如何执行此操作:
<bean class="org.springframework.integration.aop.PublisherAnnotationBeanPostProcessor"/>
为了进行更简洁的配置,你可以改用命名空间支持,如下面的示例所示:
<int:annotation-config>
<int:enable-publisher default-publisher-channel="defaultChannel"/>
</int:annotation-config>
对于 Java 配置,你必须使用 @EnablePublisher
注释,如下面的示例所示:
@Configuration
@EnableIntegration
@EnablePublisher("defaultChannel")
public class IntegrationConfiguration {
...
}
从 5.1.3 版本开始,<int:enable-publisher>
组件以及 @EnablePublisher
注释具有用于调整 ProxyFactory
配置的 proxy-target-class
和 order
属性。
与其他 Spring 注释(@Component
、@Scheduled
等等)类似,你还可以将 @Publisher
用作元注释。这意味着你可以定义自己的注释,这些注释以与 @Publisher
本身相同的方式处理。以下示例演示如何执行此操作:
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Publisher(channel="auditChannel")
public @interface Audit {
...
}
在上一个示例中,我们定义了 @Audit
注释,它本身用 @Publisher
进行了注释。另外请注意,你可以在元注释上定义一个 channel
属性,以封装消息在此注释内发送的位置。现在,你可以使用 @Audit
注释对任何方法进行注释,如下面的示例所示:
@Audit
public String test() {
return "Hello";
}
在上一个示例中,每次调用 test()
方法都会导致使用其返回值创建有效负载的消息。每条消息都会发送到名为 auditChannel
的通道。此技术的一个好处是,你可以避免在多个注释中重复使用相同的通道名称。你还可以提供自己的潜在特定于领域的注释和框架提供的注释之间的一级间接。
你还可以注释类,这使你可以在该类的每个 public 方法上应用此注释的属性,如下面的示例所示:
@Audit
static class BankingOperationsImpl implements BankingOperations {
public String debit(String amount) {
. . .
}
public String credit(String amount) {
. . .
}
}
XML-based Approach with the <publishing-interceptor>
element
基于 XML 的方法使你可以将相同基于 AOP 的消息发布功能配置为 MessagePublishingInterceptor
的基于命名空间的配置。它当然比注释驱动的做法有一些优势,因为它使你可以使用 AOP 切入点表达式,从而可能同时拦截多个方法或拦截和发布你没有源代码的方法。
要使用 XML 配置消息发布,只需要执行以下两件事:
-
使用
<publishing-interceptor>
XML 元素提供MessagePublishingInterceptor
的配置。 -
提供 AOP 配置,以便将
MessagePublishingInterceptor
应用于受管理对象。
以下示例展示如何配置 publishing-interceptor
元素:
<aop:config>
<aop:advisor advice-ref="interceptor" pointcut="bean(testBean)" />
</aop:config>
<publishing-interceptor id="interceptor" default-channel="defaultChannel">
<method pattern="echo" payload="'Echoing: ' + #return" channel="echoChannel">
<header name="things" value="something"/>
</method>
<method pattern="repl*" payload="'Echoing: ' + #return" channel="echoChannel">
<header name="things" expression="'something'.toUpperCase()"/>
</method>
<method pattern="echoDef*" payload="#return"/>
</publishing-interceptor>
<publishing-interceptor>
配置看上去与基于注释的方法相当类似,并且它还利用了 Spring 表达式语言的强大功能。
在前面的示例中,testBean
的 echo
方法执行会呈现一个具有以下结构的 Message
:
-
Message
有效负载类型为String
,内容如下:Echoing: [value]
,其中value
是执行的方法返回的值。 -
Message
有一个名称为things
和一个值为something
的头。 -
Message
被发送到echoChannel
。
第二个方法与第一个方法非常相似。此处,从 'repl' 开始的每个方法都会呈现一个具有以下结构的 Message
:
-
Message
有效负载与前面示例中相同。 -
Message
有一个名为things
的头,其值为 SpEL 表达式 ’something'.toUpperCase()' 的结果。 -
Message
被发送到echoChannel
。
第二个方法映射以 echoDef
开始的任何方法的执行,会生成一个具有以下结构的 Message
:
-
Message
有效负载是由执行的方法返回的值。 -
由于没有提供
channel
属性,所以Message
被发送到由publisher
定义的defaultChannel
。
对于简单的映射规则,您可以依赖于 publisher
默认值,如下面的示例所示:
<publishing-interceptor id="anotherInterceptor"/>
前面的示例将表达式匹配的每个方法的返回值映射到有效负载,并将其发送到 default-channel
。如果您未指定 defaultChannel
(如前面的示例未指定),那么消息将会发送到全局 nullChannel
(相当于 /dev/null
)。
Asynchronous Publishing
发布发生在与组件执行相同的线程中。因此,默认情况下,它是同步的。这意味着整个消息流程必须等到发布程序的流程完成。但是,开发人员通常需要完全相反:使用此消息发布功能来发起异步流程。例如,您可以托管接收远程请求的服务(HTTP、WS 等)。您可能希望在此流程中内部发送此请求,但可能需要一段时间。但是,您可能还希望立即回复用户。因此,您可以使用“output-channel”或“replyChannel”标头将简单的确认式回复发送回调用方,而不是将入站请求发送到输出通道(传统方式)进行处理,而使用消息发布功能来发起复杂流程。
以下示例中的服务接收复杂的有效负载(需要进一步发送以进行处理),但它还需要通过简单的确认向调用方回复:
public String echo(Object complexPayload) {
return "ACK";
}
因此,我们使用消息发布功能替换将复杂流程连接到输出通道。我们对其进行配置以使用服务方法的输入参数(在前面的示例中展示)创建一个新消息,并将其发送到 localProcessChannel
。为了确保此流程是异步的,我们需要做的就是将其发送到任何类型的异步通道(在下一个示例中为 ExecutorChannel
)。以下示例展示如何使用异步 publishing-interceptor
:
<int:service-activator input-channel="inputChannel" output-channel="outputChannel" ref="sampleservice"/>
<bean id="sampleService" class="test.SampleService"/>
<aop:config>
<aop:advisor advice-ref="interceptor" pointcut="bean(sampleService)" />
</aop:config>
<int:publishing-interceptor id="interceptor" >
<int:method pattern="echo" payload="#args[0]" channel="localProcessChannel">
<int:header name="sample_header" expression="'some sample value'"/>
</int:method>
</int:publishing-interceptor>
<int:channel id="localProcessChannel">
<int:dispatcher task-executor="executor"/>
</int:channel>
<task:executor id="executor" pool-size="5"/>
另一种处理这种情况的方法是使用窃听。请参阅 Wire Tap。
Producing and Publishing Messages Based on a Scheduled Trigger
在前面的部分中,我们了解了消息发布功能,它构建并发布消息作为方法调用的副产品。但是,在这些情况下,您仍然负责调用方法。Spring Integration 2.0 添加了对调度消息生产者和发布者的支持,并在“inbound-channel-adapter”元素上添加了新的 expression
属性。您可以基于多个触发器安排,其中任何一个都可以配置在“poller”元素上。目前,我们支持 cron
、fixed-rate
、fixed-delay
以及由您实现并由 trigger
属性值引用的任何自定义触发器。
如前所述,对调度生产者和发布者的支持是通过 <inbound-channel-adapter>
XML 元素提供的。请考虑以下示例:
<int:inbound-channel-adapter id="fixedDelayProducer"
expression="'fixedDelayTest'"
channel="fixedDelayChannel">
<int:poller fixed-delay="1000"/>
</int:inbound-channel-adapter>
前面的示例创建了一个入站通道适配器,它构造了一个 Message
,其有效负载是 expression
属性中定义的表达式的结果。每当 fixed-delay
属性指定的时间间隔发生时,都会创建和发送此类消息。
以下示例类似于前面的示例,只是它使用了 fixed-rate
属性:
<int:inbound-channel-adapter id="fixedRateProducer"
expression="'fixedRateTest'"
channel="fixedRateChannel">
<int:poller fixed-rate="1000"/>
</int:inbound-channel-adapter>
fixed-rate
属性允许您以固定速率发送消息(从每个任务的开始时间测量)。
以下示例展示如何对 cron
属性中指定的值应用 Cron 触发器:
<int:inbound-channel-adapter id="cronProducer"
expression="'cronTest'"
channel="cronChannel">
<int:poller cron="7 6 5 4 3 ?"/>
</int:inbound-channel-adapter>
以下示例展示如何将附加标头插入消息中:
<int:inbound-channel-adapter id="headerExpressionsProducer"
expression="'headerExpressionsTest'"
channel="headerExpressionsChannel"
auto-startup="false">
<int:poller fixed-delay="5000"/>
<int:header name="foo" expression="6 * 7"/>
<int:header name="bar" value="x"/>
</int:inbound-channel-adapter>
附加的消息标头可以采用标量值或求值 Spring 表达式的结果。
如果您需要实现您自己的自定义触发器,您可以使用 trigger
属性来提供对实现 org.springframework.scheduling.Trigger
接口的任何 Spring 配置 bean 的引用。下面的示例展示了如何这么做:
<int:inbound-channel-adapter id="triggerRefProducer"
expression="'triggerRefTest'" channel="triggerRefChannel">
<int:poller trigger="customTrigger"/>
</int:inbound-channel-adapter>
<beans:bean id="customTrigger" class="o.s.scheduling.support.PeriodicTrigger">
<beans:constructor-arg value="9999"/>
</beans:bean>