Message Endpoints
SubscribableChannel Event-driven Consumer Polling Consumer advice-chain TaskExecutor Support Payload Type Conversion Content Type Conversion Asynchronous Polling Endpoint Inner Beans :description: 本文介绍了 Spring Integration 框架中消息端点的基本原理。
核心概念
-
端点负责将各种消息组件连接到通道。
-
消费者端点用于接收消息,而激活器端点用于发送消息。
-
消费者可以是事件驱动的(基于订阅)或轮询驱动的。
轮询消费者
-
由 PollingConsumer 实现,具有可配置的轮询间隔和触发器。
-
可配置其他选项,如每轮询周期接收的最大消息数和接收超时。
-
可以使用 TaskExecutor 来并行执行轮询线程。
事件驱动的消费者
-
由 EventDrivenConsumer 实现,用于从可订阅通道接收消息。
-
更加简单,因为不需要管理轮询线程。
端点配置
-
使用基于名称空间的 Spring Integration DSL XML 或 Java 配置来配置端点。
-
使用 input-channel 属性来指定输入通道,output-channel 属性(对于某些激活器)来指定输出通道。
-
轮询消费者可以使用 poller 子元素来配置轮询行为。
-
可使用 advice-chain 和 transaction 子元素来配置高级功能,如 AOP 建议和事务支持。
其他主题
-
端点可以生成答复消息。
-
集成转换服务允许配置类型转换器。
-
可以配置任务执行器来启用轮询端的并行执行。
-
Spring Integration 使用 CompositeMessageConverter 根据消息头中指定的 contentType 执行内容类型转换。
本篇的第一部分介绍了一些背景理论,并揭示了驱动 Spring Integration 的各种消息组件的底层 API。如果您真的想了解幕后发生的事情,则这些信息会很有帮助。但是,如果您想继续使用各种元素的基于命名空间的简化配置,则可以跳到 Endpoint Namespace Support 以获取有关当前内容的信息。
如概述中所述,消息终结点负责将各种消息组件连接到通道。在接下来的几章中,我们将介绍许多不同的组件,它们使用消息。其中一些还可以发送答复消息。发送消息非常简单。如 Message Channels中前面所示,您可以将消息发送到消息通道。但是,接收则有些复杂。主要原因是消费者有两种类型: polling consumers和 event-driven consumers。
在这两者中,事件驱动消费者要简单得多。无需管理和安排单独的轮询线程,它们本质上是带有回调方法的侦听器。连接到 Spring Integration 的可订阅消息通道时,此简单选项运行良好。但是,连接到缓冲轮询消息通道时,某些组件必须调度和管理轮询线程。Spring Integration 提供了两种不同的端点实现来容纳这两种类型的消费者。因此,消费者本身只需要实现回调接口。需要轮询时,端点充当消费者实例的容器。其好处类似于使用容器来托管消息驱动 Bean,但由于这些消费者是在 ApplicationContext
中运行的由 Spring 管理的对象,因此它更类似于 Spring 自身的 MessageListener
容器。
Message Handler
Spring Integration 的 MessageHandler
接口由框架中的许多组件实现。换句话说,这不是公共 API 的一部分,您通常不会直接实现 MessageHandler
。尽管如此,它仍会被消息消费者用于实际处理已消费的消息,因此了解此策略接口有助于理解消费者的整体角色。该接口定义如下:
public interface MessageHandler {
void handleMessage(Message<?> message);
}
尽管该接口很简单,但它为后续章节中介绍的大多数组件(路由器、转换器、拆分器、聚合器、服务激活器等)提供了基础。这些组件使用它们处理的消息执行非常不同的功能,但是实际接收消息的要求是相同的,在轮询和事件驱动行为之间的选择也是相同的。Spring Integration 提供了两种端点实现,它们托管这些基于回调的处理程序,并允许它们连接到消息通道。
Event-driven Consumer
由于它是两者中较简单的一个,我们首先介绍基于事件的消费者端点。你可能还记得,SubscribableChannel
接口提供了一个 subscribe()
方法,该方法接受一个 MessageHandler
参数(如 xref:channel/interfaces.adoc#channel-interfaces-subscribablechannel[SubscribableChannel
所示)。以下列表显示了 subscribe
方法的定义:
subscribableChannel.subscribe(messageHandler);
由于订阅通道的处理程序不必主动轮询该通道,因此这是一个事件驱动的消费者,Spring Integration 提供的实现接受一个 SubscribableChannel
和一个 MessageHandler
,如下例所示:
SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);
EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);
Polling Consumer
Spring Integration 还提供了一个 PollingConsumer
,除了通道必须实现 PollableChannel
外,它的实例化方式与 SubscribableChannel
相同,如下例所示:
PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);
PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
有关轮询使用者的更多信息,请参阅 Channel Adapter 和 Channel Adapter。 |
轮询消费者还有许多其他配置选项。以下示例展示了如何设置触发器:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setTrigger(new PeriodicTrigger(Duration.ofSeconds(30)));
PeriodicTrigger
通常使用简单的间隔(Duration
)定义,但也支持 initialDelay
属性和布尔值 fixedRate
属性(默认值为 false
,即没有固定延迟)。以下示例设置了这两个属性:
PeriodicTrigger trigger = new PeriodicTrigger(Duration.ofSeconds(1));
trigger.setInitialDelay(Duration.ofSeconds(5));
trigger.setFixedRate(true);
前一个示例中的三个设置的结果是一个触发器,该触发器等待五秒,然后每秒触发一次。
CronTrigger`需要一个有效的cron表达式。有关详细信息,请参阅 Javadoc。以下示例设置新的`CronTrigger
:
CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");
在前面的示例中定义的触发器的结果是一个触发器,该触发器在周一到周五每十秒触发一次。
轮询端点的默认触发器是一个具有 1 秒固定延迟期的 |
除了触发器之外,您还可以指定另外两个与轮询相关的配置属性:maxMessagesPerPoll
和 receiveTimeout
。以下示例展示了如何设置这两个属性:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);
maxMessagesPerPoll
属性指定在给定的轮询操作中接收的最大消息数。这意味着轮询器会继续调用 receive()
,而不等待,直到返回 null
或达到最大值。例如,如果轮询器的间隔触发器为十秒,maxMessagesPerPoll
设置为 25
,并且它轮询其队列中有 100 条消息的通道,则所有 100 条消息都可以在 40 秒内检索到。它抓取 25 条消息,等待十秒钟,再抓取下一条 25 条消息,依此类推。如果 maxMessagesPerPoll
配置为负值,则在单个轮询周期内调用 MessageSource.receive()
,直到它返回 null
。从 5.5 版本开始,0
值具有特殊含义 - 完全跳过 MessageSource.receive()
调用,这可能被视为暂停此轮询端点,直到 maxMessagesPerPoll
稍后被更改为非零值,例如通过 Control Bus。
receiveTimeout
属性指定如果在调用接收操作时没有消息可用,轮询器应等待的时间。例如,考虑两个表面上看起来相似但实际上有很大不同的选项:第一个具有 5 秒的间隔触发器和 50 毫秒的接收超时,而第二个具有 50 毫秒的间隔触发器和 5 秒的接收超时。第一个选项可能会比接受时间晚最多 4950 毫秒收到一条消息(如果在其中一个轮询调用返回后立即收到该消息)。另一方面,第二个配置在 50 毫秒内不会错过任何消息。不同之处在于第二个选项需要一个线程进行等待。但是,结果是它可以对到达的消息做出更快速的响应。这种称为“长轮询
”的技术可用于模拟轮询源上的事件驱动行为。
可以像下面示例中所示,将轮询使用者委托给 Spring TaskExecutor
:
PollingConsumer consumer = new PollingConsumer(channel, handler);
TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);
此外,PollingConsumer
有一个名为 adviceChain
的属性。此属性使您能够为 AOP 建议指定 List
,以处理包括事务在内的其他横切关注点。这些建议应用于 doPoll()
方法。有关更多详细信息,请参见 Endpoint Namespace Support 下 AOP 建议链和事务支持的部分。另请参阅 @Poller
注释 Javadoc 和相应的 Messaging Annotations Support 部分。Java DSL 还提供了 .poller()
端点配置选项及其各自的 Pollers
工厂。
较早的示例显示了依赖查找。但是,请记住这些使用者最常配置为 Spring Bean 定义。事实上,Spring Integration 还提供了一个名为 ConsumerEndpointFactoryBean
的 FactoryBean
,它基于通道的类型创建适当的使用者类型。此外,Spring Integration 具有完全的 XML 名称空间支持,可以进一步隐藏这些详细信息。基于名称空间的配置以该指南为特色,因为它引入了每个组件类型。
许多 |
Endpoint Namespace Support
在整个参考手册中,您都可以找到端点元素的特定配置示例,例如路由器、转换器、服务激活器等。其中大多数支持 input-channel
属性,许多支持 output-channel
属性。经过解析后,这些端点元素会生成 PollingConsumer
或 EventDrivenConsumer
的实例,具体取决于引用的 input-channel
的类型:PollableChannel
或 SubscribableChannel
。当通道可轮询时,轮询行为基于端点元素的 poller
子元素及其属性。
以下列出 poller
所有可用的配置选项:
<int:poller cron="" 1
default="false" 2
error-channel="" 3
fixed-delay="" 4
fixed-rate="" 5
initial-delay="" 6
id="" 7
max-messages-per-poll="" 8
receive-timeout="" 9
ref="" 10
task-executor="" 11
time-unit="MILLISECONDS" 12
trigger=""> 13
<int:advice-chain /> 14
<int:transactional /> 15
</int:poller>
1 | 提供使用 Cron 表达式配置轮询器的功能。底层实现使用 org.springframework.scheduling.support.CronTrigger 。如果设置此属性,则不需要指定以下属性: fixed-delay , trigger , fixed-rate 和 ref 。 |
2 | 通过将此属性设置为 true ,您可以准确定义一个全局默认轮询器。如果在应用程序上下文中定义了多个默认轮询器,则会引发异常。连接到 PollableChannel (PollingConsumer ) 的任何端点或任何没有显式配置轮询器的 SourcePollingChannelAdapter ,都将使用全局默认轮询器。它默认为 false 。可选。 |
3 | 如果此轮询器的调用失败,则标识错误消息发送到的通道。要完全禁止异常,您可以提供对 nullChannel 的引用。可选。 |
4 | 固定延迟触发器在内部使用 PeriodicTrigger 。该数值以 time-unit 为单位,也可以为持续时间格式(从 6.2 版开始),例如 PT10S , P1D 。如果设置此属性,则不需要指定以下属性: fixed-rate , trigger , cron 和 ref 。 |
5 | 固定速率触发器在内部使用 PeriodicTrigger 。该数值以 time-unit 为单位,也可以为持续时间格式(从 6.2 版开始),例如 PT10S , P1D 。如果设置此属性,则不需要指定以下属性: fixed-delay , trigger , cron 和 ref 。 |
6 | 内部 PeriodicTrigger 的初始延迟(从 6.2 版开始)。该数值以 time-unit 为单位,也可以为持续时间格式,例如 PT10S , P1D 。 |
7 | 引用轮询器的底层 bean 定义的 ID,其类型为 org.springframework.integration.scheduling.PollerMetadata 。id 属性是顶级轮询器元素必需的,除非它是默认轮询器( default="true" )。 |
8 | 有关更多信息,请参阅 Configuring An Inbound Channel Adapter 。如果未指定,则默认值取决于上下文。如果您使用 PollingConsumer ,此属性默认为 -1 。但是,如果您使用 SourcePollingChannelAdapter , max-messages-per-poll 属性的默认值为 1 。可选。 |
9 | 值设置在底层类 PollerMetadata 上。如果未指定,则默认为 1000(毫秒)。可选。 |
10 | 对另一个顶级轮询器的 Bean 引用。ref 属性不能出现在顶级 poller 元素上。但是,如果设置此属性,则不需要指定以下属性: fixed-rate , trigger , cron 和 fixed-delay 。 |
11 | 提供引用自定义任务执行器 的功能。有关更多信息,请参阅 TaskExecutor Support 。可选。 |
12 | 此属性指定底层 org.springframework.scheduling.support.PeriodicTrigger 上的 java.util.concurrent.TimeUnit 枚举值。因此,此属性只能与 fixed-delay 或 fixed-rate 属性结合使用。如果与 cron 或 trigger 引用属性结合使用,则会导致失败。PeriodicTrigger 最小受支持粒度是毫秒。因此,唯一可用的选项是毫秒和秒。如果未提供此值,则将任何 fixed-delay 或 fixed-rate 值解释为毫秒。基本上,此枚举为基于秒的间隔触发器值提供了便利。对于每小时、每天和每月的设置,我们建议改用 cron 触发器。 |
13 | 对实现 org.springframework.scheduling.Trigger 接口的任何 Spring 配置 bean 的引用。但是,如果设置此属性,则不需要指定以下属性: fixed-delay , fixed-rate , cron 和 ref 。可选。 |
14 | 允许指定额外的 AOP 建议来处理其他横切关注点。有关更多信息,请参阅 Transactions 。可选。 |
15 | 轮询器可以使事务化。有关更多信息,请参阅 AOP Advice chains 。可选。 |
Examples
可以按如下方式配置一个基于 1 秒间隔的简单基于间隔的轮询器:
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller fixed-rate="1000"/>
</int:transformer>
作为使用 fixed-rate
属性的替代,您还可以使用 fixed-delay
属性。
对于基于 Cron 表达式的轮询器,请改用 cron
属性,如下例所示:
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>
如果输入通道是 PollableChannel
,则需要轮询器配置。具体而言,如前所述,trigger
是 PollingConsumer
类的必需属性。因此,如果您为轮询使用者端点的配置省略 poller
子元素,可能会抛出异常。如果您尝试在连接到不可轮询通道的元素上配置轮询器,也可能会抛出异常。
还可以创建顶级轮询器,在这种情况下只需要一个 ref
属性,如下例所示:
<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller ref="weekdayPoller"/>
</int:transformer>
|
Global Default Poller
为了进一步简化配置,您可以定义一个全局默认轮询器。在 XML DSL 中,单个顶级轮询器组件可能将 default
属性设置为 true
。对于 Java 配置,在这种情况下必须声明一个名称为 PollerMetadata.DEFAULT_POLLER
的 PollerMetadata
Bean。在这种情况下,任何为其输入通道使用 PollableChannel
的端点(在相同的 ApplicationContext
中定义且没有明确配置的 poller
)都使用该默认值。以下示例显示了这样的轮询器和使用它的转换器:
-
Java DSL
-
Java
-
Kotlin DSL
-
XML
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
return IntegrationFlow.from(MessageChannels.queue("pollable"))
.transform(transformer) // No 'poller' attribute because there is a default global poller
.channel("output")
.get();
}
@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
@Bean
public QueueChannel pollable() {
return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
...
}
@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
PollerMetadata()
.also {
it.maxMessagesPerPoll = 5
it.trigger = PeriodicTrigger(3000)
}
@Bean
fun convertFlow() =
integrationFlow(MessageChannels.queue("pollable")) {
transform(transformer) // No 'poller' attribute because there is a default global poller
channel("output")
}
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>
<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output"/>
Transaction Support
Spring Integration 还为轮询器提供事务支持,以便每个接收和转发操作都可以作为工作原子单元来执行。要为轮询器配置事务,请添加 <transactional/>
子元素。以下示例显示可用的属性:
<int:poller fixed-delay="1000">
<int:transactional transaction-manager="txManager"
propagation="REQUIRED"
isolation="REPEATABLE_READ"
timeout="10000"
read-only="false"/>
</int:poller>
有关详细信息,请参见 Poller Transaction Support。
AOP Advice chains
由于 Spring 事务支持依赖于代理机制,其中 TransactionInterceptor
(AOP 建议)处理由轮询器发起的消息流的事务行为,因此有时必须提供额外的建议来处理与轮询器关联的其他横切行为。为此,poller
定义了一个 advice-chain
元素,该元素允许您在实现 MethodInterceptor
接口的类中添加更多建议。以下示例演示了如何为 poller
定义 advice-chain
:
<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
method="good" output-channel="output">
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<int:advice-chain>
<ref bean="adviceA" />
<beans:bean class="org.something.SampleAdvice" />
<ref bean="txAdvice" />
</int:advice-chain>
</int:poller>
</int:service-activator>
有关如何实现 MethodInterceptor
接口的详细信息,请参见 AOP sections of the Spring Framework Reference Guide。还可对没有任何事务配置的轮询器应用建议链,让您增强轮询器启动的消息流的行为。
在使用通知链时,无法指定 <transactional/>
子元素。相反,声明一个 <tx:advice/>
bean 并将其添加到 <advice-chain/>
中。有关完整的配置详细信息,请参阅 Poller Transaction Support。
TaskExecutor Support
轮询线程可以通过 Spring 的 TaskExecutor
抽象的任何实例执行。这为端点或一组端点启用并发。从 Spring 3.0 开始,核心 Spring 框架有一个 task
名称空间,其 <executor/>
元素支持创建简单的线程池执行程序。该元素接受通用并发设置(如 pool-size 和 queue-capacity)的属性。配置线程池执行程序可以在端点在负载下的执行方式方面产生重大差异。每个端点都提供这些设置,因为端点的性能是需要考虑的主要因素之一(另一个主要因素是端点订阅的通道上的预期流量)。要为使用 XML 名称空间支持配置的轮询端点启用并发,请在其 <poller/>
元素上提供 task-executor
引用,然后提供以下示例中所示的一个或多个属性:
<int:poller task-executor="pool" fixed-rate="1000"/>
<task:executor id="pool"
pool-size="5-25"
queue-capacity="20"
keep-alive="120"/>
如果您未提供任务执行器,则在调用方的线程中调用消费者的处理程序。请注意,调用方通常是默认的`TaskScheduler`(参见Configuring the Task Scheduler)。您还应记住,`task-executor`属性可以通过指定bean名称来提供对Spring的`TaskExecutor`接口的任何实现的引用。前面显示的`executor`元素提供了方便。
如前所述 background section for polling consumers,您还可以将轮询使用者配置为模拟事件驱动的行为。通过触发器中的较长接收超时和较短时间间隔,您可以确保即使在轮询的消息源中也能及时地响应到达的消息。请注意,这仅适用于带有带超时的阻塞等待调用的源。例如,文件轮询器不进行阻塞。每个 receive()
调用立即返回,并且包含新文件或不包含新文件。因此,即使轮询器包含一个较长的 receive-timeout
,该值也永远不会在这样的场景中使用。另一方面,在使用 Spring Integration 自身的基于队列的通道时,超时值确实有机会参与。以下示例展示了轮询使用者如何几乎瞬间接收消息:
<int:service-activator input-channel="someQueueChannel"
output-channel="output">
<int:poller receive-timeout="30000" fixed-rate="10"/>
</int:service-activator>
使用此方法不会造成太多开销,因为在内部,它只不过是一个计时等待线程,与(例如)无休止的 while 循环相比,它所需要的 CPU 资源使用量少得多。
Changing Polling Rate at Runtime
当使用“fixed-delay
”或“fixed-rate
”属性配置轮询器时,默认实现使用“PeriodicTrigger
”实例。“PeriodicTrigger
”是核心 Spring Framework 的一部分。它只接受时间间隔作为构造函数参数。因此,无法在运行时更改它。
不过,您可以定义自己的“org.springframework.scheduling.Trigger
”接口实现。您甚至可以使用 “PeriodicTrigger
”作为起点。然后,您可以为时间间隔(周期)添加一个 setter,或者您甚至可以在触发器本身内嵌入您自己的节流逻辑。“period
”属性与每次调用“nextExecutionTime
”一起使用,以安排下一次轮询。要在轮询器中使用此自定义触发器,请在应用程序上下文中声明自定义触发器的 bean 定义,并通过使用引用自定义触发器 bean 实例的“trigger
”属性将依赖项注入到轮询器配置中。现在,您可以获取对触发器 bean 的引用,并在轮询之间更改轮询时间间隔。
要了解示例,请参阅 Spring Integration Samples项目。它包含一个名为`dynamic-poller`的示例,它使用自定义触发器并演示了在运行时更改轮询间隔的能力。
该示例提供了一个实现 org.springframework.scheduling.Trigger
接口的自定义触发器。该示例的触发器基于Spring的 PeriodicTrigger
实现。但是,自定义触发器的字段不是最终的,并且属性具有显式的getter和setter,使您可以在运行时动态更改轮询周期。
但请注意,由于 Trigger 方法为 |
Payload Type Conversion
在整个参考手册中,您还可以看到接受消息或任何任意“Object
”作为输入参数的各种端点的特定配置和实现示例。如果为“Object
”,则此类参数将映射到消息有效负载或有效负载或头的部分(当使用 Spring Expression Language 时)。但是,端点方法的输入参数类型有时与有效负载或其部分的类型不匹配。在此场景中,我们需要执行类型转换。Spring Integration 提供了一种方便的方法,可以在转换服务 bean“integrationConversionService
”的自身实例内注册类型转换器(使用 Spring“ConversionService
”)。只要使用 Spring Integration 基础结构定义第一个转换器,就会自动创建该 bean。要注册转换器,您可以实现“org.springframework.core.convert.converter.Converter
”、“org.springframework.core.convert.converter.GenericConverter
”或“org.springframework.core.convert.converter.ConverterFactory
”。
“Converter
”实现是最简单的,从一种类型转换为另一种类型。为了更复杂,例如转换为类层次结构,您可以实现“GenericConverter
”和可能的“ConditionalConverter
”。这些让您可以完全访问“from
”和“to
”类型描述符,从而实现复杂转换。例如,如果您有一个名为“Something
”的抽象类作为转换目标(参数类型、通道数据类型等),则有两个名为“Thing1
”和“Thing
”的具体实现,您希望根据输入类型将其中一个或另一个转换为目标,则“GenericConverter
”将非常适合。有关详细信息,请参阅这些接口的 Javadoc:
实现转换器后,您可以将其注册到方便的名称空间支持中,如下面的示例所示:
<int:converter ref="sampleConverter"/>
<bean id="sampleConverter" class="foo.bar.TestConverter"/>
或者,您可以使用内部 bean,如下面的示例所示:
<int:converter>
<bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>
从 Spring Integration 4.0 开始,您可以使用注释创建前面的配置,如下面的示例所示:
@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {
public Number convert(Boolean source) {
return source ? 1 : 0;
}
}
或者,您可以使用“@Configuration
”注释,如下面的示例所示:
@Configuration
@EnableIntegration
public class ContextConfiguration {
@Bean
@IntegrationConverter
public SerializingConverter serializingConverter() {
return new SerializingConverter();
}
}
在配置应用程序上下文时,Spring框架允许您添加`conversionService`bean(参见 Configuring a ConversionService一章)。在需要时,会使用此服务在bean创建和配置期间执行适当的转换。
相比之下,“integrationConversionService
”用于运行时转换。这些用途截然不同。在连接 bean 构造函数参数和属性时打算使用的转换器可能会产生意想不到的结果,如果在运行时用于 Spring Integration 对数据类型通道、有效负载类型转换器等中的消息进行表达式求值。
但是,如果您确实希望将 Spring“conversionService
”用作 Spring Integration 的“integrationConversionService
”,您可以在应用程序上下文中配置一个别名,如下面的示例所示:
<alias name="conversionService" alias="integrationConversionService"/>
在这种情况下,“conversionService
”提供的转换器可用于 Spring Integration 运行时转换。
Content Type Conversion
从 5.0 版本开始,默认情况下,方法调用机制基于“org.springframework.messaging.handler.invocation.InvocableHandlerMethod
”基础结构。它的“HandlerMethodArgumentResolver
”实现(如“PayloadArgumentResolver
”和“MessageMethodArgumentResolver
”)可以使用“MessageConverter
”抽象将传入“payload
”转换为目标方法参数类型。转换可以基于“contentType
”消息头。为此,Spring Integration 提供了“ConfigurableCompositeMessageConverter
”,它委派给注册的转换器列表调用,直到其中一个返回非空结果。默认情况下,此转换器提供(按严格顺序):
-
MappingJackson2MessageConverter
如果 classpath 上存在 Jackson 处理器
有关其目的和适当“contentType
”转换值的详细信息,请参见前一个列表中链接的 Javadoc。“ConfigurableCompositeMessageConverter
”被使用,因为可以向它提供任何其他“MessageConverter
”实现,包括或排除前面提到的默认转换器。它还可以作为应用程序上下文中的适当 bean 注册,从而覆盖默认转换器,如下面的示例所示:
@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
List<MessageConverter> converters =
Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
new JavaSerializationMessageConverter());
return new ConfigurableCompositeMessageConverter(converters);
}
这两个新转换器在默认值之前注册在合成中。您也可以不使用“ConfigurableCompositeMessageConverter
”,而是通过注册带有名称“integrationArgumentResolverMessageConverter
”的 bean(通过设置“IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME
”属性)来提供您自己的“MessageConverter
”。
在使用 SpEL 方法调用时,基于 |
Asynchronous Polling
如果您希望轮询是异步的,则轮询器可以根据需要指定指向任何“TaskExecutor
”bean 的现有实例的“task-executor
”属性(Spring 3.0 通过“task
”名称空间提供方便的名称空间配置)。但是,在使用“TaskExecutor
”配置轮询器时,您必须理解某些事项。
问题在于有两个配置,轮询器和“TaskExecutor
”。它们必须彼此协调。否则,您最终可能会创建一个人为的内存泄漏。
考虑以下配置:
<int:channel id="publishChannel">
<int:queue />
</int:channel>
<int:service-activator input-channel="publishChannel" ref="myService">
<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>
<task:executor id="taskExecutor" pool-size="20" />
前面的配置展示了一个配置错误。
默认情况下,任务执行器有一个无界任务队列。轮询器会持续计划新任务,即使所有线程都已被阻塞,等待新消息到达或超时到期。假设有 20 个线程以 5 秒的超时时间执行任务,那么它们每秒会执行 4 个任务。然而,新任务每秒被计划 20 次,所以任务执行器中的内部队列每秒增长 16 个(在进程空闲时),因此我们出现了内存泄漏。
处理此问题的一种方法是设置任务执行器的 queue-capacity
属性。即使为 0 也是一个合理的值。您还可以通过指定如何处理无法排队的消息来管理它,方法是设置任务执行器的 rejection-policy
属性(例如,为 DISCARD
)。换句话说,在配置 TaskExecutor
时,您必须了解某些详细信息。有关该主题的更多详细信息,请参见 Spring 参考手册中的 “Task Execution and Scheduling”。
Endpoint Inner Beans
许多端点都是复合 bean。这包括所有使用者和所有轮询的入站通道适配器。使用者(轮询或事件驱动的)委托给 MessageHandler
。轮询的适配器通过委托给 MessageSource
获得消息。通常,获取委托 bean 的引用很有用,也许是为了在运行时或用于测试更改配置。这些 bean 可以从具有众所周知的名称的 ApplicationContext
中获取。MessageHandler
实例使用类似于 someConsumer.handler
的 bean ID 在应用程序上下文中注册(其中“consumer”是端点 id
属性的值)。MessageSource
实例使用类似于 somePolledAdapter.source
的 bean ID 注册,其中“somePolledAdapter”是适配器的 ID。
前面的内容仅适用于框架组件本身。相反,您可以使用内部 bean 定义,如下面的示例所示:
<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
output-channel = "outChannel" method="foo">
<beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>
该 Bean 被视为声明的任何内部 Bean,且不会在应用上下文里注册。如果您想以某种其他方式访问此 Bean,请在顶层使用 id
声明它,并改为使用 ref
属性。有关详细信息,请参见 Spring Documentation。