TCP Message Correlation
IP 端点的一个目标是提供与 Spring Integration 应用程序以外的系统的通信。因此,默认情况下仅发送和接收消息有效负载。从 3.0 开始,你可以使用 JSON、Java 序列化或自定义序列化器和反序列化器传输标头。有关详细信息,请参见 Transferring Headers。框架未提供消息关联(在使用网关除外)或在服务器端协作的通道适配器。Later in this document中,将讨论可用于应用程序的各种关联技术。在大多数情况下,即使消息有效负载包含某些自然关联数据(例如订单号),这也需要特定的应用程序级消息关联。
One goal of the IP endpoints is to provide communication with systems other than Spring Integration applications. For this reason, only message payloads are sent and received by default. Since 3.0, you can transfer headers by using JSON, Java serialization, or custom serializers and deserializers. See Transferring Headers for more information. No message correlation is provided by the framework (except when using the gateways) or collaborating channel adapters on the server side. Later in this document, we discuss the various correlation techniques available to applications. In most cases, this requires specific application-level correlation of messages, even when message payloads contain some natural correlation data (such as an order number).
Gateways
网关自动关联消息。但是,您应当为相对低吞吐量的应用程序使用一个出站网关。当您配置连接工厂以对所有消息对使用单个共享连接时(“single-use="false"”),每次只可处理一条消息。新消息必须等待,直到收到对前一条消息的回复。当为每条新消息配置一个连接工厂以使用新的连接时(“single-use="true"”),此限制并不适用。虽然此设置可提供比共享连接环境更高的吞吐量,但它会因对每对消息打开和关闭一个新连接而产生开销。
Gateways automatically correlate messages. However, you should use an outbound gateway for relatively low-volume applications. When you configure the connection factory to use a single shared connection for all message pairs ('single-use="false"'), only one message can be processed at a time. A new message has to wait until the reply to the previous message has been received. When a connection factory is configured for each new message to use a new connection ('single-use="true"'), this restriction does not apply. While this setting can give higher throughput than a shared connection environment, it comes with the overhead of opening and closing a new connection for each message pair.
因此,对于大容量消息,请考虑使用一对协作的通道适配器。但是,要做到这一点,您需要提供协作逻辑。
Therefore, for high-volume messages, consider using a collaborating pair of channel adapters. However, to do so, you need to provide collaboration logic.
Spring Integration 2.2 中引入的另一个解决方案是使用 CachingClientConnectionFactory
,它允许使用共享连接池。
Another solution, introduced in Spring Integration 2.2, is to use a CachingClientConnectionFactory
, which allows the use of a pool of shared connections.
Collaborating Outbound and Inbound Channel Adapters
若要实现大容量吞吐量(避免使用网关的缺点,如 mentioned earlier)你可以配置一对协作的出站和入站通道适配器。你还可以使用协作适配器(服务器端或客户端)进行完全异步通信(而不是使用请求-答复语义)。在服务器端,消息关联由适配器自动处理,因为入站适配器添加了允许出站适配器在发送答复消息时确定要使用哪个连接的标头。
To achieve high-volume throughput (avoiding the pitfalls of using gateways, as mentioned earlier) you can configure a pair of collaborating outbound and inbound channel adapters. You can also use collaborating adapters (server-side or client-side) for totally asynchronous communication (rather than with request-reply semantics). On the server side, message correlation is automatically handled by the adapters, because the inbound adapter adds a header that allows the outbound adapter to determine which connection to use when sending the reply message.
在服务器端,你必须填充 |
On the server side, you must populate the |
在客户端,应用程序必须提供自己的关联逻辑,如果需要的话。您可以通过多种方式执行此操作。
On the client side, the application must provide its own correlation logic, if needed. You can do so in a number of ways.
如果消息有效载荷具有一些自然关联数据(如事务 ID 或订单号),并且您不需要保留任何来自原始出站消息的信息(如答复通道标头),则关联很简单,并且无论如何都将在应用程序级别完成。
If the message payload has some natural correlation data (such as a transaction ID or an order number) and you have no need to retain any information (such as a reply channel header) from the original outbound message, the correlation is simple and would be done at the application level in any case.
如果消息有效载荷具有一些自然关联数据(如事务 ID 或订单号),但您需要保留一些来自原始出站消息的信息(如答复通道标头),则可以保留一份原始出站消息的副本(可能通过使用发布-订阅通道),并使用聚合器重新组合必要的数据。
If the message payload has some natural correlation data (such as a transaction ID or an order number), but you need to retain some information (such as a reply channel header) from the original outbound message, you can retain a copy of the original outbound message (perhaps by using a publish-subscribe channel) and use an aggregator to recombine the necessary data.
对于前两种场景中的任何一种,如果有效载荷没有自然关联数据,则可以在出站通道适配器上游提供一个转换器,以使用此类数据增强有效载荷。此类转换器可以将原始有效载荷转换为新对象,该对象同时包含原始有效载荷和部分消息标头。当然,来自标头的实时对象(如答复通道)不能包含在转换后的有效载荷中。
For either of the previous two scenarios, if the payload has no natural correlation data, you can provide a transformer upstream of the outbound channel adapter to enhance the payload with such data. Such a transformer may transform the original payload to a new object that contains both the original payload and some subset of the message headers. Of course, live objects (such as reply channels) from the headers cannot be included in the transformed payload.
如果您选择这样的策略,则需要确保连接工厂具有合适的成对序列化程序和反序列化程序来处理此类有效负载(例如,使用 Java 序列化的 DefaultSerializer
和 DefaultDeserializer
,或自定义序列化程序和反序列化程序)。 TCP Connection Factories 中提到的 ByteArray*Serializer
选项(包括默认 ByteArrayCrLfSerializer
)不支持此类有效负载,除非转换后的有效负载是 String
或 byte[]
。
If you choose such a strategy, you need to ensure the connection factory has an appropriate serializer-deserializer pair to handle such a payload (such as DefaultSerializer
and DefaultDeserializer
, which use java serialization, or a custom serializer and deserializer).
The ByteArray*Serializer
options mentioned in TCP Connection Factories, including the default ByteArrayCrLfSerializer
, do not support such payloads unless the transformed payload is a String
or byte[]
.
在 2.2 版本发布之前,当协作通道适配器使用客户端连接工厂时, Before the 2.2 release, when collaborating channel adapters used a client connection factory, the 此默认行为在真正异步的环境中并不合适,所以它现在默认为无限超时。您可以通过将客户端连接工厂上的 This default behavior was not appropriate in a truly asynchronous environment, so it now defaults to an infinite timeout.
You can reinstate the previous default behavior by setting the |
从 5.4 版本开始,多个出站信道适配器和一个 TcpInboundChannelAdapter
可以共享同一个连接工厂。这使得应用程序能够同时支持请求/响应和任意的服务器 → 客户端消息传递。有关更多信息,请参见 TCP Gateways。
Starting with version 5.4, multiple outbound channel adapters and one TcpInboundChannelAdapter
can share the same connection factory.
This allows an application to support both request/reply and arbitrary server → client messaging.
See TCP Gateways for more information.
Transferring Headers
TCP 是流式协议。Serializers
和 Deserializers
在流中划分消息。在 3.0 之前,只能通过 TCP 传输消息有效负载(String
或 byte[]
)。从 3.0 开始,您可以传输选定的标头以及有效负载。但是,“live” 对象(例如 replyChannel
标头)无法序列化。
TCP is a streaming protocol.
Serializers
and Deserializers
demarcate messages within the stream.
Prior to 3.0, only message payloads (String
or byte[]
) could be transferred over TCP.
Beginning with 3.0, you can transfer selected headers as well as the payload.
However, “live” objects, such as the replyChannel
header, cannot be serialized.
要在 TCP 上发送标头信息,需要进行一些额外的配置。
Sending header information over TCP requires some additional configuration.
第一步是使用 mapper
属性为 ConnectionFactory
提供一个 MessageConvertingTcpMessageMapper
。此映射委托给任何 MessageConverter
实现,以将消息转换为可由配置的 serializer
和 deserializer
进行序列化和反序列化的某个对象并反过来执行此操作。
The first step is to provide the ConnectionFactory
with a MessageConvertingTcpMessageMapper
that uses the mapper
attribute.
This mapper delegates to any MessageConverter
implementation to convert the message to and from some object that can be serialized and deserialized by the configured serializer
and deserializer
.
Spring Integration 提供了一个 MapMessageConverter
,它允许指定添加到 Map
对象的标头列表以及有效载荷。生成的映射有两个条目:payload
和 headers
。headers
条目本身就是一个 Map
,并且包含所选标头。
Spring Integration provides a MapMessageConverter
, which allows the specification of a list of headers that are added to a Map
object, along with the payload.
The generated Map has two entries: payload
and headers
.
The headers
entry is itself a Map
and contains the selected headers.
第二步是提供一个序列化器和一个可以将 Map
转换为某种线缆格式的反序列化器。这可以是一个自定义 Serializer
或 Deserializer
,如果您在对等系统不是 Spring Integration 应用程序时通常需要它。
The second step is to provide a serializer and a deserializer that can convert between a Map
and some wire format.
This can be a custom Serializer
or Deserializer
, which you typically need if the peer system is not a Spring Integration application.
Spring Integration 提供了 MapJsonSerializer`以将 `Map`转换为 JSON 或者从 JSON 转换。它使用 Spring Integration `JsonObjectMapper
。如果需要,你可以提供自定义 JsonObjectMapper
。默认情况下,序列化器在对象之间插入行尾 (0x0a
) 字符。请参见 Javadoc了解详细信息。
Spring Integration provides a MapJsonSerializer
to convert a Map
to and from JSON.
It uses a Spring Integration JsonObjectMapper
.
You can provide a custom JsonObjectMapper
if needed.
By default, the serializer inserts a linefeed (0x0a
) character between objects.
See the Javadoc for more information.
|
The |
您还可以使用 DefaultSerializer
和 DefaultDeserializer
对 Map
执行标准 Java 序列化。
You can also use standard Java serialization of the Map
, by using the DefaultSerializer
and DefaultDeserializer
.
以下示例展示了使用 JSON 传输 correlationId
、sequenceNumber
和 sequenceSize
标头的连接工厂的配置:
The following example shows the configuration of a connection factory that transfers the correlationId
, sequenceNumber
, and sequenceSize
headers by using JSON:
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="12345"
mapper="mapper"
serializer="jsonSerializer"
deserializer="jsonSerializer"/>
<bean id="mapper"
class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
<constructor-arg name="messageConverter">
<bean class="o.sf.integration.support.converter.MapMessageConverter">
<property name="headerNames">
<list>
<value>correlationId</value>
<value>sequenceNumber</value>
<value>sequenceSize</value>
</list>
</property>
</bean>
</constructor-arg>
</bean>
<bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />
使用前面配置发送的消息,有效内容为“something”在连接上将显示如下:
A message sent with the preceding configuration, with a payload of 'something' would appear on the wire as follows:
{"headers":{"correlationId":"things","sequenceSize":5,"sequenceNumber":1},"payload":"something"}