Connection and Resource Management

虽然我们在上一节中描述的 AMQP 模型是通用的,适用于所有实现,但是当我们进入资源管理时,详细信息特定于代理实现。因此,在本节中,我们重点介绍仅存在于我们的“spring-rabbit”模块中的代码,因为此时,RabbitMQ 是唯一支持的实现。 用于管理与 RabbitMQ 代理的连接的核心组件是 ConnectionFactory 接口。ConnectionFactory 实现的责任是提供 org.springframework.amqp.rabbit.connection.Connection 的一个实例,它是一个 com.rabbitmq.client.Connection 的包装器。

Choosing a Connection Factory

有三个连接工厂可供选择

  • PooledChannelConnectionFactory

  • ThreadChannelConnectionFactory

  • CachingConnectionFactory

前两个在版本 2.3 中添加。

对于大多数用例,应使用 CachingConnectionFactory 。如果要确保严格的消息排序而不必使用 Scoped Operations ,则可以使用 ThreadChannelConnectionFactoryPooledChannelConnectionFactory 类似于 CachingConnectionFactory ,因为它使用一个连接和一个通道池。其实现更简单,但它不支持关联的发布者确认。

所有三个工厂都支持简单的发布者确认。

在将 RabbitTemplate 配置为使用 separate connection 时,你可以在从版本 2.3.2 开始将发布连接工厂配置为不同类型。默认情况下,发布工厂是相同类型,主工厂上设置的任何属性也会传播到发布工厂。

从版本 3.1 开始,AbstractConnectionFactory 包含 connectionCreatingBackOff 属性,该属性支持连接模块中的延迟策略。目前,createChannel() 的行为支持在达到 channelMax 限制时处理发生的异常,基于尝试和间隔实现延迟策略。

PooledChannelConnectionFactory

该工厂基于 Apache Pool2 管理单个连接和两个通道池。一个池用于事务通道,另一个池用于非事务通道。该池是具有默认配置的 GenericObjectPool s;提供了一个回调来配置这些池;有关更多信息,请参阅 Apache 文档。

必须在类路径上放置 Apache commons-pool2 jar 才能使用此工厂。

@Bean
PooledChannelConnectionFactory pcf() throws Exception {
    ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
    rabbitConnectionFactory.setHost("localhost");
    PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
    pcf.setPoolConfigurer((pool, tx) -> {
        if (tx) {
            // configure the transactional pool
        }
        else {
            // configure the non-transactional pool
        }
    });
    return pcf;
}

ThreadChannelConnectionFactory

此工厂管理一个连接和两个 ThreadLocal ,一个用于事务通道,另一个用于非事务通道。此工厂可确保同一线程上的所有操作使用同一个通道(只要它仍然处于打开状态)。这有助于严格的消息排序,而无需 Scoped Operations 。为避免内存泄露,如果应用程序使用许多短生存期的线程,则必须调用工厂的 closeThreadChannel() 以释放通道资源。从版本 2.3.7 开始,一个线程可以将其通道转移到另一个线程。有关更多信息,请参见 Strict Message Ordering in a Multi-Threaded Environment

CachingConnectionFactory

提供的第三个实现是 CachingConnectionFactory,它默认建立一个应用程序可以共享的单个连接代理。可以共享连接,因为 AMQP 消息传递的“处理单元”实际上是“通道”(在某些方面,这类似于 JMS 中连接与会话之间的关系)。连接实例提供了一个 createChannel 方法。CachingConnectionFactory 实现支持这些通道的缓存,并且根据通道是否具有事务性为通道维护单独的缓存。创建 CachingConnectionFactory 实例时,可以通过构造函数提供“主机名”。你还应该提供“用户名”和“密码”属性。要配置通道缓存的大小(默认为 25),可以调用 setChannelCacheSize() 方法。

从版本 1.3 开始,可以将 CachingConnectionFactory 配置为缓存连接以及通道。在这种情况下,对 createConnection() 的每次调用都会创建一个新连接(或从缓存中检索一个空闲连接)。关闭连接会将其返回到缓存(如果未达到缓存大小)。在这样的连接上创建的通道也会被缓存。在某些环境中可能有用单独的连接,例如从 HA 群集使用负载均衡器来连接到不同的群集成员,以及其他连接。要缓存连接,请将 cacheMode 设置为 CacheMode.CONNECTION

