Request/Reply Messaging
AmqpTemplate 提供多种 sendAndReceive
方法,用于实现请求-应答模式。这些方法配置必要的消息属性(如相关数据和回复队列),并在内部侦听回复消息。最新版本还支持 direct reply-to,从而消除了固定回复队列的需要。同时,AsyncRabbitTemplate 提供了异步 sendAndReceive 方法,返回 CompletableFuture 以处理请求-应答操作,并支持 direct reply-to。
AmqpTemplate
还提供多种 sendAndReceive
方法,它接受先前为单路发送操作(exchange
、routingKey
和 Message
)描述的相同参数选项。这些方法对于请求答复场景非常有用,因为它们在发送之前处理必要的 reply-to
特性的配置,并且可以在内部为该目的创建的独占队列中侦听答复消息。
同样可用的请求-回复方法是 MessageConverter
同时应用于请求和回复。这些方法命名为 convertSendAndReceive
。有关更多详细信息,请参阅 Javadoc of AmqpTemplate
。
从版本 1.5.0 开始,每个 sendAndReceive
方法变量都有一个重载版本,其中包含 CorrelationData
。与经过适当配置的连接工厂结合使用后,这将启用接收操作发送侧的发布者确认。有关更多信息,请参见 xref:amqp/template.adoc#template-confirms[Correlated Publisher Confirms and Returns 和 Javadoc for RabbitOperations
。
从版本 2.0 开始,这些方法(convertSendAndReceiveAsType
)的变体需要一个附加 ParameterizedTypeReference
参数来转换复杂的返回类型。模板必须使用 SmartMessageConverter
进行配置。有关更多详细信息,请参阅 xref:amqp/message-converters.adoc#json-complex[Converting From a Message
With RabbitTemplate
。
从 2.1 版开始,您可以使用 noLocalReplyConsumer
选项配置 RabbitTemplate
以控制答复消费者的 noLocal
标志。默认情况下,此值为 false
。
Reply Timeout
默认情况下,发送和接收方法在五秒后超时并返回 null。您可以通过设置 replyTimeout
属性修改此行为。从 1.5 版本开始,如果您将 mandatory
属性设置为 true
(或 mandatory-expression
对特定消息求值为 true
),则如果无法将消息发送到队列,则会引发 AmqpMessageReturnedException
。此异常具有 returnedMessage
、replyCode
和 replyText
属性,以及用于发送的 exchange
和 routingKey
。
此功能使用发布者返回。可以通过将 |
从版本 2.1.2 开始,已添加 replyTimedOut
方法,让子类可以得知超时,以便它们可以清除任何保留的状态。
从 2.0.11 和 2.1.3 版本开始,当您使用默认 DirectReplyToMessageListenerContainer
时,可以通过设置模板的 replyErrorHandler
属性来添加错误处理程序。对于任何失败的交付(例如:延迟答复和未关联消息头收到的消息),都会调用此错误处理程序。传递的异常是 ListenerExecutionFailedException
,它具有 failedMessage
属性。
RabbitMQ Direct reply-to
从 3.4.0 版本开始,RabbitMQ 服务器支持 direct reply-to。这消除了固定应答队列的主要原因(为了避免为每个请求创建临时队列)。从 Spring AMQP 1.4.1 版本开始,如果服务器支持,会默认使用直接应答接收(direct reply-to),而不会创建临时应答队列。当未提供 replyQueue
(或将名称设置为 amq.rabbitmq.reply-to
)时,RabbitTemplate
会自动检测是否支持直接应答接收,进而使用该功能或使用临时应答队列作为后备。当使用直接应答接收时,不需要 reply-listener
,也不应进行配置。
命名队列(而不是 amq.rabbitmq.reply-to
)仍然支持答复侦听器,从而实现对答复并发性的控制,等等。
从版本 1.6 开始,如果您希望为每个答复使用临时、独占、自动删除队列,请将 useTemporaryReplyQueues
属性设置为 true
。如果您设置 replyAddress
,则将忽略此属性。
您可以更改决定是否使用直接答复的条件,通过子类化 RabbitTemplate
并覆盖 useDirectReplyTo()
以检查不同的条件。此方法仅在第一次发送请求时调用一次。
在版本 2.0 之前,RabbitTemplate
为每个请求创建一个新消费者,并在收到答复(或超时)时取消该消费者。现在,模板转而使用 DirectReplyToMessageListenerContainer
,让消费者可以重复使用。模板仍然负责关联答复,因此不存在延迟答复转到其他发送者的风险。如果您想要恢复到以前的行为,请将 useDirectReplyToContainer
(在使用 XML 配置时为 direct-reply-to-container
)属性设置为 false。
AsyncRabbitTemplate
没有这样的选项。当使用直接答复时,它始终针对答复使用 DirectReplyToContainer
。
从版本 2.3.7 开始,模板具有一个新属性 useChannelForCorrelation
。当此值为 true
时,服务器不必将关联 ID 从请求消息头复制到答复消息。相反,用于发送请求的通道用于关联答复到请求。
Message Correlation With A Reply Queue
在使用固定回复队列(而不是 amq.rabbitmq.reply-to
)时,您必须提供相关性数据,以便将回复与请求关联起来。请参见 RabbitMQ Remote Procedure Call (RPC)。默认情况下,使用标准 correlationId
属性来保存相关性数据。但是,如果您希望使用自定义属性来保存相关性数据,则可以在 <rabbit-template/> 上设置 correlation-key
属性。显式将属性设置为 correlationId
与省略属性相同。客户端和服务器必须使用相同的标头来获取相关性数据。
Spring AMQP 版本 1.1 为此数据使用了一个名为 |
默认情况下,模板生成自己的关联 ID(忽略任何用户提供的数值)。如果您希望使用自己的关联 ID,请将 RabbitTemplate
实例的 userCorrelationId
属性设置为 true
。
关联 ID 必须唯一,以避免为请求返回错误应答的可能性。
Reply Listener Container
在使用低于 3.4.0 版本的 RabbitMQ 时,会为每个回复使用新的临时队列。 但是,可以在模板上配置一个单独的回复队列,这可以更有效,并且还可以让您在该队列上设置参数。 然而,在这种情况下,您还必须提供一个<reply-listener/>子元素。 该元素为回复队列提供了一个监听器容器,模板是监听器。 <listener-container/>上允许的所有Message Listener Container Configuration属性都允许在该元素上使用,除了`connection-factory`和`message-converter`,它们是从模板的配置中继承的。
如果您运行多个应用程序实例或使用多个 RabbitTemplate
实例,则 MUST 为每个应用程序实例使用一个唯一的应答队列。RabbitMQ 无法从队列中选择消息,因此,如果它们都使用相同的队列,则每个实例都会争夺应答,而不一定能够收到它们自己的应答。
以下示例定义了一个带有连接工厂的 Rabbit 模板:
<rabbit:template id="amqpTemplate"
connection-factory="connectionFactory"
reply-queue="replies"
reply-address="replyEx/routeReply">
<rabbit:reply-listener/>
</rabbit:template>
虽然容器和模板共享一个连接工厂,但它们并不共享通道。因此,请求和答复不在同一事务中执行(如果它具有事务性)。
在版本 1.5.0 之前, |
通过此配置,SimpleListenerContainer
用于接收答复,其中 RabbitTemplate
是 MessageListener
。在模板中定义一个带有 <rabbit:template/>
命名空间元素时,如前一个示例所示,解析器将定义容器并将模板作为侦听器连接起来。
当模板不使用固定 |
如果您将 RabbitTemplate
定义为 <bean/>
或使用 @Configuration
类来将其定义为 @Bean
或以编程方式创建模板,则需要自行定义和连接回复侦听器容器。如果您未执行此操作,则模板永远不会收到答复,最终超时并将 null 返回为 sendAndReceive
方法调用的答复。
从 1.5 版开始,RabbitTemplate
检测是否已将它配置为 MessageListener
以接收答复。如果否,则尝试使用答复地址发送和接收消息会导致 IllegalStateException
(因为永远不会收到答复)。
此外,如果使用了简单的 replyAddress
(队列名称),则回复侦听器容器将验证它是否正在侦听具有相同名称的队列。如果答复地址是交换机和路由密钥,则无法执行此检查,并且会写入调试日志消息。
在自行连接回复侦听器和模板时,请务必确保模板的 replyAddress
和容器的 queues
(或 queueNames
)属性引用同一队列,模板将回复地址插入出站消息 replyTo
属性中。
以下清单显示了如何手动连接 bean 的示例:
<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory" />
<property name="exchange" value="foo.exchange" />
<property name="routingKey" value="foo" />
<property name="replyQueue" ref="replyQ" />
<property name="replyTimeout" value="600000" />
<property name="useDirectReplyToContainer" value="false" />
</bean>
<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<constructor-arg ref="connectionFactory" />
<property name="queues" ref="replyQ" />
<property name="messageListener" ref="amqpTemplate" />
</bean>
<rabbit:queue id="replyQ" name="my.reply.queue" />
@Bean
public RabbitTemplate amqpTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(msgConv());
rabbitTemplate.setReplyAddress(replyQueue().getName());
rabbitTemplate.setReplyTimeout(60000);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(amqpTemplate());
return container;
}
@Bean
public Queue replyQueue() {
return new Queue("my.reply.queue");
}
一个使用固定回复队列连接的 RabbitTemplate
的完整示例,以及处理请求并返回回复的 “remote” 侦听器容器显示在 this test case 中。
当回复超时(replyTimeout
)时,sendAndReceive()
方法返回 null。
在 1.3.6 版之前,超时的消息的延迟答复只会记录下来。现在,如果收到延迟答复,则会将其拒绝(模板会抛出 AmqpRejectAndDontRequeueException
)。如果回复队列配置为将被拒绝的消息发送到死信交换机,则可以检索答复以进行后续分析。为此,请使用与回复队列名称相等的路由密钥将队列绑定到配置的死信交换机。
有关配置死信的更多信息,请参见 RabbitMQ Dead Letter Documentation。您还可以查看 FixedReplyQueueDeadLetterTests
测试用例作为示例。
Async Rabbit Template
版本 1.6 引入了`AsyncRabbitTemplate`。 这具有与 AmqpTemplate
上类似的`sendAndReceive`(和`convertSendAndReceive`)方法。 但是,它们不阻塞,而是返回`CompletableFuture`。
sendAndReceive
方法返回 RabbitMessageFuture
。convertSendAndReceive
方法返回 RabbitConverterFuture
。
您可以通过在 future 上调用 get()
来同步检索结果,或者您可以注册一个回调,该回调将异步地使用结果调用。以下清单显示了两种方法:
@Autowired
private AsyncRabbitTemplate template;
...
public void doSomeWorkAndGetResultLater() {
...
CompletableFuture<String> future = this.template.convertSendAndReceive("foo");
// do some more work
String reply = null;
try {
reply = future.get(10, TimeUnit.SECONDS);
}
catch (ExecutionException e) {
...
}
...
}
public void doSomeWorkAndGetResultAsync() {
...
RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
future.whenComplete((result, ex) -> {
if (ex == null) {
// success
}
else {
// failure
}
});
...
}
如果设置了 mandatory
且无法传递消息,则 future 会抛出 ExecutionException
,其原因是 AmqpMessageReturnedException
,它封装了返回的消息和有关返回的信息。
如果设置了 enableConfirms
,则 future 具有一个名为 confirm
的属性,它本身是一个 CompletableFuture<Boolean>
,其中 true
表示发布成功。如果确认 future 为 false
,则 RabbitFuture
有一个名为 nackCause
的进一步属性,其中包含失败原因(如果可用)。
如果在回复之后收到发布确认,则将其丢弃,因为回复表示成功发布。
您可以设置模板上的 receiveTimeout
属性以使回复超时(它默认为 30000
- 30 秒)。如果发生超时,则 future 将完成 AmqpReplyTimeoutException
。
此模板实现 SmartLifecycle
。在有待处理的答复时停止模板会导致待处理的 Future
实例被取消。
从版本 2.0 开始,异步模板现在支持 direct reply-to,而不是配置的回复队列。要启用此功能,请使用以下构造函数之一:
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)
public AsyncRabbitTemplate(RabbitTemplate template)
请参见 RabbitMQ Direct reply-to,直接使用回复到与同步 RabbitTemplate
一起使用。
2.0 版引入了这些方法变体(convertSendAndReceiveAsType
),该变体需要一个附加 ParameterizedTypeReference
参数来转换复杂的返回类型。你必须使用 SmartMessageConverter
对底层 RabbitTemplate
进行配置。有关更多详细信息,请参阅 xref:amqp/message-converters.adoc#json-complex[Converting From a Message
With RabbitTemplate
。
从版本 3.0 开始,AsyncRabbitTemplate
方法现在返回 CompletableFuture
而不是 ListenableFuture
。