Poller

此部分描述了轮询在 Spring Integration 中的工作原理。

This section describes how polling works in Spring Integration.

Polling Consumer

当消息端点(通道适配器)连接到通道并实例化时,它们会产生以下实例之一:

When Message Endpoints (Channel Adapters) are connected to channels and instantiated, they produce one of the following instances:

实际实现取决于这些端点连接到的信道的类型。连接到实现 link:https://docs.spring.io/spring/docs/current/javadoc-api/index.html?org/springframework/messaging/SubscribableChannel.html[org.springframework.messaging.SubscribableChannel`接口的信道的信道适配器生成`EventDrivenConsumer`实例。另一方面,连接到实现 link:https://docs.spring.io/spring/docs/current/javadoc-api/index.html?org/springframework/messaging/PollableChannel.html[`org.springframework.messaging.PollableChannel`接口(如`QueueChannel)的信道的信道适配器生成`PollingConsumer`实例。

The actual implementation depends on the type of channel to which these endpoints connect. A channel adapter connected to a channel that implements the org.springframework.messaging.SubscribableChannel interface produces an instance of EventDrivenConsumer. On the other hand, a channel adapter connected to a channel that implements the org.springframework.messaging.PollableChannel interface (such as a QueueChannel) produces an instance of PollingConsumer.

轮询消费者让 Spring Integration 组件主动轮询消息,而不是以事件驱动的形式处理消息。

Polling consumers let Spring Integration components actively poll for Messages rather than process messages in an event-driven manner.

它们代表了众多消息传递场景中的一个关键的横切关注点。在 Spring Integration 中,轮询使用者基于同名的模式,这在 Gregor Hohpe 和 Bobby Woolf 合著的 Enterprise Integration Patterns 中有所描述。您可以在 book’s website 上找到该模式的描述。

They represent a critical cross-cutting concern in many messaging scenarios. In Spring Integration, polling consumers are based on the pattern with the same name, which is described in the book Enterprise Integration Patterns, by Gregor Hohpe and Bobby Woolf. You can find a description of the pattern on the book’s website.

有关更多信息轮询使用者配置,请参阅 Message Endpoints

For more information polling consumer configuration, see Message Endpoints.

Pollable Message Source

Spring 集成提供了轮询使用者的第二个版本。当使用入站通道适配器时,这些适配器通常由 `SourcePollingChannelAdapter`包装。例如,当从远程 FTP 服务器位置检索消息时,FTP Inbound Channel Adapter中描述的适配器配置有轮询器以定期检索消息。因此,当使用轮询器配置组件时,生成的实例将为以下类型之一:

Spring Integration offers a second variation of the polling consumer pattern. When inbound channel adapters are used, these adapters are often wrapped by a SourcePollingChannelAdapter. For example, when retrieving messages from a remote FTP Server location, the adapter described in FTP Inbound Channel Adapter is configured with a poller to periodically retrieve messages. So, when components are configured with pollers, the resulting instances are of one of the following types:

这意味着轮询器同时用于入站和出站消息传递场景。以下是使用轮询器的部分用例:

This means that pollers are used in both inbound and outbound messaging scenarios. Here are some use cases in which pollers are used:

  • Polling certain external systems, such as FTP Servers, Databases, and Web Services

  • Polling internal (pollable) message channels

  • Polling internal services (such as repeatedly executing methods on a Java class)

AOP 通知类可以应用于轮询器,例如事务通知,以便启动一个事务。从 4.1 版本开始,将提供 PollSkipAdvice。轮询器使用触发器来确定下次轮询的时间。PollSkipAdvice 可以用来禁止(跳过)轮询,也许是因为有一些下游条件会阻止消息被处理。要使用此通知,您必须向它提供 PollSkipStrategy 的实现。从 4.2.5 版本开始,将提供 SimplePollSkipStrategy。要使用它,您可以将一个实例作为 bean 添加到应用程序上下文中,将其注入到 PollSkipAdvice 中,然后将它添加到轮询器的通知链中。要跳过轮询,请调用 skipPolls()。要恢复轮询,请调用 reset()。4.2 版本在此领域添加了更大的灵活性。请参阅 Conditional Pollers

