Polling Consumer

AmqpTemplate 本身可用于轮询 Message 接收。默认情况下,如果消息不可用,则立即返回 null。不会阻塞。从 1.5 版本开始,您可以设置接收超时时间(以毫秒为单位),接收方法会阻塞最长这么长时间,等待接收消息。小于零的值表示无限期阻塞(或至少持续到与代理的连接丢失为止)。1.6 版引入了 receive 方法的变体,支持在每次调用中传递超时时间。

由于接收操作会为每条消息创建一个新的 QueueingConsumer,因此此技术不适用于高容量环境。请考虑针对这些用例使用异步使用者或 receiveTimeout 为零。

从 2.4.8 版开始,当使用非零超时时,您可以指定传递到 basicConsume 方法中的参数,该方法用于将使用者与通道关联。例如:template.addConsumerArg("x-priority", 10)

有四种可用的简单 receive 方法。与发送方的 Exchange 一样,有一个方法要求在模板本身上直接设置默认队列属性,还有一个方法在运行时接受队列参数。1.6 版引入了变体来接受 timeoutMillis,以在每次请求的基础上覆盖 receiveTimeout。以下列表显示了这四个方法的定义:

Message receive() throws AmqpException;

Message receive(String queueName) throws AmqpException;

Message receive(long timeoutMillis) throws AmqpException;

Message receive(String queueName, long timeoutMillis) throws AmqpException;

与发送信息一样,AmqpTemplate 有一些使用方便的方法来接收 POJO,而不是 Message 实例,实现方式提供了一种方法来自定义用于创建返回的 ObjectMessageConverter:以下列表显示了这些方法:

Object receiveAndConvert() throws AmqpException;

Object receiveAndConvert(String queueName) throws AmqpException;

Object receiveAndConvert(long timeoutMillis) throws AmqpException;

Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;

从版本 2.0 开始,有这些方法的变体,它们采用一个额外的 ParameterizedTypeReference 参数来转换复杂类型。必须使用 SmartMessageConverter 配置模板。有关详细信息,请参见 xref:amqp/message-converters.adoc#json-complex[Converting From a MessageRabbitTemplate

类似于 sendAndReceive 方法,从 1.3 版开始,AmqpTemplate 具有几个使用方便的 receiveAndReply 方法,用于同步接收、处理和回复消息。以下列表显示了那些方法定义:

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)
       throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback)
     throws AmqpException;

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
    String replyExchange, String replyRoutingKey) throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
    String replyExchange, String replyRoutingKey) throws AmqpException;

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
     ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
            ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

AmqpTemplate 实现负责 receivereply 阶段。大多数情况下,您应该仅提供 ReceiveAndReplyCallback 的实现,以便针对接收的消息执行某些业务逻辑并在需要时构建回复对象或消息。请注意,ReceiveAndReplyCallback 可能会返回 null。在这种情况下,不发送回复,receiveAndReply 就像 receive 方法。这允许对各种消息使用同一个队列,其中一些消息可能不需要回复。

只有当提供的回调不是 ReceiveAndReplyMessageCallback(提供原始消息交换契约)的实例时,才会应用自动消息(请求和回复)转换。

ReplyToAddressCallback 适用于需要自定义逻辑以根据接收的消息和 ReceiveAndReplyCallback 的回复来确定运行时的 replyTo 地址的情况。默认情况下,请求消息中的 replyTo 信息用于路由回复。

以下列表显示了基于 POJO 的接收和回复的示例:

boolean received =
        this.template.receiveAndReply(ROUTE, new ReceiveAndReplyCallback<Order, Invoice>() {

                public Invoice handle(Order order) {
                        return processOrder(order);
                }
        });
if (received) {
        log.info("We received an order!");
}