Transactions

Spring Rabbit 框架支持在具有多种不同语义的同步和异步用例中自动事务管理,这些语义可以通过声明性方式进行选择,这对于 Spring 事务的现有用户来说很熟悉。这让许多甚至大多数常见的消息传递模式得以轻松实现。 有两种方法可以向框架指示所需的事务语义。在 RabbitTemplateSimpleMessageListenerContainer 中,都有一个标志位 channelTransacted,如果为 true,则会指示框架使用事务通道,并根据结果以提交或回滚操作(发送或接收)的结尾,其中异常指示回滚。另一个指示是使用某个 Spring PlatformTransactionManager 实现来提供外部事务作为正在进行操作的上下文。如果框架在发送或接收消息时已有事务在进行,且 channelTransacted 标志位为 true,则会将消息传递事务的提交或回滚操作推迟到当前事务结束时。如果 channelTransacted 标志位为 false,则不会将任何事务语义应用到消息传递操作(会自动确认)。 channelTransacted 标志位是配置时间设置。它在创建 AMQP 组件时会声明和处理一次,通常在应用程序启动时。从原则上讲,外部事务更动态,因为系统会在运行时响应当前线程状态。然而,在实践中,当事务以声明性方式分层到应用程序时,它通常也是一个配置设置。 针对 RabbitTemplate 使用同步用例时,外部事务由调用者提供,可以根据喜好以声明性方式或命令性方式提供(常见的 Spring 事务模型)。以下示例展示了声明性方式(通常较受青睐,因为它是非侵入性的),其中该模板已配置为 channelTransacted=true

@Transactional
public void doSomething() {
    String incoming = rabbitTemplate.receiveAndConvert();
    // do some more database processing...
    String outgoing = processInDatabaseAndExtractReply(incoming);
    rabbitTemplate.convertAndSend(outgoing);
}

在上一个示例中,String 负载会被接收、转换,并以消息正文的形式发送到一个标记为 @Transactional 的方法内部。如果数据库处理过程出现异常,入站消息会被返回到代理,且不会发送出站消息。这适用于事务方法链中的任何操作(RabbitTemplate,除非,例如,Channel 被直接处理为提前提交该事务)。 针对于 SimpleMessageListenerContainer 使用异步用例时,如果需要外部事务,则务必在设置侦听器时由容器请求该事务。为了指示需要外部事务,用户会在配置时向容器提供 PlatformTransactionManager 实现。以下示例展示了如何执行此操作:

@Configuration
public class ExampleExternalTransactionAmqpConfiguration {

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setTransactionManager(transactionManager());
        container.setChannelTransacted(true);
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
    }

}

在上一个示例中,事务管理器被添加为从其他 Bean 定义(未显示)注入的依赖,并且还将 channelTransacted 标志位设置为 true。其效果是,如果侦听器出现异常,该事务会被回滚,并且消息也会返回到代理。重要的是,如果事务提交失败(例如,由于数据库约束错误或连接问题),则 AMQP 事务也会回滚,并且该消息会被返回到代理。这有时被称为“尽量而为 1 阶段提交”,并且是用于进行可靠消息传递的一种非常有效的方法。如果在上一个示例中将 channelTransacted 标志位设置为 false(默认值),则仍然会为监听器提供外部事务,但所有消息传递操作都将自动确认,因此其效果是即使业务操作回滚,也会提交消息传递操作。

Conditional Rollback

在版本 1.6.6 之前,当使用外部事务管理器(例如 JDBC)时,向容器的 transactionAttribute 添加回滚规则不会产生任何效果。异常始终会回滚事务。

此外,在容器的建议链中使用 transaction advice 时,有条件回滚不太有用,因为所有侦听器异常都包含在 ListenerExecutionFailedException 中。

