Asynchronous Consumer

Spring AMQP 还通过使用 @RabbitListener 注释来支持带注释的侦听器端点,并提供一个开放的基础设施以编程方式注册端点。这是迄今为止设置异步消费者的最便捷方式。有关更多详细信息,请参阅 Annotation-driven Listener Endpoints

Spring AMQP also supports annotated listener endpoints through the use of the @RabbitListener annotation and provides an open infrastructure to register endpoints programmatically. This is by far the most convenient way to setup an asynchronous consumer. See Annotation-driven Listener Endpoints for more details.

预取默认值为 1,这可能导致效率较低的使用者利用率不足。从版本 2.0 开始,默认预取值现在为 250,这应该使使用者在大多数常见场景中保持忙碌,从而提高吞吐量。

The prefetch default value used to be 1, which could lead to under-utilization of efficient consumers. Starting with version 2.0, the default prefetch value is now 250, which should keep consumers busy in most common scenarios and thus improve throughput.

不过,有几种情况下应该降低预取值:

There are, nevertheless, scenarios where the prefetch value should be low:

  • For large messages, especially if the processing is slow (messages could add up to a large amount of memory in the client process)

  • When strict message ordering is necessary (the prefetch value should be set back to 1 in this case)

  • Other special cases

此外,对于低容量消息传递和多个使用者(包括单个侦听器容器实例中的并发性),你可能希望降低预取以使消息跨多个使用者获得更均匀的分布。

Also, with low-volume messaging and multiple consumers (including concurrency within a single listener container instance), you may wish to reduce the prefetch to get a more even distribution of messages across consumers.

有关预先获取的更多背景信息,请参阅有关 consumer utilization in RabbitMQ 的此文章和有关 queuing theory 的此文章。

For more background about prefetch, see this post about consumer utilization in RabbitMQ and this post about queuing theory.

Message Listener

对于异步 Message 接收,会涉及一个专用组件(而不是 AmqpTemplate)。该组件是一个用于 Message 使用回调的容器。我们会在本文档的后面部分考虑容器及其属性。不过,我们应该首先看看回调,因为这是你的应用程序代码与消息系统集成的地方。对于回调有一些选择,从实现 MessageListener 接口开始,如下清单所示:

For asynchronous Message reception, a dedicated component (not the AmqpTemplate) is involved. That component is a container for a Message-consuming callback. We consider the container and its properties later in this section. First, though, we should look at the callback, since that is where your application code is integrated with the messaging system. There are a few options for the callback, starting with an implementation of the MessageListener interface, which the following listing shows:

public interface MessageListener {
    void onMessage(Message message);
}

如果你的回调逻辑因任何原因依赖于 AMQP Channel 实例,那么你可以改用 ChannelAwareMessageListener。它看起来很相似,但有一个额外的参数。以下清单显示了 ChannelAwareMessageListener 接口定义:

If your callback logic depends on the AMQP Channel instance for any reason, you may instead use the ChannelAwareMessageListener. It looks similar but has an extra parameter. The following listing shows the ChannelAwareMessageListener interface definition:

public interface ChannelAwareMessageListener {
    void onMessage(Message message, Channel channel) throws Exception;
}

在版本 2.1 中,此界面从包 o.s.amqp.rabbit.core 移至 o.s.amqp.rabbit.listener.api

In version 2.1, this interface moved from package o.s.amqp.rabbit.core to o.s.amqp.rabbit.listener.api.

MessageListenerAdapter

如果你希望在你应用程序逻辑和消息 API 之间保持更严格的分离,你可以依赖框架提供的适配器实现。这通常称为“`消息驱动的 POJO`" 支持。

If you prefer to maintain a stricter separation between your application logic and the messaging API, you can rely upon an adapter implementation that is provided by the framework. This is often referred to as “Message-driven POJO” support.

1.5 版本引入了更灵活的 POJO 消息传递机制,即 @RabbitListener 注解。有关详细信息,请参阅 Annotation-driven Listener Endpoints

Version 1.5 introduced a more flexible mechanism for POJO messaging, the @RabbitListener annotation. See Annotation-driven Listener Endpoints for more information.

当使用适配器时,只需提供对适配器本身应该调用的实例的引用。以下示例说明了如何执行此操作:

When using the adapter, you need to provide only a reference to the instance that the adapter itself should invoke. The following example shows how to do so:

MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");

你可以对适配器进行子类化并提供 getListenerMethodName() 的实现以根据消息动态选择不同的方法。此方法有两个参数,originalMessageextractedMessage,后者是任何转换的结果。默认情况下,将配置 SimpleMessageConverter。有关更多信息以及其他可用转换器的信息,请参阅 xref:amqp/message-converters.adoc#simple-message-converter[SimpleMessageConverter