这并没有限制连接数。相反,它指定了允许的空闲打开连接数。

从版本 1.5.5 开始,提供了一个名为 connectionLimit 的新属性。当设置此属性时,它将限制允许的连接总数。如果设置,则当达到限制时,channelCheckoutTimeLimit 用于等待连接变为闲置状态。如果超过时间,则会抛出 AmqpTimeoutException

当高速缓存模式为 CONNECTION 时,不支持自动声明队列和其他队列(请参见 Automatic Declaration of Exchanges, Queues, and Bindings )。 此外,在撰写本文时,amqp-client 库默认情况下会为每个连接创建一个固定线程池(默认大小:Runtime.getRuntime().availableProcessors() * 2 线程)。在使用大量连接时,应考虑在 CachingConnectionFactory 上设置一个自定义 executor。然后,所有连接都可以使用相同的执行程序,并且可以共享其线程。执行程序的线程池应该是无界的,或者应根据预期的用途(通常,每个连接至少一个线程)进行适当设置。如果在每个连接上创建多个通道,则池大小会影响并发性,因此可变(或简单缓存)线程池执行程序最合适。

了解以下内容很重要:缓存大小(默认情况下)不是限制,而是仅仅可以缓存的通道数量。如果缓存大小为 10,实际上可以使用任意数量的通道。如果使用了 10 个以上的通道并将它们全部返回到缓存,则 10 个通道将进入缓存。其余的会被物理关闭。

从版本 1.6 开始,默认通道缓存大小已从 1 增加到 25。在高容量多线程环境中,小缓存意味着通道以高比率创建和关闭。增加默认缓存大小可以避免这种情况开销。你应该通过 RabbitMQ 管理员 UI 监视正在使用的通道,如果你看到许多通道被创建和关闭,请考虑进一步增加缓存大小。缓存仅按需增长(以满足应用程序的并发性要求),因此此更改不会影响现有的低容量应用程序。

从版本 1.4.2 开始,CachingConnectionFactory 有一个名为 channelCheckoutTimeout 的属性。当此属性大于零时,channelCacheSize 变成可以在连接上创建的通道数量的限制。如果达到限制,则调用线程将阻塞,直到通道可用或达到此超时时间,在这种情况下,将抛出 AmqpTimeoutException

在框架内使用的通道(例如 RabbitTemplate)会可靠地返回到缓存。如果在框架外创建通道(例如,通过直接访问连接并调用 createChannel()),则必须可靠地返回这些通道(通过关闭),也许是在 finally 块中,以避免用尽通道。

以下示例展示了如何创建一个新的 connection

CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

Connection connection = connectionFactory.createConnection();

当使用 XML 时,配置可能看起来像以下示例:

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="somehost"/>
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
</bean>

还有一个仅在框架单元测试代码中可用的 SingleConnectionFactory 实现。它比 CachingConnectionFactory 要简单,因为它不缓存通道,但它不适用于简单的测试之外的实际使用,因为它缺乏性能和恢复能力。如果您出于某种原因需要实现您自己的 ConnectionFactory,那么 AbstractConnectionFactory 基本类可能会提供一个不错的起点。

可以使用以下 rabbit 名称空间快速便创建一个 ConnectionFactory

<rabbit:connection-factory id="connectionFactory"/>

在大多数情况下,这种方法更可取,因为框架可以为您选择最佳默认设置。创建的实例是 CachingConnectionFactory。请记住,通道的默认缓存大小为 25。如果要缓存更多通道,请通过设置“channelCacheSize”属性来设置更大的值。在 XML 中,它看起来如下:

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="somehost"/>
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
    <property name="channelCacheSize" value="50"/>
</bean>

此外,使用名称空间,您可以添加“channel-cache-size”属性,如下所示:

<rabbit:connection-factory
    id="connectionFactory" channel-cache-size="50"/>

默认缓存模式是 CHANNEL,但您可以将其配置为缓存连接。在以下示例中,我们使用 connection-cache-size

<rabbit:connection-factory
    id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>

您可以使用名称空间来提供主机和端口属性,如下所示:

<rabbit:connection-factory
    id="connectionFactory" host="somehost" port="5672"/>

