Resilience: Recovering from Errors and Broker Failures

  • 自动声明和连接重新建立:CachingConnectionFactory 和 RabbitAdmin 允许在启动或重新连接时自动声明和重新建立交换、队列和绑定。

  • 异常处理和重试:Spring AMQP 抛出异常来指示连接问题,并支持使用 Spring Retry 进行自定义重试逻辑,包括有状态和无状态重试。

  • 消息侦听器恢复:SimpleMessageListenerContainer 自动重新启动消息侦听器以处理连接中断。

  • 业务异常处理:StatefulRetryOperationsInterceptor 和 RepublishMessageRecoverer 允许自定义业务异常处理,执行自定义恢复动作或在重试失败时重新发布消息。

Spring AMQP 提供的一些关键(也是最受欢迎的)高级功能与在发生协议错误或代理故障时的恢复和自动重新连接有关。我们在此指南中已经看到了所有相关的组件,但将它们全部放在一起并分别调用这些功能和恢复方案会有所帮助。 主要重新连接功能由 CachingConnectionFactory 本身启用。通常还建议使用 RabbitAdmin 自动声明功能。此外,如果你关心有保证的传递,你可能还需要在 RabbitTemplateSimpleMessageListenerContainer 中使用 channelTransacted 标志,并在 SimpleMessageListenerContainer 中使用 AcknowledgeMode.AUTO (或在你自己进行确认的情况下进行人工确认)。

Automatic Declaration of Exchanges, Queues, and Bindings

RabbitAdmin 组件可以在启动时声明交易所、队列和绑定。它通过 ConnectionListener 延迟执行此操作。因此,如果代理在启动时不存在,则无关紧要。当首次使用 Connection 时(例如,发送消息),侦听器会触发并应用管理功能。在侦听器中进行自动声明的另一个好处是,如果连接因任何原因断开(例如,代理死亡、网络故障等),则在重新建立连接时会再次应用它们。

以这种方式声明的队列必须有固定的名称——要么显式声明,要么由 AnonymousQueue 实例的框架生成。匿名队列是非持久的、独占的和自动删除的。

只有当 CachingConnectionFactory 缓存模式为 CHANNEL(默认)时,才会执行自动声明。存在此限制,因为独占队列和自动删除队列绑定到该连接。

从 2.2.2 版本开始,RabbitAdmin 将检测类型为 DeclarableCustomizer 的 bean,并在实际处理声明之前应用该函数。例如,这对于在框架内具有头等支持之前设置新参数(属性)很有用。

@Bean
public DeclarableCustomizer customizer() {
    return dec -> {
        if (dec instanceof Queue && ((Queue) dec).getName().equals("my.queue")) {
            dec.addArgument("some.new.queue.argument", true);
        }
        return dec;
    };
}

它在无法直接访问 Declarable bean 定义的项目中也很有用。

Failures in Synchronous Operations and Options for Retry

如果你在使用 RabbitTemplate(例如)时在同步序列中失去与代理的连接,Spring AMQP 会抛出 AmqpException(通常,但并非总是,AmqpIOException)。我们不会试图掩盖存在问题的事实,因此你必须能够捕获并响应异常。如果你怀疑连接已断开(并且这不是你的错),最简单的方法是再次尝试该操作。你可以手动执行此操作,也可以使用 Spring Retry 来处理重试(命令式或声明式)。

Spring Retry 提供了一些 AOP 拦截器和极大的灵活性来指定重试的参数(尝试次数、异常类型、退避算法等)。Spring AMQP 还提供了一些便捷的工厂 bean,用于以适合 AMQP 用例的便利形式创建 Spring Retry 拦截器,并通过强类型回调接口来实现自定义恢复逻辑。有关更多详细信息,请参阅 StatefulRetryOperationsInterceptorStatelessRetryOperationsInterceptor 的 Javadoc 和属性。如果不存在事务或在重试回调内部启动事务,则无状态重试是合适的。请注意,与有状态重试相比,无状态重试更容易配置和分析,但如果存在必须回滚或肯定要回滚的正在进行的事务,通常不合适。在事务中间断开的连接应该与回滚具有相同的效果。因此,对于在堆栈上更高级别启动事务的重新连接,有状态重试通常是最佳选择。有状态重试需要一种机制来唯一标识一条消息。最简单的方法是让发送方将一个唯一值放入 MessageId 消息属性。提供的消息转换器提供了一个执行此操作的选项:你可以将 createMessageIds 设置为 true。否则,你可以将 MessageKeyGenerator 实现注入到拦截器中。该键生成器必须为每条消息返回一个唯一键。在 2.0 版本之前的版本中,提供了一个 MissingMessageIdAdvice。它允许没有 messageId 属性的消息重试一次(忽略重试设置)。不再提供此建议,因为连同 Spring-retry 1.2 版本一起,它的功能内置于拦截器和消息侦听器容器中。

