Configuring the Broker

public interface AmqpAdmin { // Exchange Operations void declareExchange(Exchange exchange); void deleteExchange(String exchangeName); // Queue Operations Queue declareQueue(); String declareQueue(Queue queue); void deleteQueue(String queueName); void deleteQueue(String queueName, boolean unused, boolean empty); void purgeQueue(String queueName, boolean noWait); // Binding Operations void declareBinding(Binding binding); void removeBinding(Binding binding); Properties getQueueProperties(String queueName); }

AMQP 规范描述了如何使用该协议在经纪人上配置队列、交换机和绑定。这些操作(从 0.8 规范及更高版本可移植)存在于`org.springframework.amqp.core`包中的`AmqpAdmin`接口中。该类的 RabbitMQ 实现是位于`org.springframework.amqp.rabbit.core`包中的`RabbitAdmin`。 `AmqpAdmin`接口基于使用 Spring AMQP 域抽象,并显示在以下列表中:

public interface AmqpAdmin {

    // Exchange Operations

    void declareExchange(Exchange exchange);

    void deleteExchange(String exchangeName);

    // Queue Operations

    Queue declareQueue();

    String declareQueue(Queue queue);

    void deleteQueue(String queueName);

    void deleteQueue(String queueName, boolean unused, boolean empty);

    void purgeQueue(String queueName, boolean noWait);

    // Binding Operations

    void declareBinding(Binding binding);

    void removeBinding(Binding binding);

