Request/Reply Messaging

AmqpTemplate 提供多种 sendAndReceive 方法,用于实现请求-应答模式。这些方法配置必要的消息属性(如相关数据和回复队列),并在内部侦听回复消息。最新版本还支持 direct reply-to,从而消除了固定回复队列的需要。同时,AsyncRabbitTemplate 提供了异步 sendAndReceive 方法,返回 CompletableFuture 以处理请求-应答操作,并支持 direct reply-to。

AmqpTemplate 还提供多种 sendAndReceive 方法,它接受先前为单路发送操作(exchangeroutingKeyMessage)描述的相同参数选项。这些方法对于请求答复场景非常有用,因为它们在发送之前处理必要的 reply-to 特性的配置,并且可以在内部为该目的创建的独占队列中侦听答复消息。 同样可用的请求-回复方法是 MessageConverter 同时应用于请求和回复。这些方法命名为 convertSendAndReceive。有关更多详细信息,请参阅 Javadoc of AmqpTemplate。 从版本 1.5.0 开始,每个 sendAndReceive 方法变量都有一个重载版本,其中包含 CorrelationData。与经过适当配置的连接工厂结合使用后,这将启用接收操作发送侧的发布者确认。有关更多信息,请参见 xref:amqp/template.adoc#template-confirms[Correlated Publisher Confirms and ReturnsJavadoc for RabbitOperations。 从版本 2.0 开始,这些方法(convertSendAndReceiveAsType)的变体需要一个附加 ParameterizedTypeReference 参数来转换复杂的返回类型。模板必须使用 SmartMessageConverter 进行配置。有关更多详细信息,请参阅 xref:amqp/message-converters.adoc#json-complex[Converting From a Message With RabbitTemplate。 从 2.1 版开始,您可以使用 noLocalReplyConsumer 选项配置 RabbitTemplate 以控制答复消费者的 noLocal 标志。默认情况下,此值为 false

Reply Timeout

默认情况下,发送和接收方法在五秒后超时并返回 null。您可以通过设置 replyTimeout 属性修改此行为。从 1.5 版本开始,如果您将 mandatory 属性设置为 true (或 mandatory-expression 对特定消息求值为 true),则如果无法将消息发送到队列,则会引发 AmqpMessageReturnedException。此异常具有 returnedMessagereplyCodereplyText 属性,以及用于发送的 exchangeroutingKey

此功能使用发布者返回。可以通过将 CachingConnectionFactory 上的 publisherReturns 设置为 true 来启用此功能(请参阅 Publisher Confirms and Returns)。此外,您一定不能使用 RabbitTemplate 注册自己的 ReturnCallback

从版本 2.1.2 开始,已添加 replyTimedOut 方法,让子类可以得知超时,以便它们可以清除任何保留的状态。

从 2.0.11 和 2.1.3 版本开始,当您使用默认 DirectReplyToMessageListenerContainer 时,可以通过设置模板的 replyErrorHandler 属性来添加错误处理程序。对于任何失败的交付(例如:延迟答复和未关联消息头收到的消息),都会调用此错误处理程序。传递的异常是 ListenerExecutionFailedException,它具有 failedMessage 属性。

RabbitMQ Direct reply-to

从 3.4.0 版本开始,RabbitMQ 服务器支持 direct reply-to。这消除了固定应答队列的主要原因(为了避免为每个请求创建临时队列)。从 Spring AMQP 1.4.1 版本开始,如果服务器支持,会默认使用直接应答接收(direct reply-to),而不会创建临时应答队列。当未提供 replyQueue(或将名称设置为 amq.rabbitmq.reply-to)时,RabbitTemplate 会自动检测是否支持直接应答接收,进而使用该功能或使用临时应答队列作为后备。当使用直接应答接收时,不需要 reply-listener,也不应进行配置。

命名队列(而不是 amq.rabbitmq.reply-to)仍然支持答复侦听器,从而实现对答复并发性的控制,等等。

从版本 1.6 开始,如果您希望为每个答复使用临时、独占、自动删除队列,请将 useTemporaryReplyQueues 属性设置为 true。如果您设置 replyAddress,则将忽略此属性。

您可以更改决定是否使用直接答复的条件,通过子类化 RabbitTemplate 并覆盖 useDirectReplyTo() 以检查不同的条件。此方法仅在第一次发送请求时调用一次。

在版本 2.0 之前,RabbitTemplate 为每个请求创建一个新消费者,并在收到答复(或超时)时取消该消费者。现在,模板转而使用 DirectReplyToMessageListenerContainer,让消费者可以重复使用。模板仍然负责关联答复,因此不存在延迟答复转到其他发送者的风险。如果您想要恢复到以前的行为,请将 useDirectReplyToContainer(在使用 XML 配置时为 direct-reply-to-container)属性设置为 false。

AsyncRabbitTemplate 没有这样的选项。当使用直接答复时,它始终针对答复使用 DirectReplyToContainer

