Poller

此部分描述了轮询在 Spring Integration 中的工作原理。

Polling Consumer

当消息端点(通道适配器)连接到通道并实例化时,它们会产生以下实例之一:

实际实现取决于这些端点连接到的信道的类型。连接到实现 link:https://docs.spring.io/spring/docs/current/javadoc-api/index.html?org/springframework/messaging/SubscribableChannel.html[org.springframework.messaging.SubscribableChannel`接口的信道的信道适配器生成`EventDrivenConsumer`实例。另一方面,连接到实现 link:https://docs.spring.io/spring/docs/current/javadoc-api/index.html?org/springframework/messaging/PollableChannel.html[`org.springframework.messaging.PollableChannel`接口(如`QueueChannel)的信道的信道适配器生成`PollingConsumer`实例。

轮询消费者让 Spring Integration 组件主动轮询消息,而不是以事件驱动的形式处理消息。

它们代表了众多消息传递场景中的一个关键的横切关注点。在 Spring Integration 中,轮询使用者基于同名的模式,这在 Gregor Hohpe 和 Bobby Woolf 合著的 Enterprise Integration Patterns 中有所描述。您可以在 book’s website 上找到该模式的描述。

有关更多信息轮询使用者配置,请参阅 Message Endpoints

Pollable Message Source

Spring 集成提供了轮询使用者的第二个版本。当使用入站通道适配器时,这些适配器通常由 `SourcePollingChannelAdapter`包装。例如,当从远程 FTP 服务器位置检索消息时,FTP Inbound Channel Adapter中描述的适配器配置有轮询器以定期检索消息。因此,当使用轮询器配置组件时,生成的实例将为以下类型之一:

这意味着轮询器同时用于入站和出站消息传递场景。以下是使用轮询器的部分用例:

  • 轮询特定外部系统,例如 FTP 服务器、数据库和 Web 服务

  • 轮询内部(可轮询)消息通道

  • 轮询内部服务(例如重复执行 Java 类上的方法)

AOP 通知类可以应用于轮询器,例如事务通知,以便启动一个事务。从 4.1 版本开始,将提供 PollSkipAdvice。轮询器使用触发器来确定下次轮询的时间。PollSkipAdvice 可以用来禁止(跳过)轮询,也许是因为有一些下游条件会阻止消息被处理。要使用此通知,您必须向它提供 PollSkipStrategy 的实现。从 4.2.5 版本开始,将提供 SimplePollSkipStrategy。要使用它,您可以将一个实例作为 bean 添加到应用程序上下文中,将其注入到 PollSkipAdvice 中,然后将它添加到轮询器的通知链中。要跳过轮询,请调用 skipPolls()。要恢复轮询,请调用 reset()。4.2 版本在此领域添加了更大的灵活性。请参阅 Conditional Pollers

本篇的目的是仅概览轮询使用者,以及它们如何适合于消息通道(请参见 Message Channels)和通道适配器(请参见 Channel Adapter)的概念。有关常规消息传递端点及特别轮询使用者的详细信息,请参见 Message Endpoints

Deferred Acknowledgment Pollable Message Source

从版本 5.0.1 开始,某些模块提供了 MessageSource 实现,这些实现支持将确认推迟到下游流完成(或将消息移交到另一个线程)。这目前仅限于 AmqpMessageSourceKafkaMessageSource

使用这些消息来源,IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 标头(请参阅 xref:message.adoc#message-header-accessor[MessageHeaderAccessor API)添加到消息中。在与可轮询消息来源一起使用时,标头值是 AcknowledgmentCallback 的一个实例,如下所示:

@FunctionalInterface
public interface AcknowledgmentCallback {

    void acknowledge(Status status);

    boolean isAcknowledged();

    void noAutoAck();

    default boolean isAutoAck();

    enum Status {

        /**
         * Mark the message as accepted.
         */
        ACCEPT,

        /**
         * Mark the message as rejected.
         */
        REJECT,

        /**
         * Reject the message and requeue so that it will be redelivered.
         */
        REQUEUE

    }

}

并非所有消息源(例如,KafkaMessageSource)都支持 REJECT 状态。它的处理方式与 ACCEPT 相同。

应用程序可以随时确认消息,如下面的示例所示:

Message<?> received = source.receive();

...

StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
        .acknowledge(Status.ACCEPT);

如果将 MessageSource 连接到 SourcePollingChannelAdapter,当轮询器线程在下游流完成之后返回适配器时,适配器将检查确认是否已得到确认,如果没有,则将其状态设置为 ACCEPT 它(或在流抛出异常时 REJECT)。状态值在 AcknowledgmentCallback.Status enumeration 中定义。

Spring Integration 提供 MessageSourcePollingTemplate 以对 MessageSource 执行临时轮询。当 MessageHandler 回调返回(或抛出异常)时,这也负责在 AcknowledgmentCallback 上设置 ACCEPTREJECT。以下示例展示了如何使用 MessageSourcePollingTemplate 进行轮询:

MessageSourcePollingTemplate template =
    new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
    ...
});

在两种情况下(SourcePollingChannelAdapterMessageSourcePollingTemplate),可以通过在回调上调用 noAutoAck() 禁用自动确认/否定确认。如果你将消息移交给另一个线程并希望稍后确认,则可能这样做。并非所有实现都支持此功能(例如,Apache Kafka 不支持,因为偏移量提交必须在同一线程上执行)。

