Message

Spring Integration `Message`是用于存储数据的通用容器。任何对象都可以做为有效负载,而每一个`Message`示例都包含带有键值对的用户可扩展属性。

The Message Interface

以下列表展示了`Message`接口的定义:

public interface Message<T> {

    T getPayload();

    MessageHeaders getHeaders();

}

`Message`接口是 API 的核心组成部分。通过将数据封装在一个通用封装器中,消息系统可以在没有任何数据类型信息的情况下传输数据。当应用程序发展为支持新类型或者当类型本身被修改或扩展时,消息系统不会受到影响。另一方面,如果消息系统中的一些组件需要访问`Message`的相关信息,例如元数据通常可以存储在消息头的元数据中,并从中检索得到。

Message Headers

就像 Spring Integration允许使用任何`对象`作为`消息`的有效负载一样,它还支持任何`对象`类型作为头值。事实上,`MessageHeaders`类实现了`java.util.Map_`接口,正如以下类定义所示:

public final class MessageHeaders implements Map<String, Object>, Serializable {
  ...
}

虽然 MessageHeaders 类实现了 Map,但它实际上是一个只读实现。任何尝试 put Map 中值的尝试都将导致 UnsupportedOperationExceptionremoveclear 也适用。由于消息可能会传递给多个使用者,因此无法修改 Map 的结构。同样,在最初创建后无法 set 消息的有效负载 Object。但是,标头值本身(或有效负载对象)的可变性故意留给框架用户决策。

由于实现了`Map`,所以可以通过使用头的名称调用`get(..)来检索头。或者,你也可以提供额外的参数,即预期的`类。更好的是,当检索某个预先定义的值时,有方便的 getter 可用。以下示例展示这三种方法中的每一种:

Object someValue = message.getHeaders().get("someKey");

CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);

Long timestamp = message.getHeaders().getTimestamp();

下表描述了预先定义的消息头:

Table 1. Pre-defined Message Headers
Header Name Header Type Usage
  MessageHeaders.ID
  java.util.UUID

用于此消息实例的标识符。每次消息发生变化时都会更改。

  MessageHeaders.
TIMESTAMP
  java.lang.Long

消息的创建时间。每次消息发生变化时都会更改。

  MessageHeaders.
REPLY_CHANNEL
  java.lang.Object
(String or
MessageChannel)

如果未配置明确的输出信道且没有 ROUTING_SLIPROUTING_SLIP 耗尽,则向其发送回复(如果有)的信道。如果值为 String,则必须表示 Bean 名称或由 ChannelRegistry. 生成。

  MessageHeaders.
ERROR_CHANNEL
  java.lang.Object
(String or
MessageChannel)

发送错误的信道。如果值为 String,则必须表示 Bean 名称或由 ChannelRegistry. 生成。

许多入站和出站适配器实现也会提供或预期某些头,你可以配置其他用户自定义头。这些头的常量可以在存在此类头的模块中找到,例如`AmqpHeaders`、`JmsHeaders`等。

MessageHeaderAccessor API

从 Spring Framework 4.0 和 Spring Integration 4.0 开始,核心消息抽象已被移至`spring-messaging`模块,并引入了`MessageHeaderAccessor`API,以便在消息实现上提供额外的抽象。所有(核心)特定于 Spring Integration 的消息头常量现在都在`IntegrationMessageHeaderAccessor`类中声明。下表描述了预先定义的消息头:

Table 2. Pre-defined Message Headers
Header Name Header Type Usage
  IntegrationMessageHeaderAccessor.
CORRELATION_ID
  java.lang.Object

用于关联两个或多个消息。

  IntegrationMessageHeaderAccessor.
SEQUENCE_NUMBER
  java.lang.Integer

通常是带有 SEQUENCE_SIZE 组消息的序列号,但也可以在 &lt;resequencer/&gt; 中用于对无界组消息重新排序。

  IntegrationMessageHeaderAccessor.
SEQUENCE_SIZE
  java.lang.Integer

关联消息组内的消息数。

  IntegrationMessageHeaderAccessor.
EXPIRATION_DATE
  java.lang.Long

指示消息过期时间。框架不会直接使用它,但可以设置带头信息的富集器,并在配置有 UnexpiredMessageSelector&lt;filter/&gt; 中使用它。

  IntegrationMessageHeaderAccessor.
PRIORITY
  java.lang.Integer

消息优先级——例如,在 PriorityChannel 内。

  IntegrationMessageHeaderAccessor.
DUPLICATE_MESSAGE
  java.lang.Boolean

如果幂等接收器拦截器将消息检测为重复项,则为 True。参见 Idempotent Receiver Enterprise Integration Pattern

  IntegrationMessageHeaderAccessor.
CLOSEABLE_RESOURCE
  java.io.Closeable

