Delayer

迟延器是一个简单的端点,它允许根据一定的时间间隔迟延消息流。当一条消息被延迟时,原始发送器不会被阻塞。相反,使用 org.springframework.scheduling.TaskScheduler 的一个实例为延迟后的消息安排时间,以便在延迟过去后将它们发送到输出通道。这种做法具有可扩展性,即使对于较长的延迟,因为它不会导致大量发送者线程被阻塞。相反,在典型情况下,线程池用于实际执行释放消息。本节包含多个配置迟延器的示例。

Configuring a Delayer

<delayer> 元素用于延迟两个消息通道之间的消息流。与其他端点一样,您可以提供“input-channel”和“output-channel”属性,但迟延器还有“default-delay”和“expression”属性(以及“expression”元素)来确定每一则消息的延迟毫秒数。下面的示例将所有消息延迟三秒:

<int:delayer id="delayer" input-channel="input"
             default-delay="3000" output-channel="output"/>

如果您需要确定每一则消息的延迟,还可以通过使用“expression”属性来提供 SpEL 表达式,如下面的表达式所示:

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from("input")
            .delay(d -> d
                    .messageGroupId("delayer.messageGroupId")
                    .defaultDelay(3_000L)
                    .delayExpression("headers['delay']"))
            .channel("output")
            .get();
}
@Bean
fun flow() =
    integrationFlow("input") {
        delay {
            messageGroupId("delayer.messageGroupId")
            defaultDelay(3000L)
            delayExpression("headers['delay']")
        }
        channel("output")
    }
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
    DelayHandler handler = new DelayHandler("delayer.messageGroupId");
    handler.setDefaultDelay(3_000L);
    handler.setDelayExpressionString("headers['delay']");
    handler.setOutputChannelName("output");
    return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
             default-delay="3000" expression="headers['delay']"/>

在前一个示例中,只有在给定某个入站消息时表达式计算结果为 null,三秒延迟才会适用。如果你只想对表达式计算结果有效的消息应用延迟,你可以将“默认延迟”用作“0”(默认值)。对于任何延迟为“0”(或更小)的消息,都会在调用线程立即发送该消息。

XML 解析器使用消息组 ID <beanName>.messageGroupId

延迟处理程序支持表示毫秒(任何 ObjecttoString() 方法产生一个可以解析为 Long 的值)的表达式的评估结果,以及表示绝对时间的 java.util.Date 实例。在第一种情况下,毫秒数是从当前时间开始算的(例如值 5000 将延迟消息至少五秒钟,从延迟器接收到消息开始计时)。使用 Date 实例时,消息不会释放,直到该 Date 对象表示的时间。一个等于非正延迟或过去某个时间的值不会有延迟。相反,它会直接发送到原始发送者的线程上的输出通道。如果表达式评估的结果不是 Date 且无法解析为 Long,则应用默认延迟(如果有 — 默认值是 0)。

表达式评估可能会出于各种原因抛出评估异常,包括无效的表达式或其他条件。默认情况下,此类异常将被忽略(尽管记录在 DEBUG 级别),并且延迟器会退回到默认延迟(如果有)。您可以通过设置 ignore-expression-failures 属性来修改此行为。默认情况下,此属性设置为 true,并且延迟器行为如前所述。但是,如果您希望不忽略表达式评估异常并将其抛给延迟器的调用者,请将 ignore-expression-failures 属性设置为 false

在前面的示例中,延迟表达式指定为 headers['delay']。这是 SpEL Indexer 访问 Map 元素的语法(MessageHeaders 实现 Map)。它调用: headers.get("delay")。对于简单的映射元素名称(不包含 '.'),您也可以使用 SpEL “dot accessor” 语法,其中前面显示的头表达式可以指定为 headers.delay。但是,如果缺少标题,则会产生不同的结果。在第一种情况下,表达式求值结果为 null。第二个结果类似于以下内容:

 org.springframework.expression.spel.SpelEvaluationException: EL1008E:(pos 8):
		   Field or property 'delay' cannot be found on object of type 'org.springframework.messaging.MessageHeaders'

因此,如果可能遗漏头部,而你想退回到默认延迟,通常使用索引器语法而不是点属性访问器语法更有效(且推荐),因为检测 null 比捕获异常更快。

延迟器委派给 Spring 的 TaskScheduler 抽象的实例。延迟器使用的默认调度程序是 Spring Integration 在启动时提供的 ThreadPoolTaskScheduler 实例。请参阅 Configuring the Task Scheduler。如果您想委派给不同的调度程序,您可以通过延迟器元素的 'scheduler' 属性提供引用,如下例所示:

<int:delayer id="delayer" input-channel="input" output-channel="output"
    expression="headers.delay"
    scheduler="exampleTaskScheduler"/>

<task:scheduler id="exampleTaskScheduler" pool-size="3"/>