或者,如果在集群环境中运行,可以使用地址属性,如下所示:

<rabbit:connection-factory
    id="connectionFactory" addresses="host1:5672,host2:5672" address-shuffle-mode="RANDOM"/>

有关 address-shuffle-mode 的信息,请参见 Connecting to a Cluster

以下示例使用自定义线程工厂,用 rabbitmq- 为线程名称添加前缀:

<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
    thread-factory="tf"
    channel-cache-size="10" username="user" password="password" />

<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
    <constructor-arg value="rabbitmq-" />
</bean>

AddressResolver

从 2.1.15 版开始,您现在可以使用 AddressResolver 来解析连接地址。这将覆盖 addresseshost/port 属性的任何设置。

Naming Connections

从 1.7 版开始,ConnectionNameStrategy 是为注入到 AbstractionConnectionFactory 中而提供的。生成的名字用于目标 RabbitMQ 连接的特定于应用程序的识别。如果 RabbitMQ 服务器支持,连接名称将显示在管理界面中。该值不必唯一,不能用作连接标识符——例如,在 HTTP API 请求中。这个值应该是人类可读的,并且是 ClientProperties 的一部分,其键为 connection_name。您可以使用一个简单的 Lambda,如下所示:

connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");

ConnectionFactory 参数可用于通过一些逻辑区分目标连接名称。默认情况下,将使用 AbstractConnectionFactorybeanName、表示对象的十六进制字符串和内部计数器来生成 connection_name<rabbit:connection-factory> 名称空间组件还会随 connection-name-strategy 属性一起提供。

SimplePropertyValueConnectionNameStrategy 实现将连接名称设置为应用程序属性。您可以将其声明为 @Bean 并将其注入到连接工厂中,如下例所示:

@Bean
public SimplePropertyValueConnectionNameStrategy cns() {
    return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
}

@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    ...
    connectionFactory.setConnectionNameStrategy(cns);
    return connectionFactory;
}

该属性必须存在于应用程序上下文的 Environment 中。

在使用 Spring Boot 及其自动配置的连接工厂时,您只需要声明 ConnectionNameStrategy @Bean。Boot 会自动检测 bean 并将其连接到工厂中。

Blocked Connections and Resource Constraints

连接可能会被阻止与对应于 Memory Alarm的代理进行交互。从版本 2.0 开始,可以为 org.springframework.amqp.rabbit.connection.Connection`提供 `com.rabbitmq.client.BlockedListener`实例,以便在连接被阻止和取消阻止事件时收到通知。此外,`AbstractConnectionFactory`通过其内部 `BlockedListener`实现分别发出 `ConnectionBlockedEvent`和 `ConnectionUnblockedEvent。通过这些信息,您可以提供应用程序逻辑来对代理上的问题做出适当的反应,例如采取一些纠正措施。

当应用程序配置为具有单个 CachingConnectionFactory 时,就像 Spring Boot 自动配置中的默认设置一样,当连接被代理程序阻塞时,该应用程序便停止工作。当连接被代理程序阻塞时,其任何客户端都会停止工作。如果我们同时已经在同一个应用程序中有生产者和消费者,当生产者阻塞连接(因为代理程序上不再有资源)而消费者无法释放连接(因为连接被阻塞)时,最终可能会导致死锁。为了缓解该问题,我们建议再使用一个拥有相同选项的独立 CachingConnectionFactory 实例——一个用于生产者,一个用于消费者。对于在消费者线程上执行的事务性生产者来说,单独的 CachingConnectionFactory 是不可能的,因为它们应该重新使用与消费者事务关联的 Channel

从版本 2.0.2 开始, RabbitTemplate 具有一个配置选项,除非正在使用事务,否则可自动使用第二个连接工厂。有关更多信息,请参见 Using a Separate Connection 。发布者连接的 ConnectionNameStrategy 与主策略相同, .publisher 附加到了调用该方法的结果上。

从 1.7.7 版开始,提供了 AmqpResourceNotAvailableException,当 SimpleConnection.createChannel() 无法创建 Channel 时会抛出该异常(例如,因为达到了 channelMax 限制并且缓存中没有可用的通道)。您可以在 RetryPolicy 中使用该异常,以便在恢复一段时间后恢复操作。

Configuring the Underlying Client Connection Factory