    Properties getQueueProperties(String queueName);

}

另请参阅 Scoped OperationsgetQueueProperties() 方法会返回一些关于队列的有限信息(消息数量和使用者数量)。可在 RabbitTemplate(QUEUE_NAME,QUEUE_MESSAGE_COUNT`和 `QUEUE_CONSUMER_COUNT) 中以常数形式获取已返回属性的键。RabbitMQ REST APIQueueInfo 对象中提供了更多信息。 无参数 declareQueue() 方法使用自动生成的名称在代理上定义队列。此自动生成队列的其他属性为 exclusive=trueautoDelete=truedurable=falsedeclareQueue(Queue queue) 方法采用一个 Queue 对象并返回已声明队列的名称。如果所提供 Queue 的 name 属性为空 String,代理将声明带有生成名称的队列。该名称被返回给调用方。该名称也添加到 Queue 的 actualName 属性中。你只能通过直接调用 RabbitAdmin 编程方式使用此功能。当使用自动声明由管理员在应用程序上下文中声明队列时,你可以将名称属性设置为 ""(空字符串)。随后,代理会创建该名称。从版本 2.1 开始,侦听器容器可以使用此类型的队列。有关更多信息,请参阅容器和代理命名的队列。 这与 AnonymousQueue 形成对比,其中框架会生成一个唯一(UUID)名称并设置 durablefalseexclusiveautoDeletetrue。一个带有空(或丢失)name 属性的 <rabbit:queue/> 总会创建一个 AnonymousQueue。 请参阅 AnonymousQueue,了解为何优先选择 AnonymousQueue 而不是经纪人生成的队列名称,以及如何控制名称格式。从 2.1 版开始,会将匿名队列声明为默认情况下将参数 Queue.X_QUEUE_LEADER_LOCATOR 设置为 client-local。这可确保在向其连接应用程序的节点上声明队列。声明性队列必须具有固定名称,因为它们可能在上下文中被其他位置引用,例如在以下示例中显示的侦听器中:

<rabbit:listener-container>
    <rabbit:listener ref="listener" queue-names="#{someQueue.name}" />
</rabbit:listener-container>

请参阅 Automatic Declaration of Exchanges, Queues, and Bindings。 RabbitMQ 的此接口实现是 RabbitAdmin,它在使用 Spring XML 配置时类似于以下示例:

<rabbit:connection-factory id="connectionFactory"/>

<rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>

当 CachingConnectionFactory 缓存模式是 CHANNEL(默认)时,RabbitAdmin 实现会自动延迟声明在同一 ApplicationContext 中声明的队列、交换机和绑定。这些组件会在向代理打开连接后立即声明。有一些名称空间功能让这非常方便,例如在 Stocks 示例应用程序中,我们有以下内容:

<rabbit:queue id="tradeQueue"/>

<rabbit:queue id="marketDataQueue"/>

<fanout-exchange name="broadcast.responses"
                 xmlns="http://www.springframework.org/schema/rabbit">
    <bindings>
        <binding queue="tradeQueue"/>
    </bindings>
</fanout-exchange>

<topic-exchange name="app.stock.marketdata"
                xmlns="http://www.springframework.org/schema/rabbit">
    <bindings>
        <binding queue="marketDataQueue" pattern="${stocks.quote.pattern}"/>
    </bindings>
</topic-exchange>

在前面的示例中,我们使用匿名队列(实际上在内部,只是具有由框架生成名称的队列,不是由代理生成)并通过 ID 引用它们。我们还可以声明具有显式名称的队列,它也可以用作其在上下文中的 bean 定义的标识符。以下示例配置一个具有显式名称的队列:

<rabbit:queue name="stocks.trade.queue"/>

你可以提供 idname 属性。这让你可以通过与队列名称无关的 ID 来引用队列(例如,在绑定中)。它还允许标准 Spring 特性(例如,队列名称的属性占位符和 SpEL 表达式)。当使用名称作为 bean 标识符时,这些特性不可用。

队列可以使用额外的参数进行配置,例如 x-message-ttl。当你使用名称空间支持时,它们以参数名/参数值对的形式提供,使用 <rabbit:queue-arguments> 元素进行定义。以下示例演示如何执行此操作:

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="myDLX"/>
        <entry key="x-dead-letter-routing-key" value="dlqRK"/>
    </rabbit:queue-arguments>
</rabbit:queue>

默认情况下,假设参数是字符串。对于其他类型的参数,你必须提供类型。以下示例演示如何指定类型:

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments value-type="java.lang.Long">
        <entry key="x-message-ttl" value="100"/>
    </rabbit:queue-arguments>
</rabbit:queue>

当提供混合类型的参数时,你必须为每个条目元素提供类型。以下示例演示如何执行此操作:

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl">
            <value type="java.lang.Long">100</value>
        </entry>
        <entry key="x-dead-letter-exchange" value="myDLX"/>
        <entry key="x-dead-letter-routing-key" value="dlqRK"/>
    </rabbit:queue-arguments>
</rabbit:queue>

使用 Spring Framework 3.2 及更高版本,可以更简洁地声明此内容,如下所示:

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>
        <entry key="x-ha-policy" value="all"/>
    </rabbit:queue-arguments>
</rabbit:queue>

当你使用 Java 配置时,Queue.X_QUEUE_LEADER_LOCATOR 参数作为一类属性通过 Queue 类上的 setLeaderLocator() 方法得到支持。从版本 2.1 开始,匿名队列使用此属性默认设置为 client-local 进行声明。这确保在应用程序连接到的节点上声明队列。

RabbitMQ 代理不允许声明具有不匹配参数的队列。例如,如果 queue 已经存在且没有 time to live 参数,并且你尝试使用(例如)key="x-message-ttl" value="100" 来声明它,则会引发异常。

默认情况下,当出现任何异常时,RabbitAdmin 会立即停止处理所有声明。这可能会导致下游问题,例如侦听器容器无法初始化,因为另一个队列(在出错的队列之后定义的队列)未声明。 可以通过将 ignore-declaration-exceptions 属性设置为 true 在 RabbitAdmin 实例上修改此行为。此选项指示 RabbitAdmin 记录异常并继续声明其他元素。当使用 Java 配置 RabbitAdmin 时,此属性称为 ignoreDeclarationExceptions。这是一个应用于所有元素的全局设置。队列、交换机和绑定具有类似的仅应用于这些元素的属性。 在版本 1.6 之前,此属性仅在通道上发生 IOException 时才生效,例如当当前和所需属性之间不匹配时。现在,此属性对任何异常都有效,包括 TimeoutException。 此外,任何声明异常都会导致发布 DeclarationExceptionEvent,它是一个 ApplicationEvent,可以由上下文中任何 ApplicationListener 使用。该事件包含对 admin、正在声明的元素和 Throwable 的引用。

Headers Exchange

从版本 1.3 开始,你可以配置 HeadersExchange 以匹配多个标头。你也可以指定是否必须匹配任何或所有标头。以下示例演示如何执行此操作:

<rabbit:headers-exchange name="headers-test">
    <rabbit:bindings>
        <rabbit:binding queue="bucket">
            <rabbit:binding-arguments>
                <entry key="foo" value="bar"/>
                <entry key="baz" value="qux"/>
                <entry key="x-match" value="all"/>
            </rabbit:binding-arguments>
        </rabbit:binding>
    </rabbit:bindings>
</rabbit:headers-exchange>

从版本 1.6 开始,你可以使用 internal 标志(默认为 false)配置 Exchanges,此类 Exchange 通过 RabbitAdmin 正确配置在代理上(如果在应用程序上下文中存在)。如果交换机的 internal 标志为 true,RabbitMQ 不会允许客户端使用该交换机。这对于死信交换机或交换机到交换机绑定非常有用,其中你不希望发布者直接使用该交换机。

要了解如何使用 Java 配置 AMQP 基础架构,请查看 Stock 示例应用程序,其中有 @ConfigurationAbstractStockRabbitConfiguration,它反过来又包含 RabbitClientConfigurationRabbitServerConfiguration 子类。以下清单显示了 AbstractStockRabbitConfiguration 的代码:

@Configuration
public abstract class AbstractStockAppRabbitConfiguration {

    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory =
            new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMessageConverter(jsonMessageConverter());
        configureRabbitTemplate(template);
        return template;
    }

    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public TopicExchange marketDataExchange() {
        return new TopicExchange("app.stock.marketdata");
    }

