Delayer

迟延器是一个简单的端点,它允许根据一定的时间间隔迟延消息流。当一条消息被延迟时,原始发送器不会被阻塞。相反,使用 org.springframework.scheduling.TaskScheduler 的一个实例为延迟后的消息安排时间,以便在延迟过去后将它们发送到输出通道。这种做法具有可扩展性,即使对于较长的延迟,因为它不会导致大量发送者线程被阻塞。相反,在典型情况下,线程池用于实际执行释放消息。本节包含多个配置迟延器的示例。

A delayer is a simple endpoint that lets a message flow be delayed by a certain interval. When a message is delayed, the original sender does not block. Instead, the delayed messages are scheduled with an instance of org.springframework.scheduling.TaskScheduler to be sent to the output channel after the delay has passed. This approach is scalable even for rather long delays, since it does not result in a large number of blocked sender threads. On the contrary, in the typical case, a thread pool is used for the actual execution of releasing the messages. This section contains several examples of configuring a delayer.

Configuring a Delayer

<delayer> 元素用于延迟两个消息通道之间的消息流。与其他端点一样,您可以提供“input-channel”和“output-channel”属性,但迟延器还有“default-delay”和“expression”属性(以及“expression”元素)来确定每一则消息的延迟毫秒数。下面的示例将所有消息延迟三秒:

The <delayer> element is used to delay the message flow between two message channels. As with the other endpoints, you can provide the 'input-channel' and 'output-channel' attributes, but the delayer also has 'default-delay' and 'expression' attributes (and the 'expression' element) that determine the number of milliseconds by which each message should be delayed. The following example delays all messages by three seconds:

<int:delayer id="delayer" input-channel="input"
             default-delay="3000" output-channel="output"/>

如果您需要确定每一则消息的延迟,还可以通过使用“expression”属性来提供 SpEL 表达式,如下面的表达式所示:

If you need to determine the delay for each message, you can also provide the SpEL expression by using the 'expression' attribute, as the following expression shows:

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from("input")
            .delay(d -> d
                    .messageGroupId("delayer.messageGroupId")
                    .defaultDelay(3_000L)
                    .delayExpression("headers['delay']"))
            .channel("output")
            .get();
}
@Bean
fun flow() =
    integrationFlow("input") {
        delay {
            messageGroupId("delayer.messageGroupId")
            defaultDelay(3000L)
            delayExpression("headers['delay']")
        }
        channel("output")
    }
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
    DelayHandler handler = new DelayHandler("delayer.messageGroupId");
    handler.setDefaultDelay(3_000L);
    handler.setDelayExpressionString("headers['delay']");
    handler.setOutputChannelName("output");
    return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
             default-delay="3000" expression="headers['delay']"/>

在前一个示例中,只有在给定某个入站消息时表达式计算结果为 null,三秒延迟才会适用。如果你只想对表达式计算结果有效的消息应用延迟,你可以将“默认延迟”用作“0”(默认值)。对于任何延迟为“0”(或更小)的消息,都会在调用线程立即发送该消息。

In the preceding example, the three-second delay applies only when the expression evaluates to null for a given inbound message. If you want to apply a delay only to messages that have a valid result of the expression evaluation, you can use a 'default-delay' of 0 (the default). For any message that has a delay of 0 (or less), the message is sent immediately, on the calling thread.

XML 解析器使用消息组 ID <beanName>.messageGroupId

The XML parser uses a message group ID of <beanName>.messageGroupId.

延迟处理程序支持表示毫秒(任何 ObjecttoString() 方法产生一个可以解析为 Long 的值)的表达式的评估结果,以及表示绝对时间的 java.util.Date 实例。在第一种情况下,毫秒数是从当前时间开始算的(例如值 5000 将延迟消息至少五秒钟,从延迟器接收到消息开始计时)。使用 Date 实例时,消息不会释放,直到该 Date 对象表示的时间。一个等于非正延迟或过去某个时间的值不会有延迟。相反,它会直接发送到原始发送者的线程上的输出通道。如果表达式评估的结果不是 Date 且无法解析为 Long,则应用默认延迟(如果有 — 默认值是 0)。

