AMQP Abstractions

Spring AMQP 由两个模块组成(分发中每个模块都以 JAR 表示):spring-amqpspring-rabbit。“spring-amqp”模块包含 org.springframework.amqp.core 软件包。在该软件包中,你可以找到表示核心 AMQP “model” 的类。我们的目的是提供不依赖于任何特定 AMQP 代理实现或客户端库的通用抽象。最终用户代码可以跨供应商实现更便携,因为它只能针对抽象层开发。这些抽象随后由特定于代理的模块(例如“spring-rabbit”)实现。目前仅提供 RabbitMQ 实现。但是,除了 RabbitMQ 之外,这些抽象已在 .NET 中使用 Apache Qpid 进行了验证。由于 AMQP 在协议级别运行,因此原则上,你可以将 RabbitMQ 客户端与支持相同协议版本的任何代理一起使用,但我们目前不测试任何其他代理。 该概述假定您已经熟悉 AMQP 规范的基础知识。如果不熟悉,请参阅 Other Resources 处列出的资源。

Message

0-9-1 AMQP 规范未定义 Message 类或接口。相反,在执行诸如 basicPublish() 这样的操作时,内容作为字节数组参数传递,其他属性作为单独的参数传递。Spring AMQP 将 Message 类定义为更通用的 AMQP 域模型表示的一部分。Message 类的目的是将 body 和属性封装在单个实例中,以便 API 进而变得更简单。以下示例显示了 Message 类定义:

public class Message {

    private final MessageProperties messageProperties;

    private final byte[] body;

    public Message(byte[] body, MessageProperties messageProperties) {
        this.body = body;
        this.messageProperties = messageProperties;
    }

    public byte[] getBody() {
        return this.body;
    }

    public MessageProperties getMessageProperties() {
        return this.messageProperties;
    }
}

MessageProperties 接口定义了一些常见属性,例如“messageId”、“timestamp”、“contentType”等。你还可以通过调用 setHeader(String key, Object value) 方法使用自定义“headers” 来扩展这些属性。

1.5.71.6.111.7.42.0.0 版本开始,如果消息正文是序列化 Serializable java 对象,在执行 toString() 操作(如在日志消息中)时,它将不再被反序列化(默认为)。这是为了防止不安全的反序列化。默认情况下,只有 java.utiljava.lang 类会被反序列化。要恢复为之前的行为,您可以通过调用 Message.addAllowedListPatterns(…​) 来添加允许的类/包模式。简单的 wildcard is supported, for example com.something., *.MyClass。在日志消息中,不可反序列化的正文由 byte[<size>] 表示。

Exchange

Exchange 接口表示 AMQP Exchange,这是消息生产者发送到的内容。代理的虚拟主机中的每个 Exchange 都具有唯一名称以及其他一些属性。以下示例显示了 Exchange 接口:

public interface Exchange {

    String getName();

    String getExchangeType();

    boolean isDurable();

    boolean isAutoDelete();

    Map<String, Object> getArguments();

}

正如您所见,Exchange 也有一个“类型”,由 ExchangeTypes 中定义的常量表示。基本类型为:directtopicfanout`和 `headers。在核心包中,您可以找到针对上述每种类型的 Exchange 接口的实现。这些 Exchange 类型在绑定到队列的方式上有所不同。例如,Direct 交换允许队列使用固定的路由键(通常是队列的名称)进行绑定。Topic 交换支持绑定路由模式,其中可能包含通配符“*”和“#”,分别表示“恰好一个”和“零个或多个”。`Fanout`交换将发布到绑定到它的所有队列,且不考虑任何路由键。有关这些值以及其他交换类型的信息,请参阅 Other Resources

AMQP 规范还需要任何代理提供一个没有名称的 “default” 直接交换。声明的所有队列都绑定到该默认 Exchange,其名称为路由键。您可以在 xref:amqp/template.adoc[AmqpTemplate 中了解有关在 Spring AMQP 中使用默认交换的更多信息。

Queue

Queue 类表示消息使用者接收消息的组件。与各种 Exchange 类一样,我们的实现旨在成为此核心 AMQP 类型的抽象表示。以下清单显示了 Queue 类:

public class Queue  {

    private final String name;

    private volatile boolean durable;

    private volatile boolean exclusive;

    private volatile boolean autoDelete;

    private volatile Map<String, Object> arguments;

    /**
     * The queue is durable, non-exclusive and non auto-delete.
     *
     * @param name the name of the queue.
     */
    public Queue(String name) {
        this(name, true, false, false);
    }

    // Getters and Setters omitted for brevity

}

请注意,构造函数采用队列名称。根据实现,管理模板可能会提供用于生成唯一命名队列的方法。此类队列可用作“reply-to”地址或其他*临时*情况下。因此,自动生成队列的“exclusive”和“autoDelete”属性都将设置为“true”。

请参阅 Configuring the Broker 中关于队列的章节,了解有关使用命名空间支持声明队列(包括队列参数)的信息。

Binding

鉴于生产者发送到 exchange 并且使用者从队列接收,因此将队列连接到 exchange 的绑定对于通过消息传送连接生产者和使用者至关重要。在 Spring AMQP 中,我们定义了一个 Binding 类来表示这些连接。本节回顾了将队列绑定到 exchange 的基本选项。

你可以使用固定的路由密钥将队列绑定到 DirectExchange,如下例所示:

new Binding(someQueue, someDirectExchange, "foo.bar");

你还可以使用路由模式将队列绑定到 TopicExchange,如下例所示:

new Binding(someQueue, someTopicExchange, "foo.*");

你可以使用没有路由密钥将队列绑定到 FanoutExchange,如下例所示:

new Binding(someQueue, someFanoutExchange);

我们还提供了 BindingBuilder 以便于使用“流畅 API”风格,如以下示例所示:

Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");

为了清楚起见,前面的示例展示了 BindingBuilder 类,但此样式在对“bind()”方法使用静态导入时也适用。

Binding 类的实例本身只保存有关连接的数据。换句话说,它不是 “active” 组件。但是,正如您将在 Configuring the Broker 中看到的,AmqpAdmin 类可以使用 Binding 实例在 Broker 上实际触发绑定操作。此外,正如您在同一段落中看到的,您可以使用 Spring 的 @Bean 标记在 @Configuration 类中定义 Binding 实例。还有一个便捷的基本类,它进一步简化了用于生成与 AMQP 相关 Bean 定义的方法,并识别队列、交换和绑定,以便它们在应用程序启动时全部在 AMQP Broker 上声明。

AmqpTemplate 也在核心包中定义。作为实际 AMQP 消息传递所涉及的主要组件之一,它在其自己的部分中进行了详细讨论(请参阅 AmqpTemplate)。