AOP advice classes can be applied to pollers, in an advice-chain, such as a transaction advice to start a transaction. Starting with version 4.1, a PollSkipAdvice is provided. Pollers use triggers to determine the time of the next poll. The PollSkipAdvice can be used to suppress (skip) a poll, perhaps because there is some downstream condition that would prevent the message being processed. To use this advice, you have to provide it with an implementation of a PollSkipStrategy. Starting with version 4.2.5, a SimplePollSkipStrategy is provided. To use it, you can add an instance as a bean to the application context, inject it into a PollSkipAdvice, and add that to the poller’s advice chain. To skip polling, call skipPolls(). To resume polling, call reset(). Version 4.2 added more flexibility in this area. See Conditional Pollers.

本篇的目的是仅概览轮询使用者,以及它们如何适合于消息通道(请参见 Message Channels)和通道适配器(请参见 Channel Adapter)的概念。有关常规消息传递端点及特别轮询使用者的详细信息,请参见 Message Endpoints

This chapter is meant to only give a high-level overview of polling consumers and how they fit into the concept of message channels (see Message Channels) and channel adapters (see Channel Adapter). For more information regarding messaging endpoints in general and polling consumers in particular, see Message Endpoints.

Deferred Acknowledgment Pollable Message Source

从版本 5.0.1 开始,某些模块提供了 MessageSource 实现,这些实现支持将确认推迟到下游流完成(或将消息移交到另一个线程)。这目前仅限于 AmqpMessageSourceKafkaMessageSource

Starting with version 5.0.1, certain modules provide MessageSource implementations that support deferring acknowledgment until the downstream flow completes (or hands off the message to another thread). This is currently limited to the AmqpMessageSource and the KafkaMessageSource.

使用这些消息来源,IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 标头(请参阅 xref:message.adoc#message-header-accessor[MessageHeaderAccessor API)添加到消息中。在与可轮询消息来源一起使用时,标头值是 AcknowledgmentCallback 的一个实例,如下所示:

With these message sources, the IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK header (see MessageHeaderAccessor API) is added to the message. When used with pollable message sources, the value of the header is an instance of AcknowledgmentCallback, as the following example shows:

@FunctionalInterface
public interface AcknowledgmentCallback {

    void acknowledge(Status status);

    boolean isAcknowledged();

    void noAutoAck();

    default boolean isAutoAck();

    enum Status {

        /**
         * Mark the message as accepted.
         */
        ACCEPT,

        /**
         * Mark the message as rejected.
         */
        REJECT,

        /**
         * Reject the message and requeue so that it will be redelivered.
         */
        REQUEUE

    }

}

并非所有消息源(例如,KafkaMessageSource)都支持 REJECT 状态。它的处理方式与 ACCEPT 相同。

Not all message sources (for example, a KafkaMessageSource) support the REJECT status. It is treated the same as ACCEPT.

应用程序可以随时确认消息,如下面的示例所示:

Applications can acknowledge a message at any time, as the following example shows:

Message<?> received = source.receive();

...

StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
        .acknowledge(Status.ACCEPT);

如果将 MessageSource 连接到 SourcePollingChannelAdapter,当轮询器线程在下游流完成之后返回适配器时,适配器将检查确认是否已得到确认,如果没有,则将其状态设置为 ACCEPT 它(或在流抛出异常时 REJECT)。状态值在 AcknowledgmentCallback.Status enumeration 中定义。

If the MessageSource is wired into a SourcePollingChannelAdapter, when the poller thread returns to the adapter after the downstream flow completes, the adapter checks whether the acknowledgment has already been acknowledged and, if not, sets its status to ACCEPT it (or REJECT if the flow throws an exception). The status values are defined in the AcknowledgmentCallback.Status enumeration.

Spring Integration 提供 MessageSourcePollingTemplate 以对 MessageSource 执行临时轮询。当 MessageHandler 回调返回(或抛出异常)时,这也负责在 AcknowledgmentCallback 上设置 ACCEPTREJECT。以下示例展示了如何使用 MessageSourcePollingTemplate 进行轮询:

Spring Integration provides MessageSourcePollingTemplate to perform ad-hoc polling of a MessageSource. This, too, takes care of setting ACCEPT or REJECT on the AcknowledgmentCallback when the MessageHandler callback returns (or throws an exception). The following example shows how to poll with the MessageSourcePollingTemplate:

MessageSourcePollingTemplate template =
    new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
    ...
});

在两种情况下(SourcePollingChannelAdapterMessageSourcePollingTemplate),可以通过在回调上调用 noAutoAck() 禁用自动确认/否定确认。如果你将消息移交给另一个线程并希望稍后确认,则可能这样做。并非所有实现都支持此功能(例如,Apache Kafka 不支持,因为偏移量提交必须在同一线程上执行)。