如果消息与应该在消息处理完成后关闭的 Closeable 相关联,则存在此头信息。一个示例是与使用 FTP、SFTP 等进行流式文件传输所关联的 Session

  IntegrationMessageHeaderAccessor.
DELIVERY_ATTEMPT
  java.lang.
AtomicInteger

如果消息驱动通道适配器支持 RetryTemplate 的配置,此头信息将包含当前的传递尝试。

  IntegrationMessageHeaderAccessor.
ACKNOWLEDGMENT_CALLBACK
  o.s.i.support.
Acknowledgment
Callback

如果入站端点支持,则回调接受、拒绝或重新加入一条消息。参见 Deferred Acknowledgment Pollable Message SourceMQTT Manual Acks

对于其中一些头,在`IntegrationMessageHeaderAccessor`类上提供了类型化的 getter,如下例所示:

IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...

下表描述了在`IntegrationMessageHeaderAccessor`中也出现的头,但通常不适用于用户代码(也就是说,这些头通常用于 Spring Integration 的内部部分,这里包含它们是为了完整性):

Table 3. Pre-defined Message Headers
Header Name Header Type Usage
  IntegrationMessageHeaderAccessor.
SEQUENCE_DETAILS
  java.util.
List&lt;List&lt;Object&gt;&gt;

