Annotation-driven Listener Endpoints

以异步方式接收消息最简单的方法是使用注解监听器端点基础架构。简而言之,它让您将受管 Bean 的方法作为 Rabbit 监听器端点公开。以下示例展示了如何使用 @RabbitListener 注解:

@Component
public class MyService {

    @RabbitListener(queues = "myQueue")
    public void processOrder(String data) {
        ...
    }

}

上述示例的想法就是,每当名为 myQueue 的队列有消息可用时,processOrder 方法就会被相应调用(在本例中,使用消息的有效负载进行调用)。

注解端点基础架构在幕后为每个注释的方法创建一个消息监听器容器,方法是使用 RabbitListenerContainerFactory

在上述示例中,myQueue 必须已存在并且绑定到某个交换机。只要应用程序上下文中存在 RabbitAdmin,就可以自动声明和绑定队列。

属性占位符 (${some.property}) 或 SpEL 表达式 (#{someExpression}) 可为注释属性 (queues 等)指定。请参阅 Listening to Multiple Queues 了解您可能为何使用 SpEL 而不是属性占位符的示例。以下清单显示了声明 Rabbit 侦听器的三个示例:

@Component
public class MyService {

  @RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "myQueue", durable = "true"),
        exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
        key = "orderRoutingKey")
  )
  public void processOrder(Order order) {
    ...
  }

  @RabbitListener(bindings = @QueueBinding(
        value = @Queue,
        exchange = @Exchange(value = "auto.exch"),
        key = "invoiceRoutingKey")
  )
  public void processInvoice(Invoice invoice) {
    ...
  }

  @RabbitListener(queuesToDeclare = @Queue(name = "${my.queue}", durable = "true"))
  public String handleWithSimpleDeclare(String data) {
      ...
  }

}

在第一个示例中,如果需要,将自动声明队列 myQueue(持久)连同交换机,并使用路由键绑定到交换机。在第二个示例中,将声明和绑定一个匿名(独占、自动删除)队列;队列名称由框架使用 `Base64UrlNamingStrategy`创建。您无法使用此技术声明由代理命名的队列;它们需要声明为 bean 定义;请参阅 Containers and Broker-Named queues。可以提供多个 `QueueBinding`条目,让侦听器侦听多个队列。在第三个示例中,将声明具有从属性 `my.queue`检索的名称的队列(如果需要),并使用队列名称作为路由键,使用默认绑定绑定到默认交换机。

从 2.0 版本开始,`@Exchange`注释支持任何交换机类型,包括自定义交换机类型。有关详细信息,请参阅 AMQP Concepts

当您需要更高级的配置时,您可以使用常规的 @Bean 定义。

请注意第一个示例中交换机上的 ignoreDeclarationExceptions。这允许例如绑定到可能具有不同设置(如 internal)的现有交换机。默认情况下,现有交换机的属性必须匹配。

从 2.0 版本开始,您现在可以使用多个路由键将队列绑定到交换机,如下例所示:

...
    key = { "red", "yellow" }
...

您还可以为队列、交换机和绑定指定 @QueueBinding 注释中的参数,如下例所示:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "auto.headers", autoDelete = "true",
                        arguments = @Argument(name = "x-message-ttl", value = "10000",
                                                type = "java.lang.Integer")),
        exchange = @Exchange(value = "auto.headers", type = ExchangeTypes.HEADERS, autoDelete = "true"),
        arguments = {
                @Argument(name = "x-match", value = "all"),
                @Argument(name = "thing1", value = "somevalue"),
                @Argument(name = "thing2")
        })
)
public String handleWithHeadersExchange(String foo) {
    ...
}

请注意,x-message-ttl 参数已为队列设置为 10 秒。由于参数类型不是 String,因此我们必须指定其类型,在本例中为 Integer。与所有此类声明一样,如果队列已存在,则参数必须与队列上的参数匹配。对于头交换机,我们将绑定参数设置为匹配将 thing1 头设置为 somevalue 的消息,并且 thing2 头必须存在任何值。x-match 参数表示必须满足这两个条件。

参数名称、值和类型可以是属性占位符(${…​})或 SpEL 表达式(#{…​})。name 必须解析为 Stringtype 的表达式必须解析为 Class 或类的完全限定名称。value 必须解析为可以通过 DefaultConversionService 转换为该类型的对象(例如上述示例中的 x-message-ttl)。

如果名称解析为 null 或空 String,则忽略该 @Argument