如果配置了外部 ThreadPoolTaskScheduler,则可以对此属性设置 waitForTasksToCompleteOnShutdown = true。它允许在应用程序关闭时执行状态中的“延迟”任务(释放消息)成功完成。在 Spring Integration 2.2 之前,此属性可用于 <delayer> 元素,因为 DelayHandler 可以创建自己的后台计划程序。自 2.2 以来,延迟器需要外部计划程序实例,且删除了 waitForTasksToCompleteOnShutdown。你应使用计划程序自己的配置。

ThreadPoolTaskScheduler 有一个 errorHandler 属性,可以注入一些 org.springframework.util.ErrorHandler 的实现。此处理程序允许从发送延迟消息的计划任务的线程处理一个 Exception。默认情况下,它使用一个 org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler,您可以在日志中看到一个堆栈跟踪。您可能希望考虑使用一个 org.springframework.integration.channel.MessagePublishingErrorHandler,它将一个 ErrorMessage 从失败消息的标头或默认 error-channel 中发送到一个 error-channel 中。此错误处理在交易回滚之后执行(如果存在的话)。请参阅 Release Failures

Delayer and a Message Store

“DelayHandler”持久化延迟消息到提供的“MessageStore”中的消息组。(“groupId”基于“<delayer>`元素的必需“id”属性。另请参阅“DelayHandler.setMessageGroupId(String)”。)在“DelayHandler”将消息发送到“output-channel”之前,计划任务从“MessageStore”中移除延迟消息。如果提供的“MessageStore”是持久的(例如“JdbcMessageStore”),它提供了在应用程序关闭时不会丢失消息的能力。应用程序启动后,“DelayHandler”从“MessageStore”中的消息组读取消息,并根据消息的原始到达时间重新安排其延迟(如果延迟是数字的)。对于延迟头部是“Date”的消息,重新安排时将使用该“Date”。如果延迟消息的“MessageStore”中保留的时间超过了其“delay”,它将在启动后立即发送。“messageGroupId”是必需的,而且不能依赖于可以生成的“DelayHandler”bean 名称。这样,在应用程序重新启动后,“DelayHandler”可能会获取一个新生成的 bean 名称。因此,由于他们的组不再受应用程序管理,因此可能丢失重新安排延迟消息。

“<delayer>`可以通过两个互斥元素中的任一时间丰富:<transactional>`和<advice-chain>`。这些 AOP 建议的“List”应用于代理内部的“DelayHandler.ReleaseMessageHandler”,它负责在计划任务的“Thread”上在延迟后释放消息。例如,当下游消息流抛出异常,并且“ReleaseMessageHandler”的事务回滚时,它可能会被使用。在此情况下,延迟消息仍然保留在持久的“MessageStore”中。你可以在`<advice-chain>`中使用任何自定义的“org.aopalliance.aop.Advice”实现。“<transactional>`元素定义了一个仅具有事务性建议的简单建议链。以下示例展示了`<delayer>`中的`<advice-chain>`:

<int:delayer id="delayer" input-channel="input" output-channel="output"
    expression="headers.delay"
    message-store="jdbcMessageStore">
    <int:advice-chain>
        <beans:ref bean="customAdviceBean"/>
        <tx:advice>
            <tx:attributes>
                <tx:method name="*" read-only="true"/>
            </tx:attributes>
        </tx:advice>
    </int:advice-chain>
</int:delayer>

“DelayHandler”可以用受管理的操作(“getDelayedMessageCount”和“reschedulePersistedMessages”)导出为 JMX “MBean”,它允许在运行时重新安排延迟持久化消息——例如,如果“TaskScheduler”之前已停止。这些操作可以通过“Control Bus”命令调用,如下面的示例所示:

Message<String> delayerReschedulingMessage =
    MessageBuilder.withPayload("@'delayer.handler'.reschedulePersistedMessages()").build();
controlBusChannel.send(delayerReschedulingMessage);

有关消息存储、JMX 和控制总线的信息,请参阅 System Management

从 5.3.7 版开始,如果在消息存储到“MessageStore”中时某个事务处于活动状态,则会在“TransactionSynchronization.afterCommit()”回调中安排发布任务。这是必要的,以防止竞争条件,其中计划的发布可能在事务提交之前运行,并且没有找到消息。在此情况下,消息将在延迟后或在事务提交后(以较晚者为准)发布。

Release Failures

从 5.0.8 版开始,迟延器上有两个新属性:

  • maxAttempts (default 5)

  • retryDelay (default 1 second)

在释放消息时,如果下游流失败,则将在“retryDelay”后尝试释放。如果达到“maxAttempts”,则将丢弃消息(除非发布是事务性的,在这种情况下,消息将保留在存储区中,但不再计划发布,直到应用程序重新启动或调用“reschedulePersistedMessages()`方法,如上所述)。

此外,你可以配置一个“delayedMessageErrorChannel”;当发布失败时,将“ErrorMessage”发送到该通道,其中异常作为有效载荷,并具有“originalMessage”属性。ErrorMessage`包含一个标头`IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT,其中包含当前计数。

如果错误流消耗错误消息并正常退出,则不进行进一步的操作;如果发布是事务性的,则将提交事务并从存储区删除消息。如果错误流抛出异常,则将根据上述内容重新尝试发布,直到达到“maxAttempts”。