Transaction Support

本章涵盖 Spring Integration 对事务的支持。它涵盖以下主题:

Understanding Transactions in Message flows

Spring Integration 暴露了几个挂钩来满足消息流的事务性需求。要更好地理解这些挂钩以及如何从中受益,我们首先必须重新访问可以用来启动消息流的六种机制,并了解如何在每个机制中满足这些流的事务性需求。

以下六种机制启动消息流(本手册中提供了每种机制的详细信息):

  • 网关代理:一个基本消息传递网关。

  • 消息通道:与 MessageChannel 方法的直接交互(例如 channel.send(message))。

  • 消息发布器:将 Spring bean 上的方法调用作为副产品启动消息流程的方法。

  • 入站通道适配器与网关:根据将第三方系统与 Spring Integration 消息传递系统相连接启动消息流程的方法(例如 [JmsMessage] → Jms Inbound Adapter[SI Message] → SI Channel)。

  • 调度程序:根据预配置的调度程序分发的计划事件启动消息流程的方法。

  • 轮询器:类似于调度器,这是根据预配置的轮询器分发的计划或基于间隔的事件启动消息流程的方法。

我们可以将这六种机制分成两大类:

  • 由用户进程启动的消息流程:此类别的示例场景可能是调用网关方法或将 Message 明确发送到 MessageChannel。换句话说,这些消息流程依赖于启动第三方进程(如你编写的某些代码)。

  • 由守护进程启动的消息流程:此类别的示例场景包括轮询器轮询消息队列以使用轮询的消息启动新的消息流程,或调度程序通过创建新消息和在预定义时间启动消息流程来计划该进程。

显然,Gateway 代理、MessageChannel.send(…​)MessagePublisher 都属于第一类,入站适配器和网关、计划程序和轮询器属于第二类。

那么,您如何在每种类别的各种场景中解决事务性需求,Spring Integration 是否需要针对特定场景明确提供事务相关的内容?还是您可以使用 Spring 的事务支持?

Spring 本身提供了对事务管理的一流支持。因此我们的目标不是提供新的内容,而是使用 Spring 来受益于其对事务的现有支持。换句话说,作为一个框架,我们必须向 Spring 的事务管理功能公开挂钩。但是,由于 Spring Integration 配置基于 Spring 配置,因此我们不必总是公开这些挂钩,因为 Spring 已经公开它们了。毕竟,每个 Spring Integration 组件都是 Spring Bean。

有了这个目标,我们可以再次考虑这两种方案:由用户进程启动的消息流和由守护进程启动的消息流。

由用户进程启动并配置在 Spring 应用程序上下文中,消息流会受到此类进程的通常事务性配置的影响。因此,Spring Integration 不需要明确配置它们来支持事务。事务可以通过 Spring 的标准事务支持来启动并应该这样做。Spring Integration 消息流自然会遵守组件的事务性语义,因为它本身是由 Spring 配置的。例如,网关或服务激活器方法可以用 @Transactional 进行注释,或者可以在 XML 配置中定义一个 TransactionInterceptor,其中带有指向应该具有事务性的特定方法的切入点表达式。最重要的是,您可以在这些场景中完全控制事务配置和边界。

但是,对于由守护进程启动的消息流来说,情况稍微有些不同。尽管是由开发人员配置,但这些流不会直接涉及要启动的人或其他进程。这些是由触发进程(守护进程)根据进程配置启动的基于触发的流。例如,我们可以让计划程序在每个星期五晚上启动消息流。我们还可以配置一个每秒启动一次消息流的触发器,以此类推。因此,我们需要一种方法让这些基于触发器的进程知道我们打算使结果消息流具有事务性,以便可以在启动新的消息流时创建事务上下文。换句话说,我们需要公开一些事务配置,但仅限于委派给 Spring 已提供的交易支持(正如我们在其他场景中所做的那样)。

Poller Transaction Support

Spring Integration 提供了对轮询器的交易支持。轮询器是一种特殊类型的组件,因为在轮询器任务内,我们可以对本身具有事务性的资源调用 receive(),从而将 receive() 调用包括在事务的边界内,这使其在任务失败时可以回滚。如果我们为通道添加相同的支持,添加的事务将影响从 send() 调用开始的所有下游组件。这为事务划分提供了相当广泛的范围,没有任何充分的理由,特别是当 Spring 已经提供了多种方法来满足任何下游组件的事务性需求时。但是,receive() 方法包含在事务边界中是轮询器的“充分理由”。

任何时候配置轮询器时,都可以使用 <transactional> 子元素及其属性提供事务配置,如下面示例所示:

<int:poller max-messages-per-poll="1" fixed-rate="1000">
    <transactional transaction-manager="txManager"
                   isolation="DEFAULT"
                   propagation="REQUIRED"
                   read-only="true"
                   timeout="1000"/>
