Message

Spring Integration `Message`是用于存储数据的通用容器。任何对象都可以做为有效负载,而每一个`Message`示例都包含带有键值对的用户可扩展属性。

The Spring Integration Message is a generic container for data. Any object can be provided as the payload, and each Message instance includes headers containing user-extensible properties as key-value pairs.

The Message Interface

以下列表展示了`Message`接口的定义:

The following listing shows the definition of the Message interface:

public interface Message<T> {

    T getPayload();

    MessageHeaders getHeaders();

}

`Message`接口是 API 的核心组成部分。通过将数据封装在一个通用封装器中,消息系统可以在没有任何数据类型信息的情况下传输数据。当应用程序发展为支持新类型或者当类型本身被修改或扩展时,消息系统不会受到影响。另一方面,如果消息系统中的一些组件需要访问`Message`的相关信息,例如元数据通常可以存储在消息头的元数据中,并从中检索得到。

The Message interface is a core part of the API. By encapsulating the data in a generic wrapper, the messaging system can pass it around without any knowledge of the data’s type. As an application evolves to support new types or when the types themselves are modified or extended, the messaging system is not affected. On the other hand, when some component in the messaging system does require access to information about the Message, such metadata can typically be stored to and retrieved from the metadata in the message headers.

Message Headers

就像 Spring Integration允许使用任何`对象`作为`消息`的有效负载一样,它还支持任何`对象`类型作为头值。事实上,`MessageHeaders`类实现了`java.util.Map_`接口,正如以下类定义所示:

Just as Spring Integration lets any Object be used as the payload of a Message, it also supports any Object types as header values. In fact, the MessageHeaders class implements the java.util.Map_ interface, as the following class definition shows:

public final class MessageHeaders implements Map<String, Object>, Serializable {
  ...
}

虽然 MessageHeaders 类实现了 Map,但它实际上是一个只读实现。任何尝试 put Map 中值的尝试都将导致 UnsupportedOperationExceptionremoveclear 也适用。由于消息可能会传递给多个使用者,因此无法修改 Map 的结构。同样,在最初创建后无法 set 消息的有效负载 Object。但是,标头值本身(或有效负载对象)的可变性故意留给框架用户决策。

Even though the MessageHeaders class implements Map, it is effectively a read-only implementation. Any attempt to put a value in the Map results in an UnsupportedOperationException. The same applies for remove and clear. Since messages may be passed to multiple consumers, the structure of the Map cannot be modified. Likewise, the message’s payload Object can not be set after the initial creation. However, the mutability of the header values themselves (or the payload Object) is intentionally left as a decision for the framework user.

由于实现了`Map`,所以可以通过使用头的名称调用`get(..)来检索头。或者,你也可以提供额外的参数,即预期的`类。更好的是,当检索某个预先定义的值时,有方便的 getter 可用。以下示例展示这三种方法中的每一种:

As an implementation of Map, the headers can be retrieved by calling get(..) with the name of the header. Alternatively, you can provide the expected Class as an additional parameter. Even better, when retrieving one of the pre-defined values, convenient getters are available. The following example shows each of these three options:

Object someValue = message.getHeaders().get("someKey");

CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);

Long timestamp = message.getHeaders().getTimestamp();

下表描述了预先定义的消息头:

The following table describes the pre-defined message headers:

Table 1. Pre-defined Message Headers
Header Name Header Type Usage
  MessageHeaders.ID
  java.util.UUID

An identifier for this message instance. Changes each time a message is mutated.

  MessageHeaders.
TIMESTAMP
  java.lang.Long

The time the message was created. Changes each time a message is mutated.

  MessageHeaders.
REPLY_CHANNEL
  java.lang.Object
(String or
MessageChannel)

A channel to which a reply (if any) is sent when no explicit output channel is configured and there is no ROUTING_SLIP or the ROUTING_SLIP is exhausted. If the value is a String, it must represent a bean name or have been generated by a ChannelRegistry.

  MessageHeaders.
ERROR_CHANNEL
  java.lang.Object
(String or
MessageChannel)

A channel to which errors are sent. If the value is a String, it must represent a bean name or have been generated by a ChannelRegistry.

许多入站和出站适配器实现也会提供或预期某些头,你可以配置其他用户自定义头。这些头的常量可以在存在此类头的模块中找到,例如`AmqpHeaders`、`JmsHeaders`等。

