Poller
此部分描述了轮询在 Spring Integration 中的工作原理。
Polling Consumer
当消息端点(通道适配器)连接到通道并实例化时,它们会产生以下实例之一:
实际实现取决于这些端点连接到的信道的类型。连接到实现 link:https://docs.spring.io/spring/docs/current/javadoc-api/index.html?org/springframework/messaging/SubscribableChannel.html[org.springframework.messaging.SubscribableChannel`接口的信道的信道适配器生成`EventDrivenConsumer`实例。另一方面,连接到实现 link:https://docs.spring.io/spring/docs/current/javadoc-api/index.html?org/springframework/messaging/PollableChannel.html[`org.springframework.messaging.PollableChannel`接口(如`QueueChannel
)的信道的信道适配器生成`PollingConsumer`实例。
轮询消费者让 Spring Integration 组件主动轮询消息,而不是以事件驱动的形式处理消息。
它们代表了众多消息传递场景中的一个关键的横切关注点。在 Spring Integration 中,轮询使用者基于同名的模式,这在 Gregor Hohpe 和 Bobby Woolf 合著的 Enterprise Integration Patterns 中有所描述。您可以在 book’s website 上找到该模式的描述。
有关更多信息轮询使用者配置,请参阅 Message Endpoints。
Pollable Message Source
Spring 集成提供了轮询使用者的第二个版本。当使用入站通道适配器时,这些适配器通常由 `SourcePollingChannelAdapter`包装。例如,当从远程 FTP 服务器位置检索消息时,FTP Inbound Channel Adapter中描述的适配器配置有轮询器以定期检索消息。因此,当使用轮询器配置组件时,生成的实例将为以下类型之一:
这意味着轮询器同时用于入站和出站消息传递场景。以下是使用轮询器的部分用例:
-
轮询特定外部系统,例如 FTP 服务器、数据库和 Web 服务
-
轮询内部(可轮询)消息通道
-
轮询内部服务(例如重复执行 Java 类上的方法)
AOP 通知类可以应用于轮询器,例如事务通知,以便启动一个事务。从 4.1 版本开始,将提供 |
本篇的目的是仅概览轮询使用者,以及它们如何适合于消息通道(请参见 Message Channels)和通道适配器(请参见 Channel Adapter)的概念。有关常规消息传递端点及特别轮询使用者的详细信息,请参见 Message Endpoints。
Deferred Acknowledgment Pollable Message Source
从版本 5.0.1 开始,某些模块提供了 MessageSource
实现,这些实现支持将确认推迟到下游流完成(或将消息移交到另一个线程)。这目前仅限于 AmqpMessageSource
和 KafkaMessageSource
。
使用这些消息来源,IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
标头(请参阅 xref:message.adoc#message-header-accessor[MessageHeaderAccessor
API)添加到消息中。在与可轮询消息来源一起使用时,标头值是 AcknowledgmentCallback
的一个实例,如下所示:
@FunctionalInterface
public interface AcknowledgmentCallback {
void acknowledge(Status status);
boolean isAcknowledged();
void noAutoAck();
default boolean isAutoAck();
enum Status {
/**
* Mark the message as accepted.
*/
ACCEPT,
/**
* Mark the message as rejected.
*/
REJECT,
/**
* Reject the message and requeue so that it will be redelivered.
*/
REQUEUE
}
}
并非所有消息源(例如,KafkaMessageSource
)都支持 REJECT
状态。它的处理方式与 ACCEPT
相同。
应用程序可以随时确认消息,如下面的示例所示:
Message<?> received = source.receive();
...
StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
.acknowledge(Status.ACCEPT);
如果将 MessageSource
连接到 SourcePollingChannelAdapter
,当轮询器线程在下游流完成之后返回适配器时,适配器将检查确认是否已得到确认,如果没有,则将其状态设置为 ACCEPT
它(或在流抛出异常时 REJECT
)。状态值在 AcknowledgmentCallback.Status
enumeration 中定义。
Spring Integration 提供 MessageSourcePollingTemplate
以对 MessageSource
执行临时轮询。当 MessageHandler
回调返回(或抛出异常)时,这也负责在 AcknowledgmentCallback
上设置 ACCEPT
或 REJECT
。以下示例展示了如何使用 MessageSourcePollingTemplate
进行轮询:
MessageSourcePollingTemplate template =
new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
...
});
在两种情况下(SourcePollingChannelAdapter
和 MessageSourcePollingTemplate
),可以通过在回调上调用 noAutoAck()
禁用自动确认/否定确认。如果你将消息移交给另一个线程并希望稍后确认,则可能这样做。并非所有实现都支持此功能(例如,Apache Kafka 不支持,因为偏移量提交必须在同一线程上执行)。
Conditional Pollers for Message Sources
本节介绍如何使用条件轮询器。
Background
advice-chain
中的轮询器上的 Advice
对象会建议整个轮询任务(既是消息检索,也是处理)。这些“around advice
”方法无法访问轮询的任何上下文——仅能访问轮询本身。这对于要求(例如,使任务具有事务性或由于某些外部条件而跳过轮询)非常适合,如前所述。如果我们希望根据轮询 receive
部分的结果采取某些操作,或者如果我们希望根据条件调整轮询器,该怎么办?对于这些实例,Spring Integration 提供了“Smart
”轮询。
“Smart” Polling
版本 5.3 引入了 ReceiveMessageAdvice
接口。在 advice-chain
中实现此接口的任何 Advice
对象仅应用于 receive()
操作 - MessageSource.receive()
和 PollableChannel.receive(timeout)
。因此,它们只能用于 SourcePollingChannelAdapter
或 PollingConsumer
。此类实现了以下方法:
-
beforeReceive(Object source)`此方法在 `Object.receive()
方法之前调用。它允许您检查和重新配置源。返回false
取消此轮询(类似于前面提到的PollSkipAdvice
)。 -
Message<?> afterReceive(Message<?> result, Object source)`此方法在 `receive()
方法之后调用。同样,您可以重新配置源或执行任何操作(可能取决于结果,如果源未创建消息,则可以是null
)。您甚至可以返回不同的消息
如果 Advice
改变了源,则不应使用 TaskExecutor
配置轮询器。如果 Advice
改变了源,则此类改变是线程不安全的,并且可能会导致意外结果,尤其对于高频率轮询器而言。如果你需要并发处理轮询结果,请考虑使用下游 ExecutorChannel
,而不是向轮询器添加一个执行器。
你应该了解 advice chain 在初始化期间是如何处理的。未实现 ReceiveMessageAdvice
的 Advice
对象将应用于整个轮询过程,并将在所有 ReceiveMessageAdvice
之前按照顺序首先调用。然后,将按顺序调用 ReceiveMessageAdvice
对象,围绕源 receive()
方法。例如,如果你有 Advice
对象 a, b, c, d
,其中 b
和 d
是 ReceiveMessageAdvice
,则对象将按以下顺序应用:a, c, b, d
。此外,如果源已经是 Proxy
,则会在调用任何现有的 Advice
对象之后调用 ReceiveMessageAdvice
。如果你希望更改顺序,则必须自己连接代理。
SimpleActiveIdleReceiveMessageAdvice
此建议是 ReceiveMessageAdvice
的简单实现。当与 DynamicPeriodicTrigger
结合使用时,它会根据上一次轮询是否产生消息来调整轮询频率。轮询器还必须引用同一个 DynamicPeriodicTrigger
。
SimpleActiveIdleReceiveMessageAdvice
根据 receive()
结果修改触发器。只有在轮询器线程上调用通知时,这才能正常工作。如果轮询器有 task-executor
,它不起作用。要使用此通知,当您希望在轮询的结果之后使用异步操作时,稍后完成异步处理,也许是使用 ExecutorChannel
。
CompoundTriggerAdvice
此建议允许根据轮询是否返回消息来选择两个触发器之一。考虑一个使用 CronTrigger
的轮询器。CronTrigger
实例是不可变的,因此构建后无法更改它们。考虑一个用例,我们希望使用 cron 表达式每小时触发一次轮询,但是如果没有收到消息,则每分钟轮询一次,并在检索到消息后恢复使用 cron 表达式。
建议(和轮询器)为此目的使用 CompoundTrigger
。触发器的 primary
触发器可以是 CronTrigger
。当建议检测到没有收到消息时,它会将次要触发器添加到 CompoundTrigger
。当调用 CompoundTrigger
实例的 nextExecutionTime
方法时,它将委派给次要触发器(如果存在)。否则,它将委派给主触发器。
轮询器必须还引用相同的 CompoundTrigger
。
以下示例展示了每小时 cron 表达式的配置,以及每分钟进行一次回退:
<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
<bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
<int:poller trigger="compoundTrigger">
<int:advice-chain>
<bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
<constructor-arg ref="compoundTrigger"/>
<constructor-arg ref="secondary"/>
</bean>
</int:advice-chain>
</int:poller>
</int:inbound-channel-adapter>
<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
<constructor-arg ref="primary" />
</bean>
<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
<constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>
<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
<constructor-arg value="60000" />
</bean>
CompoundTriggerAdvice
根据 receive()
结果修改触发器。只有在轮询器线程上调用通知时,这才能正常工作。如果轮询器有 task-executor
,它不起作用。要使用此通知,当您希望在轮询的结果之后使用异步操作时,稍后完成异步处理,也许是使用 ExecutorChannel
。
MessageSource-only Advices
某些建议可能仅适用于 MessageSource.receive()
,而对于 `PollableChannel`来说毫无意义。为此,仍然存在 `MessageSourceMutator`接口(`ReceiveMessageAdvice`的扩展)。有关更多信息,请参阅 Inbound Channel Adapters: Polling Multiple Servers and Directories。