Reply Management

MessageListenerAdapter`中现有的支持已经让你的方法具有非空返回值类型。在这种情况下,调用的结果封装在发送到原始消息的`ReplyToAddress`头中指定的目标地址的消息中,或封装在侦听器上配置的默认地址中。你可以使用消息传递抽象的@SendTo`注解设置该默认地址。

The existing support in MessageListenerAdapter already lets your method have a non-void return type. When that is the case, the result of the invocation is encapsulated in a message sent to the address specified in the ReplyToAddress header of the original message, or to the default address configured on the listener. You can set that default address by using the @SendTo annotation of the messaging abstraction.

假设我们的`processOrder`方法现在应该返回`OrderStatus`,我们可以按如下方式编写它以自动发送回复:

Assuming our processOrder method should now return an OrderStatus, we can write it as follows to automatically send a reply:

@RabbitListener(destination = "myQueue")
@SendTo("status")
public OrderStatus processOrder(Order order) {
    // order processing
    return status;
}

如果你需要以与传输无关的方式设置其他头,你可以返回一个`Message`,如下所示:

If you need to set additional headers in a transport-independent manner, you could return a Message instead, something like the following:

@RabbitListener(destination = "myQueue")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
    // order processing
    return MessageBuilder
        .withPayload(status)
        .setHeader("code", 1234)
        .build();
}

另外,你可以在`beforeSendReplyMessagePostProcessors`容器工厂属性中使用`MessagePostProcessor`来添加更多头。从版本 2.2.3 开始,在回复消息中提供已调用的 Bean/方法,它可以在消息后处理器中用于将信息传达给调用者:

Alternatively, you can use a MessagePostProcessor in the beforeSendReplyMessagePostProcessors container factory property to add more headers. Starting with version 2.2.3, the called bean/method is made available in the reply message, which can be used in a message post processor to communicate the information back to the caller:

factory.setBeforeSendReplyPostProcessors(msg -> {
    msg.getMessageProperties().setHeader("calledBean",
            msg.getMessageProperties().getTargetBean().getClass().getSimpleName());
    msg.getMessageProperties().setHeader("calledMethod",
            msg.getMessageProperties().getTargetMethod().getName());
    return m;
});

从 2.2.5 版本开始,你可以配置 ReplyPostProcessor 来修改回复消息在发送之前的信息;在针对请求设置 correlationId 标头之后会调用它。

Starting with version 2.2.5, you can configure a ReplyPostProcessor to modify the reply message before it is sent; it is called after the correlationId header has been set up to match the request.

@RabbitListener(queues = "test.header", group = "testGroup", replyPostProcessor = "echoCustomHeader")
public String capitalizeWithHeader(String in) {
    return in.toUpperCase();
}

@Bean
public ReplyPostProcessor echoCustomHeader() {
    return (req, resp) -> {
        resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
        return resp;
    };
}

从 3.0 版本开始,可以在容器工厂中配置后置处理器,而不是通过注释进行配置。

Starting with version 3.0, you can configure the post processor on the container factory instead of on the annotation.

factory.setReplyPostProcessorProvider(id -> (req, resp) -> {
    resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
    return resp;
});

id 参数是监听器的 ID。

The id parameter is the listener id.

注释中的设置可以替代工厂设置。

A setting on the annotation will supersede the factory setting.

@SendTo 值被视为遵循 exchange/routingKey 模式的回复 exchangeroutingKey 对,其中一部分可以省略。有效值如下:

The @SendTo value is assumed as a reply exchange and routingKey pair that follows the exchange/routingKey pattern, where one of those parts can be omitted. The valid values are as follows:

  • thing1/thing2: The replyTo exchange and the routingKey. thing1/: The replyTo exchange and the default (empty) routingKey. thing2 or /thing2: The replyTo routingKey and the default (empty) exchange. / or empty: The replyTo default exchange and the default routingKey.

此外,你可以在没有 value 属性的情况下使用 @SendTo。在这种情况下,等效于空的 sendTo 模式。仅在入站消息没有 replyToAddress 属性的情况下才使用 @SendTo

Also, you can use @SendTo without a value attribute. This case is equal to an empty sendTo pattern. @SendTo is used only if the inbound message does not have a replyToAddress property.

从 1.5 版本开始,@SendTo 值可以是 bean 初始化 SpEL 表达式,如下例所示:

Starting with version 1.5, the @SendTo value can be a bean initialization SpEL Expression, as shown in the following example:

@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
    return foo.toUpperCase();
}
...
@Bean
public String spelReplyTo() {
    return "test.sendTo.reply.spel";
}

该表达式必须计算为一个 String,该值可以是简单的队列名称(发送到默认交换器)或如在前一个示例之前讨论的那样采用 exchange/routingKey 的形式。

The expression must evaluate to a String, which can be a simple queue name (sent to the default exchange) or with the form exchange/routingKey as discussed prior to the preceding example.

#{…​} 表达式在初始化期间计算一次。

The #{…​} expression is evaluated once, during initialization.

对于动态回复路由,消息发送方应包含 reply_to 消息属性或使用交替运行时 SpEL 表达式(在下一个示例之后描述)。

For dynamic reply routing, the message sender should include a reply_to message property or use the alternate runtime SpEL expression (described after the next example).

从 1.6 版本开始,@SendTo 可以是针对请求和回复计算的 SpEL 表达式,如下例所示:

Starting with version 1.6, the @SendTo can be a SpEL expression that is evaluated at runtime against the request and reply, as the following example shows:

@RabbitListener(queues = "test.sendTo.spel")
@SendTo("!{'some.reply.queue.with.' + result.queueName}")
public Bar capitalizeWithSendToSpel(Foo foo) {
    return processTheFooAndReturnABar(foo);
}

SpEL 表达式的运行时性质用 !{…​} 标识符表示。表达式 #root 对象的计算环境有三个属性:

The runtime nature of the SpEL expression is indicated with !{…​} delimiters. The evaluation context #root object for the expression has three properties:

  • request: The o.s.amqp.core.Message request object.

  • source: The o.s.messaging.Message<?> after conversion.

  • result: The method result.

环境具有一个映射属性访问器、一个标准类型转换器和一个 bean 解析器,允许引用环境中的其他 bean(例如,@someBeanName.determineReplyQ(request, result))。

The context has a map property accessor, a standard type converter, and a bean resolver, which lets other beans in the context be referenced (for example, @someBeanName.determineReplyQ(request, result)).

总之,在初始化期间使用 root 对象计算 {…​} 一次,该对象表示应用程序环境。Bean 由其名称引用。为每个消息计算 !{…​} 一次,其中根对象包含前面列出的属性。Bean 用其名称引用,并带有 @ 前缀。

In summary, #{…​} is evaluated once during initialization, with the #root object being the application context. Beans are referenced by their names. !{…​} is evaluated at runtime for each message, with the root object having the properties listed earlier. Beans are referenced with their names, prefixed by @.

从 2.1 版本开始,也支持简单的属性占位符(例如,${some.reply.to}).对于较早的版本,可以使用以下内容作为解决方法,如下例所示:

Starting with version 2.1, simple property placeholders are also supported (for example, ${some.reply.to}). With earlier versions, the following can be used as a work around, as the following example shows:

@RabbitListener(queues = "foo")
@SendTo("#{environment['my.send.to']}")
public String listen(Message in) {
    ...
    return ...
}