Connection and Resource Management
虽然我们在上一节中描述的 AMQP 模型是通用的,适用于所有实现,但是当我们进入资源管理时,详细信息特定于代理实现。因此,在本节中,我们重点介绍仅存在于我们的“spring-rabbit
”模块中的代码,因为此时,RabbitMQ 是唯一支持的实现。
Whereas the AMQP model we described in the previous section is generic and applicable to all implementations, when we get into the management of resources, the details are specific to the broker implementation. Therefore, in this section, we focus on code that exists only within our “spring-rabbit” module since, at this point, RabbitMQ is the only supported implementation.
用于管理与 RabbitMQ 代理的连接的核心组件是 ConnectionFactory
接口。ConnectionFactory
实现的责任是提供 org.springframework.amqp.rabbit.connection.Connection
的一个实例,它是一个 com.rabbitmq.client.Connection
的包装器。
The central component for managing a connection to the RabbitMQ broker is the ConnectionFactory
interface.
The responsibility of a ConnectionFactory
implementation is to provide an instance of org.springframework.amqp.rabbit.connection.Connection
, which is a wrapper for com.rabbitmq.client.Connection
.
Choosing a Connection Factory
有三个连接工厂可供选择
There are three connection factories to chose from
-
PooledChannelConnectionFactory
-
ThreadChannelConnectionFactory
-
CachingConnectionFactory
前两个在版本 2.3 中添加。
The first two were added in version 2.3.
对于大多数用例,应使用 CachingConnectionFactory
。如果要确保严格的消息排序而不必使用 Scoped Operations ,则可以使用 ThreadChannelConnectionFactory
。 PooledChannelConnectionFactory
类似于 CachingConnectionFactory
,因为它使用一个连接和一个通道池。其实现更简单,但它不支持关联的发布者确认。
For most use cases, the CachingConnectionFactory
should be used.
The ThreadChannelConnectionFactory
can be used if you want to ensure strict message ordering without the need to use Scoped Operations.
The PooledChannelConnectionFactory
is similar to the CachingConnectionFactory
in that it uses a single connection and a pool of channels.
It’s implementation is simpler but it doesn’t support correlated publisher confirmations.
所有三个工厂都支持简单的发布者确认。
Simple publisher confirmations are supported by all three factories.
在将 RabbitTemplate
配置为使用 separate connection 时,你可以在从版本 2.3.2 开始将发布连接工厂配置为不同类型。默认情况下,发布工厂是相同类型,主工厂上设置的任何属性也会传播到发布工厂。
When configuring a RabbitTemplate
to use a separate connection, you can now, starting with version 2.3.2, configure the publishing connection factory to be a different type.
By default, the publishing factory is the same type and any properties set on the main factory are also propagated to the publishing factory.
从版本 3.1 开始,AbstractConnectionFactory
包含 connectionCreatingBackOff
属性,该属性支持连接模块中的延迟策略。目前,createChannel()
的行为支持在达到 channelMax
限制时处理发生的异常,基于尝试和间隔实现延迟策略。
Starting with version 3.1, the AbstractConnectionFactory
includes the connectionCreatingBackOff
property, which supports a backoff policy in the connection module.
Currently, there is support in the behavior of createChannel()
to handle exceptions that occur when the channelMax
limit is reached, implementing a backoff strategy based on attempts and intervals.
PooledChannelConnectionFactory
该工厂基于 Apache Pool2 管理单个连接和两个通道池。一个池用于事务通道,另一个池用于非事务通道。该池是具有默认配置的 GenericObjectPool
s;提供了一个回调来配置这些池;有关更多信息,请参阅 Apache 文档。
This factory manages a single connection and two pools of channels, based on the Apache Pool2.
One pool is for transactional channels, the other is for non-transactional channels.
The pools are GenericObjectPool
s with default configuration; a callback is provided to configure the pools; refer to the Apache documentation for more information.
必须在类路径上放置 Apache commons-pool2
jar 才能使用此工厂。
The Apache commons-pool2
jar must be on the class path to use this factory.
@Bean
PooledChannelConnectionFactory pcf() throws Exception {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
pcf.setPoolConfigurer((pool, tx) -> {
if (tx) {
// configure the transactional pool
}
else {
// configure the non-transactional pool
}
});
return pcf;
}
ThreadChannelConnectionFactory
此工厂管理一个连接和两个 ThreadLocal
,一个用于事务通道,另一个用于非事务通道。此工厂可确保同一线程上的所有操作使用同一个通道(只要它仍然处于打开状态)。这有助于严格的消息排序,而无需 Scoped Operations 。为避免内存泄露,如果应用程序使用许多短生存期的线程,则必须调用工厂的 closeThreadChannel()
以释放通道资源。从版本 2.3.7 开始,一个线程可以将其通道转移到另一个线程。有关更多信息,请参见 Strict Message Ordering in a Multi-Threaded Environment 。
This factory manages a single connection and two ThreadLocal
s, one for transactional channels, the other for non-transactional channels.
This factory ensures that all operations on the same thread use the same channel (as long as it remains open).
This facilitates strict message ordering without the need for Scoped Operations.
To avoid memory leaks, if your application uses many short-lived threads, you must call the factory’s closeThreadChannel()
to release the channel resource.
Starting with version 2.3.7, a thread can transfer its channel(s) to another thread.
See Strict Message Ordering in a Multi-Threaded Environment for more information.
CachingConnectionFactory
提供的第三个实现是 CachingConnectionFactory
,它默认建立一个应用程序可以共享的单个连接代理。可以共享连接,因为 AMQP 消息传递的“处理单元”实际上是“通道”(在某些方面,这类似于 JMS 中连接与会话之间的关系)。连接实例提供了一个 createChannel
方法。CachingConnectionFactory
实现支持这些通道的缓存,并且根据通道是否具有事务性为通道维护单独的缓存。创建 CachingConnectionFactory
实例时,可以通过构造函数提供“主机名”。你还应该提供“用户名”和“密码”属性。要配置通道缓存的大小(默认为 25),可以调用 setChannelCacheSize()
方法。
The third implementation provided is the CachingConnectionFactory
, which, by default, establishes a single connection proxy that can be shared by the application.
Sharing of the connection is possible since the “unit of work” for messaging with AMQP is actually a “channel” (in some ways, this is similar to the relationship between a connection and a session in JMS).
The connection instance provides a createChannel
method.
The CachingConnectionFactory
implementation supports caching of those channels, and it maintains separate caches for channels based on whether they are transactional.
When creating an instance of CachingConnectionFactory
, you can provide the 'hostname' through the constructor.
You should also provide the 'username' and 'password' properties.
To configure the size of the channel cache (the default is 25), you can call the
setChannelCacheSize()
method.
从版本 1.3 开始,可以将 CachingConnectionFactory
配置为缓存连接以及通道。在这种情况下,对 createConnection()
的每次调用都会创建一个新连接(或从缓存中检索一个空闲连接)。关闭连接会将其返回到缓存(如果未达到缓存大小)。在这样的连接上创建的通道也会被缓存。在某些环境中可能有用单独的连接,例如从 HA 群集使用负载均衡器来连接到不同的群集成员,以及其他连接。要缓存连接,请将 cacheMode
设置为 CacheMode.CONNECTION
。
Starting with version 1.3, you can configure the CachingConnectionFactory
to cache connections as well as only channels.
In this case, each call to createConnection()
creates a new connection (or retrieves an idle one from the cache).
Closing a connection returns it to the cache (if the cache size has not been reached).
Channels created on such connections are also cached.
The use of separate connections might be useful in some environments, such as consuming from an HA cluster, in
conjunction with a load balancer, to connect to different cluster members, and others.
To cache connections, set the cacheMode
to CacheMode.CONNECTION
.
这并没有限制连接数。相反,它指定了允许的空闲打开连接数。 |
This does not limit the number of connections. Rather, it specifies how many idle open connections are allowed. |
从版本 1.5.5 开始,提供了一个名为 connectionLimit
的新属性。当设置此属性时,它将限制允许的连接总数。如果设置,则当达到限制时,channelCheckoutTimeLimit
用于等待连接变为闲置状态。如果超过时间,则会抛出 AmqpTimeoutException
。
Starting with version 1.5.5, a new property called connectionLimit
is provided.
When this property is set, it limits the total number of connections allowed.
When set, if the limit is reached, the channelCheckoutTimeLimit
is used to wait for a connection to become idle.
If the time is exceeded, an AmqpTimeoutException
is thrown.
当高速缓存模式为 CONNECTION
时,不支持自动声明队列和其他队列(请参见 Automatic Declaration of Exchanges, Queues, and Bindings )。
When the cache mode is CONNECTION
, automatic declaration of queues and others
(See Automatic Declaration of Exchanges, Queues, and Bindings) is NOT supported.
此外,在撰写本文时,amqp-client
库默认情况下会为每个连接创建一个固定线程池(默认大小:Runtime.getRuntime().availableProcessors() * 2
线程)。在使用大量连接时,应考虑在 CachingConnectionFactory
上设置一个自定义 executor
。然后,所有连接都可以使用相同的执行程序,并且可以共享其线程。执行程序的线程池应该是无界的,或者应根据预期的用途(通常,每个连接至少一个线程)进行适当设置。如果在每个连接上创建多个通道,则池大小会影响并发性,因此可变(或简单缓存)线程池执行程序最合适。
Also, at the time of this writing, the amqp-client
library by default creates a fixed thread pool for each connection (default size: Runtime.getRuntime().availableProcessors() * 2
threads).
When using a large number of connections, you should consider setting a custom executor
on the CachingConnectionFactory
.
Then, the same executor can be used by all connections and its threads can be shared.
The executor’s thread pool should be unbounded or set appropriately for the expected use (usually, at least one thread per connection).
If multiple channels are created on each connection, the pool size affects the concurrency, so a variable (or simple cached) thread pool executor would be most suitable.
了解以下内容很重要:缓存大小(默认情况下)不是限制,而是仅仅可以缓存的通道数量。如果缓存大小为 10,实际上可以使用任意数量的通道。如果使用了 10 个以上的通道并将它们全部返回到缓存,则 10 个通道将进入缓存。其余的会被物理关闭。
It is important to understand that the cache size is (by default) not a limit but is merely the number of channels that can be cached. With a cache size of, say, 10, any number of channels can actually be in use. If more than 10 channels are being used and they are all returned to the cache, 10 go in the cache. The remainder are physically closed.
从版本 1.6 开始,默认通道缓存大小已从 1 增加到 25。在高容量多线程环境中,小缓存意味着通道以高比率创建和关闭。增加默认缓存大小可以避免这种情况开销。你应该通过 RabbitMQ 管理员 UI 监视正在使用的通道,如果你看到许多通道被创建和关闭,请考虑进一步增加缓存大小。缓存仅按需增长(以满足应用程序的并发性要求),因此此更改不会影响现有的低容量应用程序。
Starting with version 1.6, the default channel cache size has been increased from 1 to 25. In high volume, multi-threaded environments, a small cache means that channels are created and closed at a high rate. Increasing the default cache size can avoid this overhead. You should monitor the channels in use through the RabbitMQ Admin UI and consider increasing the cache size further if you see many channels being created and closed. The cache grows only on-demand (to suit the concurrency requirements of the application), so this change does not impact existing low-volume applications.
从版本 1.4.2 开始,CachingConnectionFactory
有一个名为 channelCheckoutTimeout
的属性。当此属性大于零时,channelCacheSize
变成可以在连接上创建的通道数量的限制。如果达到限制,则调用线程将阻塞,直到通道可用或达到此超时时间,在这种情况下,将抛出 AmqpTimeoutException
。
Starting with version 1.4.2, the CachingConnectionFactory
has a property called channelCheckoutTimeout
.
When this property is greater than zero, the channelCacheSize
becomes a limit on the number of channels that can be created on a connection.
If the limit is reached, calling threads block until a channel is available or this timeout is reached, in which case a AmqpTimeoutException
is thrown.
在框架内使用的通道(例如 RabbitTemplate
)会可靠地返回到缓存。如果在框架外创建通道(例如,通过直接访问连接并调用 createChannel()
),则必须可靠地返回这些通道(通过关闭),也许是在 finally
块中,以避免用尽通道。
Channels used within the framework (for example,
RabbitTemplate
) are reliably returned to the cache.
If you create channels outside of the framework, (for example,
by accessing the connections directly and invoking createChannel()
), you must return them (by closing) reliably, perhaps in a finally
block, to avoid running out of channels.
以下示例展示了如何创建一个新的 connection
:
The following example shows how to create a new connection
:
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.createConnection();
当使用 XML 时,配置可能看起来像以下示例:
When using XML, the configuration might look like the following example:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
</bean>
还有一个仅在框架单元测试代码中可用的 |
There is also a |
可以使用以下 rabbit 名称空间快速便创建一个 ConnectionFactory
:
A ConnectionFactory
can be created quickly and conveniently by using the rabbit namespace, as follows:
<rabbit:connection-factory id="connectionFactory"/>
在大多数情况下,这种方法更可取,因为框架可以为您选择最佳默认设置。创建的实例是 CachingConnectionFactory
。请记住,通道的默认缓存大小为 25。如果要缓存更多通道,请通过设置“channelCacheSize”属性来设置更大的值。在 XML 中,它看起来如下:
In most cases, this approach is preferable, since the framework can choose the best defaults for you.
The created instance is a CachingConnectionFactory
.
Keep in mind that the default cache size for channels is 25.
If you want more channels to be cached, set a larger value by setting the 'channelCacheSize' property.
In XML it would look like as follows:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>
此外,使用名称空间,您可以添加“channel-cache-size”属性,如下所示:
Also, with the namespace, you can add the 'channel-cache-size' attribute, as follows:
<rabbit:connection-factory
id="connectionFactory" channel-cache-size="50"/>
默认缓存模式是 CHANNEL
,但您可以将其配置为缓存连接。在以下示例中,我们使用 connection-cache-size
:
The default cache mode is CHANNEL
, but you can configure it to cache connections instead.
In the following example, we use connection-cache-size
:
<rabbit:connection-factory
id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>
您可以使用名称空间来提供主机和端口属性,如下所示:
You can provide host and port attributes by using the namespace, as follows:
<rabbit:connection-factory
id="connectionFactory" host="somehost" port="5672"/>
或者,如果在集群环境中运行,可以使用地址属性,如下所示:
Alternatively, if running in a clustered environment, you can use the addresses attribute, as follows:
<rabbit:connection-factory
id="connectionFactory" addresses="host1:5672,host2:5672" address-shuffle-mode="RANDOM"/>
有关 address-shuffle-mode
的信息,请参见 Connecting to a Cluster 。
See Connecting to a Cluster for information about address-shuffle-mode
.
以下示例使用自定义线程工厂,用 rabbitmq-
为线程名称添加前缀:
The following example with a custom thread factory that prefixes thread names with rabbitmq-
:
<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
thread-factory="tf"
channel-cache-size="10" username="user" password="password" />
<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
<constructor-arg value="rabbitmq-" />
</bean>
AddressResolver
从 2.1.15 版开始,您现在可以使用 AddressResolver
来解析连接地址。这将覆盖 addresses
和 host/port
属性的任何设置。
Starting with version 2.1.15, you can now use an AddressResolver
to resolve the connection address(es).
This will override any settings of the addresses
and host/port
properties.
Naming Connections
从 1.7 版开始,ConnectionNameStrategy
是为注入到 AbstractionConnectionFactory
中而提供的。生成的名字用于目标 RabbitMQ 连接的特定于应用程序的识别。如果 RabbitMQ 服务器支持,连接名称将显示在管理界面中。该值不必唯一,不能用作连接标识符——例如,在 HTTP API 请求中。这个值应该是人类可读的,并且是 ClientProperties
的一部分,其键为 connection_name
。您可以使用一个简单的 Lambda,如下所示:
Starting with version 1.7, a ConnectionNameStrategy
is provided for the injection into the AbstractionConnectionFactory
.
The generated name is used for the application-specific identification of the target RabbitMQ connection.
The connection name is displayed in the management UI if the RabbitMQ server supports it.
This value does not have to be unique and cannot be used as a connection identifier — for example, in HTTP API requests.
This value is supposed to be human-readable and is a part of ClientProperties
under the connection_name
key.
You can use a simple Lambda, as follows:
connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");
ConnectionFactory
参数可用于通过一些逻辑区分目标连接名称。默认情况下,将使用 AbstractConnectionFactory
的 beanName
、表示对象的十六进制字符串和内部计数器来生成 connection_name
。<rabbit:connection-factory>
名称空间组件还会随 connection-name-strategy
属性一起提供。
The ConnectionFactory
argument can be used to distinguish target connection names by some logic.
By default, the beanName
of the AbstractConnectionFactory
, a hex string representing the object, and an internal counter are used to generate the connection_name
.
The <rabbit:connection-factory>
namespace component is also supplied with the connection-name-strategy
attribute.
SimplePropertyValueConnectionNameStrategy
实现将连接名称设置为应用程序属性。您可以将其声明为 @Bean
并将其注入到连接工厂中,如下例所示:
An implementation of SimplePropertyValueConnectionNameStrategy
sets the connection name to an application property.
You can declare it as a @Bean
and inject it into the connection factory, as the following example shows:
@Bean
public SimplePropertyValueConnectionNameStrategy cns() {
return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
}
@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
...
connectionFactory.setConnectionNameStrategy(cns);
return connectionFactory;
}
该属性必须存在于应用程序上下文的 Environment
中。
The property must exist in the application context’s Environment
.
在使用 Spring Boot 及其自动配置的连接工厂时,您只需要声明 |
When using Spring Boot and its autoconfigured connection factory, you need only declare the |
Blocked Connections and Resource Constraints
连接可能会被阻止与对应于 Memory Alarm的代理进行交互。从版本 2.0 开始,可以为 org.springframework.amqp.rabbit.connection.Connection`提供 `com.rabbitmq.client.BlockedListener`实例,以便在连接被阻止和取消阻止事件时收到通知。此外,`AbstractConnectionFactory`通过其内部 `BlockedListener`实现分别发出 `ConnectionBlockedEvent`和 `ConnectionUnblockedEvent
。通过这些信息,您可以提供应用程序逻辑来对代理上的问题做出适当的反应,例如采取一些纠正措施。
The connection might be blocked for interaction from the broker that corresponds to the Memory Alarm.
Starting with version 2.0, the org.springframework.amqp.rabbit.connection.Connection
can be supplied with com.rabbitmq.client.BlockedListener
instances to be notified for connection blocked and unblocked events.
In addition, the AbstractConnectionFactory
emits a ConnectionBlockedEvent
and ConnectionUnblockedEvent
, respectively, through its internal BlockedListener
implementation.
These let you provide application logic to react appropriately to problems on the broker and (for example) take some corrective actions.
当应用程序配置为具有单个 CachingConnectionFactory
时,就像 Spring Boot 自动配置中的默认设置一样,当连接被代理程序阻塞时,该应用程序便停止工作。当连接被代理程序阻塞时,其任何客户端都会停止工作。如果我们同时已经在同一个应用程序中有生产者和消费者,当生产者阻塞连接(因为代理程序上不再有资源)而消费者无法释放连接(因为连接被阻塞)时,最终可能会导致死锁。为了缓解该问题,我们建议再使用一个拥有相同选项的独立 CachingConnectionFactory
实例——一个用于生产者,一个用于消费者。对于在消费者线程上执行的事务性生产者来说,单独的 CachingConnectionFactory
是不可能的,因为它们应该重新使用与消费者事务关联的 Channel
。
When the application is configured with a single CachingConnectionFactory
, as it is by default with Spring Boot auto-configuration, the application stops working when the connection is blocked by the Broker.
And when it is blocked by the Broker, any of its clients stop to work.
If we have producers and consumers in the same application, we may end up with a deadlock when producers are blocking the connection (because there are no resources on the Broker any more) and consumers cannot free them (because the connection is blocked).
To mitigate the problem, we suggest having one more separate CachingConnectionFactory
instance with the same options — one for producers and one for consumers.
A separate CachingConnectionFactory
is not possible for transactional producers that execute on a consumer thread, since they should reuse the Channel
associated with the consumer transactions.
从版本 2.0.2 开始, RabbitTemplate
具有一个配置选项,除非正在使用事务,否则可自动使用第二个连接工厂。有关更多信息,请参见 Using a Separate Connection 。发布者连接的 ConnectionNameStrategy
与主策略相同, .publisher
附加到了调用该方法的结果上。
Starting with version 2.0.2, the RabbitTemplate
has a configuration option to automatically use a second connection factory, unless transactions are being used.
See Using a Separate Connection for more information.
The ConnectionNameStrategy
for the publisher connection is the same as the primary strategy with .publisher
appended to the result of calling the method.
从 1.7.7 版开始,提供了 AmqpResourceNotAvailableException
,当 SimpleConnection.createChannel()
无法创建 Channel
时会抛出该异常(例如,因为达到了 channelMax
限制并且缓存中没有可用的通道)。您可以在 RetryPolicy
中使用该异常,以便在恢复一段时间后恢复操作。
Starting with version 1.7.7, an AmqpResourceNotAvailableException
is provided, which is thrown when SimpleConnection.createChannel()
cannot create a Channel
(for example, because the channelMax
limit is reached and there are no available channels in the cache).
You can use this exception in the RetryPolicy
to recover the operation after some back-off.
Configuring the Underlying Client Connection Factory
CachingConnectionFactory
使用 Rabbit 客户端实例 ConnectionFactory
。在设置 CachingConnectionFactory
上的等效属性时,将传递许多配置属性(例如 host
、 port
、 userName
、 password
、 requestedHeartBeat
和 connectionTimeout
)。要设置其他属性(例如 clientProperties
),可以定义 Rabbit 工厂实例,并通过使用 CachingConnectionFactory
的相应构造函数提供对它的引用。在使用命名空间 (as described earlier ) 时,需要在 connection-factory
属性中提供对已配置工厂的引用。为了方便起见,提供了一个工厂 Bean,以帮助在 Spring 应用上下文中配置连接工厂,如 the next section 中所述。
The CachingConnectionFactory
uses an instance of the Rabbit client ConnectionFactory
.
A number of configuration properties are passed through (host
, port
, userName
, password
, requestedHeartBeat
, and connectionTimeout
for example) when setting the equivalent property on the CachingConnectionFactory
.
To set other properties (clientProperties
, for example), you can define an instance of the Rabbit factory and provide a reference to it by using the appropriate constructor of the CachingConnectionFactory
.
When using the namespace (as described earlier), you need to provide a reference to the configured factory in the connection-factory
attribute.
For convenience, a factory bean is provided to assist in configuring the connection factory in a Spring application context, as discussed in the next section.
<rabbit:connection-factory
id="connectionFactory" connection-factory="rabbitConnectionFactory"/>
4.0.x 客户端默认启用自动恢复。虽然与该功能兼容,Spring AMQP 拥有自己的恢复机制并且通常不需要客户端恢复功能。我们建议禁用`amqp-client` 自动恢复,以避免在代理可用但连接尚未恢复时遇到`AutoRecoverConnectionNotCurrentlyOpenException` 实例。例如,当在`RabbitTemplate` 中配置`RetryTemplate` 时,可能会看到此异常,即使故障切换到集群中的另一个代理时也是如此。由于自动恢复连接在计时器上恢复,因此可以通过 Spring AMQP 的恢复机制更快速地恢复连接。从版本 1.7.1 开始,除非您显式创建自己的 RabbitMQ 连接工厂并将其提供给`CachingConnectionFactory`,否则 Spring AMQP 禁用`amqp-client` 自动恢复。 |
The 4.0.x client enables automatic recovery by default.
While compatible with this feature, Spring AMQP has its own recovery mechanisms and the client recovery feature generally is not needed.
We recommend disabling |
RabbitConnectionFactoryBean
and Configuring SSL
从 1.4 版开始,提供了一个方便的 RabbitConnectionFactoryBean
,以便使用依赖项注入对底层客户端连接工厂上的 SSL 属性进行方便的配置。其他 setter 委托给底层工厂。以前,您必须以编程方式配置 SSL 选项。以下示例说明如何配置 RabbitConnectionFactoryBean
:
Starting with version 1.4, a convenient RabbitConnectionFactoryBean
is provided to enable convenient configuration of SSL properties on the underlying client connection factory by using dependency injection.
Other setters delegate to the underlying factory.
Previously, you had to configure the SSL options programmatically.
The following example shows how to configure a RabbitConnectionFactoryBean
:
@Bean
RabbitConnectionFactoryBean rabbitConnectionFactory() {
RabbitConnectionFactoryBean factoryBean = new RabbitConnectionFactoryBean();
factoryBean.setUseSSL(true);
factoryBean.setSslPropertiesLocation(new ClassPathResource("secrets/rabbitSSL.properties"));
return factoryBean;
}
@Bean
CachingConnectionFactory connectionFactory(ConnectionFactory rabbitConnectionFactory) {
CachingConnectionFactory ccf = new CachingConnectionFactory(rabbitConnectionFactory);
ccf.setHost("...");
// ...
return ccf;
}
spring.rabbitmq.ssl.enabled:true
spring.rabbitmq.ssl.keyStore=...
spring.rabbitmq.ssl.keyStoreType=jks
spring.rabbitmq.ssl.keyStorePassword=...
spring.rabbitmq.ssl.trustStore=...
spring.rabbitmq.ssl.trustStoreType=jks
spring.rabbitmq.ssl.trustStorePassword=...
spring.rabbitmq.host=...
...
<rabbit:connection-factory id="rabbitConnectionFactory"
connection-factory="clientConnectionFactory"
host="${host}"
port="${port}"
virtual-host="${vhost}"
username="${username}" password="${password}" />
<bean id="clientConnectionFactory"
class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
<property name="useSSL" value="true" />
<property name="sslPropertiesLocation" value="classpath:secrets/rabbitSSL.properties"/>
</bean>
有关配置 SSL 的信息,请参见 RabbitMQ Documentation 。省略 keyStore
和 trustStore
配置以在未经证书验证的情况下通过 SSL 连接。以下示例显示了如何提供密钥和信托库配置。
See the RabbitMQ Documentation for information about configuring SSL.
Omit the keyStore
and trustStore
configuration to connect over SSL without certificate validation.
The next example shows how you can provide key and trust store configuration.
sslPropertiesLocation
属性是一个 Spring Resource
,指向包含以下键的属性文件:
The sslPropertiesLocation
property is a Spring Resource
pointing to a properties file containing the following keys:
keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret
keyStore
和 truststore
是 Spring Resources
,指向存储点。通常,此属性文件由操作系统保护,应用程序具有读取权限。
The keyStore
and truststore
are Spring Resources
pointing to the stores.
Typically this properties file is secured by the operating system with the application having read access.
从 Spring AMQP 1.5 版开始,你可以在工厂 Bean 直接设置这些属性。如果同时提供了离散属性和 sslPropertiesLocation
,后者的属性会覆盖离散值。
Starting with Spring AMQP version 1.5,you can set these properties directly on the factory bean.
If both discrete properties and sslPropertiesLocation
is provided, properties in the latter override the
discrete values.
从版本 2.0 开始,默认验证服务器证书,因为它更安全。如果您出于某种原因希望跳过此验证,请将工厂 Bean 的`skipServerCertificateValidation` 属性设置为`true`。从版本 2.1 开始,RabbitConnectionFactoryBean
现在默认调用`enableHostnameVerification()。要恢复到以前的行为,请将`enableHostnameVerification
属性设置为`false`。
Starting with version 2.0, the server certificate is validated by default because it is more secure.
If you wish to skip this validation for some reason, set the factory bean’s skipServerCertificateValidation
property to true
.
Starting with version 2.1, the RabbitConnectionFactoryBean
now calls enableHostnameVerification()
by default.
To revert to the previous behavior, set the enableHostnameVerification
property to false
.
从 2.2.5 版本开始,工厂 Bean 默认情况下始终使用 TLS v1.2;以前,它在某些情况下使用 v1.1 并在其他情况下使用 v1.2(取决于其他属性)。如果您出于某种原因需要使用 v1.1,请设置 sslAlgorithm
属性:setSslAlgorithm("TLSv1.1")
。
Starting with version 2.2.5, the factory bean will always use TLS v1.2 by default; previously, it used v1.1 in some cases and v1.2 in others (depending on other properties).
If you need to use v1.1 for some reason, set the sslAlgorithm
property: setSslAlgorithm("TLSv1.1")
.
Connecting to a Cluster
要连接到集群,可以在 CachingConnectionFactory
的 addresses
属性中配置:
To connect to a cluster, configure the addresses
property on the CachingConnectionFactory
:
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
return ccf;
}
从 3.0 版本开始,底层的连接工厂会尝试连接到一个主机,只要建立新连接就会选择一个随机地址。要恢复到从第一个到最后一个依次尝试连接的上一个行为,将 addressShuffleMode
属性设置为 AddressShuffleMode.NONE
。
Starting with version 3.0, the underlying connection factory will attempt to connect to a host, by choosing a random address, whenever a new connection is established.
To revert to the previous behavior of attempting to connect from first to last, set the addressShuffleMode
property to AddressShuffleMode.NONE
.
从版本 2.3 开始,添加了 INORDER
shuffle 模式,这意味着在创建连接后,第一个地址将移至末尾。如果希望使用 CacheMode.CONNECTION
和适当的并发性从所有节点上的所有分片进行消耗,则可能希望将此模式与 RabbitMQ Sharding Plugin 一起使用。
Starting with version 2.3, the INORDER
shuffle mode was added, which means the first address is moved to the end after a connection is created.
You may wish to use this mode with the RabbitMQ Sharding Plugin with CacheMode.CONNECTION
and suitable concurrency if you wish to consume from all shards on all nodes.
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
ccf.setAddressShuffleMode(AddressShuffleMode.INORDER);
return ccf;
}
Routing Connection Factory
从 1.3 版本开始,引入了 AbstractRoutingConnectionFactory
。此工厂提供了一种机制,用于配置多个 ConnectionFactories
的映射,并在运行时通过某个 lookupKey
确定目标 ConnectionFactory
。通常,实现会检查线程绑定的上下文。为方便起见,Spring AMQP 提供了 SimpleRoutingConnectionFactory
,它从 SimpleResourceHolder
中获取当前线程绑定的 lookupKey
。以下示例展示了如何在 XML 和 Java 中配置 SimpleRoutingConnectionFactory
:
Starting with version 1.3, the AbstractRoutingConnectionFactory
has been introduced.
This factory provides a mechanism to configure mappings for several ConnectionFactories
and determine a target ConnectionFactory
by some lookupKey
at runtime.
Typically, the implementation checks a thread-bound context.
For convenience, Spring AMQP provides the SimpleRoutingConnectionFactory
, which gets the current thread-bound lookupKey
from the SimpleResourceHolder
.
The following examples shows how to configure a SimpleRoutingConnectionFactory
in both XML and Java:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
<property name="targetConnectionFactories">
<map>
<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
</map>
</property>
</bean>
<rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void service(String vHost, String payload) {
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
rabbitTemplate.convertAndSend(payload);
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
}
}
使用后取消绑定资源非常重要。有关详细信息,请参见 AbstractRoutingConnectionFactory
的 JavaDoc 。
It is important to unbind the resource after use.
For more information, see the JavaDoc for AbstractRoutingConnectionFactory
.
从 1.4 版本开始,RabbitTemplate
支持 SpEL sendConnectionFactorySelectorExpression
和 receiveConnectionFactorySelectorExpression
属性,在每一次 AMQP 协议交互操作(send
、sendAndReceive
、receive
或 receiveAndReply
)中都会对其进行评估,从而解析出所提供的 AbstractRoutingConnectionFactory
的 lookupKey
值。你可以在表达式中使用 Bean 引用,例如 @vHostResolver.getVHost(#root)
。对于 send
操作,要发送的消息是根评估对象。对于 receive
操作,queueName
是根评估对象。
Starting with version 1.4, RabbitTemplate
supports the SpEL sendConnectionFactorySelectorExpression
and receiveConnectionFactorySelectorExpression
properties, which are evaluated on each AMQP protocol interaction operation (send
, sendAndReceive
, receive
, or receiveAndReply
), resolving to a lookupKey
value for the provided AbstractRoutingConnectionFactory
.
You can use bean references, such as @vHostResolver.getVHost(#root)
in the expression.
For send
operations, the message to be sent is the root evaluation object.
For receive
operations, the queueName
is the root evaluation object.
路由算法如下:如果选择器表达式为 null
或评估结果为 null
或者提供的 ConnectionFactory
不是 AbstractRoutingConnectionFactory
的实例,则一切都像以前一样,依赖于提供的 ConnectionFactory
实现。如果评估结果不是 null
,但对于该 lookupKey
没有目标 ConnectionFactory
,且 AbstractRoutingConnectionFactory
配置了 lenientFallback = true
,则也会发生这种情况。在 AbstractRoutingConnectionFactory
的情况下,它会回退到基于 determineCurrentLookupKey()
的 routing
实现。但是,如果 lenientFallback = false
,则会抛出 IllegalStateException
。
The routing algorithm is as follows: If the selector expression is null
or is evaluated to null
or the provided ConnectionFactory
is not an instance of AbstractRoutingConnectionFactory
, everything works as before, relying on the provided ConnectionFactory
implementation.
The same occurs if the evaluation result is not null
, but there is no target ConnectionFactory
for that lookupKey
and the AbstractRoutingConnectionFactory
is configured with lenientFallback = true
.
In the case of an AbstractRoutingConnectionFactory
, it does fallback to its routing
implementation based on determineCurrentLookupKey()
.
However, if lenientFallback = false
, an IllegalStateException
is thrown.
命名空间支持还提供 <rabbit:template>
组件上的 send-connection-factory-selector-expression
和 receive-connection-factory-selector-expression
属性。
The namespace support also provides the send-connection-factory-selector-expression
and receive-connection-factory-selector-expression
attributes on the <rabbit:template>
component.
此外,从版本 1.4 开始,您可以在侦听器容器中配置一个路由连接工厂。在这种情况下,队列名列表用作查找键。例如,如果您使用 setQueueNames("thing1", "thing2")`配置容器,则查找键为 `[thing1,thing]"
(请注意,密钥中没有空格)。
Also, starting with version 1.4, you can configure a routing connection factory in a listener container.
In that case, the list of queue names is used as the lookup key.
For example, if you configure the container with setQueueNames("thing1", "thing2")
, the lookup key is [thing1,thing]"
(note that there is no space in the key).
从 1.6.9 版本开始,你可以通过在侦听器容器上使用 setLookupKeyQualifier
向查找键添加限定符。这样做会使你能够侦听同名但位于不同虚拟主机中的队列(你可以在其中为每个队列使用一个连接工厂)。
Starting with version 1.6.9, you can add a qualifier to the lookup key by using setLookupKeyQualifier
on the listener container.
Doing so enables, for example, listening to queues with the same name but in a different virtual host (where you would have a connection factory for each).
例如,对于查找键限定符 thing1`和侦听队列 `thing2`的容器,您用来注册目标连接工厂的查找键可以为 `thing1[thing2]
。
For example, with lookup key qualifier thing1
and a container listening to queue thing2
, the lookup key you could register the target connection factory with could be thing1[thing2]
.
目标(和默认,如果提供)连接工厂必须具有发布确认和返回的相同设置。请参阅 Publisher Confirms and Returns。
The target (and default, if provided) connection factories must have the same settings for publisher confirms and returns. See Publisher Confirms and Returns.
从 2.4.4 版本开始,可以禁用此验证。如果你需要确认和返回之间的值不相等的情况,则可以使用 AbstractRoutingConnectionFactory#setConsistentConfirmsReturns
关闭验证。请注意,第一个添加到 AbstractRoutingConnectionFactory
的连接工厂将确定 confirms
和 returns
的常规值。
Starting with version 2.4.4, this validation can be disabled.
If you have a case that the values between confirms and returns need to be unequal, you can use AbstractRoutingConnectionFactory#setConsistentConfirmsReturns
to turn of the validation.
Note that the first connection factory added to AbstractRoutingConnectionFactory
will determine the general values of confirms
and returns
.
如果你需要检查一些消息的确认/返回而另一些消息则不需要,这可能会派上用场。例如:
It may be useful if you have a case that certain messages you would to check confirms/returns and others you don’t. For example:
@Bean
public RabbitTemplate rabbitTemplate() {
final com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
cf.setHost("localhost");
cf.setPort(5672);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
PooledChannelConnectionFactory pooledChannelConnectionFactory = new PooledChannelConnectionFactory(cf);
final Map<Object, ConnectionFactory> connectionFactoryMap = new HashMap<>(2);
connectionFactoryMap.put("true", cachingConnectionFactory);
connectionFactoryMap.put("false", pooledChannelConnectionFactory);
final AbstractRoutingConnectionFactory routingConnectionFactory = new SimpleRoutingConnectionFactory();
routingConnectionFactory.setConsistentConfirmsReturns(false);
routingConnectionFactory.setDefaultTargetConnectionFactory(pooledChannelConnectionFactory);
routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);
final RabbitTemplate rabbitTemplate = new RabbitTemplate(routingConnectionFactory);
final Expression sendExpression = new SpelExpressionParser().parseExpression(
"messageProperties.headers['x-use-publisher-confirms'] ?: false");
rabbitTemplate.setSendConnectionFactorySelectorExpression(sendExpression);
}
这样,带有头 x-use-publisher-confirms: true
的消息将通过缓存连接发送,并且你可以确保消息传递。有关确保消息传递的更多信息,请参见 Publisher Confirms and Returns 。
This way messages with the header x-use-publisher-confirms: true
will be sent through the caching connection, and you can ensure the message delivery.
See Publisher Confirms and Returns for more information about ensuring message delivery.
Queue Affinity and the LocalizedQueueConnectionFactory
在集群中使用高可用性 (HA) 队列时,为了获得最佳性能,你可能希望连接到领导队列所在的物理代理。CachingConnectionFactory
可以配置为具有多个代理地址。这是为了进行故障转移,客户端会尝试与已配置的 AddressShuffleMode
顺序相一致进行连接。LocalizedQueueConnectionFactory
使用管理插件提供的 REST API 来确定哪个节点是队列的领导者。然后,它会创建一个(或从缓存中检索)CachingConnectionFactory
来连接到该节点。如果连接失败,则确定新的领导节点,并且使用者连接到它。LocalizedQueueConnectionFactory
配置了一个默认的连接工厂,以防无法确定队列的物理位置,在这种情况下,它会像往常一样连接到集群。
When using HA queues in a cluster, for the best performance, you may want to connect to the physical broker
where the lead queue resides.
The CachingConnectionFactory
can be configured with multiple broker addresses.
This is to fail over and the client attempts to connect in accordance with the configured AddressShuffleMode
order.
The LocalizedQueueConnectionFactory
uses the REST API provided by the management plugin to determine which node is the lead for the queue.
It then creates (or retrieves from a cache) a CachingConnectionFactory
that connects to just that node.
If the connection fails, the new lead node is determined and the consumer connects to it.
The LocalizedQueueConnectionFactory
is configured with a default connection factory, in case the physical location of the queue cannot be determined, in which case it connects as normal to the cluster.
LocalizedQueueConnectionFactory
是 RoutingConnectionFactory
,而 SimpleMessageListenerContainer
使用队列名称作为查找键,正如上文 Routing Connection Factory 中所讨论的。
The LocalizedQueueConnectionFactory
is a RoutingConnectionFactory
and the SimpleMessageListenerContainer
uses the queue names as the lookup key as discussed in Routing Connection Factory above.
出于此原因(用于查找的队列名称), |
For this reason (the use of the queue name for the lookup), the |
必须在每个节点上启用 RabbitMQ 管理插件。 |
The RabbitMQ management plugin must be enabled on each node. |
此连接工厂适用于长时间连接,例如 SimpleMessageListenerContainer
使用的连接。由于在建立连接之前调用 REST API 会产生开销,因此它不适用于短连接使用,例如与 RabbitTemplate
配合使用。此外,对于发布操作,队列是未知的,并且消息会发布到所有群集成员,因此查找节点的逻辑价值不大。
This connection factory is intended for long-lived connections, such as those used by the SimpleMessageListenerContainer
.
It is not intended for short connection use, such as with a RabbitTemplate
because of the overhead of invoking the REST API before making the connection.
Also, for publish operations, the queue is unknown, and the message is published to all cluster members anyway, so the logic of looking up the node has little value.
以下示例配置显示了如何配置工厂:
The following example configuration shows how to configure the factories:
@Autowired
private ConfigurationProperties props;
@Bean
public CachingConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.props.getAddresses());
cf.setUsername(this.props.getUsername());
cf.setPassword(this.props.getPassword());
cf.setVirtualHost(this.props.getVirtualHost());
return cf;
}
@Bean
public LocalizedQueueConnectionFactory queueAffinityCF(
@Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
return new LocalizedQueueConnectionFactory(defaultCF,
StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
StringUtils.commaDelimitedListToStringArray(this.props.getAdminUris()),
StringUtils.commaDelimitedListToStringArray(this.props.getNodes()),
this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
false, null);
}
请注意,前三个参数是 addresses
、adminUris
和 nodes
的数组。它们的位置很重要,即当容器尝试连接到队列时,它会使用管理 API 来确定哪个节点是队列的领导者,并连接到与该节点在同一数组位置的地址。
Notice that the first three parameters are arrays of addresses
, adminUris
, and nodes
.
These are positional in that, when a container attempts to connect to a queue, it uses the admin API to determine which node is the lead for the queue and connects to the address in the same array position as that node.
从版本 3.0 开始,RabbitMQ http-client
不再用于访问 Rest API。相反,如果 spring-webflux
在类路径上,则默认使用 WebClient
from Spring Webflux;否则,则使用 RestTemplate
。
Starting with version 3.0, the RabbitMQ http-client
is no longer used to access the Rest API.
Instead, by default, the WebClient
from Spring Webflux is used if spring-webflux
is on the class path; otherwise a RestTemplate
is used.
要在类路径中添加 WebFlux
:
To add WebFlux
to the class path:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
compile 'org.springframework.amqp:spring-rabbit'
你还可以通过执行`LocalizedQueueConnectionFactory.NodeLocator`并覆盖它的`createClient`、``restCall`和(可选地)`close`方法,使用其他 REST 技术。
You can also use other REST technology by implementing LocalizedQueueConnectionFactory.NodeLocator
and overriding its createClient, ``restCall
, and optionally, close
methods.
lqcf.setNodeLocator(new NodeLocator<MyClient>() {
@Override
public MyClient createClient(String userName, String password) {
...
}
@Override
public HashMap<String, Object> restCall(MyClient client, URI uri) {
...
});
});
框架提供了`WebFluxNodeLocator`和`RestTemplateNodeLocator`,其默认值如上所述。
The framework provides the WebFluxNodeLocator
and RestTemplateNodeLocator
, with the default as discussed above.
Publisher Confirms and Returns
通过将`CachingConnectionFactory`属性`publisherConfirmType`设置为`ConfirmType.CORRELATED`,并将`publisherReturns`属性设置为`true`可以支持已确认(带相关性)和返回的消息。
Confirmed (with correlation) and returned messages are supported by setting the CachingConnectionFactory
property publisherConfirmType
to ConfirmType.CORRELATED
and the publisherReturns
property to 'true'.
当设置了这些选项时,工厂创建的`Channel`实例会包装在一个`PublisherCallbackChannel`中,该`PublisherCallbackChannel`用于促进回调。获得此类通道后,客户端可以在`Channel`中注册一个`PublisherCallbackChannel.Listener`。`PublisherCallbackChannel`实现包含将确认或返回路由到适当侦听器的逻辑。以下部分将进一步解释这些功能。
When these options are set, Channel
instances created by the factory are wrapped in an PublisherCallbackChannel
, which is used to facilitate the callbacks.
When such a channel is obtained, the client can register a PublisherCallbackChannel.Listener
with the Channel
.
The PublisherCallbackChannel
implementation contains logic to route a confirm or return to the appropriate listener.
These features are explained further in the following sections.
另请参阅 Scoped Operations 中的 Correlated Publisher Confirms and Returns 和 simplePublisherConfirms
。
See also Correlated Publisher Confirms and Returns and simplePublisherConfirms
in Scoped Operations.
有关更多背景信息,请参阅 RabbitMQ 团队题为 Introducing Publisher Confirms 的博客文章。 |
For some more background information, see the blog post by the RabbitMQ team titled Introducing Publisher Confirms. |
Connection and Channel Listeners
连接工厂支持注册 ConnectionListener
和 ChannelListener
实现。这使你可以接收连接和通道相关事件的通知。(连接建立后, ConnectionListener
用于 RabbitAdmin
执行声明 - 有关更多信息,请参见 Automatic Declaration of Exchanges, Queues, and Bindings )。以下清单显示了 ConnectionListener
接口定义:
The connection factory supports registering ConnectionListener
and ChannelListener
implementations.
This allows you to receive notifications for connection and channel related events.
(A ConnectionListener
is used by the RabbitAdmin
to perform declarations when the connection is established - see Automatic Declaration of Exchanges, Queues, and Bindings for more information).
The following listing shows the ConnectionListener
interface definition:
@FunctionalInterface
public interface ConnectionListener {
void onCreate(Connection connection);
default void onClose(Connection connection) {
}
default void onShutDown(ShutdownSignalException signal) {
}
}
从 2.0 版本开始,向`org.springframework.amqp.rabbit.connection.Connection`对象提供 `com.rabbitmq.client.BlockedListener`实例,以获取连接阻塞和取消阻塞事件的通知。以下示例显示了`ChannelListener`接口定义:
Starting with version 2.0, the org.springframework.amqp.rabbit.connection.Connection
object can be supplied with com.rabbitmq.client.BlockedListener
instances to be notified for connection blocked and unblocked events.
The following example shows the ChannelListener interface definition:
@FunctionalInterface
public interface ChannelListener {
void onCreate(Channel channel, boolean transactional);
default void onShutDown(ShutdownSignalException signal) {
}
}
查询 Publishing is Asynchronous — How to Detect Successes and Failures,了解可能想要注册 `ChannelListener`的一种情况。
See Publishing is Asynchronous — How to Detect Successes and Failures for one scenario where you might want to register a ChannelListener
.
Logging Channel Close Events
1.5 版本引入了一种机制,使用户能够控制日志级别。
Version 1.5 introduced a mechanism to enable users to control logging levels.
`AbstractConnectionFactory`使用默认策略以下列方式记录通道关闭:
The AbstractConnectionFactory
uses a default strategy to log channel closures as follows:
-
Normal channel closes (200 OK) are not logged.
-
If a channel is closed due to a failed passive queue declaration, it is logged at DEBUG level.
-
If a channel is closed because the
basic.consume
is refused due to an exclusive consumer condition, it is logged at DEBUG level (since 3.1, previously INFO). -
All others are logged at ERROR level.
要修改此行为,可以将自定义`ConditionalExceptionLogger`注入到`CachingConnectionFactory`的`closeExceptionLogger`属性中。
To modify this behavior, you can inject a custom ConditionalExceptionLogger
into the
CachingConnectionFactory
in its closeExceptionLogger
property.
此外,`AbstractConnectionFactory.DefaultChannelCloseLogger`现在是公开的,允许对它进行子类化。
Also, the AbstractConnectionFactory.DefaultChannelCloseLogger
is now public, allowing it to be sub classed.
另请参见 Consumer Events。
See also Consumer Events.
Runtime Cache Properties
从 1.6 版本开始,`CachingConnectionFactory`现在通过`getCacheProperties()`方法提供缓存统计信息。这些统计信息可用于调整缓存,以便在生产中对其进行优化。例如,高水位标记可用于确定是否应增加缓存大小。如果它等于缓存大小,则可以考虑进一步增加。下表描述了`CacheMode.CHANNEL`属性:
Staring with version 1.6, the CachingConnectionFactory
now provides cache statistics through the getCacheProperties()
method.
These statistics can be used to tune the cache to optimize it in production.
For example, the high water marks can be used to determine whether the cache size should be increased.
If it equals the cache size, you might want to consider increasing further.
The following table describes the CacheMode.CHANNEL
properties:
Property | Meaning |
---|---|
connectionName |
The name of the connection generated by the |
channelCacheSize |
The currently configured maximum channels that are allowed to be idle. |
localPort |
The local port for the connection (if available). This can be used to correlate with connections and channels on the RabbitMQ Admin UI. |
idleChannelsTx |
The number of transactional channels that are currently idle (cached). |
idleChannelsNotTx |
The number of non-transactional channels that are currently idle (cached). |
idleChannelsTxHighWater |
The maximum number of transactional channels that have been concurrently idle (cached). |
idleChannelsNotTxHighWater |
The maximum number of non-transactional channels have been concurrently idle (cached). |
下表描述了`CacheMode.CONNECTION`属性:
The following table describes the CacheMode.CONNECTION
properties:
Property | Meaning |
---|---|
connectionName:<localPort> |
The name of the connection generated by the |
openConnections |
The number of connection objects representing connections to brokers. |
channelCacheSize |
The currently configured maximum channels that are allowed to be idle. |
connectionCacheSize |
The currently configured maximum connections that are allowed to be idle. |
idleConnections |
The number of connections that are currently idle. |
idleConnectionsHighWater |
The maximum number of connections that have been concurrently idle. |
idleChannelsTx:<localPort> |
The number of transactional channels that are currently idle (cached) for this connection.
You can use the |
idleChannelsNotTx:<localPort> |
The number of non-transactional channels that are currently idle (cached) for this connection.
The |
idleChannelsTxHighWater:<localPort> |
The maximum number of transactional channels that have been concurrently idle (cached). The localPort part of the property name can be used to correlate with connections and channels on the RabbitMQ Admin UI. |
idleChannelsNotTxHighWater:<localPort> |
The maximum number of non-transactional channels have been concurrently idle (cached).
You can use the |
还包括`cacheMode`属性(CHANNEL`或`CONNECTION
)。
The cacheMode
property (CHANNEL
or CONNECTION
) is also included.
RabbitMQ Automatic Connection/Topology recovery
从 Spring AMQP 的第一个版本开始,该框架已在自己内部提供连接和通道恢复以防代理故障。此外,正如在 Configuring the Broker中所讨论的,当连接重新建立时,RabbitAdmin`重新声明所有基础设施 bean(队列和其他队列)。因此,它不依赖于 `amqp-client`库现在提供的 auto-recovery。`amqp-client`在默认情况下已启用自动恢复。这两种恢复机制之间存在一些不兼容性,因此默认情况下,Spring 将底层 `RabbitMQ connectionFactory`上的 `automaticRecoveryEnabled`属性设置为 `false
。即使属性为 true
,Spring 也会立即关闭所有已恢复的连接,从而有效地禁用它。
Since the first version of Spring AMQP, the framework has provided its own connection and channel recovery in the event of a broker failure.
Also, as discussed in Configuring the Broker, the RabbitAdmin
re-declares any infrastructure beans (queues and others) when the connection is re-established.
It therefore does not rely on the auto-recovery that is now provided by the amqp-client
library.
The amqp-client
, has auto recovery enabled by default.
There are some incompatibilities between the two recovery mechanisms so, by default, Spring sets the automaticRecoveryEnabled
property on the underlying RabbitMQ connectionFactory
to false
.
Even if the property is true
, Spring effectively disables it, by immediately closing any recovered connections.
默认情况下,仅将定义为 bean 的元素(队列、交换、绑定)在连接故障后重新声明。请参阅 Recovering Auto-Delete Declarations 更改该行为。
By default, only elements (queues, exchanges, bindings) that are defined as beans will be re-declared after a connection failure. See Recovering Auto-Delete Declarations for how to change that behavior.