Many inbound and outbound adapter implementations also provide or expect certain headers, and you can configure additional user-defined headers. Constants for these headers can be found in those modules where such headers exist — for example. AmqpHeaders, JmsHeaders, and so on.

MessageHeaderAccessor API

从 Spring Framework 4.0 和 Spring Integration 4.0 开始,核心消息抽象已被移至`spring-messaging`模块,并引入了`MessageHeaderAccessor`API,以便在消息实现上提供额外的抽象。所有(核心)特定于 Spring Integration 的消息头常量现在都在`IntegrationMessageHeaderAccessor`类中声明。下表描述了预先定义的消息头:

Starting with Spring Framework 4.0 and Spring Integration 4.0, the core messaging abstraction has been moved to the spring-messaging module, and the MessageHeaderAccessor API has been introduced to provide additional abstraction over messaging implementations. All (core) Spring Integration-specific message headers constants are now declared in the IntegrationMessageHeaderAccessor class. The following table describes the pre-defined message headers:

Table 2. Pre-defined Message Headers
Header Name Header Type Usage
  IntegrationMessageHeaderAccessor.
CORRELATION_ID
  java.lang.Object

Used to correlate two or more messages.

  IntegrationMessageHeaderAccessor.
SEQUENCE_NUMBER
  java.lang.Integer

Usually a sequence number with a group of messages with a SEQUENCE_SIZE but can also be used in a <resequencer/> to resequence an unbounded group of messages.

  IntegrationMessageHeaderAccessor.
SEQUENCE_SIZE
  java.lang.Integer

The number of messages within a group of correlated messages.

  IntegrationMessageHeaderAccessor.
EXPIRATION_DATE
  java.lang.Long

Indicates when a message is expired. Not used by the framework directly but can be set with a header enricher and used in a <filter/> that is configured with an UnexpiredMessageSelector.

  IntegrationMessageHeaderAccessor.
PRIORITY
  java.lang.Integer

Message priority — for example, within a PriorityChannel.

  IntegrationMessageHeaderAccessor.
DUPLICATE_MESSAGE
  java.lang.Boolean

True if a message was detected as a duplicate by an idempotent receiver interceptor. See Idempotent Receiver Enterprise Integration Pattern.

  IntegrationMessageHeaderAccessor.
CLOSEABLE_RESOURCE
  java.io.Closeable

This header is present if the message is associated with a Closeable that should be closed when message processing is complete. An example is the Session associated with a streamed file transfer using FTP, SFTP, and so on.

  IntegrationMessageHeaderAccessor.
DELIVERY_ATTEMPT
  java.lang.
AtomicInteger

If a message-driven channel adapter supports the configuration of a RetryTemplate, this header contains the current delivery attempt.

  IntegrationMessageHeaderAccessor.
ACKNOWLEDGMENT_CALLBACK
  o.s.i.support.
Acknowledgment
Callback

If an inbound endpoint supports it, a call back to accept, reject, or requeue a message. See Deferred Acknowledgment Pollable Message Source and MQTT Manual Acks.

对于其中一些头,在`IntegrationMessageHeaderAccessor`类上提供了类型化的 getter,如下例所示:

Convenient typed getters for some of these headers are provided on the IntegrationMessageHeaderAccessor class, as the following example shows:

IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...

下表描述了在`IntegrationMessageHeaderAccessor`中也出现的头,但通常不适用于用户代码(也就是说,这些头通常用于 Spring Integration 的内部部分,这里包含它们是为了完整性):

The following table describes headers that also appear in the IntegrationMessageHeaderAccessor but are generally not used by user code (that is, they are generally used by internal parts of Spring Integration — their inclusion here is for completeness):

Table 3. Pre-defined Message Headers
Header Name Header Type Usage
  IntegrationMessageHeaderAccessor.
SEQUENCE_DETAILS
  java.util.
List&lt;List&lt;Object&gt;&gt;

A stack of correlation data used when nested correlation is needed (for example, splitter→…​→splitter→…​→aggregator→…​→aggregator).

  IntegrationMessageHeaderAccessor.
ROUTING_SLIP
  java.util.
Map&lt;List&lt;Object&gt;, Integer&gt;

See Routing Slip.

Message ID Generation

当消息在一个应用程序中转换时,每次它发生变化(例如,由转换器)时,都会分配一个新的消息 ID。消息 ID 是一个`UUID`。从 Spring Integration 3.0 开始,针对 ID 生成所使用的默认策略比先前的`java.util.UUID.randomUUID()`实现更有效。它基于安全的随机种子使用简单的随机数,而不是每次都创建一个安全的随机数。

