Resilience: Recovering from Errors and Broker Failures
-
自动声明和连接重新建立:CachingConnectionFactory 和 RabbitAdmin 允许在启动或重新连接时自动声明和重新建立交换、队列和绑定。
-
异常处理和重试:Spring AMQP 抛出异常来指示连接问题,并支持使用 Spring Retry 进行自定义重试逻辑,包括有状态和无状态重试。
-
消息侦听器恢复:SimpleMessageListenerContainer 自动重新启动消息侦听器以处理连接中断。
-
业务异常处理:StatefulRetryOperationsInterceptor 和 RepublishMessageRecoverer 允许自定义业务异常处理,执行自定义恢复动作或在重试失败时重新发布消息。
Spring AMQP 提供的一些关键(也是最受欢迎的)高级功能与在发生协议错误或代理故障时的恢复和自动重新连接有关。我们在此指南中已经看到了所有相关的组件,但将它们全部放在一起并分别调用这些功能和恢复方案会有所帮助。
Some of the key (and most popular) high-level features that Spring AMQP provides are to do with recovery and automatic re-connection in the event of a protocol error or broker failure. We have seen all the relevant components already in this guide, but it should help to bring them all together here and call out the features and recovery scenarios individually.
主要重新连接功能由 CachingConnectionFactory
本身启用。通常还建议使用 RabbitAdmin
自动声明功能。此外,如果你关心有保证的传递,你可能还需要在 RabbitTemplate
和 SimpleMessageListenerContainer
中使用 channelTransacted
标志,并在 SimpleMessageListenerContainer
中使用 AcknowledgeMode.AUTO
(或在你自己进行确认的情况下进行人工确认)。
The primary reconnection features are enabled by the CachingConnectionFactory
itself.
It is also often beneficial to use the RabbitAdmin
auto-declaration features.
In addition, if you care about guaranteed delivery, you probably also need to use the channelTransacted
flag in RabbitTemplate
and SimpleMessageListenerContainer
and the AcknowledgeMode.AUTO
(or manual if you do the acks yourself) in the SimpleMessageListenerContainer
.
Automatic Declaration of Exchanges, Queues, and Bindings
RabbitAdmin
组件可以在启动时声明交易所、队列和绑定。它通过 ConnectionListener
延迟执行此操作。因此,如果代理在启动时不存在,则无关紧要。当首次使用 Connection
时(例如,发送消息),侦听器会触发并应用管理功能。在侦听器中进行自动声明的另一个好处是,如果连接因任何原因断开(例如,代理死亡、网络故障等),则在重新建立连接时会再次应用它们。
The RabbitAdmin
component can declare exchanges, queues, and bindings on startup.
It does this lazily, through a ConnectionListener
.
Consequently, if the broker is not present on startup, it does not matter.
The first time a Connection
is used (for example,
by sending a message) the listener fires and the admin features is applied.
A further benefit of doing the auto declarations in a listener is that, if the connection is dropped for any reason (for example,
broker death, network glitch, and others), they are applied again when the connection is re-established.
以这种方式声明的队列必须有固定的名称——要么显式声明,要么由 |
Queues declared this way must have fixed names — either explicitly declared or generated by the framework for |
只有当 CachingConnectionFactory
缓存模式为 CHANNEL
(默认)时,才会执行自动声明。存在此限制,因为独占队列和自动删除队列绑定到该连接。
Automatic declaration is performed only when the CachingConnectionFactory
cache mode is CHANNEL
(the default).
This limitation exists because exclusive and auto-delete queues are bound to the connection.
从 2.2.2 版本开始,RabbitAdmin
将检测类型为 DeclarableCustomizer
的 bean,并在实际处理声明之前应用该函数。例如,这对于在框架内具有头等支持之前设置新参数(属性)很有用。
Starting with version 2.2.2, the RabbitAdmin
will detect beans of type DeclarableCustomizer
and apply the function before actually processing the declaration.
This is useful, for example, to set a new argument (property) before it has first class support within the framework.
@Bean
public DeclarableCustomizer customizer() {
return dec -> {
if (dec instanceof Queue && ((Queue) dec).getName().equals("my.queue")) {
dec.addArgument("some.new.queue.argument", true);
}
return dec;
};
}
它在无法直接访问 Declarable
bean 定义的项目中也很有用。
It is also useful in projects that don’t provide direct access to the Declarable
bean definitions.
Failures in Synchronous Operations and Options for Retry
如果你在使用 RabbitTemplate
(例如)时在同步序列中失去与代理的连接,Spring AMQP 会抛出 AmqpException
(通常,但并非总是,AmqpIOException
)。我们不会试图掩盖存在问题的事实,因此你必须能够捕获并响应异常。如果你怀疑连接已断开(并且这不是你的错),最简单的方法是再次尝试该操作。你可以手动执行此操作,也可以使用 Spring Retry 来处理重试(命令式或声明式)。
If you lose your connection to the broker in a synchronous sequence when using RabbitTemplate
(for instance), Spring AMQP throws an AmqpException
(usually, but not always, AmqpIOException
).
We do not try to hide the fact that there was a problem, so you have to be able to catch and respond to the exception.
The easiest thing to do if you suspect that the connection was lost (and it was not your fault) is to try the operation again.
You can do this manually, or you could look at using Spring Retry to handle the retry (imperatively or declaratively).
Spring Retry 提供了一些 AOP 拦截器和极大的灵活性来指定重试的参数(尝试次数、异常类型、退避算法等)。Spring AMQP 还提供了一些便捷的工厂 bean,用于以适合 AMQP 用例的便利形式创建 Spring Retry 拦截器,并通过强类型回调接口来实现自定义恢复逻辑。有关更多详细信息,请参阅 StatefulRetryOperationsInterceptor
和 StatelessRetryOperationsInterceptor
的 Javadoc 和属性。如果不存在事务或在重试回调内部启动事务,则无状态重试是合适的。请注意,与有状态重试相比,无状态重试更容易配置和分析,但如果存在必须回滚或肯定要回滚的正在进行的事务,通常不合适。在事务中间断开的连接应该与回滚具有相同的效果。因此,对于在堆栈上更高级别启动事务的重新连接,有状态重试通常是最佳选择。有状态重试需要一种机制来唯一标识一条消息。最简单的方法是让发送方将一个唯一值放入 MessageId
消息属性。提供的消息转换器提供了一个执行此操作的选项:你可以将 createMessageIds
设置为 true
。否则,你可以将 MessageKeyGenerator
实现注入到拦截器中。该键生成器必须为每条消息返回一个唯一键。在 2.0 版本之前的版本中,提供了一个 MissingMessageIdAdvice
。它允许没有 messageId
属性的消息重试一次(忽略重试设置)。不再提供此建议,因为连同 Spring-retry
1.2 版本一起,它的功能内置于拦截器和消息侦听器容器中。
Spring Retry provides a couple of AOP interceptors and a great deal of flexibility to specify the parameters of the retry (number of attempts, exception types, backoff algorithm, and others).
Spring AMQP also provides some convenience factory beans for creating Spring Retry interceptors in a convenient form for AMQP use cases, with strongly typed callback interfaces that you can use to implement custom recovery logic.
See the Javadoc and properties of StatefulRetryOperationsInterceptor
and StatelessRetryOperationsInterceptor
for more detail.
Stateless retry is appropriate if there is no transaction or if a transaction is started inside the retry callback.
Note that stateless retry is simpler to configure and analyze than stateful retry, but it is not usually appropriate if there is an ongoing transaction that must be rolled back or definitely is going to roll back.
A dropped connection in the middle of a transaction should have the same effect as a rollback.
Consequently, for reconnections where the transaction is started higher up the stack, stateful retry is usually the best choice.
Stateful retry needs a mechanism to uniquely identify a message.
The simplest approach is to have the sender put a unique value in the MessageId
message property.
The provided message converters provide an option to do this: you can set createMessageIds
to true
.
Otherwise, you can inject a MessageKeyGenerator
implementation into the interceptor.
The key generator must return a unique key for each message.
In versions prior to version 2.0, a MissingMessageIdAdvice
was provided.
It enabled messages without a messageId
property to be retried exactly once (ignoring the retry settings).
This advice is no longer provided, since, along with spring-retry
version 1.2, its functionality is built into the interceptor and message listener containers.
为了向后兼容性,带有空消息 ID 的消息在默认情况下(在重试一次后)被视为对使用者的错误(使用者已停止)。要复制 |
For backwards compatibility, a message with a null message ID is considered fatal for the consumer (consumer is stopped) by default (after one retry).
To replicate the functionality provided by the |
从 1.3 版本开始,提供了一个构建器 API 以使用 Java(在 @Configuration
类中)帮助组装这些拦截器。以下示例演示如何执行此操作:
Starting with version 1.3, a builder API is provided to aid in assembling these interceptors by using Java (in @Configuration
classes).
The following example shows how to do so:
@Bean
public StatefulRetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateful()
.maxAttempts(5)
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.build();
}
只有部分重试功能可以按此方式配置。 更高级的功能需要将`RetryTemplate`配置为 Spring bean。 有关可用策略及其配置的完整信息,请参见 Spring Retry Javadoc。
Only a subset of retry capabilities can be configured this way.
More advanced features would need the configuration of a RetryTemplate
as a Spring bean.
See the Spring Retry Javadoc for complete information about available policies and their configuration.
Retry with Batch Listeners
不建议使用批处理侦听器配置重试,除非批处理是由生产者创建的,并存在于单个记录中。 有关使用者和生产者创建的批处理的信息,请参见 Batched Messages。 对于使用者创建的批处理,框架不知道批处理中的哪条消息导致故障,因此在用尽重试次数后无法恢复。 对于生产者创建的批处理,由于实际上只有一条消息失败,因此可以恢复整个消息。 应用程序可能希望告知自定义恢复器批处理中发生故障的位置,也许是通过设置抛出异常的索引属性。
It is not recommended to configure retry with a batch listener, unless the batch was created by the producer, in a single record. See Batched Messages for information about consumer and producer-created batches. With a consumer-created batch, the framework has no knowledge about which message in the batch caused the failure so recovery after the retries are exhausted is not possible. With producer-created batches, since there is only one message that actually failed, the whole message can be recovered. Applications may want to inform a custom recoverer where in the batch the failure occurred, perhaps by setting an index property of the thrown exception.
批处理侦听器的重试恢复器必须实现 MessageBatchRecoverer
。
A retry recoverer for a batch listener must implement MessageBatchRecoverer
.
Message Listeners and the Asynchronous Case
如果 MessageListener
因业务异常而失败,则异常将由消息侦听器容器处理,然后消息侦听器容器将继续侦听另一条消息。如果故障是由断开的连接引起的(不是业务异常),则必须取消和重新启动收集侦听器消息的使用者。SimpleMessageListenerContainer
会无缝处理此问题,并留下一个日志来说明该侦听器正在重新启动。事实上,它会无限循环,尝试重新启动使用者。只有当使用者的行为确实非常糟糕时才会放弃。一个副作用是,如果在容器启动时代理已关闭,它会一直尝试直到建立连接。
If a MessageListener
fails because of a business exception, the exception is handled by the message listener container, which then goes back to listening for another message.
If the failure is caused by a dropped connection (not a business exception), the consumer that is collecting messages for the listener has to be cancelled and restarted.
The SimpleMessageListenerContainer
handles this seamlessly, and it leaves a log to say that the listener is being restarted.
In fact, it loops endlessly, trying to restart the consumer.
Only if the consumer is very badly behaved indeed will it give up.
One side effect is that if the broker is down when the container starts, it keeps trying until a connection can be established.
业务异常处理,与协议错误和连接中断相反,可能需要更多的思考和一些自定义配置,特别是在使用事务或容器确认的情况下。在 2.8.x 之前,RabbitMQ 没有对死信行为的定义。因此,默认情况下,由于业务异常而被拒绝或回滚的消息可以无限次地重新发送。为了限制客户端重新发送的次数,一种选择是在侦听器的提示链中使用 StatefulRetryOperationsInterceptor
。拦截器可以有一个恢复回调来实施自定义的死信动作,无论什么都适合你的特定环境。
Business exception handling, as opposed to protocol errors and dropped connections, might need more thought and some custom configuration, especially if transactions or container acks are in use.
Prior to 2.8.x, RabbitMQ had no definition of dead letter behavior.
Consequently, by default, a message that is rejected or rolled back because of a business exception can be redelivered endlessly.
To put a limit on the client on the number of re-deliveries, one choice is a StatefulRetryOperationsInterceptor
in the advice chain of the listener.
The interceptor can have a recovery callback that implements a custom dead letter action — whatever is appropriate for your particular environment.
另一种选择是将容器的 defaultRequeueRejected
属性设置为 false
。这将导致丢弃所有失败的消息。当使用 RabbitMQ 2.8.x 或更高版本时,这也方便了将消息发送到死信交换。
Another alternative is to set the container’s defaultRequeueRejected
property to false
.
This causes all failed messages to be discarded.
When using RabbitMQ 2.8.x or higher, this also facilitates delivering the message to a dead letter exchange.
或者,你可以抛出一个 AmqpRejectAndDontRequeueException
。这样做可以防止重新排队消息,无论 defaultRequeueRejected
属性的设置如何。
Alternatively, you can throw a AmqpRejectAndDontRequeueException
.
Doing so prevents message requeuing, regardless of the setting of the defaultRequeueRejected
property.
从版本 2.1 开始,引入了 ImmediateRequeueAmqpException
来执行完全相反的逻辑:无论 defaultRequeueRejected
属性的设置如何,该消息都将被重新排队。
Starting with version 2.1, an ImmediateRequeueAmqpException
is introduced to perform exactly the opposite logic: the message will be requeued, regardless of the setting of the defaultRequeueRejected
property.
通常,这两种技术的组合会被使用。你可以在提示链中使用`StatefulRetryOperationsInterceptor` 和一个抛出 AmqpRejectAndDontRequeueException
的 MessageRecoverer
。当所有重试都已耗尽时,将调用 MessageRecover
。RejectAndDontRequeueRecoverer
正好这样做。默认的 MessageRecoverer
会消耗错误的消息并发送 WARN
消息。
Often, a combination of both techniques is used.
You can use a StatefulRetryOperationsInterceptor
in the advice chain with a MessageRecoverer
that throws an AmqpRejectAndDontRequeueException
.
The MessageRecover
is called when all retries have been exhausted.
The RejectAndDontRequeueRecoverer
does exactly that.
The default MessageRecoverer
consumes the errant message and emits a WARN
message.
从版本 1.3 开始,提供了一个新的 RepublishMessageRecoverer
,允许在重试耗尽后发布失败的消息。
Starting with version 1.3, a new RepublishMessageRecoverer
is provided, to allow publishing of failed messages after retries are exhausted.
当一个恢复器消耗最终异常时,消息被确认,并且如果已配置,不会被代理发送到死信交换。
When a recoverer consumes the final exception, the message is ack’d and is not sent to the dead letter exchange by the broker, if configured.
当在使用者端使用 |
When |
以下示例展示了如何将 RepublishMessageRecoverer
设置为恢复器:
The following example shows how to set a RepublishMessageRecoverer
as the recoverer:
@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
.build();
}
RepublishMessageRecoverer
会发布带有时序信息的消息,例如异常信息,堆栈跟踪,原始交换和路由键。可以通过创建子类并覆盖 additionalHeaders()
来添加额外的时序信息。deliveryMode
(或任何其他属性)也可以在 additionalHeaders()
中更改,如下例所示:
The RepublishMessageRecoverer
publishes the message with additional information in message headers, such as the exception message, stack trace, original exchange, and routing key.
Additional headers can be added by creating a subclass and overriding additionalHeaders()
.
The deliveryMode
(or any other properties) can also be changed in the additionalHeaders()
, as the following example shows:
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate, "error") {
protected Map<? extends String, ? extends Object> additionalHeaders(Message message, Throwable cause) {
message.getMessageProperties()
.setDeliveryMode(message.getMessageProperties().getReceivedDeliveryMode());
return null;
}
};
从版本 2.0.5 开始,如果堆栈跟踪太大,可能会被截断;这是因为所有时序信息都必须放在一个帧中。默认情况下,如果堆栈跟踪会使得其他时序信息可用性低于 20,000 字节(“净空”),它将被截断。如果你需要更多的或者更少的填充其他时序信息的“净空”,可以通过设置恢复器的 frameMaxHeadroom
属性来调整。从版本 2.1.13,2.2.3 开始,异常信息会包含在这个计算中,并且使用以下算法最大化堆栈跟踪的数量:
Starting with version 2.0.5, the stack trace may be truncated if it is too large; this is because all headers have to fit in a single frame.
By default, if the stack trace would cause less than 20,000 bytes ('headroom') to be available for other headers, it will be truncated.
This can be adjusted by setting the recoverer’s frameMaxHeadroom
property, if you need more or less space for other headers.
Starting with versions 2.1.13, 2.2.3, the exception message is included in this calculation, and the amount of stack trace will be maximized using the following algorithm:
-
if the stack trace alone would exceed the limit, the exception message header will be truncated to 97 bytes plus
…
and the stack trace is truncated too. -
if the stack trace is small, the message will be truncated (plus
…
) to fit in the available bytes (but the message within the stack trace itself is truncated to 97 bytes plus…
).
每当发生任何类型的截断时,都会记录原始异常以保留完整的信息。在增强时序信息后,进行评估,以便在表达式中使用异常类型等信息。
Whenever a truncation of any kind occurs, the original exception will be logged to retain the complete information. The evaluation is performed after the headers are enhanced so information such as the exception type can be used in the expressions.
从版本 2.4.8 开始,可以使用 SpEL 表达式提供错误交换和路由键,其中 Message
是要进行评估的根对象。
Starting with version 2.4.8, the error exchange and routing key can be provided as SpEL expressions, with the Message
being the root object for the evaluation.
从版本 2.3.3 开始,提供了一个新的子类 RepublishMessageRecovererWithConfirms
;它同时支持发布者的确认样式,并且会在返回之前(或者如果没有确认或返回消息时抛出一个异常)等待确认。
Starting with version 2.3.3, a new subclass RepublishMessageRecovererWithConfirms
is provided; this supports both styles of publisher confirms and will wait for the confirmation before returning (or throw an exception if not confirmed or the message is returned).
如果确认类型为 CORRELATED
,子类还将检测消息是否被返回并抛出一个 AmqpMessageReturnedException
;如果发布被否定确认,它将抛出一个 AmqpNackReceivedException
。
If the confirm type is CORRELATED
, the subclass will also detect if a message is returned and throw an AmqpMessageReturnedException
; if the publication is negatively acknowledged, it will throw an AmqpNackReceivedException
.
如果确认类型为 SIMPLE
,子类将在该通道上调用 waitForConfirmsOrDie
方法。
If the confirm type is SIMPLE
, the subclass will invoke the waitForConfirmsOrDie
method on the channel.
有关确认和返回的更多信息,请参阅 Publisher Confirms and Returns。
See Publisher Confirms and Returns for more information about confirms and returns.
从版本 2.1 开始,添加了一个 ImmediateRequeueMessageRecoverer
来抛出一个 ImmediateRequeueAmqpException
,它会通知侦听器容器重新排队当前的失败消息。
Starting with version 2.1, an ImmediateRequeueMessageRecoverer
is added to throw an ImmediateRequeueAmqpException
, which notifies a listener container to requeue the current failed message.
Exception Classification for Spring Retry
Spring Retry 具有很大的灵活性来确定哪些异常可以调用重试。默认配置重试所有异常。由于用户异常被包装在一个 ListenerExecutionFailedException
,我们需要确保分类检查异常原因。默认分类器仅查看顶层的异常。
Spring Retry has a great deal of flexibility for determining which exceptions can invoke retry.
The default configuration retries for all exceptions.
Given that user exceptions are wrapped in a ListenerExecutionFailedException
, we need to ensure that the classification examines the exception causes.
The default classifier looks only at the top level exception.
从 Spring Retry 1.0.3 开始,BinaryExceptionClassifier
有一个名为 traverseCauses
的属性(默认:false
)。当 true
时,它将遍历异常的原因,直到找到匹配项或没有原因。
Since Spring Retry 1.0.3, the BinaryExceptionClassifier
has a property called traverseCauses
(default: false
).
When true
, it travers exception causes until it finds a match or there is no cause.
要将此分类器用于重试,你可以使用使用采用最大尝试次数,Exception
实例的 Map
和布尔值(traverseCauses
)构造的 SimpleRetryPolicy
,并将此策略注入到 RetryTemplate
中。
To use this classifier for retry, you can use a SimpleRetryPolicy
created with the constructor that takes the max attempts, the Map
of Exception
instances, and the boolean (traverseCauses
) and inject this policy into the RetryTemplate
.