The delay handler supports expression evaluation results that represent an interval in milliseconds (any Object whose toString() method produces a value that can be parsed into a Long) as well as java.util.Date instances representing an absolute time. In the first case, the milliseconds are counted from the current time (for example a value of 5000 would delay the message for at least five seconds from the time it is received by the delayer). With a Date instance, the message is not released until the time represented by that Date object. A value that equates to a non-positive delay or a Date in the past results in no delay. Instead, it is sent directly to the output channel on the original sender’s thread. If the expression evaluation result is not a Date and can not be parsed as a Long, the default delay (if any — the default is 0) is applied.

表达式评估可能会出于各种原因抛出评估异常,包括无效的表达式或其他条件。默认情况下,此类异常将被忽略(尽管记录在 DEBUG 级别),并且延迟器会退回到默认延迟(如果有)。您可以通过设置 ignore-expression-failures 属性来修改此行为。默认情况下,此属性设置为 true,并且延迟器行为如前所述。但是,如果您希望不忽略表达式评估异常并将其抛给延迟器的调用者,请将 ignore-expression-failures 属性设置为 false

The expression evaluation may throw an evaluation exception for various reasons, including an invalid expression or other conditions. By default, such exceptions are ignored (though logged at the DEBUG level) and the delayer falls back to the default delay (if any). You can modify this behavior by setting the ignore-expression-failures attribute. By default, this attribute is set to true and the delayer behavior is as described earlier. However, if you wish to not ignore expression evaluation exceptions and throw them to the delayer’s caller, set the ignore-expression-failures attribute to false.

在前面的示例中,延迟表达式指定为 headers['delay']。这是 SpEL Indexer 访问 Map 元素的语法(MessageHeaders 实现 Map)。它调用: headers.get("delay")。对于简单的映射元素名称(不包含 '.'),您也可以使用 SpEL “dot accessor” 语法,其中前面显示的头表达式可以指定为 headers.delay。但是,如果缺少标题,则会产生不同的结果。在第一种情况下,表达式求值结果为 null。第二个结果类似于以下内容:

In the preceding example, the delay expression is specified as headers['delay']. This is the SpEL Indexer syntax to access a Map element (MessageHeaders implements Map). It invokes: headers.get("delay"). For simple map element names (that do not contain '.') you can also use the SpEL “dot accessor” syntax, where the header expression shown earlier can be specified as headers.delay. However, different results are achieved if the header is missing. In the first case, the expression evaluates to null. The second results in something similar to the following:

 org.springframework.expression.spel.SpelEvaluationException: EL1008E:(pos 8):
		   Field or property 'delay' cannot be found on object of type 'org.springframework.messaging.MessageHeaders'

因此,如果可能遗漏头部,而你想退回到默认延迟,通常使用索引器语法而不是点属性访问器语法更有效(且推荐),因为检测 null 比捕获异常更快。

Consequently, if there is a possibility of the header being omitted and you want to fall back to the default delay, it is generally more efficient (and recommended) using the indexer syntax instead of dot property accessor syntax, because detecting the null is faster than catching an exception.

延迟器委派给 Spring 的 TaskScheduler 抽象的实例。延迟器使用的默认调度程序是 Spring Integration 在启动时提供的 ThreadPoolTaskScheduler 实例。请参阅 Configuring the Task Scheduler。如果您想委派给不同的调度程序,您可以通过延迟器元素的 'scheduler' 属性提供引用,如下例所示:

The delayer delegates to an instance of Spring’s TaskScheduler abstraction. The default scheduler used by the delayer is the ThreadPoolTaskScheduler instance provided by Spring Integration on startup. See Configuring the Task Scheduler. If you want to delegate to a different scheduler, you can provide a reference through the delayer element’s 'scheduler' attribute, as the following example shows:

<int:delayer id="delayer" input-channel="input" output-channel="output"
    expression="headers.delay"
    scheduler="exampleTaskScheduler"/>

<task:scheduler id="exampleTaskScheduler" pool-size="3"/>

如果配置了外部 ThreadPoolTaskScheduler,则可以对此属性设置 waitForTasksToCompleteOnShutdown = true。它允许在应用程序关闭时执行状态中的“延迟”任务(释放消息)成功完成。在 Spring Integration 2.2 之前,此属性可用于 <delayer> 元素,因为 DelayHandler 可以创建自己的后台计划程序。自 2.2 以来,延迟器需要外部计划程序实例,且删除了 waitForTasksToCompleteOnShutdown。你应使用计划程序自己的配置。