为了向后兼容性,带有空消息 ID 的消息在默认情况下(在重试一次后)被视为对使用者的错误(使用者已停止)。要复制 MissingMessageIdAdvice 提供的功能,可以在侦听器容器中将 statefulRetryFatalWithNullMessageId 属性设置为 false。有了该设置,使用者将继续运行,并且该消息将被拒绝(在重试一次后)。它将被丢弃或路由到错误队列(如果配置了一个)。

从 1.3 版本开始,提供了一个构建器 API 以使用 Java(在 @Configuration 类中)帮助组装这些拦截器。以下示例演示如何执行此操作:

@Bean
public StatefulRetryOperationsInterceptor interceptor() {
    return RetryInterceptorBuilder.stateful()
            .maxAttempts(5)
            .backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
            .build();
}

只有部分重试功能可以按此方式配置。 更高级的功能需要将`RetryTemplate`配置为 Spring bean。 有关可用策略及其配置的完整信息,请参见 Spring Retry Javadoc

Retry with Batch Listeners

不建议使用批处理侦听器配置重试,除非批处理是由生产者创建的,并存在于单个记录中。 有关使用者和生产者创建的批处理的信息,请参见 Batched Messages。 对于使用者创建的批处理,框架不知道批处理中的哪条消息导致故障,因此在用尽重试次数后无法恢复。 对于生产者创建的批处理,由于实际上只有一条消息失败,因此可以恢复整个消息。 应用程序可能希望告知自定义恢复器批处理中发生故障的位置,也许是通过设置抛出异常的索引属性。

批处理侦听器的重试恢复器必须实现 MessageBatchRecoverer

Message Listeners and the Asynchronous Case

如果 MessageListener 因业务异常而失败,则异常将由消息侦听器容器处理,然后消息侦听器容器将继续侦听另一条消息。如果故障是由断开的连接引起的(不是业务异常),则必须取消和重新启动收集侦听器消息的使用者。SimpleMessageListenerContainer 会无缝处理此问题,并留下一个日志来说明该侦听器正在重新启动。事实上,它会无限循环,尝试重新启动使用者。只有当使用者的行为确实非常糟糕时才会放弃。一个副作用是,如果在容器启动时代理已关闭,它会一直尝试直到建立连接。

业务异常处理,与协议错误和连接中断相反,可能需要更多的思考和一些自定义配置,特别是在使用事务或容器确认的情况下。在 2.8.x 之前,RabbitMQ 没有对死信行为的定义。因此,默认情况下,由于业务异常而被拒绝或回滚的消息可以无限次地重新发送。为了限制客户端重新发送的次数,一种选择是在侦听器的提示链中使用 StatefulRetryOperationsInterceptor。拦截器可以有一个恢复回调来实施自定义的死信动作,无论什么都适合你的特定环境。

另一种选择是将容器的 defaultRequeueRejected 属性设置为 false。这将导致丢弃所有失败的消息。当使用 RabbitMQ 2.8.x 或更高版本时,这也方便了将消息发送到死信交换。

或者,你可以抛出一个 AmqpRejectAndDontRequeueException。这样做可以防止重新排队消息,无论 defaultRequeueRejected 属性的设置如何。

从版本 2.1 开始,引入了 ImmediateRequeueAmqpException 来执行完全相反的逻辑:无论 defaultRequeueRejected 属性的设置如何,该消息都将被重新排队。

通常,这两种技术的组合会被使用。你可以在提示链中使用`StatefulRetryOperationsInterceptor` 和一个抛出 AmqpRejectAndDontRequeueExceptionMessageRecoverer。当所有重试都已耗尽时,将调用 MessageRecoverRejectAndDontRequeueRecoverer 正好这样做。默认的 MessageRecoverer 会消耗错误的消息并发送 WARN 消息。

