AMQP

高级消息队列协议 (AMQP) 是一个与平台无关的线级协议,用于面向消息的中间件。Spring AMQP 项目将 Spring 核心概念应用于基于 AMQP 的消息解决方案的开发。Spring Boot 为通过 RabbitMQ 使用 AMQP 提供多种便利,包括 spring-boot-starter-amqp “Starter”。

RabbitMQ Support

RabbitMQ 是一个基于 AMQP 协议的轻量级、可靠、可扩展且可移植的消息代理。Spring 使用 RabbitMQ 通过 AMQP 协议实现通信。

RabbitMQ 配置由 spring.rabbitmq.* 中的外部配置属性控制。例如,您可以在 application.properties 中声明以下部分:

spring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "admin"
    password: "secret"

或者,您可以使用 addresses 属性配置相同的连接:

spring:
  rabbitmq:
    addresses: "amqp://admin:secret@localhost"

以这种方式指定地址时,将忽略 hostport 属性。如果地址使用 amqps 协议,则 SSL 支持将自动启用。

有关更多受支持的基于属性的配置选项,请参阅 {code-spring-boot-autoconfigure-src}/amqp/RabbitProperties.java[RabbitProperties]。要配置 Spring AMQP 使用的 RabbitMQ ConnectionFactory 的更低级别详细信息,请定义一个 ConnectionFactoryCustomizer bean。

如果上下文中存在 ConnectionNameStrategy bean,它将被自动用于命名由自动配置的 CachingConnectionFactory 创建的连接。

要对 RabbitTemplate 进行全应用程序的附加自定义,请使用一个 RabbitTemplateCustomizer bean。

有关更多详细信息,请参见 Understanding AMQP, the protocol used by RabbitMQ

Sending a Message

Spring 的 AmqpTemplateAmqpAdmin 已自动配置,您可以将它们直接自动装配到您自己的 bean 中,如下例所示:

{url-spring-amqp-javadoc}/rabbit/core/RabbitMessagingTemplate.html[RabbitMessagingTemplate] 可以以类似的方式注入。如果定义了 MessageConverter bean,它将自动关联到自动配置的 AmqpTemplate

如果需要,作为 bean 定义的任何 org.springframework.amqp.core.Queue 都将自动用于在 RabbitMQ 实例上声明一个对应的队列。

要重试操作,你可以启用 AmqpTemplate 上的重试(例如,在代理连接丢失时):

spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: "2s"

重试默认处于禁用状态。你也可以通过声明一个 RabbitRetryTemplateCustomizer bean 以编程方式定制 RetryTemplate

如果您需要创建更多 RabbitTemplate 实例,或者您希望覆盖默认值,Spring Boot 提供了一个 RabbitTemplateConfigurer bean,您可以使用它来初始化一个 RabbitTemplate,其具有与自动配置所用的工厂相同的设置。

Sending a Message To A Stream

若要向特定流发送消息,请按以下示例所示指定流的名称:

spring:
  rabbitmq:
    stream:
      name: "my-stream"

如果已经定义了一个 MessageConverterStreamMessageConverter`或 `ProducerCustomizer bean,那么它会自动关联到自动配置的 RabbitStreamTemplate

如果您需要创建更多 RabbitStreamTemplate 实例,或者您希望覆盖默认值,Spring Boot 提供了一个 RabbitStreamTemplateConfigurer bean,您可以使用它来初始化一个 RabbitStreamTemplate,其具有与自动配置所用的工厂相同的设置。

Receiving a Message

当 Rabbit 基础设施存在时,可以通过 @RabbitListener`对任何 bean 进行注释以创建侦听器端点。如果尚未定义 `RabbitListenerContainerFactory,将自动配置一个默认的 SimpleRabbitListenerContainerFactory,并且您可以使用 configprop:spring.rabbitmq.listener.type[] 属性切换到直接容器。如果已经定义了一个 MessageConverterMessageRecoverer bean,它将自动关联到默认工厂。

以下示例组件在 someQueue 队列上创建一个侦听器端点:

有关更多详细信息,请参见 {url-spring-amqp-javadoc}/rabbit/annotation/EnableRabbit.html[ @EnableRabbit 的 Javadoc]。

如果您需要创建更多 RabbitListenerContainerFactory 实例,或者您希望覆盖默认值,Spring Boot 提供了一个 SimpleRabbitListenerContainerFactoryConfigurer 和一个 DirectRabbitListenerContainerFactoryConfigurer,您可以使用它们来初始化一个 SimpleRabbitListenerContainerFactory 和一个 DirectRabbitListenerContainerFactory,其具有与自动配置所用的工厂相同的设置。

无论您选择了哪种容器类型,都不重要。这两个 bean 都由自动配置公开。

例如,以下配置类公开另一个使用特定 MessageConverter 的工厂:

然后,您可以在任何带有 @RabbitListener 注释的方法中使用该工厂,如下所示:

您可以启用重试以处理侦听器抛出异常的情况。默认情况下,使用 RejectAndDontRequeueRecoverer,但您可以自定义自己的 MessageRecoverer。当重试用尽时,消息会被拒绝,如果代理配置了这样做,则会丢弃该消息或将其路由到死信交换。默认情况下,重试处于禁用状态。您也可以通过声明一个 RabbitRetryTemplateCustomizer bean 以编程方式定制 RetryTemplate

默认情况下,如果禁用了重试,并且侦听器抛出了异常,则会无限期地重试传递。您可以通过两种方式修改此行为:将 defaultRequeueRejected 属性设置为 false,以便尝试零次重新传递,或者抛出一个 AmqpRejectAndDontRequeueException 以指示该消息应该被拒绝。在启用重试并且达到最大传递尝试次数时,后者是所用的机制。