If you configure an external ThreadPoolTaskScheduler, you can set waitForTasksToCompleteOnShutdown = true on this property. It allows successful completion of 'delay' tasks that are already in the execution state (releasing the message) when the application is shutdown. Before Spring Integration 2.2, this property was available on the <delayer> element, because DelayHandler could create its own scheduler on the background. Since 2.2, the delayer requires an external scheduler instance and waitForTasksToCompleteOnShutdown was deleted. You should use the scheduler’s own configuration.

ThreadPoolTaskScheduler 有一个 errorHandler 属性,可以注入一些 org.springframework.util.ErrorHandler 的实现。此处理程序允许从发送延迟消息的计划任务的线程处理一个 Exception。默认情况下,它使用一个 org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler,您可以在日志中看到一个堆栈跟踪。您可能希望考虑使用一个 org.springframework.integration.channel.MessagePublishingErrorHandler,它将一个 ErrorMessage 从失败消息的标头或默认 error-channel 中发送到一个 error-channel 中。此错误处理在交易回滚之后执行(如果存在的话)。请参阅 Release Failures

ThreadPoolTaskScheduler has a property errorHandler, which can be injected with some implementation of org.springframework.util.ErrorHandler. This handler allows processing an Exception from the thread of the scheduled task sending the delayed message. By default, it uses an org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler, and you can see a stack trace in the logs. You might want to consider using an org.springframework.integration.channel.MessagePublishingErrorHandler, which sends an ErrorMessage into an error-channel, either from the failed message’s header or into the default error-channel. This error handling is performed after a transaction rolls back (if present). See Release Failures.

Delayer and a Message Store