CachingConnectionFactory 使用 Rabbit 客户端实例 ConnectionFactory 。在设置 CachingConnectionFactory 上的等效属性时,将传递许多配置属性(例如 hostportuserNamepasswordrequestedHeartBeatconnectionTimeout )。要设置其他属性(例如 clientProperties ),可以定义 Rabbit 工厂实例,并通过使用 CachingConnectionFactory 的相应构造函数提供对它的引用。在使用命名空间 (as described earlier ) 时,需要在 connection-factory 属性中提供对已配置工厂的引用。为了方便起见,提供了一个工厂 Bean,以帮助在 Spring 应用上下文中配置连接工厂,如 the next section 中所述。

<rabbit:connection-factory
      id="connectionFactory" connection-factory="rabbitConnectionFactory"/>

4.0.x 客户端默认启用自动恢复。虽然与该功能兼容,Spring AMQP 拥有自己的恢复机制并且通常不需要客户端恢复功能。我们建议禁用`amqp-client` 自动恢复,以避免在代理可用但连接尚未恢复时遇到`AutoRecoverConnectionNotCurrentlyOpenException` 实例。例如,当在`RabbitTemplate` 中配置`RetryTemplate` 时,可能会看到此异常,即使故障切换到集群中的另一个代理时也是如此。由于自动恢复连接在计时器上恢复,因此可以通过 Spring AMQP 的恢复机制更快速地恢复连接。从版本 1.7.1 开始,除非您显式创建自己的 RabbitMQ 连接工厂并将其提供给`CachingConnectionFactory`,否则 Spring AMQP 禁用`amqp-client` 自动恢复。RabbitConnectionFactoryBean 创建的 RabbitMQ`ConnectionFactory` 实例默认情况下也已禁用该选项。

RabbitConnectionFactoryBean and Configuring SSL

从 1.4 版开始,提供了一个方便的 RabbitConnectionFactoryBean,以便使用依赖项注入对底层客户端连接工厂上的 SSL 属性进行方便的配置。其他 setter 委托给底层工厂。以前,您必须以编程方式配置 SSL 选项。以下示例说明如何配置 RabbitConnectionFactoryBean

Java
@Bean
RabbitConnectionFactoryBean rabbitConnectionFactory() {
    RabbitConnectionFactoryBean factoryBean = new RabbitConnectionFactoryBean();
    factoryBean.setUseSSL(true);
    factoryBean.setSslPropertiesLocation(new ClassPathResource("secrets/rabbitSSL.properties"));
    return factoryBean;
}

@Bean
CachingConnectionFactory connectionFactory(ConnectionFactory rabbitConnectionFactory) {
    CachingConnectionFactory ccf = new CachingConnectionFactory(rabbitConnectionFactory);
    ccf.setHost("...");
    // ...
    return ccf;
}
Boot application.properties
spring.rabbitmq.ssl.enabled:true
spring.rabbitmq.ssl.keyStore=...
spring.rabbitmq.ssl.keyStoreType=jks
spring.rabbitmq.ssl.keyStorePassword=...
spring.rabbitmq.ssl.trustStore=...
spring.rabbitmq.ssl.trustStoreType=jks
spring.rabbitmq.ssl.trustStorePassword=...
spring.rabbitmq.host=...
...
XML
<rabbit:connection-factory id="rabbitConnectionFactory"
    connection-factory="clientConnectionFactory"
    host="${host}"
    port="${port}"
    virtual-host="${vhost}"
    username="${username}" password="${password}" />

<bean id="clientConnectionFactory"
        class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
    <property name="useSSL" value="true" />
    <property name="sslPropertiesLocation" value="classpath:secrets/rabbitSSL.properties"/>
</bean>

有关配置 SSL 的信息,请参见 RabbitMQ Documentation 。省略 keyStoretrustStore 配置以在未经证书验证的情况下通过 SSL 连接。以下示例显示了如何提供密钥和信托库配置。

sslPropertiesLocation 属性是一个 Spring Resource,指向包含以下键的属性文件:

keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret

keyStoretruststore 是 Spring Resources,指向存储点。通常,此属性文件由操作系统保护,应用程序具有读取权限。

