Transactions
Spring Rabbit 框架支持在具有多种不同语义的同步和异步用例中自动事务管理,这些语义可以通过声明性方式进行选择,这对于 Spring 事务的现有用户来说很熟悉。这让许多甚至大多数常见的消息传递模式得以轻松实现。
有两种方法可以向框架指示所需的事务语义。在 RabbitTemplate
和 SimpleMessageListenerContainer
中,都有一个标志位 channelTransacted
,如果为 true
,则会指示框架使用事务通道,并根据结果以提交或回滚操作(发送或接收)的结尾,其中异常指示回滚。另一个指示是使用某个 Spring PlatformTransactionManager
实现来提供外部事务作为正在进行操作的上下文。如果框架在发送或接收消息时已有事务在进行,且 channelTransacted
标志位为 true
,则会将消息传递事务的提交或回滚操作推迟到当前事务结束时。如果 channelTransacted
标志位为 false
,则不会将任何事务语义应用到消息传递操作(会自动确认)。
channelTransacted
标志位是配置时间设置。它在创建 AMQP 组件时会声明和处理一次,通常在应用程序启动时。从原则上讲,外部事务更动态,因为系统会在运行时响应当前线程状态。然而,在实践中,当事务以声明性方式分层到应用程序时,它通常也是一个配置设置。
针对 RabbitTemplate
使用同步用例时,外部事务由调用者提供,可以根据喜好以声明性方式或命令性方式提供(常见的 Spring 事务模型)。以下示例展示了声明性方式(通常较受青睐,因为它是非侵入性的),其中该模板已配置为 channelTransacted=true
:
@Transactional
public void doSomething() {
String incoming = rabbitTemplate.receiveAndConvert();
// do some more database processing...
String outgoing = processInDatabaseAndExtractReply(incoming);
rabbitTemplate.convertAndSend(outgoing);
}
在上一个示例中,String
负载会被接收、转换,并以消息正文的形式发送到一个标记为 @Transactional
的方法内部。如果数据库处理过程出现异常,入站消息会被返回到代理,且不会发送出站消息。这适用于事务方法链中的任何操作(RabbitTemplate
,除非,例如,Channel
被直接处理为提前提交该事务)。
针对于 SimpleMessageListenerContainer
使用异步用例时,如果需要外部事务,则务必在设置侦听器时由容器请求该事务。为了指示需要外部事务,用户会在配置时向容器提供 PlatformTransactionManager
实现。以下示例展示了如何执行此操作:
@Configuration
public class ExampleExternalTransactionAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setTransactionManager(transactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
}
在上一个示例中,事务管理器被添加为从其他 Bean 定义(未显示)注入的依赖,并且还将 channelTransacted
标志位设置为 true
。其效果是,如果侦听器出现异常,该事务会被回滚,并且消息也会返回到代理。重要的是,如果事务提交失败(例如,由于数据库约束错误或连接问题),则 AMQP 事务也会回滚,并且该消息会被返回到代理。这有时被称为“尽量而为 1 阶段提交
”,并且是用于进行可靠消息传递的一种非常有效的方法。如果在上一个示例中将 channelTransacted
标志位设置为 false
(默认值),则仍然会为监听器提供外部事务,但所有消息传递操作都将自动确认,因此其效果是即使业务操作回滚,也会提交消息传递操作。
Conditional Rollback
在版本 1.6.6 之前,当使用外部事务管理器(例如 JDBC)时,向容器的 transactionAttribute
添加回滚规则不会产生任何效果。异常始终会回滚事务。
此外,在容器的建议链中使用 transaction advice 时,有条件回滚不太有用,因为所有侦听器异常都包含在 ListenerExecutionFailedException
中。
第一个问题已得到修正,现在可以正常地应用这些规则。此外,现在提供了 ListenerFailedRuleBasedTransactionAttribute
。它是 RuleBasedTransactionAttribute
的一个子类,唯一的区别在于它感知 ListenerExecutionFailedException
,并使用此类异常的原因应用规则。此事务属性可以直接在容器中使用,或通过事务忠告使用。
以下示例使用此规则:
@Bean
public AbstractMessageListenerContainer container() {
...
container.setTransactionManager(transactionManager);
RuleBasedTransactionAttribute transactionAttribute =
new ListenerFailedRuleBasedTransactionAttribute();
transactionAttribute.setRollbackRules(Collections.singletonList(
new NoRollbackRuleAttribute(DontRollBackException.class)));
container.setTransactionAttribute(transactionAttribute);
...
}
A note on Rollback of Received Messages
AMQP 事务仅适用于发送到代理的消息和确认。因此,当 Spring 事务回滚且已收到消息时,Spring AMQP 不仅要回滚事务,还必须手动拒绝消息(类似 nack,但这并不是规范称之为的内容)。对消息拒绝采取的操作与事务无关,而取决于 defaultRequeueRejected
属性(默认值:true
)。有关拒绝失败消息的更多信息,请参阅 Message Listeners and the Asynchronous Case。
有关 RabbitMQ 事务及其限制的详细信息,请参阅 RabbitMQ Broker Semantics。
在 RabbitMQ 2.7.0 之前,此类消息(以及在信道关闭或中止时未确认的消息)会在 Rabbit 代理上进入队列的末尾。自 2.7.0 起,被拒绝的消息会进入队列的开头,类似于 JMS 回滚的消息。 |
以前,事务回滚时的消息重新排队在本地事务和提供 |
Using RabbitTransactionManager
RabbitTransactionManager 是在外部事务中执行 Rabbit 操作并与其同步的替代方案。此事务管理器是 PlatformTransactionManager
接口的实现,并且应与单个 Rabbit ConnectionFactory
一起使用。
此策略无法提供 XA 事务——例如,无法在消息传递和数据库访问之间共享事务。
使用 ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean)
应用程序代码需要检索事务性 Rabbit 资源,而不是使用 Connection.createChannel()
标准调用和后续信道创建。在使用 Spring AMQP 的 RabbitTemplate 时,它将自动检测线程绑定信道并自动参与其中事务。
借助 Java 配置,你可以使用以下 Bean 来设置一个新的 RabbitTransactionManager:
@Bean
public RabbitTransactionManager rabbitTransactionManager() {
return new RabbitTransactionManager(connectionFactory);
}
如果你更喜欢 XML 配置,可以在 XML 应用程序上下文文件中声明以下 Bean:
<bean id="rabbitTxManager"
class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
Transaction Synchronization
将 RabbitMQ 事务与其他某些事务(例如 DBMS 事务)同步提供“尽力而为的一阶段提交”语义。RabbitMQ 事务在事务同步的完成后的阶段中可能无法提交。这是由 spring-tx
基础结构记录为一个错误,但没有将异常抛转给调用代码。从版本 2.3.10 开始,你可以在事务在处理了事务的同一线程上提交后,调用 ConnectionUtils.checkAfterCompletion()
。如果没有发生异常,它将简单地返回;否则它将抛出一个 AfterCompletionFailedException
,其中将有一个属性代表完成的同步状态。
通过调用 ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true)
来启用此功能;这是一个全局标志,适用于所有线程。