Polling Consumer
AmqpTemplate
本身可用于轮询 Message
接收。默认情况下,如果消息不可用,则立即返回 null
。不会阻塞。从 1.5 版本开始,您可以设置接收超时时间(以毫秒为单位),接收方法会阻塞最长这么长时间,等待接收消息。小于零的值表示无限期阻塞(或至少持续到与代理的连接丢失为止)。1.6 版引入了 receive
方法的变体,支持在每次调用中传递超时时间。
The AmqpTemplate
itself can be used for polled Message
reception.
By default, if no message is available, null
is returned immediately.
There is no blocking.
Starting with version 1.5, you can set a receiveTimeout
, in milliseconds, and the receive methods block for up to that long, waiting for a message.
A value less than zero means block indefinitely (or at least until the connection to the broker is lost).
Version 1.6 introduced variants of the receive
methods that allows the timeout be passed in on each call.
由于接收操作会为每条消息创建一个新的 QueueingConsumer
,因此此技术不适用于高容量环境。请考虑针对这些用例使用异步使用者或 receiveTimeout
为零。
Since the receive operation creates a new QueueingConsumer
for each message, this technique is not really appropriate for high-volume environments.
Consider using an asynchronous consumer or a receiveTimeout
of zero for those use cases.
从 2.4.8 版开始,当使用非零超时时,您可以指定传递到 basicConsume
方法中的参数,该方法用于将使用者与通道关联。例如:template.addConsumerArg("x-priority", 10)
。
Starting with version 2.4.8, when using a non-zero timeout, you can specify arguments passed into the basicConsume
method used to associate the consumer with the channel.
For example: template.addConsumerArg("x-priority", 10)
.
有四种可用的简单 receive
方法。与发送方的 Exchange
一样,有一个方法要求在模板本身上直接设置默认队列属性,还有一个方法在运行时接受队列参数。1.6 版引入了变体来接受 timeoutMillis
,以在每次请求的基础上覆盖 receiveTimeout
。以下列表显示了这四个方法的定义:
There are four simple receive
methods available.
As with the Exchange
on the sending side, there is a method that requires that a default queue property has been set
directly on the template itself, and there is a method that accepts a queue parameter at runtime.
Version 1.6 introduced variants to accept timeoutMillis
to override receiveTimeout
on a per-request basis.
The following listing shows the definitions of the four methods:
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
与发送信息一样,AmqpTemplate
有一些使用方便的方法来接收 POJO,而不是 Message
实例,实现方式提供了一种方法来自定义用于创建返回的 Object
的 MessageConverter
:以下列表显示了这些方法:
As in the case of sending messages, the AmqpTemplate
has some convenience methods for receiving POJOs instead of Message
instances, and implementations provide a way to customize the MessageConverter
used to create the Object
returned:
The following listing shows those methods:
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
从版本 2.0 开始,有这些方法的变体,它们采用一个额外的 ParameterizedTypeReference
参数来转换复杂类型。必须使用 SmartMessageConverter
配置模板。有关详细信息,请参见 xref:amqp/message-converters.adoc#json-complex[Converting From a Message
带 RabbitTemplate
Starting with version 2.0, there are variants of these methods that take an additional ParameterizedTypeReference
argument to convert complex types.
The template must be configured with a SmartMessageConverter
.
See Converting From a Message
With RabbitTemplate
for more information.
类似于 sendAndReceive
方法,从 1.3 版开始,AmqpTemplate
具有几个使用方便的 receiveAndReply
方法,用于同步接收、处理和回复消息。以下列表显示了那些方法定义:
Similar to sendAndReceive
methods, beginning with version 1.3, the AmqpTemplate
has several convenience receiveAndReply
methods for synchronously receiving, processing and replying to messages.
The following listing shows those method definitions:
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
AmqpTemplate
实现负责 receive
和 reply
阶段。大多数情况下,您应该仅提供 ReceiveAndReplyCallback
的实现,以便针对接收的消息执行某些业务逻辑并在需要时构建回复对象或消息。请注意,ReceiveAndReplyCallback
可能会返回 null
。在这种情况下,不发送回复,receiveAndReply
就像 receive
方法。这允许对各种消息使用同一个队列,其中一些消息可能不需要回复。
The AmqpTemplate
implementation takes care of the receive
and reply
phases.
In most cases, you should provide only an implementation of ReceiveAndReplyCallback
to perform some business logic for the received message and build a reply object or message, if needed.
Note, a ReceiveAndReplyCallback
may return null
.
In this case, no reply is sent and receiveAndReply
works like the receive
method.
This lets the same queue be used for a mixture of messages, some of which may not need a reply.
只有当提供的回调不是 ReceiveAndReplyMessageCallback
(提供原始消息交换契约)的实例时,才会应用自动消息(请求和回复)转换。
Automatic message (request and reply) conversion is applied only if the provided callback is not an instance of ReceiveAndReplyMessageCallback
, which provides a raw message exchange contract.
ReplyToAddressCallback
适用于需要自定义逻辑以根据接收的消息和 ReceiveAndReplyCallback
的回复来确定运行时的 replyTo
地址的情况。默认情况下,请求消息中的 replyTo
信息用于路由回复。
The ReplyToAddressCallback
is useful for cases requiring custom logic to determine the replyTo
address at runtime against the received message and reply from the ReceiveAndReplyCallback
.
By default, replyTo
information in the request message is used to route the reply.
以下列表显示了基于 POJO 的接收和回复的示例:
The following listing shows an example of POJO-based receive and reply:
boolean received =
this.template.receiveAndReply(ROUTE, new ReceiveAndReplyCallback<Order, Invoice>() {
public Invoice handle(Order order) {
return processOrder(order);
}
});
if (received) {
log.info("We received an order!");
}