从版本 1.3 开始,提供了一个新的 RepublishMessageRecoverer,允许在重试耗尽后发布失败的消息。

当一个恢复器消耗最终异常时,消息被确认,并且如果已配置,不会被代理发送到死信交换。

当在使用者端使用 RepublishMessageRecoverer 时,接收到的消息在 receivedDeliveryMode 消息属性中具有 deliveryMode。在此情况下 deliveryModenull。这意味着代理上的 NON_PERSISTENT 传递模式。从版本 2.0 开始,可以通过配置 deliveryModeRepublishMessageRecoverer 为要重新发布到消息中而设置它,如果它为 null。默认情况下,它使用 MessageProperties 默认值 - MessageDeliveryMode.PERSISTENT

以下示例展示了如何将 RepublishMessageRecoverer 设置为恢复器:

@Bean
RetryOperationsInterceptor interceptor() {
    return RetryInterceptorBuilder.stateless()
            .maxAttempts(5)
            .recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
            .build();
}

RepublishMessageRecoverer 会发布带有时序信息的消息,例如异常信息,堆栈跟踪,原始交换和路由键。可以通过创建子类并覆盖 additionalHeaders() 来添加额外的时序信息。deliveryMode(或任何其他属性)也可以在 additionalHeaders() 中更改,如下例所示:

RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate, "error") {

    protected Map<? extends String, ? extends Object> additionalHeaders(Message message, Throwable cause) {
        message.getMessageProperties()
            .setDeliveryMode(message.getMessageProperties().getReceivedDeliveryMode());
        return null;
    }

};

从版本 2.0.5 开始,如果堆栈跟踪太大,可能会被截断;这是因为所有时序信息都必须放在一个帧中。默认情况下,如果堆栈跟踪会使得其他时序信息可用性低于 20,000 字节(“净空”),它将被截断。如果你需要更多的或者更少的填充其他时序信息的“净空”,可以通过设置恢复器的 frameMaxHeadroom 属性来调整。从版本 2.1.13,2.2.3 开始,异常信息会包含在这个计算中,并且使用以下算法最大化堆栈跟踪的数量:

  • 如果堆栈跟踪本身超出了限制,则异常消息头将被截断为 97 个字节加上 &#8230;&#8203;,堆栈跟踪也被截断。

  • 如果堆栈追踪较小,消息将被截断 (加上 &#8230;&#8203;) 以适应可用字节数(但堆栈追踪中的消息本身被截断为 97 个字节加上 &#8230;&#8203;)。

每当发生任何类型的截断时,都会记录原始异常以保留完整的信息。在增强时序信息后,进行评估,以便在表达式中使用异常类型等信息。

从版本 2.4.8 开始,可以使用 SpEL 表达式提供错误交换和路由键,其中 Message 是要进行评估的根对象。

从版本 2.3.3 开始,提供了一个新的子类 RepublishMessageRecovererWithConfirms;它同时支持发布者的确认样式,并且会在返回之前(或者如果没有确认或返回消息时抛出一个异常)等待确认。

如果确认类型为 CORRELATED,子类还将检测消息是否被返回并抛出一个 AmqpMessageReturnedException;如果发布被否定确认,它将抛出一个 AmqpNackReceivedException

如果确认类型为 SIMPLE,子类将在该通道上调用 waitForConfirmsOrDie 方法。

有关确认和返回的更多信息,请参阅 Publisher Confirms and Returns

从版本 2.1 开始,添加了一个 ImmediateRequeueMessageRecoverer 来抛出一个 ImmediateRequeueAmqpException,它会通知侦听器容器重新排队当前的失败消息。

Exception Classification for Spring Retry

Spring Retry 具有很大的灵活性来确定哪些异常可以调用重试。默认配置重试所有异常。由于用户异常被包装在一个 ListenerExecutionFailedException,我们需要确保分类检查异常原因。默认分类器仅查看顶层的异常。

从 Spring Retry 1.0.3 开始,BinaryExceptionClassifier 有一个名为 traverseCauses 的属性(默认:false)。当 true 时,它将遍历异常的原因,直到找到匹配项或没有原因。

要将此分类器用于重试,你可以使用使用采用最大尝试次数,Exception 实例的 Map 和布尔值(traverseCauses)构造的 SimpleRetryPolicy,并将此策略注入到 RetryTemplate 中。