In both cases (SourcePollingChannelAdapter and MessageSourcePollingTemplate), you can disable auto ack/nack by calling noAutoAck() on the callback. You might do this if you hand off the message to another thread and wish to acknowledge later. Not all implementations support this (for example, Apache Kafka does not, because the offset commit has to be performed on the same thread).

Conditional Pollers for Message Sources

本节介绍如何使用条件轮询器。

This section covers how to use conditional pollers.

Background

advice-chain 中的轮询器上的 Advice 对象会建议整个轮询任务(既是消息检索,也是处理)。这些“around advice”方法无法访问轮询的任何上下文——仅能访问轮询本身。这对于要求(例如,使任务具有事务性或由于某些外部条件而跳过轮询)非常适合,如前所述。如果我们希望根据轮询 receive 部分的结果采取某些操作,或者如果我们希望根据条件调整轮询器,该怎么办?对于这些实例,Spring Integration 提供了“Smart”轮询。

Advice objects, in an advice-chain on a poller, advise the whole polling task (both message retrieval and processing). These “around advice” methods do not have access to any context for the poll — only the poll itself. This is fine for requirements such as making a task transactional or skipping a poll due to some external condition, as discussed earlier. What if we wish to take some action depending on the result of the receive part of the poll or if we want to adjust the poller depending on conditions? For those instances, Spring Integration offers “Smart” Polling.

“Smart” Polling

版本 5.3 引入了 ReceiveMessageAdvice 接口。在 advice-chain 中实现此接口的任何 Advice 对象仅应用于 receive() 操作 - MessageSource.receive()PollableChannel.receive(timeout)。因此,它们只能用于 SourcePollingChannelAdapterPollingConsumer。此类实现了以下方法:

Version 5.3 introduced the ReceiveMessageAdvice interface. Any Advice objects in the advice-chain that implement this interface are applied only to the receive() operation - MessageSource.receive() and PollableChannel.receive(timeout). Therefore, they can be applied only for the SourcePollingChannelAdapter or PollingConsumer. Such classes implement the following methods:

  • beforeReceive(Object source) This method is called before the Object.receive() method. It lets you examine and reconfigure the source. Returning false cancels this poll (similar to the PollSkipAdvice mentioned earlier).

  • Message<?> afterReceive(Message<?> result, Object source) This method is called after the receive() method. Again, you can reconfigure the source or take any action (perhaps depending on the result, which can be null if there was no message created by the source). You can even return a different message

Example 1. Thread safety

如果 Advice 改变了源,则不应使用 TaskExecutor 配置轮询器。如果 Advice 改变了源,则此类改变是线程不安全的,并且可能会导致意外结果,尤其对于高频率轮询器而言。如果你需要并发处理轮询结果,请考虑使用下游 ExecutorChannel,而不是向轮询器添加一个执行器。

If an Advice mutates the source, you should not configure the poller with a TaskExecutor. If an Advice mutates the source, such mutations are not thread safe and could cause unexpected results, especially with high frequency pollers. If you need to process poll results concurrently, consider using a downstream ExecutorChannel instead of adding an executor to the poller.

Example 2. Advice Chain Ordering

你应该了解 advice chain 在初始化期间是如何处理的。未实现 ReceiveMessageAdviceAdvice 对象将应用于整个轮询过程,并将在所有 ReceiveMessageAdvice 之前按照顺序首先调用。然后,将按顺序调用 ReceiveMessageAdvice 对象,围绕源 receive() 方法。例如,如果你有 Advice 对象 a, b, c, d,其中 bdReceiveMessageAdvice,则对象将按以下顺序应用:a, c, b, d。此外,如果源已经是 Proxy,则会在调用任何现有的 Advice 对象之后调用 ReceiveMessageAdvice。如果你希望更改顺序,则必须自己连接代理。

You should understand how the advice chain is processed during initialization. Advice objects that do not implement ReceiveMessageAdvice are applied to the whole poll process and are all invoked first, in order, before any ReceiveMessageAdvice. Then ReceiveMessageAdvice objects are invoked in order around the source receive() method. If you have, for example, Advice objects a, b, c, d, where b and d are ReceiveMessageAdvice, the objects are applied in the following order: a, c, b, d. Also, if a source is already a Proxy, the ReceiveMessageAdvice is invoked after any existing Advice objects. If you wish to change the order, you must wire up the proxy yourself.

SimpleActiveIdleReceiveMessageAdvice

此建议是 ReceiveMessageAdvice 的简单实现。当与 DynamicPeriodicTrigger 结合使用时,它会根据上一次轮询是否产生消息来调整轮询频率。轮询器还必须引用同一个 DynamicPeriodicTrigger