</poller>

前面的配置看起来类似于原生 Spring 事务配置。您仍然必须提供事务管理器的引用,并指定事务属性或依靠默认值(例如,如果未指定“transaction-manager”属性,则其默认为名为“transactionManager”的 bean)。在内部,该进程包含在 Spring 的原生事务中,其中 TransactionInterceptor 负责处理事务。有关如何配置事务管理器、事务管理器类型(例如 JTA、Datasource 等)以及与事务配置相关的其他详细信息的更多信息,请参见 Spring Framework Reference Guide

使用前面的配置,此轮询程序启动的所有消息流都是事务性的。有关轮询程序的事务配置的更多信息和详细信息,请参见 Polling and Transactions

除了事务之外,在运行轮询程序时,您可能还需要解决其他多个交叉切入问题。为了提供帮助,轮询程序元素接受 <advice-chain> 子元素,该元素允许您定义要应用于该轮询程序的自定义建议实例链。(有关详细信息,请参见 Pollable Message Source。)在 Spring 集成 2.0 中,轮询程序经历了一次重构工作,现在使用代理机制来解决事务问题以及其他交叉问题。这种努力产生的重大变化之一是,我们让 <transactional><advice-chain> 元素互斥。其背后的原因是,如果您需要多条建议并且其中一条是事务建议,您可以将其包含在 <advice-chain> 中,其便利程度与之前相同,但控制力更大,因为您现在可以选择按所需顺序放置建议。以下示例展示了如何做到这一点:

<int:poller max-messages-per-poll="1" fixed-rate="10000">
  <advice-chain>
    <ref bean="txAdvice"/>
    <ref bean="someOtherAdviceBean" />
    <beans:bean class="foo.bar.SampleAdvice"/>
  </advice-chain>
</poller>

<tx:advice id="txAdvice" transaction-manager="txManager">
  <tx:attributes>
    <tx:method name="get*" read-only="true"/>
    <tx:method name="*"/>
  </tx:attributes>
</tx:advice>

前面的示例展示了 Spring 事务建议 (txAdvice) 的基本基于 XML 的配置,并将其包含在轮询器定义的 <advice-chain> 中。如果您只需要解决轮询器的交易问题,仍然可以使用 <transactional> 元素作为一种方便的方法。

Transaction Boundaries

另一个重要因素是消息流中事务的边界。启动事务时,事务上下文将绑定到当前线程。因此,无论消息流中有多少端点和通道,只要您确保流在同一线程上继续,事务上下文都将保留。一旦您通过在一些服务中引入 Pollable ChannelExecutor Channel 或手动启动一个新线程来破坏它,事务边界也将被破坏。从本质上讲,事务将结束,并且如果线程之间已经成功实现移交,则该流将被视为成功,并将发送 COMMIT 信号,即使流将继续并且仍然可能在某个下游位置导致异常。如果这样的流是同步的,则该异常可以抛回到消息流的发起者,它也是事务上下文的发起者,并且事务将导致回滚。折衷的办法是在破坏线程边界的任何点使用事务通道。例如,您可以使用委派给事务消息存储策略的队列支持通道,或者可以使用 JMS 支持通道。

Transaction Synchronization

在某些环境中,通过包含整个流的事务来同步操作非常有帮助。例如,考虑流开始处的 <file:inbound-channel-adapter/>,该流执行一些数据库更新。如果事务提交,我们可能希望将文件移动到 success 目录,而如果事务回滚,我们可能希望将其移动到 failure 目录中。

Spring 集成 2.2 引入了将这些操作与事务同步的能力。此外,如果您没有“实际”事务,但仍然希望在成功或失败时执行不同的操作,则可以配置 PseudoTransactionManager。有关详细信息,请参见 Pseudo Transactions

以下清单显示了此功能的关键策略接口:

public interface TransactionSynchronizationFactory {

    TransactionSynchronization create(Object key);
}

public interface TransactionSynchronizationProcessor {

    void processBeforeCommit(IntegrationResourceHolder holder);

    void processAfterCommit(IntegrationResourceHolder holder);

    void processAfterRollback(IntegrationResourceHolder holder);

}

工厂负责创建 TransactionSynchronization 对象。您可以实现您自己的工厂,或使用框架提供的工厂: DefaultTransactionSynchronizationFactory。此实现返回一个 TransactionSynchronization,该 TransactionSynchronization 委托给 TransactionSynchronizationProcessor 的默认实现: ExpressionEvaluatingTransactionSynchronizationProcessor。此处理器支持三个 SpEL 表达式: beforeCommitExpressionafterCommitExpressionafterRollbackExpression

