Reply Management
MessageListenerAdapter`中现有的支持已经让你的方法具有非空返回值类型。在这种情况下,调用的结果封装在发送到原始消息的`ReplyToAddress`头中指定的目标地址的消息中,或封装在侦听器上配置的默认地址中。你可以使用消息传递抽象的
@SendTo`注解设置该默认地址。
假设我们的`processOrder`方法现在应该返回`OrderStatus`,我们可以按如下方式编写它以自动发送回复:
@RabbitListener(destination = "myQueue")
@SendTo("status")
public OrderStatus processOrder(Order order) {
// order processing
return status;
}
如果你需要以与传输无关的方式设置其他头,你可以返回一个`Message`,如下所示:
@RabbitListener(destination = "myQueue")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
// order processing
return MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
}
另外,你可以在`beforeSendReplyMessagePostProcessors`容器工厂属性中使用`MessagePostProcessor`来添加更多头。从版本 2.2.3 开始,在回复消息中提供已调用的 Bean/方法,它可以在消息后处理器中用于将信息传达给调用者:
factory.setBeforeSendReplyPostProcessors(msg -> {
msg.getMessageProperties().setHeader("calledBean",
msg.getMessageProperties().getTargetBean().getClass().getSimpleName());
msg.getMessageProperties().setHeader("calledMethod",
msg.getMessageProperties().getTargetMethod().getName());
return m;
});
从 2.2.5 版本开始,你可以配置 ReplyPostProcessor
来修改回复消息在发送之前的信息;在针对请求设置 correlationId
标头之后会调用它。
@RabbitListener(queues = "test.header", group = "testGroup", replyPostProcessor = "echoCustomHeader")
public String capitalizeWithHeader(String in) {
return in.toUpperCase();
}
@Bean
public ReplyPostProcessor echoCustomHeader() {
return (req, resp) -> {
resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
return resp;
};
}
从 3.0 版本开始,可以在容器工厂中配置后置处理器,而不是通过注释进行配置。
factory.setReplyPostProcessorProvider(id -> (req, resp) -> {
resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
return resp;
});
id
参数是监听器的 ID。
注释中的设置可以替代工厂设置。
@SendTo
值被视为遵循 exchange/routingKey
模式的回复 exchange
和 routingKey
对,其中一部分可以省略。有效值如下:
-
thing1/thing2
:replyTo
交换和routingKey
.thing1/
:replyTo
交换和默认(空)routingKey
.thing2
或/thing2
:replyTo
routingKey
和默认(空)交换。/
或空:replyTo
默认交换和默认routingKey
。
此外,你可以在没有 value
属性的情况下使用 @SendTo
。在这种情况下,等效于空的 sendTo
模式。仅在入站消息没有 replyToAddress
属性的情况下才使用 @SendTo
。
从 1.5 版本开始,@SendTo
值可以是 bean 初始化 SpEL 表达式,如下例所示:
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
return foo.toUpperCase();
}
...
@Bean
public String spelReplyTo() {
return "test.sendTo.reply.spel";
}
该表达式必须计算为一个 String
,该值可以是简单的队列名称(发送到默认交换器)或如在前一个示例之前讨论的那样采用 exchange/routingKey
的形式。
|
对于动态回复路由,消息发送方应包含 reply_to
消息属性或使用交替运行时 SpEL 表达式(在下一个示例之后描述)。
从 1.6 版本开始,@SendTo
可以是针对请求和回复计算的 SpEL 表达式,如下例所示:
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("!{'some.reply.queue.with.' + result.queueName}")
public Bar capitalizeWithSendToSpel(Foo foo) {
return processTheFooAndReturnABar(foo);
}
SpEL 表达式的运行时性质用 !{…}
标识符表示。表达式 #root
对象的计算环境有三个属性:
-
request
:o.s.amqp.core.Message
请求对象。 -
source
:o.s.messaging.Message<?>
(转换之后)。 -
result
: The method result.
环境具有一个映射属性访问器、一个标准类型转换器和一个 bean 解析器,允许引用环境中的其他 bean(例如,@someBeanName.determineReplyQ(request, result)
)。
总之,在初始化期间使用 root
对象计算 {…}
一次,该对象表示应用程序环境。Bean 由其名称引用。为每个消息计算 !{…}
一次,其中根对象包含前面列出的属性。Bean 用其名称引用,并带有 @
前缀。
从 2.1 版本开始,也支持简单的属性占位符(例如,${some.reply.to}
).对于较早的版本,可以使用以下内容作为解决方法,如下例所示:
@RabbitListener(queues = "foo")
@SendTo("#{environment['my.send.to']}")
public String listen(Message in) {
...
return ...
}