从版本 2.3.7 开始,模板具有一个新属性 useChannelForCorrelation。当此值为 true 时,服务器不必将关联 ID 从请求消息头复制到答复消息。相反,用于发送请求的通道用于关联答复到请求。

Message Correlation With A Reply Queue

在使用固定回复队列(而不是 amq.rabbitmq.reply-to)时,您必须提供相关性数据,以便将回复与请求关联起来。请参见 RabbitMQ Remote Procedure Call (RPC)。默认情况下,使用标准 correlationId 属性来保存相关性数据。但是,如果您希望使用自定义属性来保存相关性数据,则可以在 <rabbit-template/> 上设置 correlation-key 属性。显式将属性设置为 correlationId 与省略属性相同。客户端和服务器必须使用相同的标头来获取相关性数据。

Spring AMQP 版本 1.1 为此数据使用了一个名为 spring_reply_correlation 的自定义属性。如果您希望使用当前版本恢复此行为(可能为了与另一个使用 1.1 的应用程序保持兼容性),则必须将属性设置为 spring_reply_correlation

默认情况下,模板生成自己的关联 ID(忽略任何用户提供的数值)。如果您希望使用自己的关联 ID,请将 RabbitTemplate 实例的 userCorrelationId 属性设置为 true

关联 ID 必须唯一,以避免为请求返回错误应答的可能性。

Reply Listener Container

在使用低于 3.4.0 版本的 RabbitMQ 时,会为每个回复使用新的临时队列。 但是,可以在模板上配置一个单独的回复队列,这可以更有效,并且还可以让您在该队列上设置参数。 然而,在这种情况下,您还必须提供一个<reply-listener/>子元素。 该元素为回复队列提供了一个监听器容器,模板是监听器。 <listener-container/>上允许的所有Message Listener Container Configuration属性都允许在该元素上使用,除了`connection-factory`和`message-converter`,它们是从模板的配置中继承的。

如果您运行多个应用程序实例或使用多个 RabbitTemplate 实例,则 MUST 为每个应用程序实例使用一个唯一的应答队列。RabbitMQ 无法从队列中选择消息,因此,如果它们都使用相同的队列,则每个实例都会争夺应答,而不一定能够收到它们自己的应答。

以下示例定义了一个带有连接工厂的 Rabbit 模板:

<rabbit:template id="amqpTemplate"
        connection-factory="connectionFactory"
        reply-queue="replies"
        reply-address="replyEx/routeReply">
    <rabbit:reply-listener/>
</rabbit:template>

虽然容器和模板共享一个连接工厂,但它们并不共享通道。因此,请求和答复不在同一事务中执行(如果它具有事务性)。

在版本 1.5.0 之前,reply-address 属性不可用。应答总是通过使用默认交换机和 reply-queue 名称作为路由键进行路由的。这仍然是默认设置,但是您现在可以指定新的 reply-address 属性。reply-address 可以包含格式为 <exchange>/<routingKey> 的地址,并且应答路由到指定交换机并路由到与路由键绑定的队列。reply-address 优先于 reply-queue。当仅使用 reply-address 时,必须将 <reply-listener> 配置为一个单独的 <listener-container> 组件。reply-addressreply-queue(或 <listener-container> 上的 queues 属性)必须在逻辑上参考同一个队列。

通过此配置,SimpleListenerContainer 用于接收答复,其中 RabbitTemplateMessageListener。在模板中定义一个带有 <rabbit:template/> 命名空间元素时,如前一个示例所示,解析器将定义容器并将模板作为侦听器连接起来。

当模板不使用固定 replyQueue(或正在使用直接应答接收——请参阅 RabbitMQ Direct reply-to)时,不需要侦听器容器。使用 RabbitMQ 3.4.0 或更高版本时,直接 reply-to 是首选机制。

如果您将 RabbitTemplate 定义为 <bean/> 或使用 @Configuration 类来将其定义为 @Bean 或以编程方式创建模板,则需要自行定义和连接回复侦听器容器。如果您未执行此操作,则模板永远不会收到答复,最终超时并将 null 返回为 sendAndReceive 方法调用的答复。

从 1.5 版开始,RabbitTemplate 检测是否已将它配置为 MessageListener 以接收答复。如果否,则尝试使用答复地址发送和接收消息会导致 IllegalStateException(因为永远不会收到答复)。

此外,如果使用了简单的 replyAddress(队列名称),则回复侦听器容器将验证它是否正在侦听具有相同名称的队列。如果答复地址是交换机和路由密钥,则无法执行此检查,并且会写入调试日志消息。

在自行连接回复侦听器和模板时,请务必确保模板的 replyAddress 和容器的 queues(或 queueNames)属性引用同一队列,模板将回复地址插入出站消息 replyTo 属性中。

以下清单显示了如何手动连接 bean 的示例:

<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <constructor-arg ref="connectionFactory" />
    <property name="exchange" value="foo.exchange" />
    <property name="routingKey" value="foo" />
    <property name="replyQueue" ref="replyQ" />
    <property name="replyTimeout" value="600000" />
    <property name="useDirectReplyToContainer" value="false" />
</bean>

<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <constructor-arg ref="connectionFactory" />
    <property name="queues" ref="replyQ" />
    <property name="messageListener" ref="amqpTemplate" />
</bean>

<rabbit:queue id="replyQ" name="my.reply.queue" />
    @Bean
    public RabbitTemplate amqpTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(msgConv());
        rabbitTemplate.setReplyAddress(replyQueue().getName());
        rabbitTemplate.setReplyTimeout(60000);
        rabbitTemplate.setUseDirectReplyToContainer(false);
        return rabbitTemplate;
    }

    @Bean
    public SimpleMessageListenerContainer replyListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setQueues(replyQueue());
        container.setMessageListener(amqpTemplate());
        return container;
    }

    @Bean
    public Queue replyQueue() {
        return new Queue("my.reply.queue");
    }

一个使用固定回复队列连接的 RabbitTemplate 的完整示例,以及处理请求并返回回复的 “remote” 侦听器容器显示在 this test case 中。

当回复超时(replyTimeout)时,sendAndReceive() 方法返回 null。

在 1.3.6 版之前,超时的消息的延迟答复只会记录下来。现在,如果收到延迟答复,则会将其拒绝(模板会抛出 AmqpRejectAndDontRequeueException)。如果回复队列配置为将被拒绝的消息发送到死信交换机,则可以检索答复以进行后续分析。为此,请使用与回复队列名称相等的路由密钥将队列绑定到配置的死信交换机。

有关配置死信的更多信息,请参见 RabbitMQ Dead Letter Documentation。您还可以查看 FixedReplyQueueDeadLetterTests 测试用例作为示例。

Async Rabbit Template

版本 1.6 引入了`AsyncRabbitTemplate`。 这具有与 AmqpTemplate上类似的`sendAndReceive`(和`convertSendAndReceive`)方法。 但是,它们不阻塞,而是返回`CompletableFuture`。

sendAndReceive 方法返回 RabbitMessageFutureconvertSendAndReceive 方法返回 RabbitConverterFuture

您可以通过在 future 上调用 get() 来同步检索结果,或者您可以注册一个回调,该回调将异步地使用结果调用。以下清单显示了两种方法:

@Autowired
private AsyncRabbitTemplate template;

...

public void doSomeWorkAndGetResultLater() {

    ...

    CompletableFuture<String> future = this.template.convertSendAndReceive("foo");

    // do some more work

    String reply = null;
    try {
        reply = future.get(10, TimeUnit.SECONDS);
    }
    catch (ExecutionException e) {
        ...
    }

    ...

}

public void doSomeWorkAndGetResultAsync() {

    ...

    RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            // success
        }
        else {
            // failure
        }
    });

    ...

}

如果设置了 mandatory 且无法传递消息,则 future 会抛出 ExecutionException,其原因是 AmqpMessageReturnedException,它封装了返回的消息和有关返回的信息。

如果设置了 enableConfirms,则 future 具有一个名为 confirm 的属性,它本身是一个 CompletableFuture<Boolean>,其中 true 表示发布成功。如果确认 future 为 false,则 RabbitFuture 有一个名为 nackCause 的进一步属性,其中包含失败原因(如果可用)。

如果在回复之后收到发布确认,则将其丢弃,因为回复表示成功发布。

您可以设置模板上的 receiveTimeout 属性以使回复超时(它默认为 30000 - 30 秒)。如果发生超时,则 future 将完成 AmqpReplyTimeoutException

此模板实现 SmartLifecycle。在有待处理的答复时停止模板会导致待处理的 Future 实例被取消。

从版本 2.0 开始,异步模板现在支持 direct reply-to,而不是配置的回复队列。要启用此功能,请使用以下构造函数之一:

public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)

public AsyncRabbitTemplate(RabbitTemplate template)

请参见 RabbitMQ Direct reply-to,直接使用回复到与同步 RabbitTemplate 一起使用。

2.0 版引入了这些方法变体(convertSendAndReceiveAsType),该变体需要一个附加 ParameterizedTypeReference 参数来转换复杂的返回类型。你必须使用 SmartMessageConverter 对底层 RabbitTemplate 进行配置。有关更多详细信息,请参阅 xref:amqp/message-converters.adoc#json-complex[Converting From a Message With RabbitTemplate

从版本 3.0 开始,AsyncRabbitTemplate 方法现在返回 CompletableFuture 而不是 ListenableFuture

Spring Remoting with AMQP

不再支持 Spring remoting,因为该功能已从 Spring 框架中移除。

改为使用 RabbitTemplate(客户端)和 @RabbitListener 上的 sendAndReceive 操作。