Transaction Support
本章涵盖 Spring Integration 对事务的支持。它涵盖以下主题:
This chapter covers Spring Integration’s support for transactions. It covers the following topics:
Understanding Transactions in Message flows
Spring Integration 暴露了几个挂钩来满足消息流的事务性需求。要更好地理解这些挂钩以及如何从中受益,我们首先必须重新访问可以用来启动消息流的六种机制,并了解如何在每个机制中满足这些流的事务性需求。
Spring Integration exposes several hooks to address the transactional needs of your message flows. To better understand these hooks and how you can benefit from them, we must first revisit the six mechanisms that you can use to initiate message flows and see how you can address the transactional needs of these flows within each of these mechanisms.
以下六种机制启动消息流(本手册中提供了每种机制的详细信息):
The following six mechanisms initiate a message flow (details for each are provided throughout this manual):
-
Gateway proxy: A basic messaging gateway.
-
Message channel: Direct interactions with
MessageChannel
methods (for example,channel.send(message)
). -
Message publisher: The way to initiate a message flow as the by-product of method invocations on Spring beans.
-
Inbound channel adapters and gateways: The way to initiate a message flow based on connecting third-party system with the Spring Integration messaging system (for example,
[JmsMessage] → Jms Inbound Adapter[SI Message] → SI Channel
). -
Scheduler: The way to initiate a message flow based on scheduling events distributed by a pre-configured scheduler.
-
Poller: Similar to the scheduler, this is the way to initiate message flow based on scheduling or interval-based events distributed by a pre-configured poller.
我们可以将这六种机制分成两大类:
We can split these six mechanisms into two general categories:
-
Message flows initiated by a user process: Example scenarios in this category would be invoking a gateway method or explicitly sending a
Message
to aMessageChannel
. In other words, these message flows depend on a third party process (such as some code that you wrote) to be initiated. -
Message flows initiated by a daemon process: Example scenarios in this category include a Poller polling a message queue to initiate a new message flow with the polled message or a scheduler scheduling the process by creating a new message and initiating a message flow at a predefined time.
显然,Gateway 代理、MessageChannel.send(…)
和 MessagePublisher
都属于第一类,入站适配器和网关、计划程序和轮询器属于第二类。
Clearly the gateway proxy, MessageChannel.send(…)
and MessagePublisher
all belong to the first category, and inbound adapters and gateways, scheduler, and poller belong to the second category.
那么,您如何在每种类别的各种场景中解决事务性需求,Spring Integration 是否需要针对特定场景明确提供事务相关的内容?还是您可以使用 Spring 的事务支持?
So, how can you address transactional needs in various scenarios within each category, and is there a need for Spring Integration to provide something explicit with regard to transactions for a particular scenario? Or can you use Spring’s transaction support instead?
Spring 本身提供了对事务管理的一流支持。因此我们的目标不是提供新的内容,而是使用 Spring 来受益于其对事务的现有支持。换句话说,作为一个框架,我们必须向 Spring 的事务管理功能公开挂钩。但是,由于 Spring Integration 配置基于 Spring 配置,因此我们不必总是公开这些挂钩,因为 Spring 已经公开它们了。毕竟,每个 Spring Integration 组件都是 Spring Bean。
Spring itself provides first class support for transaction management. So our goal here is not to provide something new but rather use Spring to benefit from its existing support for transactions. In other words, as a framework, we must expose hooks to Spring’s transaction management functionality. However, since Spring Integration configuration is based on Spring configuration, we need not always expose these hooks, because Spring already exposes them . After all, every Spring Integration component is a Spring Bean.
有了这个目标,我们可以再次考虑这两种方案:由用户进程启动的消息流和由守护进程启动的消息流。
With this goal in mind, we can again consider the two scenarios: message flows initiated by a user process and message flows initiated by a daemon.
由用户进程启动并配置在 Spring 应用程序上下文中,消息流会受到此类进程的通常事务性配置的影响。因此,Spring Integration 不需要明确配置它们来支持事务。事务可以通过 Spring 的标准事务支持来启动并应该这样做。Spring Integration 消息流自然会遵守组件的事务性语义,因为它本身是由 Spring 配置的。例如,网关或服务激活器方法可以用 @Transactional
进行注释,或者可以在 XML 配置中定义一个 TransactionInterceptor
,其中带有指向应该具有事务性的特定方法的切入点表达式。最重要的是,您可以在这些场景中完全控制事务配置和边界。
Message flows that are initiated by a user process and configured in a Spring application context are subject to the usual transactional configuration of such processes.
Therefore, they need not be explicitly configured by Spring Integration to support transactions.
The transaction could and should be initiated through Spring’s standard transaction support.
The Spring Integration message flow naturally honors the transactional semantics of the components, because it is itself configured by Spring.
For example, a gateway or service activator method could be annotated with @Transactional
, or a TransactionInterceptor
could be defined in an XML configuration with a pointcut expression that points to specific methods that should be transactional.
The bottom line is that you have full control over transaction configuration and boundaries in these scenarios.
但是,对于由守护进程启动的消息流来说,情况稍微有些不同。尽管是由开发人员配置,但这些流不会直接涉及要启动的人或其他进程。这些是由触发进程(守护进程)根据进程配置启动的基于触发的流。例如,我们可以让计划程序在每个星期五晚上启动消息流。我们还可以配置一个每秒启动一次消息流的触发器,以此类推。因此,我们需要一种方法让这些基于触发器的进程知道我们打算使结果消息流具有事务性,以便可以在启动新的消息流时创建事务上下文。换句话说,我们需要公开一些事务配置,但仅限于委派给 Spring 已提供的交易支持(正如我们在其他场景中所做的那样)。
However, things are a bit different when it comes to message flows initiated by a daemon process. Although configured by the developer, these flows do not directly involve a human or some other process to be initiated. These are trigger-based flows that are initiated by a trigger process (a daemon process) based on the configuration of the process. For example, we could have a scheduler initiate a message flow every Friday night. We can also configure a trigger that initiates a message flow every second and so on. As a result, we need a way to let these trigger-based processes know of our intention to make the resulting message flows be transactional, so that a Transaction context can be created whenever a new message flow is initiated. In other words, we need to expose some transaction configuration, but only enough to delegate to the transaction support already provided by Spring (as we do in other scenarios).
Poller Transaction Support
Spring Integration 提供了对轮询器的交易支持。轮询器是一种特殊类型的组件,因为在轮询器任务内,我们可以对本身具有事务性的资源调用 receive()
,从而将 receive()
调用包括在事务的边界内,这使其在任务失败时可以回滚。如果我们为通道添加相同的支持,添加的事务将影响从 send()
调用开始的所有下游组件。这为事务划分提供了相当广泛的范围,没有任何充分的理由,特别是当 Spring 已经提供了多种方法来满足任何下游组件的事务性需求时。但是,receive()
方法包含在事务边界中是轮询器的“充分理由”。
Spring Integration provides transactional support for pollers.
Pollers are a special type of component because, within a poller task, we can call receive()
against a resource that is itself transactional, thus including the receive()
call in the boundaries of the transaction, which lets it be rolled back in case of a task failure.
If we were to add the same support for channels, the added transactions would affect all downstream components starting with the send()
call.
That provides a rather wide scope for transaction demarcation without any strong reason, especially when Spring already provides several ways to address the transactional needs of any component downstream.
However, the receive()
method being included in a transaction boundary is the “strong reason” for pollers.
任何时候配置轮询器时,都可以使用 <transactional>
子元素及其属性提供事务配置,如下面示例所示:
Any time you configure a Poller, you can provide transactional configuration by using the transactional
child element and its attributes,as the following example shows:
<int:poller max-messages-per-poll="1" fixed-rate="1000">
<transactional transaction-manager="txManager"
isolation="DEFAULT"
propagation="REQUIRED"
read-only="true"
timeout="1000"/>
</poller>
前面的配置看起来类似于原生 Spring 事务配置。您仍然必须提供事务管理器的引用,并指定事务属性或依靠默认值(例如,如果未指定“transaction-manager”属性,则其默认为名为“transactionManager”的 bean)。在内部,该进程包含在 Spring 的原生事务中,其中 TransactionInterceptor
负责处理事务。有关如何配置事务管理器、事务管理器类型(例如 JTA、Datasource 等)以及与事务配置相关的其他详细信息的更多信息,请参见 Spring Framework Reference Guide。
The preceding configuration looks similar to a native Spring transaction configuration.
You must still provide a reference to a transaction manager and either specify transaction attributes or rely on defaults (for example, if the 'transaction-manager' attribute is not specified, it defaults to the bean named 'transactionManager').
Internally, the process is wrapped in Spring’s native transaction, where TransactionInterceptor
is responsible for handling transactions.
For more information on how to configure a transaction manager, the types of transaction managers (such as JTA, Datasource, and others), and other details related to transaction configuration, see the Spring Framework Reference Guide.
使用前面的配置,此轮询程序启动的所有消息流都是事务性的。有关轮询程序的事务配置的更多信息和详细信息,请参见 Polling and Transactions。
With the preceding configuration, all message flows initiated by this poller are transactional. For more information and details on a poller’s transactional configuration, see Polling and Transactions.
除了事务之外,在运行轮询程序时,您可能还需要解决其他多个交叉切入问题。为了提供帮助,轮询程序元素接受 <advice-chain>
子元素,该元素允许您定义要应用于该轮询程序的自定义建议实例链。(有关详细信息,请参见 Pollable Message Source。)在 Spring 集成 2.0 中,轮询程序经历了一次重构工作,现在使用代理机制来解决事务问题以及其他交叉问题。这种努力产生的重大变化之一是,我们让 <transactional>
和 <advice-chain>
元素互斥。其背后的原因是,如果您需要多条建议并且其中一条是事务建议,您可以将其包含在 <advice-chain>
中,其便利程度与之前相同,但控制力更大,因为您现在可以选择按所需顺序放置建议。以下示例展示了如何做到这一点:
Along with transactions, you might need to address several more cross-cutting concerns when you run a poller.
To help with that, the poller element accepts an <advice-chain>
child element, which lets you define a custom chain of advice instances to be applied on the Poller.
(See Pollable Message Source for more details.)
In Spring Integration 2.0, the Poller went through a refactoring effort and now uses a proxy mechanism to address transactional concerns as well as other cross-cutting concerns.
One of the significant changes evolving from this effort is that we made the <transactional>
and <advice-chain>
elements be mutually exclusive.
The rationale behind this is that, if you need more than one advice and one of them is Transaction advice, you can include it in the <advice-chain>
with the same convenience as before but with much more control, since you now have an option to position the advice in the desired order.
The following example shows how to do so:
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<advice-chain>
<ref bean="txAdvice"/>
<ref bean="someOtherAdviceBean" />
<beans:bean class="foo.bar.SampleAdvice"/>
</advice-chain>
</poller>
<tx:advice id="txAdvice" transaction-manager="txManager">
<tx:attributes>
<tx:method name="get*" read-only="true"/>
<tx:method name="*"/>
</tx:attributes>
</tx:advice>
前面的示例展示了 Spring 事务建议 (txAdvice
) 的基本基于 XML 的配置,并将其包含在轮询器定义的 <advice-chain>
中。如果您只需要解决轮询器的交易问题,仍然可以使用 <transactional>
元素作为一种方便的方法。
The preceding example shows a basic XML-based configuration of Spring Transaction advice (txAdvice
) and included it within the <advice-chain>
defined by the Poller.
If you need to address only the transactional concerns of the poller, you can still use the <transactional>
element as a convenience.
Transaction Boundaries
另一个重要因素是消息流中事务的边界。启动事务时,事务上下文将绑定到当前线程。因此,无论消息流中有多少端点和通道,只要您确保流在同一线程上继续,事务上下文都将保留。一旦您通过在一些服务中引入 Pollable Channel 或 Executor Channel 或手动启动一个新线程来破坏它,事务边界也将被破坏。从本质上讲,事务将结束,并且如果线程之间已经成功实现移交,则该流将被视为成功,并将发送 COMMIT 信号,即使流将继续并且仍然可能在某个下游位置导致异常。如果这样的流是同步的,则该异常可以抛回到消息流的发起者,它也是事务上下文的发起者,并且事务将导致回滚。折衷的办法是在破坏线程边界的任何点使用事务通道。例如,您可以使用委派给事务消息存储策略的队列支持通道,或者可以使用 JMS 支持通道。
Another important factor is the boundaries of Transactions within a Message flow. When a transaction is started, the transaction context is bound to the current thread. So regardless of how many endpoints and channels you have in your Message flow your transaction context will be preserved as long as you are ensuring that the flow continues on the same thread. As soon as you break it by introducing a Pollable Channel or Executor Channel or initiate a new thread manually in some service, the Transactional boundary will be broken as well. Essentially the Transaction will END right there, and if a successful handoff has transpired between the threads, the flow would be considered a success and a COMMIT signal would be sent even though the flow will continue and might still result in an Exception somewhere downstream. If such a flow were synchronous, that Exception could be thrown back to the initiator of the Message flow who is also the initiator of the transactional context and the transaction would result in a ROLLBACK. The middle ground is to use transactional channels at any point where a thread boundary is being broken. For example, you can use a Queue-backed Channel that delegates to a transactional MessageStore strategy, or you could use a JMS-backed channel.
Transaction Synchronization
在某些环境中,通过包含整个流的事务来同步操作非常有帮助。例如,考虑流开始处的 <file:inbound-channel-adapter/>
,该流执行一些数据库更新。如果事务提交,我们可能希望将文件移动到 success
目录,而如果事务回滚,我们可能希望将其移动到 failure
目录中。
In some environments, it helps to synchronize operations with a transaction that encompasses the entire flow.
For example, consider a <file:inbound-channel-adapter/>
at the start of a flow that performs a number of database updates.
If the transaction commits, we might want to move the file to a success
directory, while we might want to move it to a failure
directory if the transaction rolls back.
Spring 集成 2.2 引入了将这些操作与事务同步的能力。此外,如果您没有“实际”事务,但仍然希望在成功或失败时执行不同的操作,则可以配置 PseudoTransactionManager
。有关详细信息,请参见 Pseudo Transactions。
Spring Integration 2.2 introduced the capability of synchronizing these operations with a transaction.
In addition, you can configure a PseudoTransactionManager
if you do not have a 'real' transaction but still want to perform different actions on success or failure.
For more information, see Pseudo Transactions.
以下清单显示了此功能的关键策略接口:
The following listing shows the key strategy interfaces for this feature:
public interface TransactionSynchronizationFactory {
TransactionSynchronization create(Object key);
}
public interface TransactionSynchronizationProcessor {
void processBeforeCommit(IntegrationResourceHolder holder);
void processAfterCommit(IntegrationResourceHolder holder);
void processAfterRollback(IntegrationResourceHolder holder);
}
工厂负责创建 TransactionSynchronization
对象。您可以实现您自己的工厂,或使用框架提供的工厂: DefaultTransactionSynchronizationFactory
。此实现返回一个 TransactionSynchronization
,该 TransactionSynchronization
委托给 TransactionSynchronizationProcessor
的默认实现: ExpressionEvaluatingTransactionSynchronizationProcessor
。此处理器支持三个 SpEL 表达式: beforeCommitExpression
、afterCommitExpression
和 afterRollbackExpression
。
The factory is responsible for creating a TransactionSynchronization
object.
You can implement your own or use the one provided by the framework: DefaultTransactionSynchronizationFactory
.
This implementation returns a TransactionSynchronization
that delegates to a default implementation of TransactionSynchronizationProcessor
: ExpressionEvaluatingTransactionSynchronizationProcessor
.
This processor supports three SpEL expressions: beforeCommitExpression
, afterCommitExpression
, and afterRollbackExpression
.
对于熟悉交易的人来说,这些操作应该是可以自解释的。在每种情况下,#root
变量都是原始 Message
。在某些情况下,会根据轮询程序轮询的 MessageSource
提供其他 SpEL 变量。例如,MongoDbMessageSource
提供 #mongoTemplate
变量,它引用消息源的 MongoTemplate
。同样,RedisStoreMessageSource
提供 #store
变量,它引用轮询创建的 RedisStore
。
These actions should be self-explanatory to those familiar with transactions.
In each case, the #root
variable is the original Message
.
In some cases, other SpEL variables are made available, depending on the MessageSource
being polled by the poller.
For example, the MongoDbMessageSource
provides the #mongoTemplate
variable, which references the message source’s MongoTemplate
.
Similarly, the RedisStoreMessageSource
provides the #store
variable, which references the RedisStore
created by the poll.
若要为特定轮询器启用该功能,你可以通过使用 synchronization-factory
属性在轮询器的 <transactional/>
元素中提供对 TransactionSynchronizationFactory
的引用。
To enable the feature for a particular poller, you can provide a reference to the TransactionSynchronizationFactory
on the poller’s <transactional/>
element by using the synchronization-factory
attribute.
从 5.0 版本开始,Spring Integration 提供 PassThroughTransactionSynchronizationFactory
,它在没有配置 TransactionSynchronizationFactory
但在 Advice 链中存在 TransactionInterceptor
类型的建议时默认应用于轮询端点。在使用任何现成的 TransactionSynchronizationFactory
实现时,轮询端点将轮询的消息绑定到当前事务上下文,并在事务建议抛出异常后将其作为 MessagingException
中的 failedMessage
提供。在使用未实现 TransactionInterceptor
的自定义事务建议时,你可以显式配置 PassThroughTransactionSynchronizationFactory
来实现这种行为。在这两种情况下,MessagingException
成为发送到 errorChannel
的 ErrorMessage
的有效负载,原因是建议抛出的原始异常。以前,ErrorMessage
的有效负载是建议抛出的原始异常,并且不提供对 failedMessage
信息的引用,这使得难以确定事务提交问题的原因。
Starting with version 5.0, Spring Integration provides PassThroughTransactionSynchronizationFactory
, which is applied by default to polling endpoints when no TransactionSynchronizationFactory
is configured but an advice of type TransactionInterceptor
exists in the advice chain.
When using any out-of-the-box TransactionSynchronizationFactory
implementation, polling endpoints bind a polled message to the current transactional context and provide it as a failedMessage
in a MessagingException
if an exception is thrown after the transaction advice.
When using a custom transaction advice that does not implement TransactionInterceptor
, you can explicitly configure a PassThroughTransactionSynchronizationFactory
to achieve this behavior.
In either case, the MessagingException
becomes the payload of the ErrorMessage
that is sent to the errorChannel
, and the cause is the raw exception thrown by the advice.
Previously, the ErrorMessage
had a payload that was the raw exception thrown by the advice and did not provide a reference to the failedMessage
information, making it difficult to determine the reasons for the transaction commit problem.
为了简化这些组件的配置,Spring Integration 为默认工厂提供了命名空间支持。以下示例显示了如何使用命名空间配置文件入站通道适配器:
To simplify configuration of these components, Spring Integration provides namespace support for the default factory. The following example shows how to use the namespace to configure a file inbound channel adapter:
<int-file:inbound-channel-adapter id="inputDirPoller"
channel="someChannel"
directory="/foo/bar"
filter="filter"
comparator="testComparator">
<int:poller fixed-rate="5000">
<int:transactional transaction-manager="transactionManager" synchronization-factory="syncFactory" />
</int:poller>
</int-file:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.renameTo(new java.io.File('/success/' + payload.name))"
channel="committedChannel" />
<int:after-rollback expression="payload.renameTo(new java.io.File('/failed/' + payload.name))"
channel="rolledBackChannel" />
</int:transaction-synchronization-factory>
SpEL 评估的结果作为有效负载发送到 committedChannel
或 rolledBackChannel
(在这种情况下,这将是 Boolean.TRUE
或 Boolean.FALSE
— java.io.File.renameTo()
方法调用的结果)。
The result of the SpEL evaluation is sent as the payload to either committedChannel
or rolledBackChannel
(in this case, this would be Boolean.TRUE
or Boolean.FALSE
— the result of the java.io.File.renameTo()
method call).
如果你希望发送整个有效负载以进行进一步的 Spring Integration 处理,请使用“payload”表达式。
If you wish to send the entire payload for further Spring Integration processing, use the 'payload' expression.
了解将这些操作与事务同步非常重要。它不会使本来不是事务的资源实际上成为事务。相反,事务(无论是 JDBC 还是其他事务)都会在轮询之前启动,并在流程完成后提交或回滚,然后执行同步操作。
It is important to understand that this synchronizes the actions with a transaction. It does not make a resource that is not inherently transactional actually be transactional. Instead, the transaction (be it JDBC or otherwise) is started before the poll and either committed or rolled back when the flow completes, followed by the synchronized action.
如果你提供自定义 TransactionSynchronizationFactory
,它负责创建资源同步,以便事务完成后自动取消绑定绑定的资源。默认 TransactionSynchronizationFactory
通过返回 ResourceHolderSynchronization
的子类来实现此目的,其中默认 shouldUnbindAtCompletion()
返回 true
。
If you provide a custom TransactionSynchronizationFactory
, it is responsible for creating a resource synchronization that causes the bound resource to be unbound automatically when the transaction completes.
The default TransactionSynchronizationFactory
does so by returning a subclass of ResourceHolderSynchronization
, with the default shouldUnbindAtCompletion()
returning true
.
除了 after-commit
和 after-rollback
表达式之外,还支持 before-commit
。在这种情况下,如果评估(或下游处理)抛出异常,则事务将回滚,而不是提交。
In addition to the after-commit
and after-rollback
expressions, before-commit
is also supported.
In that case, if the evaluation (or downstream processing) throws an exception, the transaction is rolled back instead of being committed.
Pseudo Transactions
在阅读 Transaction Synchronization 部分之后,您可能会认为,在流完成时采取这些“成功”或“失败”操作非常有用,即使轮询程序的下游没有 “real” 事务性资源(例如 JDBC)。例如,考虑一个 “<file:inbound-channel-adapter/>”,后跟一个 “<ftp:outbout-channel-adapter/>”。这两个组件都不是事务性的,但我们可能希望根据 FTP 传输的成功或失败,将输入文件移至不同的目录。
After reading the Transaction Synchronization section, you might think it would be useful to take these 'success' or 'failure' actions when a flow completes, even if there is no “real” transactional resources (such as JDBC) downstream of the poller. For example, consider a “<file:inbound-channel-adapter/>” followed by an “<ftp:outbout-channel-adapter/>”. Neither of these components is transactional, but we might want to move the input file to different directories, based on the success or failure of the FTP transfer.
为了提供此功能,该框架提供了一个 PseudoTransactionManager
,即使没有涉及真正的交易资源,也可以启用上述配置。如果流程正常完成,则将调用 beforeCommit
和 afterCommit
同步。发生故障时,将调用 afterRollback
同步。因为它不是真正的交易,所以不会发生实际的提交或回滚。伪交易是一个用于启用同步功能的媒介。
To provide this functionality, the framework provides a PseudoTransactionManager
, enabling the above configuration even when there is no real transactional resource involved.
If the flow completes normally, the beforeCommit
and afterCommit
synchronizations are called.
On failure, the afterRollback
synchronization is called.
Because it is not a real transaction, no actual commit or rollback occurs.
The pseudo transaction is a vehicle used to enable the synchronization features.
若要使用 PseudoTransactionManager
,你可以将它定义为 <bean/>,就像配置真正的交易管理器一样。以下示例展示了如何执行此操作:
To use a PseudoTransactionManager
, you can define it as a <bean/>, in the same way you would configure a real transaction manager.
The following example shows how to do so:
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager" />
Reactive Transactions
从 5.3 版本开始,ReactiveTransactionManager
也可以与 TransactionInterceptor
建议一起用于返回响应类型的端点。这包括产生带有 Flux
或 Mono
有效负载的消息的 MessageSource
和 ReactiveMessageHandler
实现(例如 ReactiveMongoDbMessageSource
)。当它们的答复有效负载也是某种响应类型时,所有其他产生答复的消息处理程序实现都可以依赖于 ReactiveTransactionManager
。
Starting with version 5.3, a ReactiveTransactionManager
can also be used together with a TransactionInterceptor
advice for endpoints returning a reactive type.
This includes MessageSource
and ReactiveMessageHandler
implementations (e.g. ReactiveMongoDbMessageSource
) which produce a message with a Flux
or Mono
payload.
All other reply producing message handler implementations can rely on a ReactiveTransactionManager
when their reply payload is also some reactive type.