You can subclass the adapter and provide an implementation of getListenerMethodName() to dynamically select different methods based on the message. This method has two parameters, originalMessage and extractedMessage, the latter being the result of any conversion. By default, a SimpleMessageConverter is configured. See SimpleMessageConverter for more information and information about other converters available.

从 1.4.2 版本开始,原始消息包含`consumerQueue`和`consumerTag`属性,可用于确定接收消息的队列。

Starting with version 1.4.2, the original message has consumerQueue and consumerTag properties, which can be used to determine the queue from which a message was received.

从 1.5 版本开始,您可以配置消费者队列或标记到方法名称的映射,以动态选择要调用的方法。如果映射中没有条目,我们将退回到默认侦听器方法。默认侦听器方法(如果未设置)为`handleMessage`。

Starting with version 1.5, you can configure a map of consumer queue or tag to method name, to dynamically select the method to call. If no entry is in the map, we fall back to the default listener method. The default listener method (if not set) is handleMessage.

从 2.0 版本开始,提供了一个便利的`FunctionalInterface`。以下清单显示了`FunctionalInterface`的定义:

Starting with version 2.0, a convenient FunctionalInterface has been provided. The following listing shows the definition of FunctionalInterface:

@FunctionalInterface
public interface ReplyingMessageListener<T, R> {

    R handleMessage(T t);

}

此接口有助于使用 Java 8 lambda 方便地配置适配器,如下例所示:

This interface facilitates convenient configuration of the adapter by using Java 8 lambdas, as the following example shows:

new MessageListenerAdapter((ReplyingMessageListener<String, String>) data -> {
    ...
    return result;
}));

从 2.2 版本开始,已弃用`buildListenerArguments(Object),并引入了新的`buildListenerArguments(Object, Channel, Message)。新方法帮助侦听器获取`Channel`和`Message`参数以执行更多操作,例如在手动确认模式下调用`channel.basicReject(long, boolean)`。以下清单显示最基本的示例:

Starting with version 2.2, the buildListenerArguments(Object) has been deprecated and new buildListenerArguments(Object, Channel, Message) one has been introduced instead. The new method helps listener to get Channel and Message arguments to do more, such as calling channel.basicReject(long, boolean) in manual acknowledge mode. The following listing shows the most basic example:

public class ExtendedListenerAdapter extends MessageListenerAdapter {

    @Override
    protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
        return new Object[]{extractedMessage, channel, message};
    }

}

如果您需要接收“channel”和“message”,现在您可以配置 ExtendedListenerAdapter,其配置方式与配置 MessageListenerAdapter 相同。侦听器的参数应设置为 buildListenerArguments(Object, Channel, Message) 返回的值,如下面的侦听器示例所示:

Now you could configure ExtendedListenerAdapter as same as MessageListenerAdapter if you need to receive “channel” and “message”. Parameters of listener should be set as buildListenerArguments(Object, Channel, Message) returned, as the following example of listener shows:

public void handleMessage(Object object, Channel channel, Message message) throws IOException {
    ...
}

Container

现在您已经了解了有关 Message 侦听回调的各种选项,我们可以把注意力转向容器。基本上,容器处理“活动”职责,以便侦听器回调可以保持被动。该容器是“生命周期”组件的一个示例。它提供了用于启动和停止的方法。在配置容器时,您实质上可以缩小 AMQP 队列和 MessageListener 实例之间的差距。您必须提供对 ConnectionFactory 和侦听器应从其使用消息的队列名称或队列实例的引用。

Now that you have seen the various options for the Message-listening callback, we can turn our attention to the container. Basically, the container handles the “active” responsibilities so that the listener callback can remain passive. The container is an example of a “lifecycle” component. It provides methods for starting and stopping. When configuring the container, you essentially bridge the gap between an AMQP Queue and the MessageListener instance. You must provide a reference to the ConnectionFactory and the queue names or Queue instances from which that listener should consume messages.

在 2.0 版本之前,有一个监听器容器,SimpleMessageListenerContainer。 现在有一个第二个容器 DirectMessageListenerContainerChoosing a Container中描述了容器之间的差异以及在选择要使用的容器时可能应用的条件。

Prior to version 2.0, there was one listener container, the SimpleMessageListenerContainer. There is now a second container, the DirectMessageListenerContainer. The differences between the containers and criteria you might apply when choosing which to use are described in Choosing a Container.

以下清单显示了使用 SimpleMessageListenerContainer 的最基本示例:

The following listing shows the most basic example, which works by using the, SimpleMessageListenerContainer:

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("some.queue");
container.setMessageListener(new MessageListenerAdapter(somePojo));