Conditional Pollers for Message Sources

本节介绍如何使用条件轮询器。

Background

advice-chain 中的轮询器上的 Advice 对象会建议整个轮询任务(既是消息检索,也是处理)。这些“around advice”方法无法访问轮询的任何上下文——仅能访问轮询本身。这对于要求(例如,使任务具有事务性或由于某些外部条件而跳过轮询)非常适合,如前所述。如果我们希望根据轮询 receive 部分的结果采取某些操作,或者如果我们希望根据条件调整轮询器,该怎么办?对于这些实例,Spring Integration 提供了“Smart”轮询。

“Smart” Polling

版本 5.3 引入了 ReceiveMessageAdvice 接口。在 advice-chain 中实现此接口的任何 Advice 对象仅应用于 receive() 操作 - MessageSource.receive()PollableChannel.receive(timeout)。因此,它们只能用于 SourcePollingChannelAdapterPollingConsumer。此类实现了以下方法:

  • beforeReceive(Object source)`此方法在 `Object.receive() 方法之前调用。它允许您检查和重新配置源。返回 false 取消此轮询(类似于前面提到的 PollSkipAdvice )。

  • Message&lt;?&gt; afterReceive(Message&lt;?&gt; result, Object source)`此方法在 `receive() 方法之后调用。同样,您可以重新配置源或执行任何操作(可能取决于结果,如果源未创建消息,则可以是 null )。您甚至可以返回不同的消息

Example 1. Thread safety

如果 Advice 改变了源,则不应使用 TaskExecutor 配置轮询器。如果 Advice 改变了源,则此类改变是线程不安全的,并且可能会导致意外结果,尤其对于高频率轮询器而言。如果你需要并发处理轮询结果,请考虑使用下游 ExecutorChannel,而不是向轮询器添加一个执行器。

Example 2. Advice Chain Ordering

你应该了解 advice chain 在初始化期间是如何处理的。未实现 ReceiveMessageAdviceAdvice 对象将应用于整个轮询过程,并将在所有 ReceiveMessageAdvice 之前按照顺序首先调用。然后,将按顺序调用 ReceiveMessageAdvice 对象,围绕源 receive() 方法。例如,如果你有 Advice 对象 a, b, c, d,其中 bdReceiveMessageAdvice,则对象将按以下顺序应用:a, c, b, d。此外,如果源已经是 Proxy,则会在调用任何现有的 Advice 对象之后调用 ReceiveMessageAdvice。如果你希望更改顺序,则必须自己连接代理。

SimpleActiveIdleReceiveMessageAdvice

此建议是 ReceiveMessageAdvice 的简单实现。当与 DynamicPeriodicTrigger 结合使用时,它会根据上一次轮询是否产生消息来调整轮询频率。轮询器还必须引用同一个 DynamicPeriodicTrigger

Example 3. Important: Async Handoff

SimpleActiveIdleReceiveMessageAdvice 根据 receive() 结果修改触发器。只有在轮询器线程上调用通知时,这才能正常工作。如果轮询器有 task-executor,它不起作用。要使用此通知,当您希望在轮询的结果之后使用异步操作时,稍后完成异步处理,也许是使用 ExecutorChannel

CompoundTriggerAdvice

此建议允许根据轮询是否返回消息来选择两个触发器之一。考虑一个使用 CronTrigger 的轮询器。CronTrigger 实例是不可变的,因此构建后无法更改它们。考虑一个用例,我们希望使用 cron 表达式每小时触发一次轮询,但是如果没有收到消息,则每分钟轮询一次,并在检索到消息后恢复使用 cron 表达式。

建议(和轮询器)为此目的使用 CompoundTrigger。触发器的 primary 触发器可以是 CronTrigger。当建议检测到没有收到消息时,它会将次要触发器添加到 CompoundTrigger。当调用 CompoundTrigger 实例的 nextExecutionTime 方法时,它将委派给次要触发器(如果存在)。否则,它将委派给主触发器。

轮询器必须还引用相同的 CompoundTrigger

以下示例展示了每小时 cron 表达式的配置,以及每分钟进行一次回退:

<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
    <bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
    <int:poller trigger="compoundTrigger">
        <int:advice-chain>
            <bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
                <constructor-arg ref="compoundTrigger"/>
                <constructor-arg ref="secondary"/>
            </bean>
        </int:advice-chain>
    </int:poller>
</int:inbound-channel-adapter>

<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
    <constructor-arg ref="primary" />
</bean>

<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
    <constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>

<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
    <constructor-arg value="60000" />
</bean>
Example 4. Important: Async Handoff

CompoundTriggerAdvice 根据 receive() 结果修改触发器。只有在轮询器线程上调用通知时,这才能正常工作。如果轮询器有 task-executor,它不起作用。要使用此通知,当您希望在轮询的结果之后使用异步操作时,稍后完成异步处理,也许是使用 ExecutorChannel

MessageSource-only Advices

某些建议可能仅适用于 MessageSource.receive(),而对于 `PollableChannel`来说毫无意义。为此,仍然存在 `MessageSourceMutator`接口(`ReceiveMessageAdvice`的扩展)。有关更多信息,请参阅 Inbound Channel Adapters: Polling Multiple Servers and Directories