从 Spring AMQP 1.5 版开始,你可以在工厂 Bean 直接设置这些属性。如果同时提供了离散属性和 sslPropertiesLocation,后者的属性会覆盖离散值。

从版本 2.0 开始,默认验证服务器证书,因为它更安全。如果您出于某种原因希望跳过此验证,请将工厂 Bean 的`skipServerCertificateValidation` 属性设置为`true`。从版本 2.1 开始,RabbitConnectionFactoryBean 现在默认调用`enableHostnameVerification()。要恢复到以前的行为,请将`enableHostnameVerification 属性设置为`false`。

从 2.2.5 版本开始,工厂 Bean 默认情况下始终使用 TLS v1.2;以前,它在某些情况下使用 v1.1 并在其他情况下使用 v1.2(取决于其他属性)。如果您出于某种原因需要使用 v1.1,请设置 sslAlgorithm 属性:setSslAlgorithm("TLSv1.1")

Connecting to a Cluster

要连接到集群,可以在 CachingConnectionFactoryaddresses 属性中配置:

@Bean
public CachingConnectionFactory ccf() {
    CachingConnectionFactory ccf = new CachingConnectionFactory();
    ccf.setAddresses("host1:5672,host2:5672,host3:5672");
    return ccf;
}

从 3.0 版本开始,底层的连接工厂会尝试连接到一个主机,只要建立新连接就会选择一个随机地址。要恢复到从第一个到最后一个依次尝试连接的上一个行为,将 addressShuffleMode 属性设置为 AddressShuffleMode.NONE

从版本 2.3 开始,添加了 INORDER shuffle 模式,这意味着在创建连接后,第一个地址将移至末尾。如果希望使用 CacheMode.CONNECTION 和适当的并发性从所有节点上的所有分片进行消耗,则可能希望将此模式与 RabbitMQ Sharding Plugin 一起使用。

@Bean
public CachingConnectionFactory ccf() {
    CachingConnectionFactory ccf = new CachingConnectionFactory();
    ccf.setAddresses("host1:5672,host2:5672,host3:5672");
    ccf.setAddressShuffleMode(AddressShuffleMode.INORDER);
    return ccf;
}

Routing Connection Factory

从 1.3 版本开始,引入了 AbstractRoutingConnectionFactory。此工厂提供了一种机制,用于配置多个 ConnectionFactories 的映射,并在运行时通过某个 lookupKey 确定目标 ConnectionFactory。通常,实现会检查线程绑定的上下文。为方便起见,Spring AMQP 提供了 SimpleRoutingConnectionFactory,它从 SimpleResourceHolder 中获取当前线程绑定的 lookupKey。以下示例展示了如何在 XML 和 Java 中配置 SimpleRoutingConnectionFactory

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
    <property name="targetConnectionFactories">
        <map>
            <entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
            <entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
        </map>
    </property>
</bean>

<rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void service(String vHost, String payload) {
        SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
        rabbitTemplate.convertAndSend(payload);
        SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
    }

}

使用后取消绑定资源非常重要。有关详细信息,请参见 AbstractRoutingConnectionFactoryJavaDoc