对于熟悉交易的人来说,这些操作应该是可以自解释的。在每种情况下,#root 变量都是原始 Message。在某些情况下,会根据轮询程序轮询的 MessageSource 提供其他 SpEL 变量。例如,MongoDbMessageSource 提供 #mongoTemplate 变量,它引用消息源的 MongoTemplate。同样,RedisStoreMessageSource 提供 #store 变量,它引用轮询创建的 RedisStore

若要为特定轮询器启用该功能,你可以通过使用 synchronization-factory 属性在轮询器的 <transactional/> 元素中提供对 TransactionSynchronizationFactory 的引用。

从 5.0 版本开始,Spring Integration 提供 PassThroughTransactionSynchronizationFactory,它在没有配置 TransactionSynchronizationFactory 但在 Advice 链中存在 TransactionInterceptor 类型的建议时默认应用于轮询端点。在使用任何现成的 TransactionSynchronizationFactory 实现时,轮询端点将轮询的消息绑定到当前事务上下文,并在事务建议抛出异常后将其作为 MessagingException 中的 failedMessage 提供。在使用未实现 TransactionInterceptor 的自定义事务建议时,你可以显式配置 PassThroughTransactionSynchronizationFactory 来实现这种行为。在这两种情况下,MessagingException 成为发送到 errorChannelErrorMessage 的有效负载,原因是建议抛出的原始异常。以前,ErrorMessage 的有效负载是建议抛出的原始异常,并且不提供对 failedMessage 信息的引用,这使得难以确定事务提交问题的原因。

为了简化这些组件的配置,Spring Integration 为默认工厂提供了命名空间支持。以下示例显示了如何使用命名空间配置文件入站通道适配器:

<int-file:inbound-channel-adapter id="inputDirPoller"
    channel="someChannel"
    directory="/foo/bar"
    filter="filter"
    comparator="testComparator">
    <int:poller fixed-rate="5000">
        <int:transactional transaction-manager="transactionManager" synchronization-factory="syncFactory" />
    </int:poller>
</int-file:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="payload.renameTo(new java.io.File('/success/' + payload.name))"
        channel="committedChannel" />
    <int:after-rollback expression="payload.renameTo(new java.io.File('/failed/' + payload.name))"
        channel="rolledBackChannel" />
</int:transaction-synchronization-factory>

SpEL 评估的结果作为有效负载发送到 committedChannelrolledBackChannel(在这种情况下,这将是 Boolean.TRUEBoolean.FALSE — java.io.File.renameTo() 方法调用的结果)。

如果你希望发送整个有效负载以进行进一步的 Spring Integration 处理,请使用“payload”表达式。

了解将这些操作与事务同步非常重要。它不会使本来不是事务的资源实际上成为事务。相反,事务(无论是 JDBC 还是其他事务)都会在轮询之前启动,并在流程完成后提交或回滚,然后执行同步操作。 如果你提供自定义 TransactionSynchronizationFactory,它负责创建资源同步,以便事务完成后自动取消绑定绑定的资源。默认 TransactionSynchronizationFactory 通过返回 ResourceHolderSynchronization 的子类来实现此目的,其中默认 shouldUnbindAtCompletion() 返回 true

除了 after-commitafter-rollback 表达式之外,还支持 before-commit。在这种情况下,如果评估(或下游处理)抛出异常,则事务将回滚,而不是提交。

Pseudo Transactions

在阅读 Transaction Synchronization 部分之后,您可能会认为,在流完成时采取这些“成功”或“失败”操作非常有用,即使轮询程序的下游没有 “real” 事务性资源(例如 JDBC)。例如,考虑一个 “<file:inbound-channel-adapter/>”,后跟一个 “<ftp:outbout-channel-adapter/>”。这两个组件都不是事务性的,但我们可能希望根据 FTP 传输的成功或失败,将输入文件移至不同的目录。

为了提供此功能,该框架提供了一个 PseudoTransactionManager,即使没有涉及真正的交易资源,也可以启用上述配置。如果流程正常完成,则将调用 beforeCommitafterCommit 同步。发生故障时,将调用 afterRollback 同步。因为它不是真正的交易,所以不会发生实际的提交或回滚。伪交易是一个用于启用同步功能的媒介。

若要使用 PseudoTransactionManager,你可以将它定义为 <bean/>,就像配置真正的交易管理器一样。以下示例展示了如何执行此操作:

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager" />

Reactive Transactions

从 5.3 版本开始,ReactiveTransactionManager 也可以与 TransactionInterceptor 建议一起用于返回响应类型的端点。这包括产生带有 FluxMono 有效负载的消息的 MessageSourceReactiveMessageHandler 实现(例如 ReactiveMongoDbMessageSource)。当它们的答复有效负载也是某种响应类型时,所有其他产生答复的消息处理程序实现都可以依赖于 ReactiveTransactionManager