Sending Messages

在发送消息时,您可以使用以下任何方法:

void send(Message message) throws AmqpException;

void send(String routingKey, Message message) throws AmqpException;

void send(String exchange, String routingKey, Message message) throws AmqpException;

我们可以从此前列表中的最后一个方法开始讨论,因为这实际上是最明确的。它允许在运行时提供 AMQP 交换名称(连同路由键)。最后一个参数是要负责实际创建消息实例的回调。使用此方法发送消息的一个示例可能如下所示:以下示例演示如何使用 send 方法发送消息:

amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
    new Message("12.34".getBytes(), someProperties));

如果您打算使用该模板实例在大多数或所有时间向同一个交换发送消息,则可以在模板本身上设置 exchange 属性。在这些情况下,您可以使用此前列表中的第二个方法。以下示例在功能上等同于之前的示例:

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));

如果模板上同时设置了 exchangeroutingKey 属性,则可以使用只接受 Message 的该方法。以下示例演示如何执行此操作:

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));

考虑交换和路由键属性的一个更好的方法是,显式方法参数始终会覆盖模板的默认值。事实上,即使您不在模板上显式设置这些属性,也会始终有默认值。在这两种情况下,默认值都是一个空的 String,但这实际上是一个明智的默认值。就路由键而言,首先它并不总是有必要(例如,对于 Fanout 交换)。此外,队列可能绑定到一个具有空 String 的交换。对于模板的路由键属性,依赖空 String 默认值这两个都是合理的情况。就交换名称而言,经常使用空 String,因为 AMQP 规范将“default exchange” 定义为没有名称。由于所有队列都自动绑定到该默认交换(这是一个直接交换),并且使用其名称作为绑定值,因此前面列表中的第二个方法可用于通过默认交换进行简单的点对点消息传递给任何队列。您可以将队列名称作为 routingKey 提供,方法是在运行时提供方法参数。以下示例演示如何执行此操作:

RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));

或者,您可以创建一个模板,该模板主要或完全用于发布到单个队列。以下示例演示如何执行此操作:

RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));

Message Builder API

从版本 1.3 开始,MessageBuilderMessagePropertiesBuilder 提供了消息构建器 API。这些方法提供了一种便捷的“fluent”方式来创建消息或消息属性。以下示例展示了 Fluent API 的实际应用:

Message message = MessageBuilder.withBody("foo".getBytes())
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();
MessageProperties props = MessagePropertiesBuilder.newInstance()
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();
Message message = MessageBuilder.withBody("foo".getBytes())
    .andProperties(props)
    .build();

可以在 MessageProperties上定义的每个属性进行设置。 其他方法包括`setHeader(String key, String value), `removeHeader(String key), removeHeaders(), 和 copyProperties(MessageProperties properties)。 每个属性设置方法都有`set*IfAbsent()变体。 如果存在默认初始值,则方法名为 `set*IfAbsentOrDefault()

提供五个静态方法来创建初始消息构建器:

public static MessageBuilder withBody(byte[] body) 1

public static MessageBuilder withClonedBody(byte[] body) 2

public static MessageBuilder withBody(byte[] body, int from, int to) 3

public static MessageBuilder fromMessage(Message message) 4

public static MessageBuilder fromClonedMessage(Message message) 5
1 由生成器创建的消息有一个主体,该主体是对该参数的直接引用。
2 由生成器创建的消息有一个主体,该主体是一个包含参数中字节副本的新数组。
3 由构建器创建的消息主体是一个新数组,其中包含来自参数的字节范围。有关更多详细信息,请参见 Arrays.copyOfRange()
4 由生成器创建的消息有一个主体,该主体是对参数主体直接的引用。参数的属性复制到一个新的 MessageProperties 对象。
5 由生成器创建的消息有一个主体,该主体是一个包含参数的主体副本的新数组。参数的属性复制到一个新的 MessageProperties 对象。

提供三个静态方法来创建 MessagePropertiesBuilder 实例:

public static MessagePropertiesBuilder newInstance() 1

public static MessagePropertiesBuilder fromProperties(MessageProperties properties) 2

public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) 3
1 一个新的消息属性对象使用默认值初始化。
2 生成器使用提供的属性对象初始化,并且 build() 将返回该对象。
3 参数的属性复制到一个新的 MessageProperties 对象。

使用 RabbitTemplateAmqpTemplate 的实现,每个 send() 方法都有一个重载版本,该版本接受一个附加的 CorrelationData 对象。当发布者确认已启用时,此对象将在 AmqpTemplate 中描述的回调中返回。这可以让发送方将确认(acknack)与发送的消息关联。

从版本 1.6.7 开始,引入了 CorrelationAwareMessagePostProcessor 接口,允许在消息转换后修改关联数据。以下示例演示如何使用它:

Message postProcessMessage(Message message, Correlation correlation);

在版本 2.0 中,弃用了此接口。该方法已移动到 MessagePostProcessor,带有委托到 postProcessMessage(Message message) 的默认实现。

从版本 1.6.7 开始,还提供了名为 CorrelationDataPostProcessor 的新回调接口。它在所有 MessagePostProcessor 实例(在 send() 方法中提供,以及在 setBeforePublishPostProcessors() 中提供)之后被调用。实现可以更新或替换 send() 方法中提供的关联数据(如果存在)。提供 Message 和原始 CorrelationData(如果存在)作为参数。以下示例演示如何使用 postProcess 方法:

CorrelationData postProcess(Message message, CorrelationData correlationData);

Publisher Returns

当模板的 mandatory 属性为 true 时,返回的消息将由 AmqpTemplate 中描述的回调提供。

从 1.4 版开始,RabbitTemplate 支持 SpEL mandatoryExpression 属性,该属性针对每个请求消息(作为根评估对象)进行评估,从而解析为一个 boolean 值。可以在表达式中使用 Bean 引用,例如 @myBean.isMandatory(#root)

发布者返回还可以由 RabbitTemplate 在发送和接收操作中内部使用。有关更多信息,请参阅 Reply Timeout

Batching

1.4.2 版引入了 BatchingRabbitTemplate。这是一个 RabbitTemplate 的子类,其重写了 send 方法,该方法根据 BatchingStrategy 对消息进行批处理。只有在一个批次完成时,才会将消息发送到 RabbitMQ。以下清单显示了 BatchingStrategy 接口的定义:

public interface BatchingStrategy {

    MessageBatch addToBatch(String exchange, String routingKey, Message message);

    Date nextRelease();

    Collection<MessageBatch> releaseBatches();

}

批处理数据保存在内存中。如果系统出现故障,则待发送的消息可能会丢失。

SimpleBatchingStrategy 已提供。它支持将消息发送到单个 exchange 或路由密钥。它具有以下属性:

  • batchSize:批量中消息的数量,在消息发送之前。

  • bufferLimit:批处理消息的最大大小。如果超过该大小,将会抢占 batchSize,并且会发送一个部分批处理。

  • timeout:没有新活动向批处理添加消息后,发送部分批处理的时间。

SimpleBatchingStrategy 通过在每个嵌入的消息前加上一个四字节的二进制长度来格式化批次。通过将 springBatchFormat 消息属性设置为 lengthHeader4 来向接收系统传达这一点。

在默认情况下,侦听器容器会自动取消批处理消息(通过使用 springBatchFormat 消息头)。拒绝批量中的任何消息将导致拒绝整个批量。

但是,有关详细信息,请参阅 @RabbitListener with Batching