Delayer
迟延器是一个简单的端点,它允许根据一定的时间间隔迟延消息流。当一条消息被延迟时,原始发送器不会被阻塞。相反,使用 org.springframework.scheduling.TaskScheduler
的一个实例为延迟后的消息安排时间,以便在延迟过去后将它们发送到输出通道。这种做法具有可扩展性,即使对于较长的延迟,因为它不会导致大量发送者线程被阻塞。相反,在典型情况下,线程池用于实际执行释放消息。本节包含多个配置迟延器的示例。
A delayer is a simple endpoint that lets a message flow be delayed by a certain interval.
When a message is delayed, the original sender does not block.
Instead, the delayed messages are scheduled with an instance of org.springframework.scheduling.TaskScheduler
to be sent to the output channel after the delay has passed.
This approach is scalable even for rather long delays, since it does not result in a large number of blocked sender threads.
On the contrary, in the typical case, a thread pool is used for the actual execution of releasing the messages.
This section contains several examples of configuring a delayer.
Configuring a Delayer
<delayer>
元素用于延迟两个消息通道之间的消息流。与其他端点一样,您可以提供“input-channel”和“output-channel”属性,但迟延器还有“default-delay”和“expression”属性(以及“expression”元素)来确定每一则消息的延迟毫秒数。下面的示例将所有消息延迟三秒:
The <delayer>
element is used to delay the message flow between two message channels.
As with the other endpoints, you can provide the 'input-channel' and 'output-channel' attributes, but the delayer also has 'default-delay' and 'expression' attributes (and the 'expression' element) that determine the number of milliseconds by which each message should be delayed.
The following example delays all messages by three seconds:
<int:delayer id="delayer" input-channel="input"
default-delay="3000" output-channel="output"/>
如果您需要确定每一则消息的延迟,还可以通过使用“expression”属性来提供 SpEL 表达式,如下面的表达式所示:
If you need to determine the delay for each message, you can also provide the SpEL expression by using the 'expression' attribute, as the following expression shows:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from("input")
.delay(d -> d
.messageGroupId("delayer.messageGroupId")
.defaultDelay(3_000L)
.delayExpression("headers['delay']"))
.channel("output")
.get();
}
@Bean
fun flow() =
integrationFlow("input") {
delay {
messageGroupId("delayer.messageGroupId")
defaultDelay(3000L)
delayExpression("headers['delay']")
}
channel("output")
}
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
DelayHandler handler = new DelayHandler("delayer.messageGroupId");
handler.setDefaultDelay(3_000L);
handler.setDelayExpressionString("headers['delay']");
handler.setOutputChannelName("output");
return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
default-delay="3000" expression="headers['delay']"/>
在前一个示例中,只有在给定某个入站消息时表达式计算结果为 null,三秒延迟才会适用。如果你只想对表达式计算结果有效的消息应用延迟,你可以将“默认延迟”用作“0”(默认值)。对于任何延迟为“0”(或更小)的消息,都会在调用线程立即发送该消息。
In the preceding example, the three-second delay applies only when the expression evaluates to null for a given inbound message.
If you want to apply a delay only to messages that have a valid result of the expression evaluation, you can use a 'default-delay' of 0
(the default).
For any message that has a delay of 0
(or less), the message is sent immediately, on the calling thread.
XML 解析器使用消息组 ID |
The XML parser uses a message group ID of |
延迟处理程序支持表示毫秒(任何 |
The delay handler supports expression evaluation results that represent an interval in milliseconds (any |
表达式评估可能会出于各种原因抛出评估异常,包括无效的表达式或其他条件。默认情况下,此类异常将被忽略(尽管记录在 DEBUG 级别),并且延迟器会退回到默认延迟(如果有)。您可以通过设置 ignore-expression-failures
属性来修改此行为。默认情况下,此属性设置为 true
,并且延迟器行为如前所述。但是,如果您希望不忽略表达式评估异常并将其抛给延迟器的调用者,请将 ignore-expression-failures
属性设置为 false
。
The expression evaluation may throw an evaluation exception for various reasons, including an invalid expression or other conditions.
By default, such exceptions are ignored (though logged at the DEBUG level) and the delayer falls back to the default delay (if any).
You can modify this behavior by setting the ignore-expression-failures
attribute.
By default, this attribute is set to true
and the delayer behavior is as described earlier.
However, if you wish to not ignore expression evaluation exceptions and throw them to the delayer’s caller, set the ignore-expression-failures
attribute to false
.
在前面的示例中,延迟表达式指定为 In the preceding example, the delay expression is specified as
因此,如果可能遗漏头部,而你想退回到默认延迟,通常使用索引器语法而不是点属性访问器语法更有效(且推荐),因为检测 null 比捕获异常更快。 Consequently, if there is a possibility of the header being omitted and you want to fall back to the default delay, it is generally more efficient (and recommended) using the indexer syntax instead of dot property accessor syntax, because detecting the null is faster than catching an exception. |
延迟器委派给 Spring 的 TaskScheduler
抽象的实例。延迟器使用的默认调度程序是 Spring Integration 在启动时提供的 ThreadPoolTaskScheduler
实例。请参阅 Configuring the Task Scheduler。如果您想委派给不同的调度程序,您可以通过延迟器元素的 'scheduler' 属性提供引用,如下例所示:
The delayer delegates to an instance of Spring’s TaskScheduler
abstraction.
The default scheduler used by the delayer is the ThreadPoolTaskScheduler
instance provided by Spring Integration on startup.
See Configuring the Task Scheduler.
If you want to delegate to a different scheduler, you can provide a reference through the delayer element’s 'scheduler' attribute, as the following example shows:
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
scheduler="exampleTaskScheduler"/>
<task:scheduler id="exampleTaskScheduler" pool-size="3"/>
如果配置了外部 |
If you configure an external |
|
|
Delayer and a Message Store
“DelayHandler”持久化延迟消息到提供的“MessageStore”中的消息组。(“groupId”基于“<delayer>`元素的必需“id”属性。另请参阅“DelayHandler.setMessageGroupId(String)”。)在“DelayHandler”将消息发送到“output-channel”之前,计划任务从“MessageStore”中移除延迟消息。如果提供的“MessageStore”是持久的(例如“JdbcMessageStore”),它提供了在应用程序关闭时不会丢失消息的能力。应用程序启动后,“DelayHandler”从“MessageStore”中的消息组读取消息,并根据消息的原始到达时间重新安排其延迟(如果延迟是数字的)。对于延迟头部是“Date”的消息,重新安排时将使用该“Date”。如果延迟消息的“MessageStore”中保留的时间超过了其“delay”,它将在启动后立即发送。“messageGroupId”是必需的,而且不能依赖于可以生成的“DelayHandler”bean 名称。这样,在应用程序重新启动后,“DelayHandler”可能会获取一个新生成的 bean 名称。因此,由于他们的组不再受应用程序管理,因此可能丢失重新安排延迟消息。
The DelayHandler
persists delayed messages into the message group in the provided MessageStore
.
(The 'groupId' is based on the required 'id' attribute of the <delayer>
element.
See also DelayHandler.setMessageGroupId(String)
.)
A delayed message is removed from the MessageStore
by the scheduled task immediately before the DelayHandler
sends the message to the output-channel
.
If the provided MessageStore
is persistent (such as JdbcMessageStore
), it provides the ability to not lose messages on the application shutdown.
After application startup, the DelayHandler
reads messages from its message group in the MessageStore
and reschedules them with a delay based on the original arrival time of the message (if the delay is numeric).
For messages where the delay header was a Date
, that Date
is used when rescheduling.
If a delayed message remains in the MessageStore
more than its 'delay', it is sent immediately after startup.
The messageGroupId
is required and cannot rely on a DelayHandler
bean name which can be generated.
That way, after application restart, a DelayHandler
may get a new generated bean name.
Therefore, delayed messages might be lost from rescheduling since their group is not managed by the application anymore.
“<delayer>`可以通过两个互斥元素中的任一时间丰富:<transactional>`和
<advice-chain>`。这些 AOP 建议的“List”应用于代理内部的“DelayHandler.ReleaseMessageHandler”,它负责在计划任务的“Thread”上在延迟后释放消息。例如,当下游消息流抛出异常,并且“ReleaseMessageHandler”的事务回滚时,它可能会被使用。在此情况下,延迟消息仍然保留在持久的“MessageStore”中。你可以在`<advice-chain>`中使用任何自定义的“org.aopalliance.aop.Advice”实现。“<transactional>`元素定义了一个仅具有事务性建议的简单建议链。以下示例展示了`<delayer>`中的`<advice-chain>`:
The <delayer>
can be enriched with either of two mutually exclusive elements: <transactional>
and <advice-chain>
.
The List
of these AOP advices is applied to the proxied internal DelayHandler.ReleaseMessageHandler
, which has the responsibility to release the message, after the delay, on a Thread
of the scheduled task.
It might be used, for example, when the downstream message flow throws an exception and the transaction of the ReleaseMessageHandler
is rolled back.
In this case, the delayed message remains in the persistent MessageStore
.
You can use any custom org.aopalliance.aop.Advice
implementation within the <advice-chain>
.
The <transactional>
element defines a simple advice chain that has only the transactional advice.
The following example shows an advice-chain
within a <delayer>
:
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
message-store="jdbcMessageStore">
<int:advice-chain>
<beans:ref bean="customAdviceBean"/>
<tx:advice>
<tx:attributes>
<tx:method name="*" read-only="true"/>
</tx:attributes>
</tx:advice>
</int:advice-chain>
</int:delayer>
“DelayHandler”可以用受管理的操作(“getDelayedMessageCount”和“reschedulePersistedMessages”)导出为 JMX “MBean”,它允许在运行时重新安排延迟持久化消息——例如,如果“TaskScheduler”之前已停止。这些操作可以通过“Control Bus”命令调用,如下面的示例所示:
The DelayHandler
can be exported as a JMX MBean
with managed operations (getDelayedMessageCount
and reschedulePersistedMessages
), which allows the rescheduling of delayed persisted messages at runtime — for example, if the TaskScheduler
has previously been stopped.
These operations can be invoked through a Control Bus
command, as the following example shows:
Message<String> delayerReschedulingMessage =
MessageBuilder.withPayload("@'delayer.handler'.reschedulePersistedMessages()").build();
controlBusChannel.send(delayerReschedulingMessage);
有关消息存储、JMX 和控制总线的信息,请参阅 System Management。 |
For more information regarding the message store, JMX, and the control bus, see System Management. |
从 5.3.7 版开始,如果在消息存储到“MessageStore”中时某个事务处于活动状态,则会在“TransactionSynchronization.afterCommit()”回调中安排发布任务。这是必要的,以防止竞争条件,其中计划的发布可能在事务提交之前运行,并且没有找到消息。在此情况下,消息将在延迟后或在事务提交后(以较晚者为准)发布。
Starting with version 5.3.7, if a transaction is active when a message is stored into a MessageStore
, the release task is scheduled in a TransactionSynchronization.afterCommit()
callback.
This is necessary to prevent a race condition, where the scheduled release could run before the transaction has committed, and the message is not found.
In this case, the message will be released after the delay, or after the transaction commits, whichever is later.
Release Failures
从 5.0.8 版开始,迟延器上有两个新属性:
Starting with version 5.0.8, there are two new properties on the delayer:
-
maxAttempts
(default 5) -
retryDelay
(default 1 second)
在释放消息时,如果下游流失败,则将在“retryDelay”后尝试释放。如果达到“maxAttempts”,则将丢弃消息(除非发布是事务性的,在这种情况下,消息将保留在存储区中,但不再计划发布,直到应用程序重新启动或调用“reschedulePersistedMessages()`方法,如上所述)。
When a message is released, if the downstream flow fails, the release will be attempted after the retryDelay
.
If the maxAttempts
is reached, the message is discarded (unless the release is transactional, in which case the message will remain in the store, but will no longer be scheduled for release, until the application is restarted, or the reschedulePersistedMessages()
method is invoked, as discussed above).
此外,你可以配置一个“delayedMessageErrorChannel”;当发布失败时,将“ErrorMessage”发送到该通道,其中异常作为有效载荷,并具有“originalMessage”属性。ErrorMessage`包含一个标头`IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT
,其中包含当前计数。
In addition, you can configure a delayedMessageErrorChannel
; when a release fails, an ErrorMessage
is sent to that channel with the exception as the payload and has the originalMessage
property.
The ErrorMessage
contains a header IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT
containing the current count.
如果错误流消耗错误消息并正常退出,则不进行进一步的操作;如果发布是事务性的,则将提交事务并从存储区删除消息。如果错误流抛出异常,则将根据上述内容重新尝试发布,直到达到“maxAttempts”。
If the error flow consumes the error message and exits normally, no further action is taken; if the release is transactional, the transaction will commit and the message deleted from the store.
If the error flow throws an exception, the release will be retried up to maxAttempts
as discussed above.