    // additional code omitted for brevity

}

在 Stock 应用程序中,使用以下 @Configuration 类配置服务器:

@Configuration
public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration  {

    @Bean
    public Queue stockRequestQueue() {
        return new Queue("app.stock.request");
    }
}

这是所有 @Configuration 类的继承链的终点。最终结果是启动应用程序时向代理程序声明了 TopicExchangeQueue。在服务器配置中没有将 TopicExchange 绑定到队列,因为这是在客户端应用程序中完成的。但是,股票请求队列会自动绑定到 AMQP 默认交换机。这种行为由规范定义。

客户端 @Configuration 类更有意思。以下是其声明:

@Configuration
public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration {

    @Value("${stocks.quote.pattern}")
    private String marketDataRoutingKey;

    @Bean
    public Queue marketDataQueue() {
        return amqpAdmin().declareQueue();
    }

    /**
     * Binds to the market data exchange.
     * Interested in any stock quotes
     * that match its routing key.
     */
    @Bean
    public Binding marketDataBinding() {
        return BindingBuilder.bind(
                marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
    }

    // additional code omitted for brevity

}

客户端通过 AmqpAdmin 上的 declareQueue() 方法声明另一个队列。它使用属性文件中外部化的路由模式将该队列绑定到市场数据交换机。

Builder API for Queues and Exchanges

版本 1.6 引入了用于使用 Java 配置配置 QueueExchange 对象的便捷 fluent API。以下示例展示了如何使用它:

@Bean
public Queue queue() {
    return QueueBuilder.nonDurable("foo")
        .autoDelete()
        .exclusive()
        .withArgument("foo", "bar")
        .build();
}

@Bean
public Exchange exchange() {
  return ExchangeBuilder.directExchange("foo")
      .autoDelete()
      .internal()
      .withArgument("foo", "bar")
      .build();
}

从版本 2.0 开始,ExchangeBuilder 现在默认创建持久交换机,以与单个 AbstractExchange 类上的简单构造函数保持一致。要使用构建器制作一个非持久交换机,请在调用 .build() 之前使用 .durable(false)。不再提供不带参数的 durable() 方法。

版本 2.2 引入了 fluent API 以添加“众所周知”的交换机和队列参数…

@Bean
public Queue allArgs1() {
    return QueueBuilder.nonDurable("all.args.1")
            .ttl(1000)
            .expires(200_000)
            .maxLength(42)
            .maxLengthBytes(10_000)
            .overflow(Overflow.rejectPublish)
            .deadLetterExchange("dlx")
            .deadLetterRoutingKey("dlrk")
            .maxPriority(4)
            .lazy()
            .leaderLocator(LeaderLocator.minLeaders)
            .singleActiveConsumer()
            .build();
}

@Bean
public DirectExchange ex() {
    return ExchangeBuilder.directExchange("ex.with.alternate")
            .durable(true)
            .alternate("alternate")
            .build();
}

Declaring Collections of Exchanges, Queues, and Bindings

你可以用 Declarables 对象包装 Declarable 对象(QueueExchangeBinding)的集合。RabbitAdmin 在应用程序上下文中检测到此类 Bean(以及离散的 Declarable Bean),并在建立连接(最初在连接失败之后)时在代理程序上声明包含的对象。以下示例演示了如何执行此操作:

@Configuration
public static class Config {

    @Bean
    public CachingConnectionFactory cf() {
        return new CachingConnectionFactory("localhost");
    }

    @Bean
    public RabbitAdmin admin(ConnectionFactory cf) {
        return new RabbitAdmin(cf);
    }

    @Bean
    public DirectExchange e1() {
        return new DirectExchange("e1", false, true);
    }

    @Bean
    public Queue q1() {
        return new Queue("q1", false, false, true);
    }

    @Bean
    public Binding b1() {
        return BindingBuilder.bind(q1()).to(e1()).with("k1");
    }

    @Bean
    public Declarables es() {
        return new Declarables(
                new DirectExchange("e2", false, true),
                new DirectExchange("e3", false, true));
    }

    @Bean
    public Declarables qs() {
        return new Declarables(
                new Queue("q2", false, false, true),
                new Queue("q3", false, false, true));
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public Declarables prototypes() {
        return new Declarables(new Queue(this.prototypeQueueName, false, false, true));
    }

    @Bean
    public Declarables bs() {
        return new Declarables(
                new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
                new Binding("q3", DestinationType.QUEUE, "e3", "k3", null));
    }

    @Bean
    public Declarables ds() {
        return new Declarables(
                new DirectExchange("e4", false, true),
                new Queue("q4", false, false, true),
                new Binding("q4", DestinationType.QUEUE, "e4", "k4", null));
    }

}

在 2.1 之前版本中,可以通过定义输入 Collection<Declarable> 的 bean 来声明多个 Declarable 实例。这在某些情况下会导致不良的副作用,因为管理员必须遍历所有 Collection<?> bean。

版本 2.2 向 Declarables 添加了 getDeclarablesByType 方法;这可以作为一种便利方法,例如在声明侦听器容器 Bean 时使用。

public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
        Declarables mixedDeclarables, MessageListener listener) {

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setQueues(mixedDeclarables.getDeclarablesByType(Queue.class).toArray(new Queue[0]));
    container.setMessageListener(listener);
    return container;
}

Conditional Declaration

默认情况下,所有队列、交换机和绑定都由应用程序上下文中所有 RabbitAdmin 实例声明(假设它们具有 auto-startup="true")。

从版本 2.1.9 开始,RabbitAdmin 有了一个新属性 explicitDeclarationsOnly(默认为 false);当将其设置为 true 时,管理程序将只声明由该管理程序明确配置为声明的 Bean。

从 1.2 版本开始,你可以有条件地声明这些元素。这在应用程序连接到多个代理并需要指定在哪些代理中应该声明特定元素时特别有用。

表示这些元素的类实现 Declarable,它有两个方法:shouldDeclare()getDeclaringAdmins()RabbitAdmin 使用这些方法来确定特定实例是否应该在其 Connection 上实际处理声明。

这些属性在命名空间中作为属性可用,如下例所示:

<rabbit:admin id="admin1" connection-factory="CF1" />

<rabbit:admin id="admin2" connection-factory="CF2" />

<rabbit:admin id="admin3" connection-factory="CF3" explicit-declarations-only="true" />

<rabbit:queue id="declaredByAdmin1AndAdmin2Implicitly" />

<rabbit:queue id="declaredByAdmin1AndAdmin2" declared-by="admin1, admin2" />

<rabbit:queue id="declaredByAdmin1Only" declared-by="admin1" />

<rabbit:queue id="notDeclaredByAllExceptAdmin3" auto-declare="false" />

<rabbit:direct-exchange name="direct" declared-by="admin1, admin2">
    <rabbit:bindings>
        <rabbit:binding key="foo" queue="bar"/>
    </rabbit:bindings>
</rabbit:direct-exchange>

默认情况下 auto-declare 属性是 true ,且如果未提供(或为空) declared-by ,则所有 RabbitAdmin 实例声明该对象(只要管理员的 auto-startup 属性为 true,这是默认,且管理员的 explicit-declarations-only 属性为 false)。

同样,你可以使用基于 Java 的 @Configuration 来实现相同的效果。在以下示例中,组件由 admin1 声明,但未由 admin2 声明:

@Bean
public RabbitAdmin admin1() {
    return new RabbitAdmin(cf1());
}

@Bean
public RabbitAdmin admin2() {
    return new RabbitAdmin(cf2());
}

@Bean
public Queue queue() {
    Queue queue = new Queue("foo");
    queue.setAdminsThatShouldDeclare(admin1());
    return queue;
}

@Bean
public Exchange exchange() {
    DirectExchange exchange = new DirectExchange("bar");
    exchange.setAdminsThatShouldDeclare(admin1());
    return exchange;
}

@Bean
public Binding binding() {
    Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
    binding.setAdminsThatShouldDeclare(admin1());
    return binding;
}

A Note On the id and name Attributes

<rabbit:queue/><rabbit:exchange/> 元素上的 name 属性反映代理中的实体名称。对于队列,如果省略 name,则会创建一个匿名队列(参见 xref:amqp/broker-configuration.adoc#anonymous-queue[AnonymousQueue)。

在 2.0 之前的版本中,name 也被注册为 Bean 名称别名(类似于 <bean/> 元素上的 name)。

这导致了两个问题:

  • 阻止声明拥有相同名称的队列和交换机。

  • 如果别名中包含 SpEL 表达式(#{&#8230;&#8203;}),则该表达式不会被解析。

从版本 2.0 开始,如果使用 idname 属性声明其中一个元素,则不再将名称声明为 Bean 名称别名。如果你希望声明具有相同 name 的队列和交换机,则必须提供 id

如果没有 name 属性,不会有任何更改。仍然可以通过 name 引用 bean,例如在 binding 声明中。但是,名称包含 SpEL 时仍然无法引用它的情况依然存在 — 你必须提供一个 id 以用于引用目的。

AnonymousQueue

通常,当你需要一个唯一的名称、排他的自动删除队列时,我们建议你使用 AnonymousQueue,而不是使用 Message Broker 定义的队列名称(将 Queue 名称用作 "" 会导致 Message Broker 生成队列名称)。

这是因为:

  1. 当与代理的连接建立后,队列才会实际声明。这在创建并连接 Bean 之后很长的时间。使用队列的 Bean 需要知道其名称。实际上,在启动应用程序时代理甚至可能未运行。

  2. 如果由于某种原因断开与代理的连接,那么管理员将重新声明拥有相同名称的 AnonymousQueue。如果我们使用代理声明的队列,那么队列名称将更改。

你可以控制 AnonymousQueue 实例使用的队列名称的格式。

默认情况下,队列名称添加 spring.gen- 前缀,后跟 UUID 的 base64 表示形式,例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g

你可以在构造函数参数中提供一个 AnonymousQueue.NamingStrategy 实现。以下示例展示了如何操作:

@Bean
public Queue anon1() {
    return new AnonymousQueue();
}

@Bean
public Queue anon2() {
    return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("something-"));
}

@Bean
public Queue anon3() {
    return new AnonymousQueue(AnonymousQueue.UUIDNamingStrategy.DEFAULT);
}

第一个 bean 生成的队列名称添加 spring.gen- 前缀,后跟 UUID 的 base64 表示形式,例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g。第二个 bean 生成的队列名称添加 something- 前缀,后跟 UUID 的 base64 表示形式。第三个 bean 生成的名称仅使用 UUID(没有 base64 转换),例如:f20c818a-006b-4416-bf91-643590fedb0e

base64 编码使用 RFC 4648 中的“URL 和文件名安全字母表”("URL and Filename Safe Alphabet")。尾部填充字符(=)会被删除。

你可以提供你自己的命名策略,从而将其他信息(例如应用程序名称或客户端主机)包含在队列名称中。

在使用 XML 配置时,你可以指定命名策略。<rabbit:queue> 元素上存在 naming-strategy 属性,该元素可以用于实现 AnonymousQueue.NamingStrategy 的 bean 引用。以下示例展示了如何以各种方式指定命名策略:

<rabbit:queue id="uuidAnon" />

<rabbit:queue id="springAnon" naming-strategy="uuidNamer" />

<rabbit:queue id="customAnon" naming-strategy="customNamer" />

<bean id="uuidNamer" class="org.springframework.amqp.core.AnonymousQueue.UUIDNamingStrategy" />

<bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
    <constructor-arg value="custom.gen-" />
</bean>

第一个示例创建类似于 spring.gen-MRBv9sqISkuCiPfOYfpo4g 的名称。第二个示例创建带有 UUID 字符串表示形式的名称。第三个示例创建类似于 custom.gen-MRBv9sqISkuCiPfOYfpo4g 的名称。

你还可以提供你自己的命名策略 bean。

从 2.1 版本开始,匿名队列的声明会将参数 Queue.X_QUEUE_LEADER_LOCATOR 默认设置为 client-local。这样可以确保队列在应用程序连接到的节点上声明。你在构建实例之后,可以调用 queue.setLeaderLocator(null) 来还原为之前的行为。

Recovering Auto-Delete Declarations

通常,RabbitAdmin 仅恢复在应用程序上下文中声明为 bean 的队列/交换/绑定;如果任何此类声明是自动删除的,那么它们会在丢失连接时被 Message Broker 删除。当重新建立连接时,admin 将重新声明实体。通常,调用 admin.declareQueue(…​), admin.declareExchange(…​)admin.declareBinding(…​) 不会恢复实体。

从 2.4 版本开始,admin 有一个新的属性 redeclareManualDeclarations;当它为 true 时,admin 将恢复应用程序上下文中 bean 之外的这些实体。

如果调用 deleteQueue(…​), deleteExchange(…​)removeBinding(…​),则不会执行单独声明的恢复。删除队列和交换时,关联的绑定也会从可恢复实体中删除。

最后,调用 resetAllManualDeclarations() 会阻止恢复任何先前声明的实体。