“DelayHandler”持久化延迟消息到提供的“MessageStore”中的消息组。(“groupId”基于“<delayer>`元素的必需“id”属性。另请参阅“DelayHandler.setMessageGroupId(String)”。)在“DelayHandler”将消息发送到“output-channel”之前,计划任务从“MessageStore”中移除延迟消息。如果提供的“MessageStore”是持久的(例如“JdbcMessageStore”),它提供了在应用程序关闭时不会丢失消息的能力。应用程序启动后,“DelayHandler”从“MessageStore”中的消息组读取消息,并根据消息的原始到达时间重新安排其延迟(如果延迟是数字的)。对于延迟头部是“Date”的消息,重新安排时将使用该“Date”。如果延迟消息的“MessageStore”中保留的时间超过了其“delay”,它将在启动后立即发送。“messageGroupId”是必需的,而且不能依赖于可以生成的“DelayHandler”bean 名称。这样,在应用程序重新启动后,“DelayHandler”可能会获取一个新生成的 bean 名称。因此,由于他们的组不再受应用程序管理,因此可能丢失重新安排延迟消息。

The DelayHandler persists delayed messages into the message group in the provided MessageStore. (The 'groupId' is based on the required 'id' attribute of the <delayer> element. See also DelayHandler.setMessageGroupId(String).) A delayed message is removed from the MessageStore by the scheduled task immediately before the DelayHandler sends the message to the output-channel. If the provided MessageStore is persistent (such as JdbcMessageStore), it provides the ability to not lose messages on the application shutdown. After application startup, the DelayHandler reads messages from its message group in the MessageStore and reschedules them with a delay based on the original arrival time of the message (if the delay is numeric). For messages where the delay header was a Date, that Date is used when rescheduling. If a delayed message remains in the MessageStore more than its 'delay', it is sent immediately after startup. The messageGroupId is required and cannot rely on a DelayHandler bean name which can be generated. That way, after application restart, a DelayHandler may get a new generated bean name. Therefore, delayed messages might be lost from rescheduling since their group is not managed by the application anymore.

“<delayer>`可以通过两个互斥元素中的任一时间丰富:<transactional>`和<advice-chain>`。这些 AOP 建议的“List”应用于代理内部的“DelayHandler.ReleaseMessageHandler”,它负责在计划任务的“Thread”上在延迟后释放消息。例如,当下游消息流抛出异常,并且“ReleaseMessageHandler”的事务回滚时,它可能会被使用。在此情况下,延迟消息仍然保留在持久的“MessageStore”中。你可以在`<advice-chain>`中使用任何自定义的“org.aopalliance.aop.Advice”实现。“<transactional>`元素定义了一个仅具有事务性建议的简单建议链。以下示例展示了`<delayer>`中的`<advice-chain>`:

The <delayer> can be enriched with either of two mutually exclusive elements: <transactional> and <advice-chain>. The List of these AOP advices is applied to the proxied internal DelayHandler.ReleaseMessageHandler, which has the responsibility to release the message, after the delay, on a Thread of the scheduled task. It might be used, for example, when the downstream message flow throws an exception and the transaction of the ReleaseMessageHandler is rolled back. In this case, the delayed message remains in the persistent MessageStore. You can use any custom org.aopalliance.aop.Advice implementation within the <advice-chain>. The <transactional> element defines a simple advice chain that has only the transactional advice. The following example shows an advice-chain within a <delayer>:

<int:delayer id="delayer" input-channel="input" output-channel="output"
    expression="headers.delay"
    message-store="jdbcMessageStore">
    <int:advice-chain>
        <beans:ref bean="customAdviceBean"/>
        <tx:advice>
            <tx:attributes>
                <tx:method name="*" read-only="true"/>
            </tx:attributes>
        </tx:advice>
    </int:advice-chain>
</int:delayer>

“DelayHandler”可以用受管理的操作(“getDelayedMessageCount”和“reschedulePersistedMessages”)导出为 JMX “MBean”,它允许在运行时重新安排延迟持久化消息——例如,如果“TaskScheduler”之前已停止。这些操作可以通过“Control Bus”命令调用,如下面的示例所示:

The DelayHandler can be exported as a JMX MBean with managed operations (getDelayedMessageCount and reschedulePersistedMessages), which allows the rescheduling of delayed persisted messages at runtime — for example, if the TaskScheduler has previously been stopped. These operations can be invoked through a Control Bus command, as the following example shows:

Message<String> delayerReschedulingMessage =
    MessageBuilder.withPayload("@'delayer.handler'.reschedulePersistedMessages()").build();
controlBusChannel.send(delayerReschedulingMessage);

有关消息存储、JMX 和控制总线的信息,请参阅 System Management

For more information regarding the message store, JMX, and the control bus, see System Management.

从 5.3.7 版开始,如果在消息存储到“MessageStore”中时某个事务处于活动状态,则会在“TransactionSynchronization.afterCommit()”回调中安排发布任务。这是必要的,以防止竞争条件,其中计划的发布可能在事务提交之前运行,并且没有找到消息。在此情况下,消息将在延迟后或在事务提交后(以较晚者为准)发布。

Starting with version 5.3.7, if a transaction is active when a message is stored into a MessageStore, the release task is scheduled in a TransactionSynchronization.afterCommit() callback. This is necessary to prevent a race condition, where the scheduled release could run before the transaction has committed, and the message is not found. In this case, the message will be released after the delay, or after the transaction commits, whichever is later.

Release Failures

从 5.0.8 版开始,迟延器上有两个新属性:

Starting with version 5.0.8, there are two new properties on the delayer:

  • maxAttempts (default 5)

  • retryDelay (default 1 second)

在释放消息时,如果下游流失败,则将在“retryDelay”后尝试释放。如果达到“maxAttempts”,则将丢弃消息(除非发布是事务性的,在这种情况下,消息将保留在存储区中,但不再计划发布,直到应用程序重新启动或调用“reschedulePersistedMessages()`方法,如上所述)。

When a message is released, if the downstream flow fails, the release will be attempted after the retryDelay. If the maxAttempts is reached, the message is discarded (unless the release is transactional, in which case the message will remain in the store, but will no longer be scheduled for release, until the application is restarted, or the reschedulePersistedMessages() method is invoked, as discussed above).

此外,你可以配置一个“delayedMessageErrorChannel”;当发布失败时,将“ErrorMessage”发送到该通道,其中异常作为有效载荷,并具有“originalMessage”属性。ErrorMessage`包含一个标头`IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT,其中包含当前计数。

In addition, you can configure a delayedMessageErrorChannel; when a release fails, an ErrorMessage is sent to that channel with the exception as the payload and has the originalMessage property. The ErrorMessage contains a header IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT containing the current count.

如果错误流消耗错误消息并正常退出,则不进行进一步的操作;如果发布是事务性的,则将提交事务并从存储区删除消息。如果错误流抛出异常,则将根据上述内容重新尝试发布,直到达到“maxAttempts”。

If the error flow consumes the error message and exits normally, no further action is taken; if the release is transactional, the transaction will commit and the message deleted from the store. If the error flow throws an exception, the release will be retried up to maxAttempts as discussed above.