Polling Consumer

AmqpTemplate 本身可用于轮询 Message 接收。默认情况下,如果消息不可用,则立即返回 null。不会阻塞。从 1.5 版本开始,您可以设置接收超时时间(以毫秒为单位),接收方法会阻塞最长这么长时间,等待接收消息。小于零的值表示无限期阻塞(或至少持续到与代理的连接丢失为止)。1.6 版引入了 receive 方法的变体,支持在每次调用中传递超时时间。

The AmqpTemplate itself can be used for polled Message reception. By default, if no message is available, null is returned immediately. There is no blocking. Starting with version 1.5, you can set a receiveTimeout, in milliseconds, and the receive methods block for up to that long, waiting for a message. A value less than zero means block indefinitely (or at least until the connection to the broker is lost). Version 1.6 introduced variants of the receive methods that allows the timeout be passed in on each call.

由于接收操作会为每条消息创建一个新的 QueueingConsumer,因此此技术不适用于高容量环境。请考虑针对这些用例使用异步使用者或 receiveTimeout 为零。

Since the receive operation creates a new QueueingConsumer for each message, this technique is not really appropriate for high-volume environments. Consider using an asynchronous consumer or a receiveTimeout of zero for those use cases.

从 2.4.8 版开始,当使用非零超时时,您可以指定传递到 basicConsume 方法中的参数,该方法用于将使用者与通道关联。例如:template.addConsumerArg("x-priority", 10)

Starting with version 2.4.8, when using a non-zero timeout, you can specify arguments passed into the basicConsume method used to associate the consumer with the channel. For example: template.addConsumerArg("x-priority", 10).

有四种可用的简单 receive 方法。与发送方的 Exchange 一样,有一个方法要求在模板本身上直接设置默认队列属性,还有一个方法在运行时接受队列参数。1.6 版引入了变体来接受 timeoutMillis,以在每次请求的基础上覆盖 receiveTimeout。以下列表显示了这四个方法的定义:

There are four simple receive methods available. As with the Exchange on the sending side, there is a method that requires that a default queue property has been set directly on the template itself, and there is a method that accepts a queue parameter at runtime. Version 1.6 introduced variants to accept timeoutMillis to override receiveTimeout on a per-request basis. The following listing shows the definitions of the four methods:

Message receive() throws AmqpException;

Message receive(String queueName) throws AmqpException;

Message receive(long timeoutMillis) throws AmqpException;

Message receive(String queueName, long timeoutMillis) throws AmqpException;

与发送信息一样,AmqpTemplate 有一些使用方便的方法来接收 POJO,而不是 Message 实例,实现方式提供了一种方法来自定义用于创建返回的 ObjectMessageConverter:以下列表显示了这些方法:

As in the case of sending messages, the AmqpTemplate has some convenience methods for receiving POJOs instead of Message instances, and implementations provide a way to customize the MessageConverter used to create the Object returned: The following listing shows those methods:

Object receiveAndConvert() throws AmqpException;

Object receiveAndConvert(String queueName) throws AmqpException;

Object receiveAndConvert(long timeoutMillis) throws AmqpException;

Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;

从版本 2.0 开始,有这些方法的变体,它们采用一个额外的 ParameterizedTypeReference 参数来转换复杂类型。必须使用 SmartMessageConverter 配置模板。有关详细信息,请参见 xref:amqp/message-converters.adoc#json-complex[Converting From a MessageRabbitTemplate

Starting with version 2.0, there are variants of these methods that take an additional ParameterizedTypeReference argument to convert complex types. The template must be configured with a SmartMessageConverter. See Converting From a Message With RabbitTemplate for more information.

类似于 sendAndReceive 方法,从 1.3 版开始,AmqpTemplate 具有几个使用方便的 receiveAndReply 方法,用于同步接收、处理和回复消息。以下列表显示了那些方法定义:

Similar to sendAndReceive methods, beginning with version 1.3, the AmqpTemplate has several convenience receiveAndReply methods for synchronously receiving, processing and replying to messages. The following listing shows those method definitions:

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)
       throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback)
     throws AmqpException;

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
    String replyExchange, String replyRoutingKey) throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
    String replyExchange, String replyRoutingKey) throws AmqpException;

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
     ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
            ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

AmqpTemplate 实现负责 receivereply 阶段。大多数情况下,您应该仅提供 ReceiveAndReplyCallback 的实现,以便针对接收的消息执行某些业务逻辑并在需要时构建回复对象或消息。请注意,ReceiveAndReplyCallback 可能会返回 null。在这种情况下,不发送回复,receiveAndReply 就像 receive 方法。这允许对各种消息使用同一个队列,其中一些消息可能不需要回复。

The AmqpTemplate implementation takes care of the receive and reply phases. In most cases, you should provide only an implementation of ReceiveAndReplyCallback to perform some business logic for the received message and build a reply object or message, if needed. Note, a ReceiveAndReplyCallback may return null. In this case, no reply is sent and receiveAndReply works like the receive method. This lets the same queue be used for a mixture of messages, some of which may not need a reply.

只有当提供的回调不是 ReceiveAndReplyMessageCallback(提供原始消息交换契约)的实例时,才会应用自动消息(请求和回复)转换。

Automatic message (request and reply) conversion is applied only if the provided callback is not an instance of ReceiveAndReplyMessageCallback, which provides a raw message exchange contract.

ReplyToAddressCallback 适用于需要自定义逻辑以根据接收的消息和 ReceiveAndReplyCallback 的回复来确定运行时的 replyTo 地址的情况。默认情况下,请求消息中的 replyTo 信息用于路由回复。

The ReplyToAddressCallback is useful for cases requiring custom logic to determine the replyTo address at runtime against the received message and reply from the ReceiveAndReplyCallback. By default, replyTo information in the request message is used to route the reply.

以下列表显示了基于 POJO 的接收和回复的示例:

The following listing shows an example of POJO-based receive and reply:

boolean received =
        this.template.receiveAndReply(ROUTE, new ReceiveAndReplyCallback<Order, Invoice>() {

                public Invoice handle(Order order) {
                        return processOrder(order);
                }
        });
if (received) {
        log.info("We received an order!");
}