Sending Messages

在发送消息时,您可以使用以下任何方法:

When sending a message, you can use any of the following methods:

void send(Message message) throws AmqpException;

void send(String routingKey, Message message) throws AmqpException;

void send(String exchange, String routingKey, Message message) throws AmqpException;

我们可以从此前列表中的最后一个方法开始讨论,因为这实际上是最明确的。它允许在运行时提供 AMQP 交换名称(连同路由键)。最后一个参数是要负责实际创建消息实例的回调。使用此方法发送消息的一个示例可能如下所示:以下示例演示如何使用 send 方法发送消息:

We can begin our discussion with the last method in the preceding listing, since it is actually the most explicit. It lets an AMQP exchange name (along with a routing key)be provided at runtime. The last parameter is the callback that is responsible for actual creating the message instance. An example of using this method to send a message might look like this: The following example shows how to use the send method to send a message:

amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
    new Message("12.34".getBytes(), someProperties));

如果您打算使用该模板实例在大多数或所有时间向同一个交换发送消息,则可以在模板本身上设置 exchange 属性。在这些情况下,您可以使用此前列表中的第二个方法。以下示例在功能上等同于之前的示例:

You can set the exchange property on the template itself if you plan to use that template instance to send to the same exchange most or all of the time. In such cases, you can use the second method in the preceding listing. The following example is functionally equivalent to the previous example:

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));

如果模板上同时设置了 exchangeroutingKey 属性,则可以使用只接受 Message 的该方法。以下示例演示如何执行此操作:

If both the exchange and routingKey properties are set on the template, you can use the method that accepts only the Message. The following example shows how to do so:

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));

考虑交换和路由键属性的一个更好的方法是,显式方法参数始终会覆盖模板的默认值。事实上,即使您不在模板上显式设置这些属性,也会始终有默认值。在这两种情况下,默认值都是一个空的 String,但这实际上是一个明智的默认值。就路由键而言,首先它并不总是有必要(例如,对于 Fanout 交换)。此外,队列可能绑定到一个具有空 String 的交换。对于模板的路由键属性,依赖空 String 默认值这两个都是合理的情况。就交换名称而言,经常使用空 String,因为 AMQP 规范将“default exchange” 定义为没有名称。由于所有队列都自动绑定到该默认交换(这是一个直接交换),并且使用其名称作为绑定值,因此前面列表中的第二个方法可用于通过默认交换进行简单的点对点消息传递给任何队列。您可以将队列名称作为 routingKey 提供,方法是在运行时提供方法参数。以下示例演示如何执行此操作:

A better way of thinking about the exchange and routing key properties is that the explicit method parameters always override the template’s default values. In fact, even if you do not explicitly set those properties on the template, there are always default values in place. In both cases, the default is an empty String, but that is actually a sensible default. As far as the routing key is concerned, it is not always necessary in the first place (for example, for a Fanout exchange). Furthermore, a queue may be bound to an exchange with an empty String. Those are both legitimate scenarios for reliance on the default empty String value for the routing key property of the template. As far as the exchange name is concerned, the empty String is commonly used because the AMQP specification defines the “default exchange” as having no name. Since all queues are automatically bound to that default exchange (which is a direct exchange), using their name as the binding value, the second method in the preceding listing can be used for simple point-to-point messaging to any queue through the default exchange. You can provide the queue name as the routingKey, either by providing the method parameter at runtime. The following example shows how to do so:

RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));

或者,您可以创建一个模板,该模板主要或完全用于发布到单个队列。以下示例演示如何执行此操作:

Alternately, you can create a template that can be used for publishing primarily or exclusively to a single Queue. The following example shows how to do so:

RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));

Message Builder API

从版本 1.3 开始,MessageBuilderMessagePropertiesBuilder 提供了消息构建器 API。这些方法提供了一种便捷的“fluent”方式来创建消息或消息属性。以下示例展示了 Fluent API 的实际应用:

Starting with version 1.3, a message builder API is provided by the MessageBuilder and MessagePropertiesBuilder. These methods provide a convenient “fluent” means of creating a message or message properties. The following examples show the fluent API in action:

Message message = MessageBuilder.withBody("foo".getBytes())
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();
MessageProperties props = MessagePropertiesBuilder.newInstance()
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();
Message message = MessageBuilder.withBody("foo".getBytes())
    .andProperties(props)
    .build();

可以在 MessageProperties上定义的每个属性进行设置。 其他方法包括`setHeader(String key, String value), `removeHeader(String key), removeHeaders(), 和 copyProperties(MessageProperties properties)。 每个属性设置方法都有`set*IfAbsent()变体。 如果存在默认初始值,则方法名为 `set*IfAbsentOrDefault()

Each of the properties defined on the MessageProperties can be set. Other methods include setHeader(String key, String value), removeHeader(String key), removeHeaders(), and copyProperties(MessageProperties properties). Each property setting method has a set*IfAbsent() variant. In the cases where a default initial value exists, the method is named set*IfAbsentOrDefault().

提供五个静态方法来创建初始消息构建器:

Five static methods are provided to create an initial message builder:

public static MessageBuilder withBody(byte[] body) 1

public static MessageBuilder withClonedBody(byte[] body) 2

public static MessageBuilder withBody(byte[] body, int from, int to) 3

public static MessageBuilder fromMessage(Message message) 4

public static MessageBuilder fromClonedMessage(Message message) 5
1 The message created by the builder has a body that is a direct reference to the argument.
2 The message created by the builder has a body that is a new array containing a copy of bytes in the argument.
3 The message created by the builder has a body that is a new array containing the range of bytes from the argument. See Arrays.copyOfRange() for more details.
4 The message created by the builder has a body that is a direct reference to the body of the argument. The argument’s properties are copied to a new MessageProperties object.
5 The message created by the builder has a body that is a new array containing a copy of the argument’s body. The argument’s properties are copied to a new MessageProperties object.

提供三个静态方法来创建 MessagePropertiesBuilder 实例:

Three static methods are provided to create a MessagePropertiesBuilder instance:

public static MessagePropertiesBuilder newInstance() 1

public static MessagePropertiesBuilder fromProperties(MessageProperties properties) 2

public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) 3
1 A new message properties object is initialized with default values.
2 The builder is initialized with, and build() will return, the provided properties object.,
3 The argument’s properties are copied to a new MessageProperties object.

使用 RabbitTemplateAmqpTemplate 的实现,每个 send() 方法都有一个重载版本,该版本接受一个附加的 CorrelationData 对象。当发布者确认已启用时,此对象将在 AmqpTemplate 中描述的回调中返回。这可以让发送方将确认(acknack)与发送的消息关联。

With the RabbitTemplate implementation of AmqpTemplate, each of the send() methods has an overloaded version that takes an additional CorrelationData object. When publisher confirms are enabled, this object is returned in the callback described in AmqpTemplate. This lets the sender correlate a confirm (ack or nack) with the sent message.

从版本 1.6.7 开始,引入了 CorrelationAwareMessagePostProcessor 接口,允许在消息转换后修改关联数据。以下示例演示如何使用它:

Starting with version 1.6.7, the CorrelationAwareMessagePostProcessor interface was introduced, allowing the correlation data to be modified after the message has been converted. The following example shows how to use it:

Message postProcessMessage(Message message, Correlation correlation);

在版本 2.0 中,弃用了此接口。该方法已移动到 MessagePostProcessor,带有委托到 postProcessMessage(Message message) 的默认实现。

In version 2.0, this interface is deprecated. The method has been moved to MessagePostProcessor with a default implementation that delegates to postProcessMessage(Message message).

从版本 1.6.7 开始,还提供了名为 CorrelationDataPostProcessor 的新回调接口。它在所有 MessagePostProcessor 实例(在 send() 方法中提供,以及在 setBeforePublishPostProcessors() 中提供)之后被调用。实现可以更新或替换 send() 方法中提供的关联数据(如果存在)。提供 Message 和原始 CorrelationData(如果存在)作为参数。以下示例演示如何使用 postProcess 方法:

Also starting with version 1.6.7, a new callback interface called CorrelationDataPostProcessor is provided. This is invoked after all MessagePostProcessor instances (provided in the send() method as well as those provided in setBeforePublishPostProcessors()). Implementations can update or replace the correlation data supplied in the send() method (if any). The Message and original CorrelationData (if any) are provided as arguments. The following example shows how to use the postProcess method:

CorrelationData postProcess(Message message, CorrelationData correlationData);

Publisher Returns

当模板的 mandatory 属性为 true 时,返回的消息将由 AmqpTemplate 中描述的回调提供。

When the template’s mandatory property is true, returned messages are provided by the callback described in AmqpTemplate.

从 1.4 版开始,RabbitTemplate 支持 SpEL mandatoryExpression 属性,该属性针对每个请求消息(作为根评估对象)进行评估,从而解析为一个 boolean 值。可以在表达式中使用 Bean 引用,例如 @myBean.isMandatory(#root)

Starting with version 1.4, the RabbitTemplate supports the SpEL mandatoryExpression property, which is evaluated against each request message as the root evaluation object, resolving to a boolean value. Bean references, such as @myBean.isMandatory(#root), can be used in the expression.

发布者返回还可以由 RabbitTemplate 在发送和接收操作中内部使用。有关更多信息,请参阅 Reply Timeout

Publisher returns can also be used internally by the RabbitTemplate in send and receive operations. See Reply Timeout for more information.

Batching

1.4.2 版引入了 BatchingRabbitTemplate。这是一个 RabbitTemplate 的子类,其重写了 send 方法,该方法根据 BatchingStrategy 对消息进行批处理。只有在一个批次完成时,才会将消息发送到 RabbitMQ。以下清单显示了 BatchingStrategy 接口的定义:

Version 1.4.2 introduced the BatchingRabbitTemplate. This is a subclass of RabbitTemplate with an overridden send method that batches messages according to the BatchingStrategy. Only when a batch is complete is the message sent to RabbitMQ. The following listing shows the BatchingStrategy interface definition:

public interface BatchingStrategy {

    MessageBatch addToBatch(String exchange, String routingKey, Message message);

    Date nextRelease();

    Collection<MessageBatch> releaseBatches();

}

批处理数据保存在内存中。如果系统出现故障,则待发送的消息可能会丢失。

Batched data is held in memory. Unsent messages can be lost in the event of a system failure.

SimpleBatchingStrategy 已提供。它支持将消息发送到单个 exchange 或路由密钥。它具有以下属性:

A SimpleBatchingStrategy is provided. It supports sending messages to a single exchange or routing key. It has the following properties:

  • batchSize: The number of messages in a batch before it is sent.

  • bufferLimit: The maximum size of the batched message. This preempts the batchSize, if exceeded, and causes a partial batch to be sent.

  • timeout: A time after which a partial batch is sent when there is no new activity adding messages to the batch.

SimpleBatchingStrategy 通过在每个嵌入的消息前加上一个四字节的二进制长度来格式化批次。通过将 springBatchFormat 消息属性设置为 lengthHeader4 来向接收系统传达这一点。

The SimpleBatchingStrategy formats the batch by preceding each embedded message with a four-byte binary length. This is communicated to the receiving system by setting the springBatchFormat message property to lengthHeader4.

在默认情况下,侦听器容器会自动取消批处理消息(通过使用 springBatchFormat 消息头)。拒绝批量中的任何消息将导致拒绝整个批量。

Batched messages are automatically de-batched by listener containers by default (by using the springBatchFormat message header). Rejecting any message from a batch causes the entire batch to be rejected.

但是,有关详细信息,请参阅 @RabbitListener with Batching

However, see @RabbitListener with Batching for more information.