从 1.4 版本开始,RabbitTemplate 支持 SpEL sendConnectionFactorySelectorExpressionreceiveConnectionFactorySelectorExpression 属性,在每一次 AMQP 协议交互操作(sendsendAndReceivereceivereceiveAndReply)中都会对其进行评估,从而解析出所提供的 AbstractRoutingConnectionFactorylookupKey 值。你可以在表达式中使用 Bean 引用,例如 @vHostResolver.getVHost(#root)。对于 send 操作,要发送的消息是根评估对象。对于 receive 操作,queueName 是根评估对象。

路由算法如下:如果选择器表达式为 null 或评估结果为 null 或者提供的 ConnectionFactory 不是 AbstractRoutingConnectionFactory 的实例,则一切都像以前一样,依赖于提供的 ConnectionFactory 实现。如果评估结果不是 null,但对于该 lookupKey 没有目标 ConnectionFactory,且 AbstractRoutingConnectionFactory 配置了 lenientFallback = true,则也会发生这种情况。在 AbstractRoutingConnectionFactory 的情况下,它会回退到基于 determineCurrentLookupKey()routing 实现。但是,如果 lenientFallback = false,则会抛出 IllegalStateException

命名空间支持还提供 <rabbit:template> 组件上的 send-connection-factory-selector-expressionreceive-connection-factory-selector-expression 属性。

此外,从版本 1.4 开始,您可以在侦听器容器中配置一个路由连接工厂。在这种情况下,队列名列表用作查找键。例如,如果您使用 setQueueNames("thing1", "thing2")`配置容器,则查找键为 `[thing1,thing]"(请注意,密钥中没有空格)。

从 1.6.9 版本开始,你可以通过在侦听器容器上使用 setLookupKeyQualifier 向查找键添加限定符。这样做会使你能够侦听同名但位于不同虚拟主机中的队列(你可以在其中为每个队列使用一个连接工厂)。

例如,对于查找键限定符 thing1`和侦听队列 `thing2`的容器,您用来注册目标连接工厂的查找键可以为 `thing1[thing2]

目标(和默认,如果提供)连接工厂必须具有发布确认和返回的相同设置。请参阅 Publisher Confirms and Returns

从 2.4.4 版本开始,可以禁用此验证。如果你需要确认和返回之间的值不相等的情况,则可以使用 AbstractRoutingConnectionFactory#setConsistentConfirmsReturns 关闭验证。请注意,第一个添加到 AbstractRoutingConnectionFactory 的连接工厂将确定 confirmsreturns 的常规值。

如果你需要检查一些消息的确认/返回而另一些消息则不需要,这可能会派上用场。例如:

@Bean
public RabbitTemplate rabbitTemplate() {
    final com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
    cf.setHost("localhost");
    cf.setPort(5672);

    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
    cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);

    PooledChannelConnectionFactory pooledChannelConnectionFactory = new PooledChannelConnectionFactory(cf);

    final Map<Object, ConnectionFactory> connectionFactoryMap = new HashMap<>(2);
    connectionFactoryMap.put("true", cachingConnectionFactory);
    connectionFactoryMap.put("false", pooledChannelConnectionFactory);

    final AbstractRoutingConnectionFactory routingConnectionFactory = new SimpleRoutingConnectionFactory();
    routingConnectionFactory.setConsistentConfirmsReturns(false);
    routingConnectionFactory.setDefaultTargetConnectionFactory(pooledChannelConnectionFactory);
    routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);

    final RabbitTemplate rabbitTemplate = new RabbitTemplate(routingConnectionFactory);

    final Expression sendExpression = new SpelExpressionParser().parseExpression(
            "messageProperties.headers['x-use-publisher-confirms'] ?: false");
    rabbitTemplate.setSendConnectionFactorySelectorExpression(sendExpression);
}

这样,带有头 x-use-publisher-confirms: true 的消息将通过缓存连接发送,并且你可以确保消息传递。有关确保消息传递的更多信息,请参见 Publisher Confirms and Returns

Queue Affinity and the LocalizedQueueConnectionFactory

在集群中使用高可用性 (HA) 队列时,为了获得最佳性能,你可能希望连接到领导队列所在的物理代理。CachingConnectionFactory 可以配置为具有多个代理地址。这是为了进行故障转移,客户端会尝试与已配置的 AddressShuffleMode 顺序相一致进行连接。LocalizedQueueConnectionFactory 使用管理插件提供的 REST API 来确定哪个节点是队列的领导者。然后,它会创建一个(或从缓存中检索)CachingConnectionFactory 来连接到该节点。如果连接失败,则确定新的领导节点,并且使用者连接到它。LocalizedQueueConnectionFactory 配置了一个默认的连接工厂,以防无法确定队列的物理位置,在这种情况下,它会像往常一样连接到集群。

LocalizedQueueConnectionFactoryRoutingConnectionFactory,而 SimpleMessageListenerContainer 使用队列名称作为查找键,正如上文 Routing Connection Factory 中所讨论的。

出于此原因(用于查找的队列名称),LocalizedQueueConnectionFactory 只能在将容器配置为侦听单个队列时使用。

必须在每个节点上启用 RabbitMQ 管理插件。

此连接工厂适用于长时间连接,例如 SimpleMessageListenerContainer 使用的连接。由于在建立连接之前调用 REST API 会产生开销,因此它不适用于短连接使用,例如与 RabbitTemplate 配合使用。此外,对于发布操作,队列是未知的,并且消息会发布到所有群集成员,因此查找节点的逻辑价值不大。