When a message transitions through an application, each time it is mutated (for example, by a transformer) a new message ID is assigned. The message ID is a UUID. Beginning with Spring Integration 3.0, the default strategy used for IS generation is more efficient than the previous java.util.UUID.randomUUID() implementation. It uses simple random numbers based on a secure random seed instead of creating a secure random number each time.

可以通过在应用程序上下文中声明实现`org.springframework.util.IdGenerator`的 bean 来选择不同的 UUID 生成策略。

A different UUID generation strategy can be selected by declaring a bean that implements org.springframework.util.IdGenerator in the application context.

一个类加载器中只能使用一个 UUID 生成策略。这意味着,如果两个或更多应用程序上下文在同一个类加载器中运行,那么它们共享同一个策略。如果其中一个上下文更改了策略,那么所有上下文都会使用它。如果同一个类加载器中的两个或多个上下文声明了类型为 org.springframework.util.IdGenerator 的 bean,那么它们都必须是同一类的实例。否则,尝试替换自定义策略的上下文将无法初始化。如果策略相同,但参数化,那么将使用第一个要初始化的上下文中策略。

Only one UUID generation strategy can be used in a classloader. This means that, if two or more application contexts run in the same classloader, they share the same strategy. If one of the contexts changes the strategy, it is used by all contexts. If two or more contexts in the same classloader declare a bean of type org.springframework.util.IdGenerator, they must all be an instance of the same class. Otherwise, the context attempting to replace a custom strategy fails to initialize. If the strategy is the same, but parameterized, the strategy in the first context to be initialized is used.

除了默认策略之外,还提供了两个附加的`IdGenerators`。org.springframework.util.JdkIdGenerator`使用以前的`UUID.randomUUID()`机制。如果你实际上并不需要 UUID,并且一个简单的递增值就足够了,那么你可以使用`o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator

In addition to the default strategy, two additional IdGenerators are provided. org.springframework.util.JdkIdGenerator uses the previous UUID.randomUUID() mechanism. You can use o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator when a UUID is not really needed and a simple incrementing value is sufficient.

Read-only Headers

`MessageHeaders.ID`和`MessageHeaders.TIMESTAMP`是只读头,不能被覆盖。

The MessageHeaders.ID and MessageHeaders.TIMESTAMP are read-only headers and cannot be overridden.

自版本 4.3.2 起,MessageBuilder`提供 `readOnlyHeaders(String…​ readOnlyHeaders)`API 来定制不应从上游 `Message`复制的一系列标头。默认情况下,只有 `MessageHeaders.ID`和 `MessageHeaders.TIMESTAMP`为只读。全局 `spring.integration.readOnly.headers`属性(见 Global Properties)用于为框架组件定制 `DefaultMessageBuilderFactory。当您希望不填入某些现成的标头(例如,ObjectToJsonTransformer`的 `contentType)时,这将很有用(见 JSON Transformers)。

Since version 4.3.2, the MessageBuilder provides the readOnlyHeaders(String…​ readOnlyHeaders) API to customize a list of headers that should not be copied from an upstream Message. Only the MessageHeaders.ID and MessageHeaders.TIMESTAMP are read only by default. The global spring.integration.readOnly.headers property (see Global Properties) is provided to customize DefaultMessageBuilderFactory for framework components. This can be useful when you would like do not populate some out-of-the-box headers, such as contentType by the ObjectToJsonTransformer (see JSON Transformers).

当你尝试使用`MessageBuilder`构建一个新消息时,这种报头将被忽略,而特定的`INFO`消息将被发送到日志中。

When you try to build a new message using MessageBuilder, this kind of header is ignored and a particular INFO message is emitted to logs.

从版本 5.0 起,Messaging GatewayHeader EnricherContent EnricherHeader Filter不再允许您在使用 DefaultMessageBuilderFactory`时配置 `MessageHeaders.ID`和 `MessageHeaders.TIMESTAMP`标头名称,并抛出 `BeanInitializationException

Starting with version 5.0, Messaging Gateway, Header Enricher, Content Enricher and Header Filter do not let you configure the MessageHeaders.ID and MessageHeaders.TIMESTAMP header names when DefaultMessageBuilderFactory is used, and they throw BeanInitializationException.

Header Propagation

