Annotation-driven Listener Endpoints
以异步方式接收消息最简单的方法是使用注解监听器端点基础架构。简而言之,它让您将受管 Bean 的方法作为 Rabbit 监听器端点公开。以下示例展示了如何使用 @RabbitListener
注解:
The easiest way to receive a message asynchronously is to use the annotated listener endpoint infrastructure.
In a nutshell, it lets you expose a method of a managed bean as a Rabbit listener endpoint.
The following example shows how to use the @RabbitListener
annotation:
@Component
public class MyService {
@RabbitListener(queues = "myQueue")
public void processOrder(String data) {
...
}
}
上述示例的想法就是,每当名为 myQueue
的队列有消息可用时,processOrder
方法就会被相应调用(在本例中,使用消息的有效负载进行调用)。
The idea of the preceding example is that, whenever a message is available on the queue named myQueue
, the processOrder
method is invoked accordingly (in this case, with the payload of the message).
注解端点基础架构在幕后为每个注释的方法创建一个消息监听器容器,方法是使用 RabbitListenerContainerFactory
。
The annotated endpoint infrastructure creates a message listener container behind the scenes for each annotated method, by using a RabbitListenerContainerFactory
.
在上述示例中,myQueue
必须已存在并且绑定到某个交换机。只要应用程序上下文中存在 RabbitAdmin
,就可以自动声明和绑定队列。
In the preceding example, myQueue
must already exist and be bound to some exchange.
The queue can be declared and bound automatically, as long as a RabbitAdmin
exists in the application context.
属性占位符 ( |
Property placeholders ( |
@Component
public class MyService {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "myQueue", durable = "true"),
exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
key = "orderRoutingKey")
)
public void processOrder(Order order) {
...
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "auto.exch"),
key = "invoiceRoutingKey")
)
public void processInvoice(Invoice invoice) {
...
}
@RabbitListener(queuesToDeclare = @Queue(name = "${my.queue}", durable = "true"))
public String handleWithSimpleDeclare(String data) {
...
}
}
在第一个示例中,如果需要,将自动声明队列 myQueue
(持久)连同交换机,并使用路由键绑定到交换机。在第二个示例中,将声明和绑定一个匿名(独占、自动删除)队列;队列名称由框架使用 `Base64UrlNamingStrategy`创建。您无法使用此技术声明由代理命名的队列;它们需要声明为 bean 定义;请参阅 Containers and Broker-Named queues。可以提供多个 `QueueBinding`条目,让侦听器侦听多个队列。在第三个示例中,将声明具有从属性 `my.queue`检索的名称的队列(如果需要),并使用队列名称作为路由键,使用默认绑定绑定到默认交换机。
In the first example, a queue myQueue
is declared automatically (durable) together with the exchange, if needed,
and bound to the exchange with the routing key.
In the second example, an anonymous (exclusive, auto-delete) queue is declared and bound; the queue name is created by the framework using the Base64UrlNamingStrategy
.
You cannot declare broker-named queues using this technique; they need to be declared as bean definitions; see Containers and Broker-Named queues.
Multiple QueueBinding
entries can be provided, letting the listener listen to multiple queues.
In the third example, a queue with the name retrieved from property my.queue
is declared, if necessary, with the default binding to the default exchange using the queue name as the routing key.
从 2.0 版本开始,`@Exchange`注释支持任何交换机类型,包括自定义交换机类型。有关详细信息,请参阅 AMQP Concepts。
Since version 2.0, the @Exchange
annotation supports any exchange types, including custom.
For more information, see AMQP Concepts.
当您需要更高级的配置时,您可以使用常规的 @Bean
定义。
You can use normal @Bean
definitions when you need more advanced configuration.
请注意第一个示例中交换机上的 ignoreDeclarationExceptions
。这允许例如绑定到可能具有不同设置(如 internal
)的现有交换机。默认情况下,现有交换机的属性必须匹配。
Notice ignoreDeclarationExceptions
on the exchange in the first example.
This allows, for example, binding to an existing exchange that might have different settings (such as internal
).
By default, the properties of an existing exchange must match.
从 2.0 版本开始,您现在可以使用多个路由键将队列绑定到交换机,如下例所示:
Starting with version 2.0, you can now bind a queue to an exchange with multiple routing keys, as the following example shows:
...
key = { "red", "yellow" }
...
您还可以为队列、交换机和绑定指定 @QueueBinding
注释中的参数,如下例所示:
You can also specify arguments within @QueueBinding
annotations for queues, exchanges,
and bindings, as the following example shows:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "auto.headers", autoDelete = "true",
arguments = @Argument(name = "x-message-ttl", value = "10000",
type = "java.lang.Integer")),
exchange = @Exchange(value = "auto.headers", type = ExchangeTypes.HEADERS, autoDelete = "true"),
arguments = {
@Argument(name = "x-match", value = "all"),
@Argument(name = "thing1", value = "somevalue"),
@Argument(name = "thing2")
})
)
public String handleWithHeadersExchange(String foo) {
...
}
请注意,x-message-ttl
参数已为队列设置为 10 秒。由于参数类型不是 String
,因此我们必须指定其类型,在本例中为 Integer
。与所有此类声明一样,如果队列已存在,则参数必须与队列上的参数匹配。对于头交换机,我们将绑定参数设置为匹配将 thing1
头设置为 somevalue
的消息,并且 thing2
头必须存在任何值。x-match
参数表示必须满足这两个条件。
Notice that the x-message-ttl
argument is set to 10 seconds for the queue.
Since the argument type is not String
, we have to specify its type — in this case, Integer
.
As with all such declarations, if the queue already exists, the arguments must match those on the queue.
For the header exchange, we set the binding arguments to match messages that have the thing1
header set to somevalue
, and
the thing2
header must be present with any value.
The x-match
argument means both conditions must be satisfied.
参数名称、值和类型可以是属性占位符(${…}
)或 SpEL 表达式(#{…}
)。name
必须解析为 String
。type
的表达式必须解析为 Class
或类的完全限定名称。value
必须解析为可以通过 DefaultConversionService
转换为该类型的对象(例如上述示例中的 x-message-ttl
)。
The argument name, value, and type can be property placeholders (${…}
) or SpEL expressions (#{…}
).
The name
must resolve to a String
.
The expression for type
must resolve to a Class
or the fully-qualified name of a class.
The value
must resolve to something that can be converted by the DefaultConversionService
to the type (such as the x-message-ttl
in the preceding example).
如果名称解析为 null
或空 String
,则忽略该 @Argument
。
If a name resolves to null
or an empty String
, that @Argument
is ignored.