Message Endpoints
SubscribableChannel Event-driven Consumer Polling Consumer advice-chain TaskExecutor Support Payload Type Conversion Content Type Conversion Asynchronous Polling Endpoint Inner Beans :description: 本文介绍了 Spring Integration 框架中消息端点的基本原理。
核心概念
-
端点负责将各种消息组件连接到通道。
-
消费者端点用于接收消息,而激活器端点用于发送消息。
-
消费者可以是事件驱动的(基于订阅)或轮询驱动的。
轮询消费者
-
由 PollingConsumer 实现,具有可配置的轮询间隔和触发器。
-
可配置其他选项,如每轮询周期接收的最大消息数和接收超时。
-
可以使用 TaskExecutor 来并行执行轮询线程。
事件驱动的消费者
-
由 EventDrivenConsumer 实现,用于从可订阅通道接收消息。
-
更加简单,因为不需要管理轮询线程。
端点配置
-
使用基于名称空间的 Spring Integration DSL XML 或 Java 配置来配置端点。
-
使用 input-channel 属性来指定输入通道,output-channel 属性(对于某些激活器)来指定输出通道。
-
轮询消费者可以使用 poller 子元素来配置轮询行为。
-
可使用 advice-chain 和 transaction 子元素来配置高级功能,如 AOP 建议和事务支持。
其他主题
-
端点可以生成答复消息。
-
集成转换服务允许配置类型转换器。
-
可以配置任务执行器来启用轮询端的并行执行。
-
Spring Integration 使用 CompositeMessageConverter 根据消息头中指定的 contentType 执行内容类型转换。
本篇的第一部分介绍了一些背景理论,并揭示了驱动 Spring Integration 的各种消息组件的底层 API。如果您真的想了解幕后发生的事情,则这些信息会很有帮助。但是,如果您想继续使用各种元素的基于命名空间的简化配置,则可以跳到 Endpoint Namespace Support 以获取有关当前内容的信息。
The first part of this chapter covers some background theory and reveals quite a bit about the underlying API that drives Spring Integration’s various messaging components. This information can be helpful if you want to really understand what goes on behind the scenes. However, if you want to get up and running with the simplified namespace-based configuration of the various elements, feel free to skip ahead to Endpoint Namespace Support for now.
如概述中所述,消息终结点负责将各种消息组件连接到通道。在接下来的几章中,我们将介绍许多不同的组件,它们使用消息。其中一些还可以发送答复消息。发送消息非常简单。如 Message Channels中前面所示,您可以将消息发送到消息通道。但是,接收则有些复杂。主要原因是消费者有两种类型: polling consumers和 event-driven consumers。
As mentioned in the overview, message endpoints are responsible for connecting the various messaging components to channels. Over the next several chapters, we cover a number of different components that consume messages. Some of these are also capable of sending reply messages. Sending messages is quite straightforward. As shown earlier in Message Channels, you can send a message to a message channel. However, receiving is a bit more complicated. The main reason is that there are two types of consumers: polling consumers and event-driven consumers.
在这两者中,事件驱动消费者要简单得多。无需管理和安排单独的轮询线程,它们本质上是带有回调方法的侦听器。连接到 Spring Integration 的可订阅消息通道时,此简单选项运行良好。但是,连接到缓冲轮询消息通道时,某些组件必须调度和管理轮询线程。Spring Integration 提供了两种不同的端点实现来容纳这两种类型的消费者。因此,消费者本身只需要实现回调接口。需要轮询时,端点充当消费者实例的容器。其好处类似于使用容器来托管消息驱动 Bean,但由于这些消费者是在 ApplicationContext
中运行的由 Spring 管理的对象,因此它更类似于 Spring 自身的 MessageListener
容器。
Of the two, event-driven consumers are much simpler.
Without any need to manage and schedule a separate poller thread, they are essentially listeners with a callback method.
When connecting to one of Spring Integration’s subscribable message channels, this simple option works great.
However, when connecting to a buffering, pollable message channel, some component has to schedule and manage the polling threads.
Spring Integration provides two different endpoint implementations to accommodate these two types of consumers.
Therefore, the consumers themselves need only implement the callback interface.
When polling is required, the endpoint acts as a container for the consumer instance.
The benefit is similar to that of using a container for hosting message-driven beans, but, since these consumers are Spring-managed objects running within an ApplicationContext
, it more closely resembles Spring’s own MessageListener
containers.
Message Handler
Spring Integration 的 MessageHandler
接口由框架中的许多组件实现。换句话说,这不是公共 API 的一部分,您通常不会直接实现 MessageHandler
。尽管如此,它仍会被消息消费者用于实际处理已消费的消息,因此了解此策略接口有助于理解消费者的整体角色。该接口定义如下:
Spring Integration’s MessageHandler
interface is implemented by many of the components within the framework.
In other words, this is not part of the public API, and you would not typically implement MessageHandler
directly.
Nevertheless, it is used by a message consumer for actually handling the consumed messages, so being aware of this strategy interface does help in terms of understanding the overall role of a consumer.
The interface is defined as follows:
public interface MessageHandler {
void handleMessage(Message<?> message);
}
尽管该接口很简单,但它为后续章节中介绍的大多数组件(路由器、转换器、拆分器、聚合器、服务激活器等)提供了基础。这些组件使用它们处理的消息执行非常不同的功能,但是实际接收消息的要求是相同的,在轮询和事件驱动行为之间的选择也是相同的。Spring Integration 提供了两种端点实现,它们托管这些基于回调的处理程序,并允许它们连接到消息通道。
Despite its simplicity, this interface provides the foundation for most of the components (routers, transformers, splitters, aggregators, service activators, and others) covered in the following chapters. Those components each perform very different functionality with the messages they handle, but the requirements for actually receiving a message are the same, and the choice between polling and event-driven behavior is also the same. Spring Integration provides two endpoint implementations that host these callback-based handlers and let them be connected to message channels.
Event-driven Consumer
由于它是两者中较简单的一个,我们首先介绍基于事件的消费者端点。你可能还记得,SubscribableChannel
接口提供了一个 subscribe()
方法,该方法接受一个 MessageHandler
参数(如 xref:channel/interfaces.adoc#channel-interfaces-subscribablechannel[SubscribableChannel
所示)。以下列表显示了 subscribe
方法的定义:
Because it is the simpler of the two, we cover the event-driven consumer endpoint first.
You may recall that the SubscribableChannel
interface provides a subscribe()
method and that the method accepts a MessageHandler
parameter (as shown in SubscribableChannel
).
The following listing shows the definition of the subscribe
method:
subscribableChannel.subscribe(messageHandler);
由于订阅通道的处理程序不必主动轮询该通道,因此这是一个事件驱动的消费者,Spring Integration 提供的实现接受一个 SubscribableChannel
和一个 MessageHandler
,如下例所示:
Since a handler that is subscribed to a channel does not have to actively poll that channel, this is an event-driven consumer, and the implementation provided by Spring Integration accepts a SubscribableChannel
and a MessageHandler
, as the following example shows:
SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);
EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);
Polling Consumer
Spring Integration 还提供了一个 PollingConsumer
,除了通道必须实现 PollableChannel
外,它的实例化方式与 SubscribableChannel
相同,如下例所示:
Spring Integration also provides a PollingConsumer
, and it can be instantiated in the same way except that the channel must implement PollableChannel
, as the following example shows:
PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);
PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
有关轮询使用者的更多信息,请参阅 Channel Adapter 和 Channel Adapter。 |
For more information regarding polling consumers, see Channel Adapter and Channel Adapter. |
轮询消费者还有许多其他配置选项。以下示例展示了如何设置触发器:
There are many other configuration options for the polling consumer. The following example shows how to set the trigger:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setTrigger(new PeriodicTrigger(Duration.ofSeconds(30)));
PeriodicTrigger
通常使用简单的间隔(Duration
)定义,但也支持 initialDelay
属性和布尔值 fixedRate
属性(默认值为 false
,即没有固定延迟)。以下示例设置了这两个属性:
The PeriodicTrigger
is typically defined with a simple interval (Duration
) but also supports an initialDelay
property and a boolean fixedRate
property (the default is false
— that is, no fixed delay).
The following example sets both properties:
PeriodicTrigger trigger = new PeriodicTrigger(Duration.ofSeconds(1));
trigger.setInitialDelay(Duration.ofSeconds(5));
trigger.setFixedRate(true);
前一个示例中的三个设置的结果是一个触发器,该触发器等待五秒,然后每秒触发一次。
The result of the three settings in the preceding example is a trigger that waits five seconds and then triggers every second.
CronTrigger`需要一个有效的cron表达式。有关详细信息,请参阅 Javadoc。以下示例设置新的`CronTrigger
:
The CronTrigger
requires a valid cron expression.
See the Javadoc for details.
The following example sets a new CronTrigger
:
CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");
在前面的示例中定义的触发器的结果是一个触发器,该触发器在周一到周五每十秒触发一次。
The result of the trigger defined in the previous example is a trigger that triggers every ten seconds, Monday through Friday.
轮询端点的默认触发器是一个具有 1 秒固定延迟期的 |
The default trigger for polling endpoint is a |
除了触发器之外,您还可以指定另外两个与轮询相关的配置属性:maxMessagesPerPoll
和 receiveTimeout
。以下示例展示了如何设置这两个属性:
In addition to the trigger, you can specify two other polling-related configuration properties: maxMessagesPerPoll
and receiveTimeout
.
The following example shows how to set these two properties:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);
maxMessagesPerPoll
属性指定在给定的轮询操作中接收的最大消息数。这意味着轮询器会继续调用 receive()
,而不等待,直到返回 null
或达到最大值。例如,如果轮询器的间隔触发器为十秒,maxMessagesPerPoll
设置为 25
,并且它轮询其队列中有 100 条消息的通道,则所有 100 条消息都可以在 40 秒内检索到。它抓取 25 条消息,等待十秒钟,再抓取下一条 25 条消息,依此类推。如果 maxMessagesPerPoll
配置为负值,则在单个轮询周期内调用 MessageSource.receive()
,直到它返回 null
。从 5.5 版本开始,0
值具有特殊含义 - 完全跳过 MessageSource.receive()
调用,这可能被视为暂停此轮询端点,直到 maxMessagesPerPoll
稍后被更改为非零值,例如通过 Control Bus。
The maxMessagesPerPoll
property specifies the maximum number of messages to receive within a given poll operation.
This means that the poller continues calling receive()
without waiting, until either null
is returned or the maximum value is reached.
For example, if a poller has a ten-second interval trigger and a maxMessagesPerPoll
setting of 25
, and it is polling a channel that has 100 messages in its queue, all 100 messages can be retrieved within 40 seconds.
It grabs 25, waits ten seconds, grabs the next 25, and so on.
If maxMessagesPerPoll
is configured with a negative value, then MessageSource.receive()
is called within a single polling cycle until it returns null
.
Starting with version 5.5, a 0
value has a special meaning - skip the MessageSource.receive()
call altogether, which may be considered as pausing for this polling endpoint until the maxMessagesPerPoll
is changed to a n non-zero value at a later time, e.g. via a Control Bus.
receiveTimeout
属性指定如果在调用接收操作时没有消息可用,轮询器应等待的时间。例如,考虑两个表面上看起来相似但实际上有很大不同的选项:第一个具有 5 秒的间隔触发器和 50 毫秒的接收超时,而第二个具有 50 毫秒的间隔触发器和 5 秒的接收超时。第一个选项可能会比接受时间晚最多 4950 毫秒收到一条消息(如果在其中一个轮询调用返回后立即收到该消息)。另一方面,第二个配置在 50 毫秒内不会错过任何消息。不同之处在于第二个选项需要一个线程进行等待。但是,结果是它可以对到达的消息做出更快速的响应。这种称为“长轮询
”的技术可用于模拟轮询源上的事件驱动行为。
The receiveTimeout
property specifies the amount of time the poller should wait if no messages are available when it invokes the receive operation.
For example, consider two options that seem similar on the surface but are actually quite different: The first has an interval trigger of 5 seconds and a receive timeout of 50 milliseconds, while the second has an interval trigger of 50 milliseconds and a receive timeout of 5 seconds.
The first one may receive a message up to 4950 milliseconds later than it accepted on the channel (if that message arrived immediately after one of its poll calls returned).
On the other hand, the second configuration never misses a message by more than 50 milliseconds.
The difference is that the second option requires a thread to wait.
However, as a result, it can respond much more quickly to arriving messages.
This technique, known as “long polling”, can be used to emulate event-driven behavior on a polled source.
可以像下面示例中所示,将轮询使用者委托给 Spring TaskExecutor
:
A polling consumer can also delegate to a Spring TaskExecutor
, as the following example shows:
PollingConsumer consumer = new PollingConsumer(channel, handler);
TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);
此外,PollingConsumer
有一个名为 adviceChain
的属性。此属性使您能够为 AOP 建议指定 List
,以处理包括事务在内的其他横切关注点。这些建议应用于 doPoll()
方法。有关更多详细信息,请参见 Endpoint Namespace Support 下 AOP 建议链和事务支持的部分。另请参阅 @Poller
注释 Javadoc 和相应的 Messaging Annotations Support 部分。Java DSL 还提供了 .poller()
端点配置选项及其各自的 Pollers
工厂。
Furthermore, a PollingConsumer
has a property called adviceChain
.
This property lets you specify a List
of AOP advices for handling additional cross-cutting concerns including transactions.
These advices are applied around the doPoll()
method.
For more in-depth information, see the sections on AOP advice chains and transaction support under Endpoint Namespace Support.
See also a @Poller
annotation Javadocs and respective Messaging Annotations Support section.
The Java DSL also provides a .poller()
endpoint configuration option with its respective Pollers
factory.
较早的示例显示了依赖查找。但是,请记住这些使用者最常配置为 Spring Bean 定义。事实上,Spring Integration 还提供了一个名为 ConsumerEndpointFactoryBean
的 FactoryBean
,它基于通道的类型创建适当的使用者类型。此外,Spring Integration 具有完全的 XML 名称空间支持,可以进一步隐藏这些详细信息。基于名称空间的配置以该指南为特色,因为它引入了每个组件类型。
The earlier examples show dependency lookups.
However, keep in mind that these consumers are most often configured as Spring bean definitions.
In fact, Spring Integration also provides a FactoryBean
called ConsumerEndpointFactoryBean
that creates the appropriate consumer type based on the type of channel.
Also, Spring Integration has full XML namespace support to even further hide those details.
The namespace-based configuration is in this guide featured as each component type is introduced.
许多 |
Many of the |
Endpoint Namespace Support
在整个参考手册中,您都可以找到端点元素的特定配置示例,例如路由器、转换器、服务激活器等。其中大多数支持 input-channel
属性,许多支持 output-channel
属性。经过解析后,这些端点元素会生成 PollingConsumer
或 EventDrivenConsumer
的实例,具体取决于引用的 input-channel
的类型:PollableChannel
或 SubscribableChannel
。当通道可轮询时,轮询行为基于端点元素的 poller
子元素及其属性。
Throughout this reference manual, you can find specific configuration examples for endpoint elements, such as router, transformer, service-activator, and so on.
Most of these support an input-channel
attribute and many support an output-channel
attribute.
After being parsed, these endpoint elements produce an instance of either the PollingConsumer
or the EventDrivenConsumer
, depending on the type of the input-channel
that is referenced: PollableChannel
or SubscribableChannel
, respectively.
When the channel is pollable, the polling behavior is based on the endpoint element’s poller
sub-element and its attributes.
以下列出 poller
所有可用的配置选项:
The following lists all available configuration options for a poller
:
<int:poller cron="" 1
default="false" 2
error-channel="" 3
fixed-delay="" 4
fixed-rate="" 5
initial-delay="" 6
id="" 7
max-messages-per-poll="" 8
receive-timeout="" 9
ref="" 10
task-executor="" 11
time-unit="MILLISECONDS" 12
trigger=""> 13
<int:advice-chain /> 14
<int:transactional /> 15
</int:poller>
1 | Provides the ability to configure pollers by using Cron expressions.
The underlying implementation uses an org.springframework.scheduling.support.CronTrigger .
If this attribute is set, none of the following attributes must be specified: fixed-delay , trigger , fixed-rate , and ref . |
2 | By setting this attribute to true , you can define exactly one global default poller.
An exception is raised if more than one default poller is defined in the application context.
Any endpoints connected to a PollableChannel (PollingConsumer ) or any SourcePollingChannelAdapter that does not have an explicitly configured poller then uses the global default poller.
It defaults to false .
Optional. |
3 | Identifies the channel to which error messages are sent if a failure occurs in this poller’s invocation.
To completely suppress exceptions, you can provide a reference to the nullChannel .
Optional. |
4 | The fixed delay trigger uses a PeriodicTrigger under the covers.
The numeric value is in time-unit or can be as a duration format (starting with version 6.2), e.g. PT10S , P1D .
If this attribute is set, none of the following attributes must be specified: fixed-rate , trigger , cron , and ref . |
5 | The fixed rate trigger uses a PeriodicTrigger under the covers.
The numeric value is in time-unit or can be as a duration format (starting with version 6.2), e.g. PT10S , P1D .
If this attribute is set, none of the following attributes must be specified: fixed-delay , trigger , cron , and ref . |
6 | The initial delay for a PeriodicTrigger under the covers(starting with version 6.2).
The numeric value is in time-unit or can be as a duration format, e.g. PT10S , P1D . |
7 | The ID referring to the poller’s underlying bean-definition, which is of type org.springframework.integration.scheduling.PollerMetadata .
The id attribute is required for a top-level poller element, unless it is the default poller (default="true" ). |
8 | See Configuring An Inbound Channel Adapter for more information.
If not specified, the default value depends on the context.
If you use a PollingConsumer , this attribute defaults to -1 .
However, if you use a SourcePollingChannelAdapter , the max-messages-per-poll attribute defaults to 1 .
Optional. |
9 | Value is set on the underlying class PollerMetadata .
If not specified, it defaults to 1000 (milliseconds).
Optional. |
10 | Bean reference to another top-level poller.
The ref attribute must not be present on the top-level poller element.
However, if this attribute is set, none of the following attributes must be specified: fixed-rate , trigger , cron , and fixed-delay . |
11 | Provides the ability to reference a custom task executor. See TaskExecutor Support for further information. Optional. |
12 | This attribute specifies the java.util.concurrent.TimeUnit enum value on the underlying org.springframework.scheduling.support.PeriodicTrigger .
Therefore, this attribute can be used only in combination with the fixed-delay or fixed-rate attributes.
If combined with either cron or a trigger reference attribute, it causes a failure.
The minimal supported granularity for a PeriodicTrigger is milliseconds.
Therefore, the only available options are milliseconds and seconds.
If this value is not provided, any fixed-delay or fixed-rate value is interpreted as milliseconds.
Basically, this enum provides a convenience for seconds-based interval trigger values.
For hourly, daily, and monthly settings, we recommend using a cron trigger instead. |
13 | Reference to any Spring-configured bean that implements the org.springframework.scheduling.Trigger interface.
However, if this attribute is set, none of the following attributes must be specified: fixed-delay , fixed-rate , cron , and ref .
Optional. |
14 | Allows specifying extra AOP advices to handle additional cross-cutting concerns. See Transactions for further information. Optional. |
15 | Pollers can be made transactional. See AOP Advice chains for further information. Optional. |
Examples
可以按如下方式配置一个基于 1 秒间隔的简单基于间隔的轮询器:
A simple interval-based poller with a 1-second interval can be configured as follows:
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller fixed-rate="1000"/>
</int:transformer>
作为使用 fixed-rate
属性的替代,您还可以使用 fixed-delay
属性。
As an alternative to using the fixed-rate
attribute, you can also use the fixed-delay
attribute.
对于基于 Cron 表达式的轮询器,请改用 cron
属性,如下例所示:
For a poller based on a Cron expression, use the cron
attribute instead, as the following example shows:
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>
如果输入通道是 PollableChannel
,则需要轮询器配置。具体而言,如前所述,trigger
是 PollingConsumer
类的必需属性。因此,如果您为轮询使用者端点的配置省略 poller
子元素,可能会抛出异常。如果您尝试在连接到不可轮询通道的元素上配置轮询器,也可能会抛出异常。
If the input channel is a PollableChannel
, the poller configuration is required.
Specifically, as mentioned earlier, the trigger
is a required property of the PollingConsumer
class.
Therefore, if you omit the poller
sub-element for a polling consumer endpoint’s configuration, an exception may be thrown.
The exception may also be thrown if you attempt to configure a poller on the element that is connected to a non-pollable channel.
还可以创建顶级轮询器,在这种情况下只需要一个 ref
属性,如下例所示:
It is also possible to create top-level pollers, in which case only a ref
attribute is required, as the following example shows:
<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller ref="weekdayPoller"/>
</int:transformer>
|
The |
Global Default Poller
为了进一步简化配置,您可以定义一个全局默认轮询器。在 XML DSL 中,单个顶级轮询器组件可能将 default
属性设置为 true
。对于 Java 配置,在这种情况下必须声明一个名称为 PollerMetadata.DEFAULT_POLLER
的 PollerMetadata
Bean。在这种情况下,任何为其输入通道使用 PollableChannel
的端点(在相同的 ApplicationContext
中定义且没有明确配置的 poller
)都使用该默认值。以下示例显示了这样的轮询器和使用它的转换器:
To simplify the configuration even further, you can define a global default poller.
A single top-level poller component in XML DSL may have the default
attribute set to true
.
For Java configuration a PollerMetadata
bean with the PollerMetadata.DEFAULT_POLLER
name must be declared in this case.
In that case, any endpoint with a PollableChannel
for its input channel, that is defined within the same ApplicationContext
, and has no explicitly configured poller
uses that default.
The following example shows such a poller and a transformer that uses it:
-
Java DSL
-
Java
-
Kotlin DSL
-
XML
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
return IntegrationFlow.from(MessageChannels.queue("pollable"))
.transform(transformer) // No 'poller' attribute because there is a default global poller
.channel("output")
.get();
}
@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
@Bean
public QueueChannel pollable() {
return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
...
}
@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
PollerMetadata()
.also {
it.maxMessagesPerPoll = 5
it.trigger = PeriodicTrigger(3000)
}
@Bean
fun convertFlow() =
integrationFlow(MessageChannels.queue("pollable")) {
transform(transformer) // No 'poller' attribute because there is a default global poller
channel("output")
}
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>
<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output"/>
Transaction Support
Spring Integration 还为轮询器提供事务支持,以便每个接收和转发操作都可以作为工作原子单元来执行。要为轮询器配置事务,请添加 <transactional/>
子元素。以下示例显示可用的属性:
Spring Integration also provides transaction support for the pollers so that each receive-and-forward operation can be performed as an atomic unit of work.
To configure transactions for a poller, add the <transactional/>
sub-element.
The following example shows the available attributes:
<int:poller fixed-delay="1000">
<int:transactional transaction-manager="txManager"
propagation="REQUIRED"
isolation="REPEATABLE_READ"
timeout="10000"
read-only="false"/>
</int:poller>
有关详细信息,请参见 Poller Transaction Support。
For more information, see Poller Transaction Support.
AOP Advice chains
由于 Spring 事务支持依赖于代理机制,其中 TransactionInterceptor
(AOP 建议)处理由轮询器发起的消息流的事务行为,因此有时必须提供额外的建议来处理与轮询器关联的其他横切行为。为此,poller
定义了一个 advice-chain
元素,该元素允许您在实现 MethodInterceptor
接口的类中添加更多建议。以下示例演示了如何为 poller
定义 advice-chain
:
Since Spring transaction support depends on the proxy mechanism with TransactionInterceptor
(AOP Advice) handling transactional behavior of the message flow initiated by the poller, you must sometimes provide extra advices to handle other cross cutting behavior associated with the poller.
For that, the poller
defines an advice-chain
element that lets you add more advices in a class that implements the MethodInterceptor
interface.
The following example shows how to define an advice-chain
for a poller
:
<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
method="good" output-channel="output">
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<int:advice-chain>
<ref bean="adviceA" />
<beans:bean class="org.something.SampleAdvice" />
<ref bean="txAdvice" />
</int:advice-chain>
</int:poller>
</int:service-activator>
有关如何实现 MethodInterceptor
接口的详细信息,请参见 AOP sections of the Spring Framework Reference Guide。还可对没有任何事务配置的轮询器应用建议链,让您增强轮询器启动的消息流的行为。
For more information on how to implement the MethodInterceptor
interface, see the AOP sections of the Spring Framework Reference Guide.
An advice chain can also be applied on a poller that does not have any transaction configuration, letting you enhance the behavior of the message flow initiated by the poller.
在使用通知链时,无法指定 <transactional/>
子元素。相反,声明一个 <tx:advice/>
bean 并将其添加到 <advice-chain/>
中。有关完整的配置详细信息,请参阅 Poller Transaction Support。
When using an advice chain, the <transactional/>
child element cannot be specified.
Instead, declare a <tx:advice/>
bean and add it to the <advice-chain/>
.
See Poller Transaction Support for complete configuration details.
TaskExecutor Support
轮询线程可以通过 Spring 的 TaskExecutor
抽象的任何实例执行。这为端点或一组端点启用并发。从 Spring 3.0 开始,核心 Spring 框架有一个 task
名称空间,其 <executor/>
元素支持创建简单的线程池执行程序。该元素接受通用并发设置(如 pool-size 和 queue-capacity)的属性。配置线程池执行程序可以在端点在负载下的执行方式方面产生重大差异。每个端点都提供这些设置,因为端点的性能是需要考虑的主要因素之一(另一个主要因素是端点订阅的通道上的预期流量)。要为使用 XML 名称空间支持配置的轮询端点启用并发,请在其 <poller/>
元素上提供 task-executor
引用,然后提供以下示例中所示的一个或多个属性:
The polling threads may be executed by any instance of Spring’s TaskExecutor
abstraction.
This enables concurrency for an endpoint or group of endpoints.
As of Spring 3.0, the core Spring Framework has a task
namespace, and its <executor/>
element supports the creation of a simple thread pool executor.
That element accepts attributes for common concurrency settings, such as pool-size and queue-capacity.
Configuring a thread-pooling executor can make a substantial difference in how the endpoint performs under load.
These settings are available for each endpoint, since the performance of an endpoint is one of the major factors to consider (the other major factor being the expected volume on the channel to which the endpoint subscribes).
To enable concurrency for a polling endpoint that is configured with the XML namespace support, provide the task-executor
reference on its <poller/>
element and then provide one or more of the properties shown in the following example:
<int:poller task-executor="pool" fixed-rate="1000"/>
<task:executor id="pool"
pool-size="5-25"
queue-capacity="20"
keep-alive="120"/>
如果您未提供任务执行器,则在调用方的线程中调用消费者的处理程序。请注意,调用方通常是默认的`TaskScheduler`(参见Configuring the Task Scheduler)。您还应记住,`task-executor`属性可以通过指定bean名称来提供对Spring的`TaskExecutor`接口的任何实现的引用。前面显示的`executor`元素提供了方便。
If you do not provide a task-executor, the consumer’s handler is invoked in the caller’s thread.
Note that the caller is usually the default TaskScheduler
(see Configuring the Task Scheduler).
You should also keep in mind that the task-executor
attribute can provide a reference to any implementation of Spring’s TaskExecutor
interface by specifying the bean name.
The executor
element shown earlier is provided for convenience.
如前所述 background section for polling consumers,您还可以将轮询使用者配置为模拟事件驱动的行为。通过触发器中的较长接收超时和较短时间间隔,您可以确保即使在轮询的消息源中也能及时地响应到达的消息。请注意,这仅适用于带有带超时的阻塞等待调用的源。例如,文件轮询器不进行阻塞。每个 receive()
调用立即返回,并且包含新文件或不包含新文件。因此,即使轮询器包含一个较长的 receive-timeout
,该值也永远不会在这样的场景中使用。另一方面,在使用 Spring Integration 自身的基于队列的通道时,超时值确实有机会参与。以下示例展示了轮询使用者如何几乎瞬间接收消息:
As mentioned earlier in the background section for polling consumers, you can also configure a polling consumer in such a way as to emulate event-driven behavior.
With a long receive timeout and a short interval in the trigger, you can ensure a very timely reaction to arriving messages even on a polled message source.
Note that this applies only to sources that have a blocking wait call with a timeout.
For example, the file poller does not block.
Each receive()
call returns immediately and either contains new files or not.
Therefore, even if a poller contains a long receive-timeout
, that value would never be used in such a scenario.
On the other hand, when using Spring Integration’s own queue-based channels, the timeout value does have a chance to participate.
The following example shows how a polling consumer can receive messages nearly instantaneously:
<int:service-activator input-channel="someQueueChannel"
output-channel="output">
<int:poller receive-timeout="30000" fixed-rate="10"/>
</int:service-activator>
使用此方法不会造成太多开销,因为在内部,它只不过是一个计时等待线程,与(例如)无休止的 while 循环相比,它所需要的 CPU 资源使用量少得多。
Using this approach does not carry much overhead, since, internally, it is nothing more then a timed-wait thread, which does not require nearly as much CPU resource usage as (for example) a thrashing, infinite while loop.
Changing Polling Rate at Runtime
当使用“fixed-delay
”或“fixed-rate
”属性配置轮询器时,默认实现使用“PeriodicTrigger
”实例。“PeriodicTrigger
”是核心 Spring Framework 的一部分。它只接受时间间隔作为构造函数参数。因此,无法在运行时更改它。
When configuring a poller with a fixed-delay
or a fixed-rate
attribute, the default implementation uses a PeriodicTrigger
instance.
The PeriodicTrigger
is part of the core Spring Framework.
It accepts the interval only as a constructor argument.
Therefore, it cannot be changed at runtime.
不过,您可以定义自己的“org.springframework.scheduling.Trigger
”接口实现。您甚至可以使用 “PeriodicTrigger
”作为起点。然后,您可以为时间间隔(周期)添加一个 setter,或者您甚至可以在触发器本身内嵌入您自己的节流逻辑。“period
”属性与每次调用“nextExecutionTime
”一起使用,以安排下一次轮询。要在轮询器中使用此自定义触发器,请在应用程序上下文中声明自定义触发器的 bean 定义,并通过使用引用自定义触发器 bean 实例的“trigger
”属性将依赖项注入到轮询器配置中。现在,您可以获取对触发器 bean 的引用,并在轮询之间更改轮询时间间隔。
However, you can define your own implementation of the org.springframework.scheduling.Trigger
interface.
You could even use the PeriodicTrigger
as a starting point.
Then you can add a setter for the interval (period), or you can even embed your own throttling logic within the trigger itself.
The period
property is used with each call to nextExecutionTime
to schedule the next poll.
To use this custom trigger within pollers, declare the bean definition of the custom trigger in your application context and inject the dependency into your poller configuration by using the trigger
attribute, which references the custom trigger bean instance.
You can now obtain a reference to the trigger bean and change the polling interval between polls.
要了解示例,请参阅 Spring Integration Samples项目。它包含一个名为`dynamic-poller`的示例,它使用自定义触发器并演示了在运行时更改轮询间隔的能力。
For an example, see the Spring Integration Samples project.
It contains a sample called dynamic-poller
, which uses a custom trigger and demonstrates the ability to change the polling interval at runtime.
该示例提供了一个实现 org.springframework.scheduling.Trigger
接口的自定义触发器。该示例的触发器基于Spring的 PeriodicTrigger
实现。但是,自定义触发器的字段不是最终的,并且属性具有显式的getter和setter,使您可以在运行时动态更改轮询周期。
The sample provides a custom trigger that implements the org.springframework.scheduling.Trigger
interface.
The sample’s trigger is based on Spring’s PeriodicTrigger
implementation.
However, the fields of the custom trigger are not final, and the properties have explicit getters and setters, letting you dynamically change the polling period at runtime.
但请注意,由于 Trigger 方法为 |
It is important to note, though, that because the Trigger method is |
Payload Type Conversion
在整个参考手册中,您还可以看到接受消息或任何任意“Object
”作为输入参数的各种端点的特定配置和实现示例。如果为“Object
”,则此类参数将映射到消息有效负载或有效负载或头的部分(当使用 Spring Expression Language 时)。但是,端点方法的输入参数类型有时与有效负载或其部分的类型不匹配。在此场景中,我们需要执行类型转换。Spring Integration 提供了一种方便的方法,可以在转换服务 bean“integrationConversionService
”的自身实例内注册类型转换器(使用 Spring“ConversionService
”)。只要使用 Spring Integration 基础结构定义第一个转换器,就会自动创建该 bean。要注册转换器,您可以实现“org.springframework.core.convert.converter.Converter
”、“org.springframework.core.convert.converter.GenericConverter
”或“org.springframework.core.convert.converter.ConverterFactory
”。
Throughout this reference manual, you can also see specific configuration and implementation examples of various endpoints that accept a message or any arbitrary Object
as an input parameter.
In the case of an Object
, such a parameter is mapped to a message payload or part of the payload or header (when using the Spring Expression Language).
However, the type of input parameter of the endpoint method sometimes does not match the type of the payload or its part.
In this scenario, we need to perform type conversion.
Spring Integration provides a convenient way for registering type converters (by using the Spring ConversionService
) within its own instance of a conversion service bean named integrationConversionService
.
That bean is automatically created as soon as the first converter is defined by using the Spring Integration infrastructure.
To register a converter, you can implement org.springframework.core.convert.converter.Converter
, org.springframework.core.convert.converter.GenericConverter
, or org.springframework.core.convert.converter.ConverterFactory
.
“Converter
”实现是最简单的,从一种类型转换为另一种类型。为了更复杂,例如转换为类层次结构,您可以实现“GenericConverter
”和可能的“ConditionalConverter
”。这些让您可以完全访问“from
”和“to
”类型描述符,从而实现复杂转换。例如,如果您有一个名为“Something
”的抽象类作为转换目标(参数类型、通道数据类型等),则有两个名为“Thing1
”和“Thing
”的具体实现,您希望根据输入类型将其中一个或另一个转换为目标,则“GenericConverter
”将非常适合。有关详细信息,请参阅这些接口的 Javadoc:
The Converter
implementation is the simplest and converts from a single type to another.
For more sophistication, such as converting to a class hierarchy, you can implement a GenericConverter
and possibly a ConditionalConverter
.
These give you complete access to the from
and to
type descriptors, enabling complex conversions.
For example, if you have an abstract class called Something
that is the target of your conversion (parameter type, channel data type, and so on), you have two concrete implementations called Thing1
and Thing
, and you wish to convert to one or the other based on the input type, the GenericConverter
would be a good fit.
For more information, see the Javadoc for these interfaces:
实现转换器后,您可以将其注册到方便的名称空间支持中,如下面的示例所示:
When you have implemented your converter, you can register it with convenient namespace support, as the following example shows:
<int:converter ref="sampleConverter"/>
<bean id="sampleConverter" class="foo.bar.TestConverter"/>
或者,您可以使用内部 bean,如下面的示例所示:
Alternately, you can use an inner bean, as the following example shows:
<int:converter>
<bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>
从 Spring Integration 4.0 开始,您可以使用注释创建前面的配置,如下面的示例所示:
Starting with Spring Integration 4.0, you can use annotations to create the preceding configuration, as the following example shows:
@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {
public Number convert(Boolean source) {
return source ? 1 : 0;
}
}
或者,您可以使用“@Configuration
”注释,如下面的示例所示:
Alternately, you can use the @Configuration
annotation, as the following example shows:
@Configuration
@EnableIntegration
public class ContextConfiguration {
@Bean
@IntegrationConverter
public SerializingConverter serializingConverter() {
return new SerializingConverter();
}
}
在配置应用程序上下文时,Spring框架允许您添加`conversionService`bean(参见 Configuring a ConversionService一章)。在需要时,会使用此服务在bean创建和配置期间执行适当的转换。
When configuring an application context, the Spring Framework lets you add a conversionService
bean (see Configuring a ConversionService chapter).
This service is used, when needed, to perform appropriate conversions during bean creation and configuration.
相比之下,“integrationConversionService
”用于运行时转换。这些用途截然不同。在连接 bean 构造函数参数和属性时打算使用的转换器可能会产生意想不到的结果,如果在运行时用于 Spring Integration 对数据类型通道、有效负载类型转换器等中的消息进行表达式求值。
In contrast, the integrationConversionService
is used for runtime conversions.
These uses are quite different.
Converters that are intended for use when wiring bean constructor arguments and properties may produce unintended results if used at runtime for Spring Integration expression evaluation against messages within data type channels, payload type transformers, and so on.
但是,如果您确实希望将 Spring“conversionService
”用作 Spring Integration 的“integrationConversionService
”,您可以在应用程序上下文中配置一个别名,如下面的示例所示:
However, if you do want to use the Spring conversionService
as the Spring Integration integrationConversionService
, you can configure an alias in the application context, as the following example shows:
<alias name="conversionService" alias="integrationConversionService"/>
在这种情况下,“conversionService
”提供的转换器可用于 Spring Integration 运行时转换。
In this case, the converters provided by the conversionService
are available for Spring Integration runtime conversion.
Content Type Conversion
从 5.0 版本开始,默认情况下,方法调用机制基于“org.springframework.messaging.handler.invocation.InvocableHandlerMethod
”基础结构。它的“HandlerMethodArgumentResolver
”实现(如“PayloadArgumentResolver
”和“MessageMethodArgumentResolver
”)可以使用“MessageConverter
”抽象将传入“payload
”转换为目标方法参数类型。转换可以基于“contentType
”消息头。为此,Spring Integration 提供了“ConfigurableCompositeMessageConverter
”,它委派给注册的转换器列表调用,直到其中一个返回非空结果。默认情况下,此转换器提供(按严格顺序):
Starting with version 5.0, by default, the method invocation mechanism is based on the org.springframework.messaging.handler.invocation.InvocableHandlerMethod
infrastructure.
Its HandlerMethodArgumentResolver
implementations (such as PayloadArgumentResolver
and MessageMethodArgumentResolver
) can use the MessageConverter
abstraction to convert an incoming payload
to the target method argument type.
The conversion can be based on the contentType
message header.
For this purpose, Spring Integration provides the ConfigurableCompositeMessageConverter
, which delegates to a list of registered converters to be invoked until one of them returns a non-null result.
By default, this converter provides (in strict order):
-
MappingJackson2MessageConverter
if the Jackson processor is present on the classpath
有关其目的和适当“contentType
”转换值的详细信息,请参见前一个列表中链接的 Javadoc。“ConfigurableCompositeMessageConverter
”被使用,因为可以向它提供任何其他“MessageConverter
”实现,包括或排除前面提到的默认转换器。它还可以作为应用程序上下文中的适当 bean 注册,从而覆盖默认转换器,如下面的示例所示:
See the Javadoc (linked in the preceding list) for more information about their purpose and appropriate contentType
values for conversion.
The ConfigurableCompositeMessageConverter
is used because it can be supplied with any other MessageConverter
implementations, including or excluding the previously mentioned default converters.
It can also be registered as an appropriate bean in the application context, overriding the default converter, as the following example shows:
@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
List<MessageConverter> converters =
Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
new JavaSerializationMessageConverter());
return new ConfigurableCompositeMessageConverter(converters);
}
这两个新转换器在默认值之前注册在合成中。您也可以不使用“ConfigurableCompositeMessageConverter
”,而是通过注册带有名称“integrationArgumentResolverMessageConverter
”的 bean(通过设置“IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME
”属性)来提供您自己的“MessageConverter
”。
Those two new converters are registered in the composite before the defaults.
You can also not use a ConfigurableCompositeMessageConverter
but provide your own MessageConverter
by registering a bean with the name, integrationArgumentResolverMessageConverter
(by setting the IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME
property).
在使用 SpEL 方法调用时,基于 |
The |
Asynchronous Polling
如果您希望轮询是异步的,则轮询器可以根据需要指定指向任何“TaskExecutor
”bean 的现有实例的“task-executor
”属性(Spring 3.0 通过“task
”名称空间提供方便的名称空间配置)。但是,在使用“TaskExecutor
”配置轮询器时,您必须理解某些事项。
If you want the polling to be asynchronous, a poller can optionally specify a task-executor
attribute that points to an existing instance of any TaskExecutor
bean (Spring 3.0 provides a convenient namespace configuration through the task
namespace).
However, there are certain things you must understand when configuring a poller with a TaskExecutor
.
问题在于有两个配置,轮询器和“TaskExecutor
”。它们必须彼此协调。否则,您最终可能会创建一个人为的内存泄漏。
The problem is that there are two configurations in place, the poller and the TaskExecutor
.
They must be in tune with each other.
Otherwise, you might end up creating an artificial memory leak.
考虑以下配置:
Consider the following configuration:
<int:channel id="publishChannel">
<int:queue />
</int:channel>
<int:service-activator input-channel="publishChannel" ref="myService">
<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>
<task:executor id="taskExecutor" pool-size="20" />
前面的配置展示了一个配置错误。
The preceding configuration demonstrates an out-of-tune configuration.
默认情况下,任务执行器有一个无界任务队列。轮询器会持续计划新任务,即使所有线程都已被阻塞,等待新消息到达或超时到期。假设有 20 个线程以 5 秒的超时时间执行任务,那么它们每秒会执行 4 个任务。然而,新任务每秒被计划 20 次,所以任务执行器中的内部队列每秒增长 16 个(在进程空闲时),因此我们出现了内存泄漏。
By default, the task executor has an unbounded task queue. The poller keeps scheduling new tasks even though all the threads are blocked, waiting for either a new message to arrive or the timeout to expire. Given that there are 20 threads executing tasks with a five-second timeout, they are executed at a rate of 4 per second. However, new tasks are being scheduled at a rate of 20 per second, so the internal queue in the task executor grows at a rate of 16 per second (while the process is idle), so we have a memory leak.
处理此问题的一种方法是设置任务执行器的 queue-capacity
属性。即使为 0 也是一个合理的值。您还可以通过指定如何处理无法排队的消息来管理它,方法是设置任务执行器的 rejection-policy
属性(例如,为 DISCARD
)。换句话说,在配置 TaskExecutor
时,您必须了解某些详细信息。有关该主题的更多详细信息,请参见 Spring 参考手册中的 “Task Execution and Scheduling”。
One of the ways to handle this is to set the queue-capacity
attribute of the task executor.
Even 0 is a reasonable value.
You can also manage it by specifying what to do with messages that can not be queued by setting the rejection-policy
attribute of the Task Executor (for example, to DISCARD
).
In other words, there are certain details you must understand when configuring TaskExecutor
.
See “Task Execution and Scheduling” in the Spring reference manual for more detail on the subject.
Endpoint Inner Beans
许多端点都是复合 bean。这包括所有使用者和所有轮询的入站通道适配器。使用者(轮询或事件驱动的)委托给 MessageHandler
。轮询的适配器通过委托给 MessageSource
获得消息。通常,获取委托 bean 的引用很有用,也许是为了在运行时或用于测试更改配置。这些 bean 可以从具有众所周知的名称的 ApplicationContext
中获取。MessageHandler
实例使用类似于 someConsumer.handler
的 bean ID 在应用程序上下文中注册(其中“consumer”是端点 id
属性的值)。MessageSource
实例使用类似于 somePolledAdapter.source
的 bean ID 注册,其中“somePolledAdapter”是适配器的 ID。
Many endpoints are composite beans.
This includes all consumers and all polled inbound channel adapters.
Consumers (polled or event-driven) delegate to a MessageHandler
.
Polled adapters obtain messages by delegating to a MessageSource
.
Often, it is useful to obtain a reference to the delegate bean, perhaps to change configuration at runtime or for testing.
These beans can be obtained from the ApplicationContext
with well known names.
MessageHandler
instances are registered with the application context with bean IDs similar to someConsumer.handler
(where 'consumer' is the value of the endpoint’s id
attribute).
MessageSource
instances are registered with bean IDs similar to somePolledAdapter.source
, where 'somePolledAdapter' is the ID of the adapter.
前面的内容仅适用于框架组件本身。相反,您可以使用内部 bean 定义,如下面的示例所示:
The preceding only applies to the framework component itself. You can instead use an inner bean definition, as the following example shows:
<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
output-channel = "outChannel" method="foo">
<beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>
该 Bean 被视为声明的任何内部 Bean,且不会在应用上下文里注册。如果您想以某种其他方式访问此 Bean,请在顶层使用 id
声明它,并改为使用 ref
属性。有关详细信息,请参见 Spring Documentation。
The bean is treated like any inner bean declared and is not registered with the application context.
If you wish to access this bean in some other manner, declare it at the top level with an id
and use the ref
attribute instead.
See the Spring Documentation for more information.