当消息由消息生成端点(例如 service activator)处理(并修改)时,通常传入标头会传播到传出消息。其中一个例外是 transformer,当完整消息返回到框架时。在这种情况下,用户代码负责整个传出消息。当转换器仅返回有效负载时,将会传播传入标头。此外,仅当传入标头在传出消息中不存在时才会进行传播,从而允许您根据需要更改标头值。

When messages are processed (and modified) by message-producing endpoints (such as a service activator), in general, inbound headers are propagated to the outbound message. One exception to this is a transformer, when a complete message is returned to the framework. In that case, the user code is responsible for the entire outbound message. When a transformer just returns the payload, the inbound headers are propagated. Also, a header is only propagated if it does not already exist in the outbound message, letting you change header values as needed.

从 4.3.10 开始,你可以将消息处理器(修改消息并生成输出)配置为不传播特定头。若要配置不想复制的头,请在`MessageProducingMessageHandler`抽象类上调用`setNotPropagatedHeaders()`或`addNotPropagatedHeaders()`方法。

Starting with version 4.3.10, you can configure message handlers (that modify messages and produce output) to suppress the propagation of specific headers. To configure the header(s) you do not want to be copied, call the setNotPropagatedHeaders() or addNotPropagatedHeaders() methods on the MessageProducingMessageHandler abstract class.

你还可以通过在`META-INF/spring.integration.properties`中将`readOnlyHeaders`属性设置为用逗号分隔的头列表,在全局范围内禁止传播特定的消息头。

You can also globally suppress propagation of specific message headers by setting the readOnlyHeaders property in META-INF/spring.integration.properties to a comma-delimited list of headers.

从 5.0 版本开始,AbstractMessageProducingHandler 上的 setNotPropagatedHeaders() 实现应用简单的模式(xxx*xxx, *xxxxxx*yyy)以允许使用公共后缀或前缀筛选标头。更多信息,请参见 link:https://docs.spring.io/spring-integration/api/org/springframework/integration/util/PatternMatchUtils.html[PatternMatchUtils Javadoc。当其中一个模式是 *(星号)时,则不会传播标头。所有其他模式都会被忽略。在这种情况下,服务激活器的行为与转换器相同,并且任何必需的标头都必须在从服务方法返回的 Message 中提供。ConsumerEndpointSpec 中为 Java DSL 提供了 notPropagatedHeaders() 选项。它还可用于作为 not-propagated-headers 属性对 <service-activator> 组件的 XML 配置。

Starting with version 5.0, the setNotPropagatedHeaders() implementation on the AbstractMessageProducingHandler applies simple patterns (xxx*, xxx, *xxx, or xxx*yyy) to allow filtering headers with a common suffix or prefix. See PatternMatchUtils Javadoc for more information. When one of the patterns is * (asterisk), no headers are propagated. All other patterns are ignored. In that case, the service activator behaves the same way as a transformer and any required headers must be supplied in the Message returned from the service method. The notPropagatedHeaders() option is available in the ConsumerEndpointSpec for the Java DSL It is also available for XML configuration of the <service-activator> component as a not-propagated-headers attribute.

标头传播抑制不适用于那些不会修改消息的端点,如 bridgesrouters

Header propagation suppression does not apply to those endpoints that do not modify the message, such as bridges and routers.

Message Implementations

Message 界面的基础实现是 GenericMessage<T>,它提供了两个构造函数,如下面的清单所示:

The base implementation of the Message interface is GenericMessage<T>, and it provides two constructors, shown in the following listing:

new GenericMessage<T>(T payload);

new GenericMessage<T>(T payload, Map<String, Object> headers)

创建 Message 时,会生成一个随机的唯一 ID。接受标头 Map 的构造函数会将提供的标头复制到新创建的 Message 中。

When a Message is created, a random unique ID is generated. The constructor that accepts a Map of headers copies the provided headers to the newly created Message.

还有一种 Message 的便捷实现,它旨在告知错误状况。该实现以 Throwable 对象作为其负载,如下面的示例所示:

There is also a convenient implementation of Message designed to communicate error conditions. This implementation takes a Throwable object as its payload, as the following example shows:

ErrorMessage message = new ErrorMessage(someThrowable);

Throwable t = message.getPayload();

请注意,此实现利用了 GenericMessage 基类是参数化的这一事实。因此,如同在两个示例中所示的那样,在检索 Message 负载 Object 时不需要进行强制转换。

Note that this implementation takes advantage of the fact that the GenericMessage base class is parameterized. Therefore, as shown in both examples, no casting is necessary when retrieving the Message payload Object.