This advice is a simple implementation of ReceiveMessageAdvice. When used in conjunction with a DynamicPeriodicTrigger, it adjusts the polling frequency, depending on whether the previous poll resulted in a message or not. The poller must also have a reference to the same DynamicPeriodicTrigger.

Example 3. Important: Async Handoff

SimpleActiveIdleReceiveMessageAdvice 根据 receive() 结果修改触发器。只有在轮询器线程上调用通知时,这才能正常工作。如果轮询器有 task-executor,它不起作用。要使用此通知,当您希望在轮询的结果之后使用异步操作时,稍后完成异步处理,也许是使用 ExecutorChannel

Example 4. Important: Async Handoff

SimpleActiveIdleReceiveMessageAdvice modifies the trigger based on the receive() result. This works only if the advice is called on the poller thread. It does not work if the poller has a task-executor. To use this advice where you wish to use async operations after the result of a poll, do the async handoff later, perhaps by using an ExecutorChannel.

CompoundTriggerAdvice

此建议允许根据轮询是否返回消息来选择两个触发器之一。考虑一个使用 CronTrigger 的轮询器。CronTrigger 实例是不可变的,因此构建后无法更改它们。考虑一个用例,我们希望使用 cron 表达式每小时触发一次轮询,但是如果没有收到消息,则每分钟轮询一次,并在检索到消息后恢复使用 cron 表达式。

This advice allows the selection of one of two triggers based on whether a poll returns a message or not. Consider a poller that uses a CronTrigger. CronTrigger instances are immutable, so they cannot be altered once constructed. Consider a use case where we want to use a cron expression to trigger a poll once each hour but, if no message is received, poll once per minute and, when a message is retrieved, revert to using the cron expression.

建议(和轮询器)为此目的使用 CompoundTrigger。触发器的 primary 触发器可以是 CronTrigger。当建议检测到没有收到消息时,它会将次要触发器添加到 CompoundTrigger。当调用 CompoundTrigger 实例的 nextExecutionTime 方法时,它将委派给次要触发器(如果存在)。否则,它将委派给主触发器。

The advice (and poller) use a CompoundTrigger for this purpose. The trigger’s primary trigger can be a CronTrigger. When the advice detects that no message is received, it adds the secondary trigger to the CompoundTrigger. When the CompoundTrigger instance’s nextExecutionTime method is invoked, it delegates to the secondary trigger, if present. Otherwise, it delegates to the primary trigger.

轮询器必须还引用相同的 CompoundTrigger

The poller must also have a reference to the same CompoundTrigger.

以下示例展示了每小时 cron 表达式的配置,以及每分钟进行一次回退:

The following example shows the configuration for the hourly cron expression with a fallback to every minute:

<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
    <bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
    <int:poller trigger="compoundTrigger">
        <int:advice-chain>
            <bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
                <constructor-arg ref="compoundTrigger"/>
                <constructor-arg ref="secondary"/>
            </bean>
        </int:advice-chain>
    </int:poller>
</int:inbound-channel-adapter>

<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
    <constructor-arg ref="primary" />
</bean>

<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
    <constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>

<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
    <constructor-arg value="60000" />
</bean>
Example 5. Important: Async Handoff

CompoundTriggerAdvice 根据 receive() 结果修改触发器。只有在轮询器线程上调用通知时,这才能正常工作。如果轮询器有 task-executor,它不起作用。要使用此通知,当您希望在轮询的结果之后使用异步操作时,稍后完成异步处理,也许是使用 ExecutorChannel

Example 6. Important: Async Handoff

CompoundTriggerAdvice modifies the trigger based on the receive() result. This works only if the advice is called on the poller thread. It does not work if the poller has a task-executor. To use this advice where you wish to use async operations after the result of a poll, do the async handoff later, perhaps by using an ExecutorChannel.

MessageSource-only Advices

某些建议可能仅适用于 MessageSource.receive(),而对于 `PollableChannel`来说毫无意义。为此,仍然存在 `MessageSourceMutator`接口(`ReceiveMessageAdvice`的扩展)。有关更多信息,请参阅 Inbound Channel Adapters: Polling Multiple Servers and Directories

Some advices might be applied only for the MessageSource.receive() and they don’t make sense for PollableChannel. For this purpose a MessageSourceMutator interface (an extension of the ReceiveMessageAdvice) is still present. See Inbound Channel Adapters: Polling Multiple Servers and Directories for more information.