以下示例配置显示了如何配置工厂:

@Autowired
private ConfigurationProperties props;

@Bean
public CachingConnectionFactory defaultConnectionFactory() {
    CachingConnectionFactory cf = new CachingConnectionFactory();
    cf.setAddresses(this.props.getAddresses());
    cf.setUsername(this.props.getUsername());
    cf.setPassword(this.props.getPassword());
    cf.setVirtualHost(this.props.getVirtualHost());
    return cf;
}

@Bean
public LocalizedQueueConnectionFactory queueAffinityCF(
        @Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
    return new LocalizedQueueConnectionFactory(defaultCF,
            StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
            StringUtils.commaDelimitedListToStringArray(this.props.getAdminUris()),
            StringUtils.commaDelimitedListToStringArray(this.props.getNodes()),
            this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
            false, null);
}

请注意,前三个参数是 addressesadminUrisnodes 的数组。它们的位置很重要,即当容器尝试连接到队列时,它会使用管理 API 来确定哪个节点是队列的领导者,并连接到与该节点在同一数组位置的地址。

从版本 3.0 开始,RabbitMQ http-client 不再用于访问 Rest API。相反,如果 spring-webflux 在类路径上,则默认使用 WebClient from Spring Webflux;否则,则使用 RestTemplate

要在类路径中添加 WebFlux

Maven
<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
</dependency>
Gradle
compile 'org.springframework.amqp:spring-rabbit'

