Message
Spring Integration `Message`是用于存储数据的通用容器。任何对象都可以做为有效负载,而每一个`Message`示例都包含带有键值对的用户可扩展属性。
The Message
Interface
以下列表展示了`Message`接口的定义:
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}
`Message`接口是 API 的核心组成部分。通过将数据封装在一个通用封装器中,消息系统可以在没有任何数据类型信息的情况下传输数据。当应用程序发展为支持新类型或者当类型本身被修改或扩展时,消息系统不会受到影响。另一方面,如果消息系统中的一些组件需要访问`Message`的相关信息,例如元数据通常可以存储在消息头的元数据中,并从中检索得到。
Message Headers
就像 Spring Integration允许使用任何`对象`作为`消息`的有效负载一样,它还支持任何`对象`类型作为头值。事实上,`MessageHeaders`类实现了`java.util.Map_`接口,正如以下类定义所示:
public final class MessageHeaders implements Map<String, Object>, Serializable {
...
}
虽然 |
由于实现了`Map`,所以可以通过使用头的名称调用`get(..)来检索头。或者,你也可以提供额外的参数,即预期的`类
。更好的是,当检索某个预先定义的值时,有方便的 getter 可用。以下示例展示这三种方法中的每一种:
Object someValue = message.getHeaders().get("someKey");
CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);
Long timestamp = message.getHeaders().getTimestamp();
下表描述了预先定义的消息头:
Header Name | Header Type | Usage |
---|---|---|
MessageHeaders.ID |
java.util.UUID |
用于此消息实例的标识符。每次消息发生变化时都会更改。 |
MessageHeaders. TIMESTAMP |
java.lang.Long |
消息的创建时间。每次消息发生变化时都会更改。 |
MessageHeaders. REPLY_CHANNEL |
java.lang.Object (String or MessageChannel) |
如果未配置明确的输出信道且没有 |
MessageHeaders. ERROR_CHANNEL |
java.lang.Object (String or MessageChannel) |
发送错误的信道。如果值为 |
许多入站和出站适配器实现也会提供或预期某些头,你可以配置其他用户自定义头。这些头的常量可以在存在此类头的模块中找到,例如`AmqpHeaders`、`JmsHeaders`等。
MessageHeaderAccessor
API
从 Spring Framework 4.0 和 Spring Integration 4.0 开始,核心消息抽象已被移至`spring-messaging`模块,并引入了`MessageHeaderAccessor`API,以便在消息实现上提供额外的抽象。所有(核心)特定于 Spring Integration 的消息头常量现在都在`IntegrationMessageHeaderAccessor`类中声明。下表描述了预先定义的消息头:
Header Name | Header Type | Usage |
---|---|---|
IntegrationMessageHeaderAccessor. CORRELATION_ID |
java.lang.Object |
用于关联两个或多个消息。 |
IntegrationMessageHeaderAccessor. SEQUENCE_NUMBER |
java.lang.Integer |
通常是带有 |
IntegrationMessageHeaderAccessor. SEQUENCE_SIZE |
java.lang.Integer |
关联消息组内的消息数。 |
IntegrationMessageHeaderAccessor. EXPIRATION_DATE |
java.lang.Long |
指示消息过期时间。框架不会直接使用它,但可以设置带头信息的富集器,并在配置有 |
IntegrationMessageHeaderAccessor. PRIORITY |
java.lang.Integer |
消息优先级——例如,在 |
IntegrationMessageHeaderAccessor. DUPLICATE_MESSAGE |
java.lang.Boolean |
如果幂等接收器拦截器将消息检测为重复项,则为 True。参见 Idempotent Receiver Enterprise Integration Pattern。 |
IntegrationMessageHeaderAccessor. CLOSEABLE_RESOURCE |
java.io.Closeable |
如果消息与应该在消息处理完成后关闭的 |
IntegrationMessageHeaderAccessor. DELIVERY_ATTEMPT |
java.lang. AtomicInteger |
如果消息驱动通道适配器支持 |
IntegrationMessageHeaderAccessor. ACKNOWLEDGMENT_CALLBACK |
o.s.i.support. Acknowledgment Callback |
如果入站端点支持,则回调接受、拒绝或重新加入一条消息。参见 Deferred Acknowledgment Pollable Message Source和 MQTT Manual Acks。 |
对于其中一些头,在`IntegrationMessageHeaderAccessor`类上提供了类型化的 getter,如下例所示:
IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...
下表描述了在`IntegrationMessageHeaderAccessor`中也出现的头,但通常不适用于用户代码(也就是说,这些头通常用于 Spring Integration 的内部部分,这里包含它们是为了完整性):
Header Name | Header Type | Usage |
---|---|---|
IntegrationMessageHeaderAccessor. SEQUENCE_DETAILS |
java.util. List<List<Object>> |
嵌套关联需要时使用的关联数据堆栈(例如, |
IntegrationMessageHeaderAccessor. ROUTING_SLIP |
java.util. Map<List<Object>, Integer> |
See Routing Slip. |
Message ID Generation
当消息在一个应用程序中转换时,每次它发生变化(例如,由转换器)时,都会分配一个新的消息 ID。消息 ID 是一个`UUID`。从 Spring Integration 3.0 开始,针对 ID 生成所使用的默认策略比先前的`java.util.UUID.randomUUID()`实现更有效。它基于安全的随机种子使用简单的随机数,而不是每次都创建一个安全的随机数。
可以通过在应用程序上下文中声明实现`org.springframework.util.IdGenerator`的 bean 来选择不同的 UUID 生成策略。
一个类加载器中只能使用一个 UUID 生成策略。这意味着,如果两个或更多应用程序上下文在同一个类加载器中运行,那么它们共享同一个策略。如果其中一个上下文更改了策略,那么所有上下文都会使用它。如果同一个类加载器中的两个或多个上下文声明了类型为 org.springframework.util.IdGenerator
的 bean,那么它们都必须是同一类的实例。否则,尝试替换自定义策略的上下文将无法初始化。如果策略相同,但参数化,那么将使用第一个要初始化的上下文中策略。
除了默认策略之外,还提供了两个附加的`IdGenerators`。org.springframework.util.JdkIdGenerator`使用以前的`UUID.randomUUID()`机制。如果你实际上并不需要 UUID,并且一个简单的递增值就足够了,那么你可以使用`o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator
。
Read-only Headers
`MessageHeaders.ID`和`MessageHeaders.TIMESTAMP`是只读头,不能被覆盖。
自版本 4.3.2 起,MessageBuilder`提供 `readOnlyHeaders(String… readOnlyHeaders)`API 来定制不应从上游 `Message`复制的一系列标头。默认情况下,只有 `MessageHeaders.ID`和 `MessageHeaders.TIMESTAMP`为只读。全局 `spring.integration.readOnly.headers`属性(见 Global Properties)用于为框架组件定制 `DefaultMessageBuilderFactory
。当您希望不填入某些现成的标头(例如,ObjectToJsonTransformer`的 `contentType
)时,这将很有用(见 JSON Transformers)。
当你尝试使用`MessageBuilder`构建一个新消息时,这种报头将被忽略,而特定的`INFO`消息将被发送到日志中。
从版本 5.0 起,Messaging Gateway、Header Enricher、Content Enricher和 Header Filter不再允许您在使用 DefaultMessageBuilderFactory`时配置 `MessageHeaders.ID`和 `MessageHeaders.TIMESTAMP`标头名称,并抛出 `BeanInitializationException
。
Header Propagation
当消息由消息生成端点(例如 service activator)处理(并修改)时,通常传入标头会传播到传出消息。其中一个例外是 transformer,当完整消息返回到框架时。在这种情况下,用户代码负责整个传出消息。当转换器仅返回有效负载时,将会传播传入标头。此外,仅当传入标头在传出消息中不存在时才会进行传播,从而允许您根据需要更改标头值。
从 4.3.10 开始,你可以将消息处理器(修改消息并生成输出)配置为不传播特定头。若要配置不想复制的头,请在`MessageProducingMessageHandler`抽象类上调用`setNotPropagatedHeaders()`或`addNotPropagatedHeaders()`方法。
你还可以通过在`META-INF/spring.integration.properties`中将`readOnlyHeaders`属性设置为用逗号分隔的头列表,在全局范围内禁止传播特定的消息头。
从 5.0 版本开始,AbstractMessageProducingHandler
上的 setNotPropagatedHeaders()
实现应用简单的模式(xxx*
、xxx
, *xxx
或 xxx*yyy
)以允许使用公共后缀或前缀筛选标头。更多信息,请参见 link:https://docs.spring.io/spring-integration/api/org/springframework/integration/util/PatternMatchUtils.html[PatternMatchUtils
Javadoc。当其中一个模式是 *
(星号)时,则不会传播标头。所有其他模式都会被忽略。在这种情况下,服务激活器的行为与转换器相同,并且任何必需的标头都必须在从服务方法返回的 Message
中提供。ConsumerEndpointSpec
中为 Java DSL 提供了 notPropagatedHeaders()
选项。它还可用于作为 not-propagated-headers
属性对 <service-activator>
组件的 XML 配置。
Message Implementations
Message
界面的基础实现是 GenericMessage<T>
,它提供了两个构造函数,如下面的清单所示:
new GenericMessage<T>(T payload);
new GenericMessage<T>(T payload, Map<String, Object> headers)
创建 Message
时,会生成一个随机的唯一 ID。接受标头 Map
的构造函数会将提供的标头复制到新创建的 Message
中。
还有一种 Message
的便捷实现,它旨在告知错误状况。该实现以 Throwable
对象作为其负载,如下面的示例所示:
ErrorMessage message = new ErrorMessage(someThrowable);
Throwable t = message.getPayload();
请注意,此实现利用了 GenericMessage
基类是参数化的这一事实。因此,如同在两个示例中所示的那样,在检索 Message
负载 Object
时不需要进行强制转换。
The MessageBuilder
Helper Class
您可能会注意到,Message
接口定义了用于检索其负载和标头的检索方法,但不提供 set 方法。原因是 Message
在其最初创建后就不能进行修改。因此,当 Message
实例发送给多个使用者(例如,通过发布/订阅通道)时,如果其中一个使用者需要使用不同的负载类型发送答复,则它必须创建一个新的 Message
。因此,其他使用者不受这些更改的影响。请记住,多个使用者可以访问相同的负载实例或标头值,而此实例本身是否不可变的决定权留给了您。换句话说,Message
实例的契约与不可修改 Collection
的契约类似,并且 MessageHeaders
映射进一步说明了这一点。尽管 MessageHeaders
类实现了 java.util.Map
,但在 MessageHeaders
实例上调用 put
操作(或“remove”或“clear”)的任何尝试都会导致 UnsupportedOperationException
。
Spring Integration 没有要求创建和填充 Map 以传递到 GenericMessage 构造函数中,而是提供了一种更方便的方法来构建 Message:MessageBuilder
。MessageBuilder
提供了两个工厂方法,用于从现有 Message
或带有负载 Object
的 Message
中创建 Message
实例。从现有 Message
构建时,该 Message
的标头和负载将复制到新 Message
中,如下面的示例所示:
Message<String> message1 = MessageBuilder.withPayload("test")
.setHeader("foo", "bar")
.build();
Message<String> message2 = MessageBuilder.fromMessage(message1).build();
assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));
如果您需要使用新负载创建一个 Message
,但仍然希望从现有 Message
中复制标头,则可以使用其中一种“copy”方法,如下面的示例所示:
Message<String> message3 = MessageBuilder.withPayload("test3")
.copyHeaders(message1.getHeaders())
.build();
Message<String> message4 = MessageBuilder.withPayload("test4")
.setHeader("foo", 123)
.copyHeadersIfAbsent(message1.getHeaders())
.build();
assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));
请注意,copyHeadersIfAbsent
方法不会覆盖现有值。此外,在前面的示例中,您可以看到如何使用 setHeader
设置任何用户定义的标头。最后,还有可用于预定义标头的 set
方法,以及用于设置任何标头的非破坏性方法(MessageHeaders
还定义了预定义标头名的常量)。
您还可以使用 MessageBuilder
设置消息的优先级,如下面的示例所示:
Message<Integer> importantMessage = MessageBuilder.withPayload(99)
.setPriority(5)
.build();
assertEquals(5, importantMessage.getHeaders().getPriority());
Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
.setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
.build();
assertEquals(2, lessImportantMessage.getHeaders().getPriority());
仅在使用 PriorityChannel
(正如下一章中所述)时才考虑 priority
标头。它被定义为 java.lang.Integer
。