作为“活动”组件,最常见的做法是使用 bean 定义创建侦听器容器,以便它可以在后台运行。以下示例显示了使用 XML 执行此操作的一种方法:

As an “active” component, it is most common to create the listener container with a bean definition so that it can run in the background. The following example shows one way to do so with XML:

<rabbit:listener-container connection-factory="rabbitConnectionFactory">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>

以下清单显示了使用 XML 执行此操作的另一种方法:

The following listing shows another way to do so with XML:

<rabbit:listener-container connection-factory="rabbitConnectionFactory" type="direct">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>

以上两个示例都创建了一个 DirectMessageListenerContainer(注意 type 属性 — 其默认值为 simple)。

Both of the preceding examples create a DirectMessageListenerContainer (notice the type attribute — it defaults to simple).

或者,您也可以更喜欢使用 Java 配置,其与前面的代码片段类似:

Alternately, you may prefer to use Java configuration, which looks similar to the preceding code snippet:

@Configuration
public class ExampleAmqpConfiguration {

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
    }

    @Bean
    public CachingConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory =
            new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public MessageListener exampleListener() {
        return new MessageListener() {
            public void onMessage(Message message) {
                System.out.println("received: " + message);
            }
        };
    }
}

Consumer Priority

从 RabbitMQ 版本 3.2 开始,代理现在支持使用者优先级(请参见 Using Consumer Priorities with RabbitMQ)。 这是通过在消费者上设置 `x-priority`参数来实现的。 `SimpleMessageListenerContainer`现在支持设置使用者参数,如下例所示:

Starting with RabbitMQ Version 3.2, the broker now supports consumer priority (see Using Consumer Priorities with RabbitMQ). This is enabled by setting the x-priority argument on the consumer. The SimpleMessageListenerContainer now supports setting consumer arguments, as the following example shows:

container.setConsumerArguments(Collections.
<String, Object> singletonMap("x-priority", Integer.valueOf(10)));

为了方便起见,命名空间在 listener 元素中提供了 priority 属性,如下面的示例所示:

For convenience, the namespace provides the priority attribute on the listener element, as the following example shows:

<rabbit:listener-container connection-factory="rabbitConnectionFactory">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" />
</rabbit:listener-container>

从版本 1.3 开始,您可以在运行时修改容器侦听的队列。参见 Listener Container Queues

Starting with version 1.3, you can modify the queues on which the container listens at runtime. See Listener Container Queues.

auto-delete Queues

当容器配置为侦听`auto-delete`队列时,如果队列具有`x-expires`选项,或者代理上配置了 Time-To-Live策略,则当容器停止(即当最后一个使用者被取消)时,代理会删除该队列。 在 1.3 版本之前,由于队列丢失,因此无法重新启动容器。 `RabbitAdmin`仅在连接关闭或打开时自动重新声明队列,而容器停止并启动时不会发生这种情况。

When a container is configured to listen to auto-delete queues, the queue has an x-expires option, or the Time-To-Live policy is configured on the Broker, the queue is removed by the broker when the container is stopped (that is, when the last consumer is cancelled). Before version 1.3, the container could not be restarted because the queue was missing. The RabbitAdmin only automatically redeclares queues and so on when the connection is closed or when it opens, which does not happen when the container is stopped and started.

从版本 1.3 开始,容器使用 RabbitAdmin 在启动期间重新声明任何丢失的队列。

Starting with version 1.3, the container uses a RabbitAdmin to redeclare any missing queues during startup.

您还可以将条件声明(参见 Conditional Declaration)与 auto-startup="false" 管理员结合使用,以推迟队列声明,直到容器启动。以下示例演示了如何操作:

You can also use conditional declaration (see Conditional Declaration) together with an auto-startup="false" admin to defer queue declaration until the container is started. The following example shows how to do so:

<rabbit:queue id="otherAnon" declared-by="containerAdmin" />

<rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
    <rabbit:bindings>
        <rabbit:binding queue="otherAnon" key="otherAnon" />
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:listener-container id="container2" auto-startup="false">
    <rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" />
</rabbit:listener-container>

<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory"
    auto-startup="false" />

在这种情况下,队列和交换由具有 auto-startup="false"containerAdmin 声明,以便在上下文初始化期间不会声明这些元素。而且,容器出于同样的原因也没有启动。当容器稍后启动时,它使用对 containerAdmin 的引用来声明这些元素。

In this case, the queue and exchange are declared by containerAdmin, which has auto-startup="false" so that the elements are not declared during context initialization. Also, the container is not started for the same reason. When the container is later started, it uses its reference to containerAdmin to declare the elements.