嵌套关联需要时使用的关联数据堆栈(例如,splitter&#8594;&#8230;&#8203;&#8594;splitter&#8594;&#8230;&#8203;&#8594;aggregator&#8594;&#8230;&#8203;&#8594;aggregator)。

  IntegrationMessageHeaderAccessor.
ROUTING_SLIP
  java.util.
Map&lt;List&lt;Object&gt;, Integer&gt;

See Routing Slip.

Message ID Generation

当消息在一个应用程序中转换时,每次它发生变化(例如,由转换器)时,都会分配一个新的消息 ID。消息 ID 是一个`UUID`。从 Spring Integration 3.0 开始,针对 ID 生成所使用的默认策略比先前的`java.util.UUID.randomUUID()`实现更有效。它基于安全的随机种子使用简单的随机数,而不是每次都创建一个安全的随机数。

可以通过在应用程序上下文中声明实现`org.springframework.util.IdGenerator`的 bean 来选择不同的 UUID 生成策略。

一个类加载器中只能使用一个 UUID 生成策略。这意味着,如果两个或更多应用程序上下文在同一个类加载器中运行,那么它们共享同一个策略。如果其中一个上下文更改了策略,那么所有上下文都会使用它。如果同一个类加载器中的两个或多个上下文声明了类型为 org.springframework.util.IdGenerator 的 bean,那么它们都必须是同一类的实例。否则,尝试替换自定义策略的上下文将无法初始化。如果策略相同,但参数化,那么将使用第一个要初始化的上下文中策略。

除了默认策略之外,还提供了两个附加的`IdGenerators`。org.springframework.util.JdkIdGenerator`使用以前的`UUID.randomUUID()`机制。如果你实际上并不需要 UUID,并且一个简单的递增值就足够了,那么你可以使用`o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator

Read-only Headers

`MessageHeaders.ID`和`MessageHeaders.TIMESTAMP`是只读头,不能被覆盖。

自版本 4.3.2 起,MessageBuilder`提供 `readOnlyHeaders(String…​ readOnlyHeaders)`API 来定制不应从上游 `Message`复制的一系列标头。默认情况下,只有 `MessageHeaders.ID`和 `MessageHeaders.TIMESTAMP`为只读。全局 `spring.integration.readOnly.headers`属性(见 Global Properties)用于为框架组件定制 `DefaultMessageBuilderFactory。当您希望不填入某些现成的标头(例如,ObjectToJsonTransformer`的 `contentType)时,这将很有用(见 JSON Transformers)。

当你尝试使用`MessageBuilder`构建一个新消息时,这种报头将被忽略,而特定的`INFO`消息将被发送到日志中。

从版本 5.0 起,Messaging GatewayHeader EnricherContent EnricherHeader Filter不再允许您在使用 DefaultMessageBuilderFactory`时配置 `MessageHeaders.ID`和 `MessageHeaders.TIMESTAMP`标头名称,并抛出 `BeanInitializationException

Header Propagation

当消息由消息生成端点(例如 service activator)处理(并修改)时,通常传入标头会传播到传出消息。其中一个例外是 transformer,当完整消息返回到框架时。在这种情况下,用户代码负责整个传出消息。当转换器仅返回有效负载时,将会传播传入标头。此外,仅当传入标头在传出消息中不存在时才会进行传播,从而允许您根据需要更改标头值。

从 4.3.10 开始,你可以将消息处理器(修改消息并生成输出)配置为不传播特定头。若要配置不想复制的头,请在`MessageProducingMessageHandler`抽象类上调用`setNotPropagatedHeaders()`或`addNotPropagatedHeaders()`方法。

你还可以通过在`META-INF/spring.integration.properties`中将`readOnlyHeaders`属性设置为用逗号分隔的头列表,在全局范围内禁止传播特定的消息头。

从 5.0 版本开始,AbstractMessageProducingHandler 上的 setNotPropagatedHeaders() 实现应用简单的模式(xxx*xxx, *xxxxxx*yyy)以允许使用公共后缀或前缀筛选标头。更多信息,请参见 link:https://docs.spring.io/spring-integration/api/org/springframework/integration/util/PatternMatchUtils.html[PatternMatchUtils Javadoc。当其中一个模式是 *(星号)时,则不会传播标头。所有其他模式都会被忽略。在这种情况下,服务激活器的行为与转换器相同,并且任何必需的标头都必须在从服务方法返回的 Message 中提供。ConsumerEndpointSpec 中为 Java DSL 提供了 notPropagatedHeaders() 选项。它还可用于作为 not-propagated-headers 属性对 <service-activator> 组件的 XML 配置。

标头传播抑制不适用于那些不会修改消息的端点,如 bridgesrouters

Message Implementations

Message 界面的基础实现是 GenericMessage<T>,它提供了两个构造函数,如下面的清单所示:

new GenericMessage<T>(T payload);

new GenericMessage<T>(T payload, Map<String, Object> headers)

创建 Message 时,会生成一个随机的唯一 ID。接受标头 Map 的构造函数会将提供的标头复制到新创建的 Message 中。

还有一种 Message 的便捷实现,它旨在告知错误状况。该实现以 Throwable 对象作为其负载,如下面的示例所示:

ErrorMessage message = new ErrorMessage(someThrowable);

Throwable t = message.getPayload();

请注意,此实现利用了 GenericMessage 基类是参数化的这一事实。因此,如同在两个示例中所示的那样,在检索 Message 负载 Object 时不需要进行强制转换。

The MessageBuilder Helper Class

您可能会注意到,Message 接口定义了用于检索其负载和标头的检索方法,但不提供 set 方法。原因是 Message 在其最初创建后就不能进行修改。因此,当 Message 实例发送给多个使用者(例如,通过发布/订阅通道)时,如果其中一个使用者需要使用不同的负载类型发送答复,则它必须创建一个新的 Message。因此,其他使用者不受这些更改的影响。请记住,多个使用者可以访问相同的负载实例或标头值,而此实例本身是否不可变的决定权留给了您。换句话说,Message 实例的契约与不可修改 Collection 的契约类似,并且 MessageHeaders 映射进一步说明了这一点。尽管 MessageHeaders 类实现了 java.util.Map,但在 MessageHeaders 实例上调用 put 操作(或“remove”或“clear”)的任何尝试都会导致 UnsupportedOperationException

Spring Integration 没有要求创建和填充 Map 以传递到 GenericMessage 构造函数中,而是提供了一种更方便的方法来构建 Message:MessageBuilderMessageBuilder 提供了两个工厂方法,用于从现有 Message 或带有负载 ObjectMessage 中创建 Message 实例。从现有 Message 构建时,该 Message 的标头和负载将复制到新 Message 中,如下面的示例所示:

Message<String> message1 = MessageBuilder.withPayload("test")
        .setHeader("foo", "bar")
        .build();

Message<String> message2 = MessageBuilder.fromMessage(message1).build();

assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));

如果您需要使用新负载创建一个 Message,但仍然希望从现有 Message 中复制标头,则可以使用其中一种“copy”方法,如下面的示例所示:

Message<String> message3 = MessageBuilder.withPayload("test3")
        .copyHeaders(message1.getHeaders())
        .build();

Message<String> message4 = MessageBuilder.withPayload("test4")
        .setHeader("foo", 123)
        .copyHeadersIfAbsent(message1.getHeaders())
        .build();

assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));

请注意,copyHeadersIfAbsent 方法不会覆盖现有值。此外,在前面的示例中,您可以看到如何使用 setHeader 设置任何用户定义的标头。最后,还有可用于预定义标头的 set 方法,以及用于设置任何标头的非破坏性方法(MessageHeaders 还定义了预定义标头名的常量)。

您还可以使用 MessageBuilder 设置消息的优先级,如下面的示例所示:

Message<Integer> importantMessage = MessageBuilder.withPayload(99)
        .setPriority(5)
        .build();

assertEquals(5, importantMessage.getHeaders().getPriority());

Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
        .setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
        .build();

assertEquals(2, lessImportantMessage.getHeaders().getPriority());

仅在使用 PriorityChannel(正如下一章中所述)时才考虑 priority 标头。它被定义为 java.lang.Integer