Reply Management

MessageListenerAdapter`中现有的支持已经让你的方法具有非空返回值类型。在这种情况下,调用的结果封装在发送到原始消息的`ReplyToAddress`头中指定的目标地址的消息中,或封装在侦听器上配置的默认地址中。你可以使用消息传递抽象的@SendTo`注解设置该默认地址。

假设我们的`processOrder`方法现在应该返回`OrderStatus`,我们可以按如下方式编写它以自动发送回复:

@RabbitListener(destination = "myQueue")
@SendTo("status")
public OrderStatus processOrder(Order order) {
    // order processing
    return status;
}

如果你需要以与传输无关的方式设置其他头,你可以返回一个`Message`,如下所示:

@RabbitListener(destination = "myQueue")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
    // order processing
    return MessageBuilder
        .withPayload(status)
        .setHeader("code", 1234)
        .build();
}

另外,你可以在`beforeSendReplyMessagePostProcessors`容器工厂属性中使用`MessagePostProcessor`来添加更多头。从版本 2.2.3 开始,在回复消息中提供已调用的 Bean/方法,它可以在消息后处理器中用于将信息传达给调用者:

factory.setBeforeSendReplyPostProcessors(msg -> {
    msg.getMessageProperties().setHeader("calledBean",
            msg.getMessageProperties().getTargetBean().getClass().getSimpleName());
    msg.getMessageProperties().setHeader("calledMethod",
            msg.getMessageProperties().getTargetMethod().getName());
    return m;
});

从 2.2.5 版本开始,你可以配置 ReplyPostProcessor 来修改回复消息在发送之前的信息;在针对请求设置 correlationId 标头之后会调用它。

@RabbitListener(queues = "test.header", group = "testGroup", replyPostProcessor = "echoCustomHeader")
public String capitalizeWithHeader(String in) {
    return in.toUpperCase();
}

@Bean
public ReplyPostProcessor echoCustomHeader() {
    return (req, resp) -> {
        resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
        return resp;
    };
}

从 3.0 版本开始,可以在容器工厂中配置后置处理器,而不是通过注释进行配置。

factory.setReplyPostProcessorProvider(id -> (req, resp) -> {
    resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
    return resp;
});

id 参数是监听器的 ID。

注释中的设置可以替代工厂设置。

@SendTo 值被视为遵循 exchange/routingKey 模式的回复 exchangeroutingKey 对,其中一部分可以省略。有效值如下:

  • thing1/thing2replyTo 交换和 routingKey.thing1/replyTo 交换和默认(空)routingKey.thing2/thing2replyTo routingKey 和默认(空)交换。/ 或空:replyTo 默认交换和默认 routingKey

此外,你可以在没有 value 属性的情况下使用 @SendTo。在这种情况下,等效于空的 sendTo 模式。仅在入站消息没有 replyToAddress 属性的情况下才使用 @SendTo

从 1.5 版本开始,@SendTo 值可以是 bean 初始化 SpEL 表达式,如下例所示:

@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
    return foo.toUpperCase();
}
...
@Bean
public String spelReplyTo() {
    return "test.sendTo.reply.spel";
}

该表达式必须计算为一个 String,该值可以是简单的队列名称(发送到默认交换器)或如在前一个示例之前讨论的那样采用 exchange/routingKey 的形式。

#{…​} 表达式在初始化期间计算一次。

对于动态回复路由,消息发送方应包含 reply_to 消息属性或使用交替运行时 SpEL 表达式(在下一个示例之后描述)。

从 1.6 版本开始,@SendTo 可以是针对请求和回复计算的 SpEL 表达式,如下例所示:

@RabbitListener(queues = "test.sendTo.spel")
@SendTo("!{'some.reply.queue.with.' + result.queueName}")
public Bar capitalizeWithSendToSpel(Foo foo) {
    return processTheFooAndReturnABar(foo);
}

SpEL 表达式的运行时性质用 !{…​} 标识符表示。表达式 #root 对象的计算环境有三个属性:

  • requesto.s.amqp.core.Message 请求对象。

  • sourceo.s.messaging.Message&lt;?&gt;(转换之后)。

  • result: The method result.

环境具有一个映射属性访问器、一个标准类型转换器和一个 bean 解析器,允许引用环境中的其他 bean(例如,@someBeanName.determineReplyQ(request, result))。

总之,在初始化期间使用 root 对象计算 {…​} 一次,该对象表示应用程序环境。Bean 由其名称引用。为每个消息计算 !{…​} 一次,其中根对象包含前面列出的属性。Bean 用其名称引用,并带有 @ 前缀。

从 2.1 版本开始,也支持简单的属性占位符(例如,${some.reply.to}).对于较早的版本,可以使用以下内容作为解决方法,如下例所示:

@RabbitListener(queues = "foo")
@SendTo("#{environment['my.send.to']}")
public String listen(Message in) {
    ...
    return ...
}