Poller
此部分描述了轮询在 Spring Integration 中的工作原理。
This section describes how polling works in Spring Integration.
Polling Consumer
当消息端点(通道适配器)连接到通道并实例化时,它们会产生以下实例之一:
When Message Endpoints (Channel Adapters) are connected to channels and instantiated, they produce one of the following instances:
实际实现取决于这些端点连接到的信道的类型。连接到实现 link:https://docs.spring.io/spring/docs/current/javadoc-api/index.html?org/springframework/messaging/SubscribableChannel.html[org.springframework.messaging.SubscribableChannel`接口的信道的信道适配器生成`EventDrivenConsumer`实例。另一方面,连接到实现 link:https://docs.spring.io/spring/docs/current/javadoc-api/index.html?org/springframework/messaging/PollableChannel.html[`org.springframework.messaging.PollableChannel`接口(如`QueueChannel
)的信道的信道适配器生成`PollingConsumer`实例。
The actual implementation depends on the type of channel to which these endpoints connect.
A channel adapter connected to a channel that implements the org.springframework.messaging.SubscribableChannel
interface produces an instance of EventDrivenConsumer
.
On the other hand, a channel adapter connected to a channel that implements the org.springframework.messaging.PollableChannel
interface (such as a QueueChannel
) produces an instance of PollingConsumer
.
轮询消费者让 Spring Integration 组件主动轮询消息,而不是以事件驱动的形式处理消息。
Polling consumers let Spring Integration components actively poll for Messages rather than process messages in an event-driven manner.
它们代表了众多消息传递场景中的一个关键的横切关注点。在 Spring Integration 中,轮询使用者基于同名的模式,这在 Gregor Hohpe 和 Bobby Woolf 合著的 Enterprise Integration Patterns 中有所描述。您可以在 book’s website 上找到该模式的描述。
They represent a critical cross-cutting concern in many messaging scenarios. In Spring Integration, polling consumers are based on the pattern with the same name, which is described in the book Enterprise Integration Patterns, by Gregor Hohpe and Bobby Woolf. You can find a description of the pattern on the book’s website.
有关更多信息轮询使用者配置,请参阅 Message Endpoints。
For more information polling consumer configuration, see Message Endpoints.
Pollable Message Source
Spring 集成提供了轮询使用者的第二个版本。当使用入站通道适配器时,这些适配器通常由 `SourcePollingChannelAdapter`包装。例如,当从远程 FTP 服务器位置检索消息时,FTP Inbound Channel Adapter中描述的适配器配置有轮询器以定期检索消息。因此,当使用轮询器配置组件时,生成的实例将为以下类型之一:
Spring Integration offers a second variation of the polling consumer pattern.
When inbound channel adapters are used, these adapters are often wrapped by a SourcePollingChannelAdapter
.
For example, when retrieving messages from a remote FTP Server location, the adapter described in FTP Inbound Channel Adapter is configured with a poller to periodically retrieve messages.
So, when components are configured with pollers, the resulting instances are of one of the following types:
这意味着轮询器同时用于入站和出站消息传递场景。以下是使用轮询器的部分用例:
This means that pollers are used in both inbound and outbound messaging scenarios. Here are some use cases in which pollers are used:
-
Polling certain external systems, such as FTP Servers, Databases, and Web Services
-
Polling internal (pollable) message channels
-
Polling internal services (such as repeatedly executing methods on a Java class)
AOP 通知类可以应用于轮询器,例如事务通知,以便启动一个事务。从 4.1 版本开始,将提供 |
AOP advice classes can be applied to pollers, in an |
本篇的目的是仅概览轮询使用者,以及它们如何适合于消息通道(请参见 Message Channels)和通道适配器(请参见 Channel Adapter)的概念。有关常规消息传递端点及特别轮询使用者的详细信息,请参见 Message Endpoints。
This chapter is meant to only give a high-level overview of polling consumers and how they fit into the concept of message channels (see Message Channels) and channel adapters (see Channel Adapter). For more information regarding messaging endpoints in general and polling consumers in particular, see Message Endpoints.
Deferred Acknowledgment Pollable Message Source
从版本 5.0.1 开始,某些模块提供了 MessageSource
实现,这些实现支持将确认推迟到下游流完成(或将消息移交到另一个线程)。这目前仅限于 AmqpMessageSource
和 KafkaMessageSource
。
Starting with version 5.0.1, certain modules provide MessageSource
implementations that support deferring acknowledgment until the downstream flow completes (or hands off the message to another thread).
This is currently limited to the AmqpMessageSource
and the KafkaMessageSource
.
使用这些消息来源,IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
标头(请参阅 xref:message.adoc#message-header-accessor[MessageHeaderAccessor
API)添加到消息中。在与可轮询消息来源一起使用时,标头值是 AcknowledgmentCallback
的一个实例,如下所示:
With these message sources, the IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
header (see MessageHeaderAccessor
API) is added to the message.
When used with pollable message sources, the value of the header is an instance of AcknowledgmentCallback
, as the following example shows:
@FunctionalInterface
public interface AcknowledgmentCallback {
void acknowledge(Status status);
boolean isAcknowledged();
void noAutoAck();
default boolean isAutoAck();
enum Status {
/**
* Mark the message as accepted.
*/
ACCEPT,
/**
* Mark the message as rejected.
*/
REJECT,
/**
* Reject the message and requeue so that it will be redelivered.
*/
REQUEUE
}
}
并非所有消息源(例如,KafkaMessageSource
)都支持 REJECT
状态。它的处理方式与 ACCEPT
相同。
Not all message sources (for example, a KafkaMessageSource
) support the REJECT
status.
It is treated the same as ACCEPT
.
应用程序可以随时确认消息,如下面的示例所示:
Applications can acknowledge a message at any time, as the following example shows:
Message<?> received = source.receive();
...
StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
.acknowledge(Status.ACCEPT);
如果将 MessageSource
连接到 SourcePollingChannelAdapter
,当轮询器线程在下游流完成之后返回适配器时,适配器将检查确认是否已得到确认,如果没有,则将其状态设置为 ACCEPT
它(或在流抛出异常时 REJECT
)。状态值在 AcknowledgmentCallback.Status
enumeration 中定义。
If the MessageSource
is wired into a SourcePollingChannelAdapter
, when the poller thread returns to the adapter after the downstream flow completes, the adapter checks whether the acknowledgment has already been acknowledged and, if not, sets its status to ACCEPT
it (or REJECT
if the flow throws an exception).
The status values are defined in the AcknowledgmentCallback.Status
enumeration.
Spring Integration 提供 MessageSourcePollingTemplate
以对 MessageSource
执行临时轮询。当 MessageHandler
回调返回(或抛出异常)时,这也负责在 AcknowledgmentCallback
上设置 ACCEPT
或 REJECT
。以下示例展示了如何使用 MessageSourcePollingTemplate
进行轮询:
Spring Integration provides MessageSourcePollingTemplate
to perform ad-hoc polling of a MessageSource
.
This, too, takes care of setting ACCEPT
or REJECT
on the AcknowledgmentCallback
when the MessageHandler
callback returns (or throws an exception).
The following example shows how to poll with the MessageSourcePollingTemplate
:
MessageSourcePollingTemplate template =
new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
...
});
在两种情况下(SourcePollingChannelAdapter
和 MessageSourcePollingTemplate
),可以通过在回调上调用 noAutoAck()
禁用自动确认/否定确认。如果你将消息移交给另一个线程并希望稍后确认,则可能这样做。并非所有实现都支持此功能(例如,Apache Kafka 不支持,因为偏移量提交必须在同一线程上执行)。
In both cases (SourcePollingChannelAdapter
and MessageSourcePollingTemplate
), you can disable auto ack/nack by calling noAutoAck()
on the callback.
You might do this if you hand off the message to another thread and wish to acknowledge later.
Not all implementations support this (for example, Apache Kafka does not, because the offset commit has to be performed on the same thread).
Conditional Pollers for Message Sources
本节介绍如何使用条件轮询器。
This section covers how to use conditional pollers.
Background
advice-chain
中的轮询器上的 Advice
对象会建议整个轮询任务(既是消息检索,也是处理)。这些“around advice
”方法无法访问轮询的任何上下文——仅能访问轮询本身。这对于要求(例如,使任务具有事务性或由于某些外部条件而跳过轮询)非常适合,如前所述。如果我们希望根据轮询 receive
部分的结果采取某些操作,或者如果我们希望根据条件调整轮询器,该怎么办?对于这些实例,Spring Integration 提供了“Smart
”轮询。
Advice
objects, in an advice-chain
on a poller, advise the whole polling task (both message retrieval and processing).
These “around advice” methods do not have access to any context for the poll — only the poll itself.
This is fine for requirements such as making a task transactional or skipping a poll due to some external condition, as discussed earlier.
What if we wish to take some action depending on the result of the receive
part of the poll or if we want to adjust the poller depending on conditions? For those instances, Spring Integration offers “Smart” Polling.
“Smart” Polling
版本 5.3 引入了 ReceiveMessageAdvice
接口。在 advice-chain
中实现此接口的任何 Advice
对象仅应用于 receive()
操作 - MessageSource.receive()
和 PollableChannel.receive(timeout)
。因此,它们只能用于 SourcePollingChannelAdapter
或 PollingConsumer
。此类实现了以下方法:
Version 5.3 introduced the ReceiveMessageAdvice
interface.
Any Advice
objects in the advice-chain
that implement this interface are applied only to the receive()
operation - MessageSource.receive()
and PollableChannel.receive(timeout)
.
Therefore, they can be applied only for the SourcePollingChannelAdapter
or PollingConsumer
.
Such classes implement the following methods:
-
beforeReceive(Object source)
This method is called before theObject.receive()
method. It lets you examine and reconfigure the source. Returningfalse
cancels this poll (similar to thePollSkipAdvice
mentioned earlier). -
Message<?> afterReceive(Message<?> result, Object source)
This method is called after thereceive()
method. Again, you can reconfigure the source or take any action (perhaps depending on the result, which can benull
if there was no message created by the source). You can even return a different message
如果 Advice
改变了源,则不应使用 TaskExecutor
配置轮询器。如果 Advice
改变了源,则此类改变是线程不安全的,并且可能会导致意外结果,尤其对于高频率轮询器而言。如果你需要并发处理轮询结果,请考虑使用下游 ExecutorChannel
,而不是向轮询器添加一个执行器。
If an Advice
mutates the source, you should not configure the poller with a TaskExecutor
.
If an Advice
mutates the source, such mutations are not thread safe and could cause unexpected results, especially with high frequency pollers.
If you need to process poll results concurrently, consider using a downstream ExecutorChannel
instead of adding an executor to the poller.
你应该了解 advice chain 在初始化期间是如何处理的。未实现 ReceiveMessageAdvice
的 Advice
对象将应用于整个轮询过程,并将在所有 ReceiveMessageAdvice
之前按照顺序首先调用。然后,将按顺序调用 ReceiveMessageAdvice
对象,围绕源 receive()
方法。例如,如果你有 Advice
对象 a, b, c, d
,其中 b
和 d
是 ReceiveMessageAdvice
,则对象将按以下顺序应用:a, c, b, d
。此外,如果源已经是 Proxy
,则会在调用任何现有的 Advice
对象之后调用 ReceiveMessageAdvice
。如果你希望更改顺序,则必须自己连接代理。
You should understand how the advice chain is processed during initialization.
Advice
objects that do not implement ReceiveMessageAdvice
are applied to the whole poll process and are all invoked first, in order, before any ReceiveMessageAdvice
.
Then ReceiveMessageAdvice
objects are invoked in order around the source receive()
method.
If you have, for example, Advice
objects a, b, c, d
, where b
and d
are ReceiveMessageAdvice
, the objects are applied in the following order: a, c, b, d
.
Also, if a source is already a Proxy
, the ReceiveMessageAdvice
is invoked after any existing Advice
objects.
If you wish to change the order, you must wire up the proxy yourself.
SimpleActiveIdleReceiveMessageAdvice
此建议是 ReceiveMessageAdvice
的简单实现。当与 DynamicPeriodicTrigger
结合使用时,它会根据上一次轮询是否产生消息来调整轮询频率。轮询器还必须引用同一个 DynamicPeriodicTrigger
。
This advice is a simple implementation of ReceiveMessageAdvice
.
When used in conjunction with a DynamicPeriodicTrigger
, it adjusts the polling frequency, depending on whether the previous poll resulted in a message or not.
The poller must also have a reference to the same DynamicPeriodicTrigger
.
SimpleActiveIdleReceiveMessageAdvice
根据 receive()
结果修改触发器。只有在轮询器线程上调用通知时,这才能正常工作。如果轮询器有 task-executor
,它不起作用。要使用此通知,当您希望在轮询的结果之后使用异步操作时,稍后完成异步处理,也许是使用 ExecutorChannel
。
SimpleActiveIdleReceiveMessageAdvice
modifies the trigger based on the receive()
result.
This works only if the advice is called on the poller thread.
It does not work if the poller has a task-executor
.
To use this advice where you wish to use async operations after the result of a poll, do the async handoff later, perhaps by using an ExecutorChannel
.
CompoundTriggerAdvice
此建议允许根据轮询是否返回消息来选择两个触发器之一。考虑一个使用 CronTrigger
的轮询器。CronTrigger
实例是不可变的,因此构建后无法更改它们。考虑一个用例,我们希望使用 cron 表达式每小时触发一次轮询,但是如果没有收到消息,则每分钟轮询一次,并在检索到消息后恢复使用 cron 表达式。
This advice allows the selection of one of two triggers based on whether a poll returns a message or not.
Consider a poller that uses a CronTrigger
.
CronTrigger
instances are immutable, so they cannot be altered once constructed.
Consider a use case where we want to use a cron expression to trigger a poll once each hour but, if no message is received, poll once per minute and, when a message is retrieved, revert to using the cron expression.
建议(和轮询器)为此目的使用 CompoundTrigger
。触发器的 primary
触发器可以是 CronTrigger
。当建议检测到没有收到消息时,它会将次要触发器添加到 CompoundTrigger
。当调用 CompoundTrigger
实例的 nextExecutionTime
方法时,它将委派给次要触发器(如果存在)。否则,它将委派给主触发器。
The advice (and poller) use a CompoundTrigger
for this purpose.
The trigger’s primary
trigger can be a CronTrigger
.
When the advice detects that no message is received, it adds the secondary trigger to the CompoundTrigger
.
When the CompoundTrigger
instance’s nextExecutionTime
method is invoked, it delegates to the secondary trigger, if present.
Otherwise, it delegates to the primary trigger.
轮询器必须还引用相同的 CompoundTrigger
。
The poller must also have a reference to the same CompoundTrigger
.
以下示例展示了每小时 cron 表达式的配置,以及每分钟进行一次回退:
The following example shows the configuration for the hourly cron expression with a fallback to every minute:
<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
<bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
<int:poller trigger="compoundTrigger">
<int:advice-chain>
<bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
<constructor-arg ref="compoundTrigger"/>
<constructor-arg ref="secondary"/>
</bean>
</int:advice-chain>
</int:poller>
</int:inbound-channel-adapter>
<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
<constructor-arg ref="primary" />
</bean>
<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
<constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>
<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
<constructor-arg value="60000" />
</bean>
CompoundTriggerAdvice
根据 receive()
结果修改触发器。只有在轮询器线程上调用通知时,这才能正常工作。如果轮询器有 task-executor
,它不起作用。要使用此通知,当您希望在轮询的结果之后使用异步操作时,稍后完成异步处理,也许是使用 ExecutorChannel
。
CompoundTriggerAdvice
modifies the trigger based on the receive()
result.
This works only if the advice is called on the poller thread.
It does not work if the poller has a task-executor
.
To use this advice where you wish to use async operations after the result of a poll, do the async handoff later, perhaps by using an ExecutorChannel
.
MessageSource-only Advices
某些建议可能仅适用于 MessageSource.receive()
,而对于 `PollableChannel`来说毫无意义。为此,仍然存在 `MessageSourceMutator`接口(`ReceiveMessageAdvice`的扩展)。有关更多信息,请参阅 Inbound Channel Adapters: Polling Multiple Servers and Directories。
Some advices might be applied only for the MessageSource.receive()
and they don’t make sense for PollableChannel
.
For this purpose a MessageSourceMutator
interface (an extension of the ReceiveMessageAdvice
) is still present.
See Inbound Channel Adapters: Polling Multiple Servers and Directories for more information.