你还可以通过执行`LocalizedQueueConnectionFactory.NodeLocator`并覆盖它的`createClient`、``restCall`和(可选地)`close`方法,使用其他 REST 技术。

lqcf.setNodeLocator(new NodeLocator<MyClient>() {

    @Override
    public MyClient createClient(String userName, String password) {
        ...
    }

    @Override
    public HashMap<String, Object> restCall(MyClient client, URI uri) {
        ...
    });

});

框架提供了`WebFluxNodeLocator`和`RestTemplateNodeLocator`,其默认值如上所述。

Publisher Confirms and Returns

通过将`CachingConnectionFactory`属性`publisherConfirmType`设置为`ConfirmType.CORRELATED`,并将`publisherReturns`属性设置为`true`可以支持已确认(带相关性)和返回的消息。

当设置了这些选项时,工厂创建的`Channel`实例会包装在一个`PublisherCallbackChannel`中,该`PublisherCallbackChannel`用于促进回调。获得此类通道后,客户端可以在`Channel`中注册一个`PublisherCallbackChannel.Listener`。`PublisherCallbackChannel`实现包含将确认或返回路由到适当侦听器的逻辑。以下部分将进一步解释这些功能。

另请参阅 Scoped Operations 中的 Correlated Publisher Confirms and ReturnssimplePublisherConfirms

有关更多背景信息,请参阅 RabbitMQ 团队题为 Introducing Publisher Confirms 的博客文章。

Connection and Channel Listeners

连接工厂支持注册 ConnectionListenerChannelListener 实现。这使你可以接收连接和通道相关事件的通知。(连接建立后, ConnectionListener 用于 RabbitAdmin 执行声明 - 有关更多信息,请参见 Automatic Declaration of Exchanges, Queues, and Bindings )。以下清单显示了 ConnectionListener 接口定义:

@FunctionalInterface
public interface ConnectionListener {

    void onCreate(Connection connection);

    default void onClose(Connection connection) {
    }

    default void onShutDown(ShutdownSignalException signal) {
    }

}

从 2.0 版本开始,向`org.springframework.amqp.rabbit.connection.Connection`对象提供 `com.rabbitmq.client.BlockedListener`实例,以获取连接阻塞和取消阻塞事件的通知。以下示例显示了`ChannelListener`接口定义:

@FunctionalInterface
public interface ChannelListener {

    void onCreate(Channel channel, boolean transactional);

    default void onShutDown(ShutdownSignalException signal) {
    }

}

查询 Publishing is Asynchronous — How to Detect Successes and Failures,了解可能想要注册 `ChannelListener`的一种情况。

Logging Channel Close Events

1.5 版本引入了一种机制,使用户能够控制日志级别。

`AbstractConnectionFactory`使用默认策略以下列方式记录通道关闭:

  • 正常的频道关闭(200 OK)不会记录。

  • 如果由于被动队列声明失败导致一个频道关闭,则记录为 DEBUG 级别。

  • 如果由于专属消费者条件导致 basic.consume 被拒绝而关闭频道,则记录为 DEBUG 级别(3.1 版起,以前为 INFO 级别)。

  • 所有其他操作都已记录在 ERROR 级别。

要修改此行为,可以将自定义`ConditionalExceptionLogger`注入到`CachingConnectionFactory`的`closeExceptionLogger`属性中。

此外,`AbstractConnectionFactory.DefaultChannelCloseLogger`现在是公开的,允许对它进行子类化。

另请参见 Consumer Events

Runtime Cache Properties

从 1.6 版本开始,`CachingConnectionFactory`现在通过`getCacheProperties()`方法提供缓存统计信息。这些统计信息可用于调整缓存,以便在生产中对其进行优化。例如,高水位标记可用于确定是否应增加缓存大小。如果它等于缓存大小,则可以考虑进一步增加。下表描述了`CacheMode.CHANNEL`属性:

Table 1. Cache properties for CacheMode.CHANNEL
Property Meaning
 connectionName

ConnectionNameStrategy 生成的连接名称。

 channelCacheSize

当前配置的最大空闲通道数。

 localPort

连接的本地端口(如果可用)。这可用于与 RabbitMQ 管理界面上的连接和通道相关联。

 idleChannelsTx

当前闲置(已缓存)的事务通道数。

 idleChannelsNotTx

当前闲置(已缓存)的非事务通道数。

 idleChannelsTxHighWater

已同时闲置(已缓存)的最大事务通道数。

 idleChannelsNotTxHighWater

已同时闲置(已缓存)的最大非事务通道数。

下表描述了`CacheMode.CONNECTION`属性:

Table 2. Cache properties for CacheMode.CONNECTION
Property Meaning
 connectionName:&lt;localPort&gt;

ConnectionNameStrategy 生成的连接名称。

 openConnections

表示与代理服务器的连接的连接对象数。

 channelCacheSize

当前配置的最大空闲通道数。

 connectionCacheSize

当前配置的最大空闲连接数。

 idleConnections

当前闲置的连接数。

 idleConnectionsHighWater

已同时闲置的最大连接数。

 idleChannelsTx:&lt;localPort&gt;

当前连接的当前空闲(已缓存)的事务通道数。您可以使用属性名称的 localPort 部分与 RabbitMQ 管理界面中的连接和通道关联。

 idleChannelsNotTx:&lt;localPort&gt;

目前空闲(缓存)为此连接的非事务通道的数量。属性名称的 localPort 部分可用于与 RabbitMQ 管理界面的连接和通道相关联。

 idleChannelsTxHighWater:&lt;localPort&gt;

并发空闲(缓存)的事务通道的最大数量。属性名称的 localPort 部分可用于与 RabbitMQ 管理界面的连接和通道相关联。

 idleChannelsNotTxHighWater:&lt;localPort&gt;

并发空闲(缓存)的非事务通道的最大数量。您可以使用属性名称的 localPort 部分与 RabbitMQ 管理界面的连接和通道相关联。

还包括`cacheMode`属性(CHANNEL`或`CONNECTION)。

cacheStats
Figure 1. JVisualVM Example

RabbitMQ Automatic Connection/Topology recovery

从 Spring AMQP 的第一个版本开始,该框架已在自己内部提供连接和通道恢复以防代理故障。此外,正如在 Configuring the Broker中所讨论的,当连接重新建立时,RabbitAdmin`重新声明所有基础设施 bean(队列和其他队列)。因此,它不依赖于 `amqp-client`库现在提供的 auto-recovery。`amqp-client`在默认情况下已启用自动恢复。这两种恢复机制之间存在一些不兼容性,因此默认情况下,Spring 将底层 `RabbitMQ connectionFactory`上的 `automaticRecoveryEnabled`属性设置为 `false。即使属性为 true,Spring 也会立即关闭所有已恢复的连接,从而有效地禁用它。

默认情况下,仅将定义为 bean 的元素(队列、交换、绑定)在连接故障后重新声明。请参阅 Recovering Auto-Delete Declarations 更改该行为。