TCP Message Correlation

IP 端点的一个目标是提供与 Spring Integration 应用程序以外的系统的通信。因此,默认情况下仅发送和接收消息有效负载。从 3.0 开始,你可以使用 JSON、Java 序列化或自定义序列化器和反序列化器传输标头。有关详细信息,请参见 Transferring Headers。框架未提供消息关联(在使用网关除外)或在服务器端协作的通道适配器。Later in this document中,将讨论可用于应用程序的各种关联技术。在大多数情况下,即使消息有效负载包含某些自然关联数据(例如订单号),这也需要特定的应用程序级消息关联。

Gateways

网关自动关联消息。但是,您应当为相对低吞吐量的应用程序使用一个出站网关。当您配置连接工厂以对所有消息对使用单个共享连接时(“single-use="false"”),每次只可处理一条消息。新消息必须等待,直到收到对前一条消息的回复。当为每条新消息配置一个连接工厂以使用新的连接时(“single-use="true"”),此限制并不适用。虽然此设置可提供比共享连接环境更高的吞吐量,但它会因对每对消息打开和关闭一个新连接而产生开销。

因此,对于大容量消息,请考虑使用一对协作的通道适配器。但是,要做到这一点,您需要提供协作逻辑。

Spring Integration 2.2 中引入的另一个解决方案是使用 CachingClientConnectionFactory,它允许使用共享连接池。

Collaborating Outbound and Inbound Channel Adapters

若要实现大容量吞吐量(避免使用网关的缺点,如 mentioned earlier)你可以配置一对协作的出站和入站通道适配器。你还可以使用协作适配器(服务器端或客户端)进行完全异步通信(而不是使用请求-答复语义)。在服务器端,消息关联由适配器自动处理,因为入站适配器添加了允许出站适配器在发送答复消息时确定要使用哪个连接的标头。

在服务器端,你必须填充 ip_connectionId 标头,因为它用于将消息与连接相关联。源自入站适配器中的消息自动设置了标头。如果你希望构建要发送的其他消息,则需要设置标头。你可以从传入消息中获取标头值。

在客户端,应用程序必须提供自己的关联逻辑,如果需要的话。您可以通过多种方式执行此操作。

如果消息有效载荷具有一些自然关联数据(如事务 ID 或订单号),并且您不需要保留任何来自原始出站消息的信息(如答复通道标头),则关联很简单,并且无论如何都将在应用程序级别完成。

如果消息有效载荷具有一些自然关联数据(如事务 ID 或订单号),但您需要保留一些来自原始出站消息的信息(如答复通道标头),则可以保留一份原始出站消息的副本(可能通过使用发布-订阅通道),并使用聚合器重新组合必要的数据。

对于前两种场景中的任何一种,如果有效载荷没有自然关联数据,则可以在出站通道适配器上游提供一个转换器,以使用此类数据增强有效载荷。此类转换器可以将原始有效载荷转换为新对象,该对象同时包含原始有效载荷和部分消息标头。当然,来自标头的实时对象(如答复通道)不能包含在转换后的有效载荷中。

如果您选择这样的策略,则需要确保连接工厂具有合适的成对序列化程序和反序列化程序来处理此类有效负载(例如,使用 Java 序列化的 DefaultSerializerDefaultDeserializer,或自定义序列化程序和反序列化程序)。 TCP Connection Factories 中提到的 ByteArray*Serializer 选项(包括默认 ByteArrayCrLfSerializer)不支持此类有效负载,除非转换后的有效负载是 Stringbyte[]

在 2.2 版本发布之前,当协作通道适配器使用客户端连接工厂时,so-timeout 属性默认为默认答复超时(10 秒)。这意味着,如果入站适配器在这段时间内没有收到任何数据,则套接字将关闭。 此默认行为在真正异步的环境中并不合适,所以它现在默认为无限超时。您可以通过将客户端连接工厂上的 so-timeout 属性设为 10000 毫秒来恢复以前默认行为。

从 5.4 版本开始,多个出站信道适配器和一个 TcpInboundChannelAdapter 可以共享同一个连接工厂。这使得应用程序能够同时支持请求/响应和任意的服务器 → 客户端消息传递。有关更多信息,请参见 TCP Gateways

Transferring Headers

TCP 是流式协议。SerializersDeserializers 在流中划分消息。在 3.0 之前,只能通过 TCP 传输消息有效负载(Stringbyte[])。从 3.0 开始,您可以传输选定的标头以及有效负载。但是,“live” 对象(例如 replyChannel 标头)无法序列化。

要在 TCP 上发送标头信息,需要进行一些额外的配置。

第一步是使用 mapper 属性为 ConnectionFactory 提供一个 MessageConvertingTcpMessageMapper。此映射委托给任何 MessageConverter 实现,以将消息转换为可由配置的 serializerdeserializer 进行序列化和反序列化的某个对象并反过来执行此操作。

Spring Integration 提供了一个 MapMessageConverter,它允许指定添加到 Map 对象的标头列表以及有效载荷。生成的映射有两个条目:payloadheadersheaders 条目本身就是一个 Map,并且包含所选标头。

第二步是提供一个序列化器和一个可以将 Map 转换为某种线缆格式的反序列化器。这可以是一个自定义 SerializerDeserializer,如果您在对等系统不是 Spring Integration 应用程序时通常需要它。

Spring Integration 提供了 MapJsonSerializer`以将 `Map`转换为 JSON 或者从 JSON 转换。它使用 Spring Integration `JsonObjectMapper。如果需要,你可以提供自定义 JsonObjectMapper。默认情况下,序列化器在对象之间插入行尾 (0x0a) 字符。请参见 Javadoc了解详细信息。

JsonObjectMapper 使用类路径上 Jackson 的任何版本。

您还可以使用 DefaultSerializerDefaultDeserializerMap 执行标准 Java 序列化。

以下示例展示了使用 JSON 传输 correlationIdsequenceNumbersequenceSize 标头的连接工厂的配置:

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="12345"
    mapper="mapper"
    serializer="jsonSerializer"
    deserializer="jsonSerializer"/>

<bean id="mapper"
      class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
    <constructor-arg name="messageConverter">
        <bean class="o.sf.integration.support.converter.MapMessageConverter">
            <property name="headerNames">
                <list>
                    <value>correlationId</value>
                    <value>sequenceNumber</value>
                    <value>sequenceSize</value>
                </list>
            </property>
        </bean>
    </constructor-arg>
</bean>

<bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />

使用前面配置发送的消息,有效内容为“something”在连接上将显示如下:

{"headers":{"correlationId":"things","sequenceSize":5,"sequenceNumber":1},"payload":"something"}