The MessageBuilder Helper Class

您可能会注意到,Message 接口定义了用于检索其负载和标头的检索方法,但不提供 set 方法。原因是 Message 在其最初创建后就不能进行修改。因此,当 Message 实例发送给多个使用者(例如,通过发布/订阅通道)时,如果其中一个使用者需要使用不同的负载类型发送答复,则它必须创建一个新的 Message。因此,其他使用者不受这些更改的影响。请记住,多个使用者可以访问相同的负载实例或标头值,而此实例本身是否不可变的决定权留给了您。换句话说,Message 实例的契约与不可修改 Collection 的契约类似,并且 MessageHeaders 映射进一步说明了这一点。尽管 MessageHeaders 类实现了 java.util.Map,但在 MessageHeaders 实例上调用 put 操作(或“remove”或“clear”)的任何尝试都会导致 UnsupportedOperationException

You may notice that the Message interface defines retrieval methods for its payload and headers but provides no setters. The reason for this is that a Message cannot be modified after its initial creation. Therefore, when a Message instance is sent to multiple consumers (for example, through a publish-subscribe Channel), if one of those consumers needs to send a reply with a different payload type, it must create a new Message. As a result, the other consumers are not affected by those changes. Keep in mind that multiple consumers may access the same payload instance or header value, and whether such an instance is itself immutable is a decision left to you. In other words, the contract for Message instances is similar to that of an unmodifiable Collection, and the MessageHeaders map further exemplifies that. Even though the MessageHeaders class implements java.util.Map, any attempt to invoke a put operation (or 'remove' or 'clear') on a MessageHeaders instance results in an UnsupportedOperationException.

Spring Integration 没有要求创建和填充 Map 以传递到 GenericMessage 构造函数中,而是提供了一种更方便的方法来构建 Message:MessageBuilderMessageBuilder 提供了两个工厂方法,用于从现有 Message 或带有负载 ObjectMessage 中创建 Message 实例。从现有 Message 构建时,该 Message 的标头和负载将复制到新 Message 中,如下面的示例所示:

Rather than requiring the creation and population of a Map to pass into the GenericMessage constructor, Spring Integration does provide a far more convenient way to construct Messages: MessageBuilder. The MessageBuilder provides two factory methods for creating Message instances from either an existing Message or with a payload Object. When building from an existing Message, the headers and payload of that Message are copied to the new Message, as the following example shows:

Message<String> message1 = MessageBuilder.withPayload("test")
        .setHeader("foo", "bar")
        .build();

Message<String> message2 = MessageBuilder.fromMessage(message1).build();

assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));

如果您需要使用新负载创建一个 Message,但仍然希望从现有 Message 中复制标头,则可以使用其中一种“copy”方法,如下面的示例所示:

If you need to create a Message with a new payload but still want to copy the headers from an existing Message, you can use one of the 'copy' methods, as the following example shows:

Message<String> message3 = MessageBuilder.withPayload("test3")
        .copyHeaders(message1.getHeaders())
        .build();

Message<String> message4 = MessageBuilder.withPayload("test4")
        .setHeader("foo", 123)
        .copyHeadersIfAbsent(message1.getHeaders())
        .build();

assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));

请注意,copyHeadersIfAbsent 方法不会覆盖现有值。此外,在前面的示例中,您可以看到如何使用 setHeader 设置任何用户定义的标头。最后,还有可用于预定义标头的 set 方法,以及用于设置任何标头的非破坏性方法(MessageHeaders 还定义了预定义标头名的常量)。

Note that the copyHeadersIfAbsent method does not overwrite existing values. Also, in the preceding example, you can see how to set any user-defined header with setHeader. Finally, there are set methods available for the predefined headers as well as a non-destructive method for setting any header (MessageHeaders also defines constants for the pre-defined header names).

您还可以使用 MessageBuilder 设置消息的优先级,如下面的示例所示:

You can also use MessageBuilder to set the priority of messages, as the following example shows:

Message<Integer> importantMessage = MessageBuilder.withPayload(99)
        .setPriority(5)
        .build();

assertEquals(5, importantMessage.getHeaders().getPriority());

Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
        .setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
        .build();

assertEquals(2, lessImportantMessage.getHeaders().getPriority());

仅在使用 PriorityChannel(正如下一章中所述)时才考虑 priority 标头。它被定义为 java.lang.Integer

The priority header is considered only when using a PriorityChannel (as described in the next chapter). It is defined as a java.lang.Integer.