第一个问题已得到修正,现在可以正常地应用这些规则。此外,现在提供了 ListenerFailedRuleBasedTransactionAttribute。它是 RuleBasedTransactionAttribute 的一个子类,唯一的区别在于它感知 ListenerExecutionFailedException,并使用此类异常的原因应用规则。此事务属性可以直接在容器中使用,或通过事务忠告使用。

以下示例使用此规则:

@Bean
public AbstractMessageListenerContainer container() {
    ...
    container.setTransactionManager(transactionManager);
    RuleBasedTransactionAttribute transactionAttribute =
        new ListenerFailedRuleBasedTransactionAttribute();
    transactionAttribute.setRollbackRules(Collections.singletonList(
        new NoRollbackRuleAttribute(DontRollBackException.class)));
    container.setTransactionAttribute(transactionAttribute);
    ...
}

A note on Rollback of Received Messages

AMQP 事务仅适用于发送到代理的消息和确认。因此,当 Spring 事务回滚且已收到消息时,Spring AMQP 不仅要回滚事务,还必须手动拒绝消息(类似 nack,但这并不是规范称之为的内容)。对消息拒绝采取的操作与事务无关,而取决于 defaultRequeueRejected 属性(默认值:true)。有关拒绝失败消息的更多信息,请参阅 Message Listeners and the Asynchronous Case

有关 RabbitMQ 事务及其限制的详细信息,请参阅 RabbitMQ Broker Semantics

在 RabbitMQ 2.7.0 之前,此类消息(以及在信道关闭或中止时未确认的消息)会在 Rabbit 代理上进入队列的末尾。自 2.7.0 起,被拒绝的消息会进入队列的开头,类似于 JMS 回滚的消息。

以前,事务回滚时的消息重新排队在本地事务和提供 TransactionManager 时不一致。在第一种情况下,应用了常规的重新排队逻辑(AmqpRejectAndDontRequeueExceptiondefaultRequeueRejected=false)(请参阅 Message Listeners and the Asynchronous Case)。如果使用事务管理器,则消息将在回滚时无条件重新排队。从 2.0 版本开始,行为保持一致,并且在两种情况下都应用了常规的重新排队逻辑。要恢复为之前的行为,您可以将容器的 alwaysRequeueWithTxManagerRollback 属性设置为 true。请参阅 Message Listener Container Configuration

Using RabbitTransactionManager

RabbitTransactionManager 是在外部事务中执行 Rabbit 操作并与其同步的替代方案。此事务管理器是 PlatformTransactionManager 接口的实现,并且应与单个 Rabbit ConnectionFactory 一起使用。

此策略无法提供 XA 事务——例如,无法在消息传递和数据库访问之间共享事务。

使用 ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean) 应用程序代码需要检索事务性 Rabbit 资源,而不是使用 Connection.createChannel() 标准调用和后续信道创建。在使用 Spring AMQP 的 RabbitTemplate 时,它将自动检测线程绑定信道并自动参与其中事务。

借助 Java 配置,你可以使用以下 Bean 来设置一个新的 RabbitTransactionManager:

@Bean
public RabbitTransactionManager rabbitTransactionManager() {
    return new RabbitTransactionManager(connectionFactory);
}

如果你更喜欢 XML 配置,可以在 XML 应用程序上下文文件中声明以下 Bean:

<bean id="rabbitTxManager"
      class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>

Transaction Synchronization

将 RabbitMQ 事务与其他某些事务(例如 DBMS 事务)同步提供“尽力而为的一阶段提交”语义。RabbitMQ 事务在事务同步的完成后的阶段中可能无法提交。这是由 spring-tx 基础结构记录为一个错误,但没有将异常抛转给调用代码。从版本 2.3.10 开始,你可以在事务在处理了事务的同一线程上提交后,调用 ConnectionUtils.checkAfterCompletion()。如果没有发生异常,它将简单地返回;否则它将抛出一个 AfterCompletionFailedException,其中将有一个属性代表完成的同步状态。

通过调用 ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true) 来启用此功能;这是一个全局标志,适用于所有线程。