Asynchronous Consumer
Spring AMQP 还通过使用 @RabbitListener
注释来支持带注释的侦听器端点,并提供一个开放的基础设施以编程方式注册端点。这是迄今为止设置异步消费者的最便捷方式。有关更多详细信息,请参阅 Annotation-driven Listener Endpoints。
预取默认值为 1,这可能导致效率较低的使用者利用率不足。从版本 2.0 开始,默认预取值现在为 250,这应该使使用者在大多数常见场景中保持忙碌,从而提高吞吐量。 不过,有几种情况下应该降低预取值:
-
对于大消息,尤其是处理较慢时(消息可能会在客户端进程中累积大量的内存)。
-
当需要严格的消息排序时(在这种情况下,预取值应设置为 1)。
-
Other special cases
此外,对于低容量消息传递和多个使用者(包括单个侦听器容器实例中的并发性),你可能希望降低预取以使消息跨多个使用者获得更均匀的分布。 请参阅 Message Listener Container Configuration。 有关预先获取的更多背景信息,请参阅有关 consumer utilization in RabbitMQ 的此文章和有关 queuing theory 的此文章。
Message Listener
对于异步 Message
接收,会涉及一个专用组件(而不是 AmqpTemplate
)。该组件是一个用于 Message
使用回调的容器。我们会在本文档的后面部分考虑容器及其属性。不过,我们应该首先看看回调,因为这是你的应用程序代码与消息系统集成的地方。对于回调有一些选择,从实现 MessageListener
接口开始,如下清单所示:
public interface MessageListener {
void onMessage(Message message);
}
如果你的回调逻辑因任何原因依赖于 AMQP Channel 实例,那么你可以改用 ChannelAwareMessageListener
。它看起来很相似,但有一个额外的参数。以下清单显示了 ChannelAwareMessageListener
接口定义:
public interface ChannelAwareMessageListener {
void onMessage(Message message, Channel channel) throws Exception;
}
在版本 2.1 中,此界面从包 o.s.amqp.rabbit.core
移至 o.s.amqp.rabbit.listener.api
。
MessageListenerAdapter
如果你希望在你应用程序逻辑和消息 API 之间保持更严格的分离,你可以依赖框架提供的适配器实现。这通常称为“`消息驱动的 POJO`" 支持。
1.5 版本引入了更灵活的 POJO 消息传递机制,即 |
当使用适配器时,只需提供对适配器本身应该调用的实例的引用。以下示例说明了如何执行此操作:
MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");
你可以对适配器进行子类化并提供 getListenerMethodName()
的实现以根据消息动态选择不同的方法。此方法有两个参数,originalMessage
和 extractedMessage
,后者是任何转换的结果。默认情况下,将配置 SimpleMessageConverter
。有关更多信息以及其他可用转换器的信息,请参阅 xref:amqp/message-converters.adoc#simple-message-converter[SimpleMessageConverter
。
从 1.4.2 版本开始,原始消息包含`consumerQueue`和`consumerTag`属性,可用于确定接收消息的队列。
从 1.5 版本开始,您可以配置消费者队列或标记到方法名称的映射,以动态选择要调用的方法。如果映射中没有条目,我们将退回到默认侦听器方法。默认侦听器方法(如果未设置)为`handleMessage`。
从 2.0 版本开始,提供了一个便利的`FunctionalInterface`。以下清单显示了`FunctionalInterface`的定义:
@FunctionalInterface
public interface ReplyingMessageListener<T, R> {
R handleMessage(T t);
}
此接口有助于使用 Java 8 lambda 方便地配置适配器,如下例所示:
new MessageListenerAdapter((ReplyingMessageListener<String, String>) data -> {
...
return result;
}));
从 2.2 版本开始,已弃用`buildListenerArguments(Object),并引入了新的`buildListenerArguments(Object, Channel, Message)
。新方法帮助侦听器获取`Channel`和`Message`参数以执行更多操作,例如在手动确认模式下调用`channel.basicReject(long, boolean)`。以下清单显示最基本的示例:
public class ExtendedListenerAdapter extends MessageListenerAdapter {
@Override
protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
return new Object[]{extractedMessage, channel, message};
}
}
如果您需要接收“channel”和“message”,现在您可以配置 ExtendedListenerAdapter
,其配置方式与配置 MessageListenerAdapter
相同。侦听器的参数应设置为 buildListenerArguments(Object, Channel, Message)
返回的值,如下面的侦听器示例所示:
public void handleMessage(Object object, Channel channel, Message message) throws IOException {
...
}
Container
现在您已经了解了有关 Message
侦听回调的各种选项,我们可以把注意力转向容器。基本上,容器处理“活动”职责,以便侦听器回调可以保持被动。该容器是“生命周期”组件的一个示例。它提供了用于启动和停止的方法。在配置容器时,您实质上可以缩小 AMQP 队列和 MessageListener
实例之间的差距。您必须提供对 ConnectionFactory
和侦听器应从其使用消息的队列名称或队列实例的引用。
在 2.0 版本之前,有一个监听器容器,SimpleMessageListenerContainer
。 现在有一个第二个容器 DirectMessageListenerContainer
。 Choosing a Container中描述了容器之间的差异以及在选择要使用的容器时可能应用的条件。
以下清单显示了使用 SimpleMessageListenerContainer
的最基本示例:
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("some.queue");
container.setMessageListener(new MessageListenerAdapter(somePojo));
作为“活动”组件,最常见的做法是使用 bean 定义创建侦听器容器,以便它可以在后台运行。以下示例显示了使用 XML 执行此操作的一种方法:
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
以下清单显示了使用 XML 执行此操作的另一种方法:
<rabbit:listener-container connection-factory="rabbitConnectionFactory" type="direct">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
以上两个示例都创建了一个 DirectMessageListenerContainer
(注意 type
属性 — 其默认值为 simple
)。
或者,您也可以更喜欢使用 Java 配置,其与前面的代码片段类似:
@Configuration
public class ExampleAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
@Bean
public CachingConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public MessageListener exampleListener() {
return new MessageListener() {
public void onMessage(Message message) {
System.out.println("received: " + message);
}
};
}
}
Consumer Priority
从 RabbitMQ 版本 3.2 开始,代理现在支持使用者优先级(请参见 Using Consumer Priorities with RabbitMQ)。 这是通过在消费者上设置 `x-priority`参数来实现的。 `SimpleMessageListenerContainer`现在支持设置使用者参数,如下例所示:
container.setConsumerArguments(Collections.
<String, Object> singletonMap("x-priority", Integer.valueOf(10)));
为了方便起见,命名空间在 listener
元素中提供了 priority
属性,如下面的示例所示:
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" />
</rabbit:listener-container>
从版本 1.3 开始,您可以在运行时修改容器侦听的队列。参见 Listener Container Queues。
auto-delete
Queues
当容器配置为侦听`auto-delete`队列时,如果队列具有`x-expires`选项,或者代理上配置了 Time-To-Live策略,则当容器停止(即当最后一个使用者被取消)时,代理会删除该队列。 在 1.3 版本之前,由于队列丢失,因此无法重新启动容器。 `RabbitAdmin`仅在连接关闭或打开时自动重新声明队列,而容器停止并启动时不会发生这种情况。
从版本 1.3 开始,容器使用 RabbitAdmin
在启动期间重新声明任何丢失的队列。
您还可以将条件声明(参见 Conditional Declaration)与 auto-startup="false"
管理员结合使用,以推迟队列声明,直到容器启动。以下示例演示了如何操作:
<rabbit:queue id="otherAnon" declared-by="containerAdmin" />
<rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
<rabbit:bindings>
<rabbit:binding queue="otherAnon" key="otherAnon" />
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:listener-container id="container2" auto-startup="false">
<rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" />
</rabbit:listener-container>
<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory"
auto-startup="false" />
在这种情况下,队列和交换由具有 auto-startup="false"
的 containerAdmin
声明,以便在上下文初始化期间不会声明这些元素。而且,容器出于同样的原因也没有启动。当容器稍后启动时,它使用对 containerAdmin
的引用来声明这些元素。