TCP Connection Factories

Overview

对于 TCP,底层连接的配置通过使用一个连接工厂提供。提供了两种类型的连接工厂:客户端连接工厂和服务器连接工厂。客户端连接工厂建立出站连接。服务器连接工厂侦听传入连接。

出站通道适配器使用客户端连接工厂,但你也可以向入站通道适配器提供到客户端连接工厂的引用。该适配器接收由出站适配器创建的连接上接收到的任何传入消息。

入站通道适配器或网关使用服务器连接工厂。(实际上,连接工厂不能在没有一个连接工厂的情况下运行)。你还可以向出站适配器提供到服务器连接工厂的引用。然后你可以使用该适配器在同一连接上向传入消息发送答复。

答复消息仅在答复包含连接工厂插入原始消息中的 ip_connectionId 头时才路由到连接。

当我们在输入和输出适配器之间共享连接工厂时,消息关联的程度已执行完毕。共享允许通过 TCP 以异步双向方式通信。默认情况下,只有有效载信息通过 TCP 传输。因此,任何消息关联都必须由下游组件(如聚合器或其他端点)执行。对传输部分标头提供支持从版本 3.0 开始。更多信息,请参阅 TCP Message Correlation

你可以向最多一个每种类型的适配器提供一个连接工厂的引用。

Spring Integration 提供使用 java.net.Socketjava.nio.channel.SocketChannel 的连接工厂。

以下示例显示一个使用 java.net.Socket 连接的简单服务器连接工厂:

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"/>

以下示例显示一个使用 java.nio.channel.SocketChannel 连接的简单服务器连接工厂:

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    using-nio="true"/>

从 Spring Integration 4.2 版本开始,如果将服务器配置为侦听随机端口(通过将端口设置为 0),你可以使用 getPort() 来获取操作系统选择的实际端口。此外,getServerSocketAddress() 可让你获取完整的 SocketAddress。有关更多信息,请参阅 link:https://docs.spring.io/spring-integration/api/org/springframework/integration/ip/tcp/connection/TcpServerConnectionFactory.html[Javadoc for the TcpServerConnectionFactory 接口。

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"/>

以下示例显示一个使用 java.net.Socket 连接的客户端连接工厂并为每个消息创建一个新的连接:

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"
    using-nio=true/>

从 5.2 版本开始,客户端连接工厂支持属性 connectTimeout,以秒为单位指定,默认为 60。

Message Demarcation (Serializers and Deserializers)

TCP 是一个流协议。这意味着必须向通过 TCP 传输的数据提供一些结构,以便接收者可以将数据分隔为离散的消息。连接工厂被配置为使用序列化器和反序列化器在消息有效负载和通过 TCP 发送的位之间转换。这是通过分别为入站消息与出站消息提供反序列化器和序列化器来完成的。Spring Integration 提供了许多标准序列化器和反序列化器。

ByteArrayCrlfSerializer* 将一个字节数组转换为一个字节流,后面跟着回车和换行符(\r\n)。这是默认序列化器(和反序列化器),可以用电信作为客户端(例如)。

ByteArraySingleTerminatorSerializer* 将一个字节数组转换为一个字节流,后面跟着一个终止字符(默认是 0x00)。

ByteArrayLfSerializer* 将一个字节数组转换为一个字节流,后面跟着一个换行符(0x0a)。

ByteArrayStxEtxSerializer* 将一个字节数组转换为一个字节流,前面是 STX(0x02),后面是 ETX(0x03)。

ByteArrayLengthHeaderSerializer 将一个字节数组转换为一个字节流,前面是按网络字节顺序(大端)保存的二进制长度。这是一个高效的反序列化器,因为它不需要解析每个字节来查找终止字符序列。它还可以用于包含二进制数据的有效负载。前面的序列化器只支持有效负载中的文本。长度头的默认大小是 4 个字节(一个整数),允许消息最多为(2^31 - 1)字节。但是,对于最多为 255 个字节的消息,length 头可以是单个字节(无符号);对于最多为(2^16 - 1)个字节的消息,可以是不带符号的短整数(2 个字节)。如果你需要头部的任何其他格式,可以将 ByteArrayLengthHeaderSerializer 子类化,并为 readHeaderwriteHeader 方法提供实现。绝对最大数据大小是(2^31 - 1)字节。从 5.2 版本开始,头值除了有效负载之外,还可以包括头部的长度。设置属性 inclusive 以启用该机制(对于生产者和消费者,必须将其设置为相同)。

ByteArrayRawSerializer * 将一个字节数组转换为一个字节流,并且不添加任何附加的消息分隔数据。使用此序列化器(和反序列化器),消息的结尾由客户端以有序的方式关闭套接字来指示。当使用此序列化器时,消息接收会挂起,直到客户端关闭套接字或出现超时。超时不会导致消息。当此序列化器正在使用且客户端是 Spring Integration 应用程序时,客户端必须使用配置为 single-use="true" 的连接工厂。这样做会使适配器在发送消息后关闭套接字。序列化器本身不会关闭连接。你应该仅将此序列化器与通道适配器(而不是网关)使用的连接工厂一起使用,并且连接工厂应由入站或出站适配器使用,但不能同时使用这两个适配器。另请参阅此部分后面的 ByteArrayElasticRawDeserializer。但是,从 5.2 版本开始,出站网关有一个新属性 closeStreamAfterSend;这允许使用原始序列化器/反序列化器,因为 EOF 已向服务器发出信号,同时让连接保持打开以接收答复。

在 4.2.2 版本之前,使用非阻塞 I/O(NIO)时,此序列化器将超时(在读取期间)视为文件结尾,并且到目前为止读取的数据将作为消息发出。这是不可靠的,不应用于分隔消息。现在,它将此类条件视为异常。在不太可能以这种方式使用它的情况下,你可以通过将 treatTimeoutAsEndOfMessage 构造器参数设置为 true 来恢复以前的行为。

这些中的每一个都是 AbstractByteArraySerializer 的子类,它同时实现了 org.springframework.core.serializer.Serializerorg.springframework.core.serializer.Deserializer。为了向后兼容,使用 AbstractByteArraySerializer 的任何子类进行序列化的连接也接受一个首先转换为字节数组的 String。这些序列化器和反序列化器中的每一个都将包含相应格式的输入流转换为字节数组有效负载。

为了避免因为表现不佳的客户端(不遵守配置的序列化器协议)而导致内存耗尽,这些序列化器将施加最大消息大小。如果传入消息超出此大小,则抛出异常。默认最大消息大小为 2048 字节。可以通过设置 maxMessageSize 属性来增加它。如果你使用默认序列化器或反序列化器并希望增加最大消息大小,则必须将最大消息大小声明为具有已设置的 maxMessageSize 属性的显式 bean,并配置连接工厂以使用该 bean。

本节前面标记为 * 的类使用中间缓冲区,并将解码数据复制到正确大小的最终缓冲区。从 4.3 版本开始,你可以通过设置 poolSize 属性来配置这些缓冲区,以允许重复使用这些原始缓冲区,而不是为每条消息分配和丢弃它们,这是默认行为。将属性设置为负值将创建一个没有边界的池。如果池有界,还可以设置 poolWaitTimeout 属性(以毫秒为单位),在该属性过期后如果没有可用缓冲区,则会抛出异常。它默认为无穷大。这样的异常会导致套接字关闭。

如果你希望在自定义反序列化器中使用相同的机制,可以扩展 AbstractPooledBufferByteArraySerializer(而不是它的超类 AbstractByteArraySerializer),并实现 doDeserialize() 而非 deserialize()。缓冲区将自动返回到池中。AbstractPooledBufferByteArraySerializer 还提供了一个方便的实用方法:copyToSizedArray()

版本 5.0 添加了 ByteArrayElasticRawDeserializer。它类似于上方 ByteArrayRawSerializer 的反序列化器部分,不同之处在于不需要设置 maxMessageSize。在内部,它使用一个 ByteArrayOutputStream,让缓冲区按需要增长。客户端必须有序关闭套接字,以发出消息结束信号。

只有在受信任的对等方的情况下才应该使用此反序列化器;由于内存不足,它容易受到 DoS 攻击。

MapJsonSerializer 使用 Jackson ObjectMapperMap 和 JSON 之间进行转换。你可以将该序列化器与 MessageConvertingTcpMessageMapperMapMessageConverter 搭配使用,以 JSON 形式传输选定的标头和有效负载。

Jackson ObjectMapper 无法界定流中的消息。因此,MapJsonSerializer 需要委派另一个序列化器或反序列化器来处理消息分界。默认情况下,使用 ByteArrayLfSerializer,从而在电线上产生格式为 <json><LF> 的消息,但你可以配置它改为使用其他消息。(下一个示例显示了如何执行此操作。)

最后的标准序列化器是 org.springframework.core.serializer.DefaultSerializer,你可以使用它以 Java 序列化方式转换可序列化对象。org.springframework.core.serializer.DefaultDeserializer 提供用于包含可序列化对象流的入站反序列化的作用。

如果你不想使用默认序列化器和反序列化器 (ByteArrayCrLfSerializer),则必须在连接工厂上设置 serializerdeserializer 属性。以下示例展示了如何进行设置:

<bean id="javaSerializer"
      class="org.springframework.core.serializer.DefaultSerializer" />
<bean id="javaDeserializer"
      class="org.springframework.core.serializer.DefaultDeserializer" />

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    deserializer="javaDeserializer"
    serializer="javaSerializer"/>

使用 java.net.Socket 连接并在线路上使用 Java 序列化的服务器连接工厂。

有关连接工厂中可用属性的完整详细信息,请参见本节末尾的 the reference

默认情况下,不针对入站数据包执行反向 DNS 查找:在未配置 DNS(例如,Docker 容器)的环境中,这会导致连接延迟。为了将 IP 地址转换为用于消息标头的主机名,可以通过将 lookup-host 属性设置为 true 来覆盖默认行为。

你还可以修改套接字和套接字工厂的属性。更多信息,请参阅 SSL/TLS Support。如其中所述,如果使用或不使用 SSL,都可以进行此类修改。

Custom Serializers and Deserializers

如果你的数据不是受标准反序列化器支持的格式,你可以实现自己的反序列化器;你还可以实现一个自定义序列化器。

若要实现自定义序列化器和反序列化器对,请实现 org.springframework.core.serializer.Deserializerorg.springframework.core.serializer.Serializer 接口。

当反序列化器检测到消息之间有封闭的输入流时,它必须抛出 SoftEndOfStreamException;这是向框架发出的信号,表示关闭是“正常的”。如果消息解码期间流被关闭,则应改为抛出其他某些异常。

从版本 5.2 开始,SoftEndOfStreamException 现在是 RuntimeException,而不是扩展 IOException

TCP Caching Client Connection Factory

noted earlier一样,TCP 套接字可以是“单次使用”(一次请求或响应)或共享。在高流量环境中使用共享套接字与出站网关的配合效果不佳,因为此套接字一次只能处理一个请求或响应。

为提高性能,您可以使用协作信道适配器来取代网关,但这需要进行应用程序级消息关联。有关更多信息,请参见 TCP Message Correlation

Spring Integration 2.2 引入了缓存客户端连接工厂,它使用共享套接字的池,让网关能够使用共享连接池处理多个并发请求。

TCP Failover Client Connection Factory

你可以配置一个支持故障转移到一个或多个其他服务器的 TCP 连接工厂。在发送消息时,工厂会迭代其所有已配置的工厂,直至消息能够被发送,或者找不到连接。最初,将使用已配置列表中的第一个工厂。如果连接随后失败,下一个工厂将成为当前工厂。以下示例展示了如何配置故障转移客户端连接工厂:

<bean id="failCF" class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory">
    <constructor-arg>
        <list>
            <ref bean="clientFactory1"/>
            <ref bean="clientFactory2"/>
        </list>
    </constructor-arg>
</bean>

使用故障转移连接工厂时,singleUse 属性必须在工厂本身及其配置为使用的工厂列表之间保持一致。

该连接工厂有两个与故障回退相关的属性(与共享连接一起使用时,即 singleUse=false):

  • refreshSharedInterval

  • closeOnRefresh

考虑根据上述配置的以下场景:假设 clientFactory1 无法建立连接,但 clientFactory2 可以。在 refreshSharedInterval 经过后调用 failCF getConnection() 方法时,我们将再次尝试使用 clientFactory1 进行连接;如果成功,到 clientFactory2 的连接将被关闭。如果 closeOnRefreshfalse,“旧”连接将保持打开状态,并且如果第一个工厂再次失败,可能会在将来重复使用。

设置 refreshSharedInterval,仅在经过该时间后尝试重新连接到第一个工厂;将其设置为 Long.MAX_VALUE(默认值),如果你只想在当前连接失败时故障回退到第一个工厂,请执行此操作。

设置 closeOnRefresh,让刷新实际创建新连接后关闭“旧”连接。

如果任何代理工厂是 CachingClientConnectionFactory,则这些属性不适用,因为连接缓存是在那里处理的;在这种情况下,系统始终会查阅连接工厂列表以获取连接。

从版本 5.3 开始,这些默认值变为 Long.MAX_VALUEtrue,因此当当前连接失败时,该工厂仅尝试进行故障回退。要恢复到以前版本中的默认行为,请将其设置为 0false

另请参见 Testing Connections

TCP Thread Affinity Connection Factory

Spring Integration 版本 5.0 引入了此连接工厂。它将连接绑定到调用线程,并且每次该线程发送消息时都会重复使用相同的连接。这将一直持续到连接被关闭(由服务器或网络)为止,或者直到线程调用 releaseConnection() 方法为止。连接本身由另一个客户端工厂实现提供,该实现必须配置为提供非共享(一次性)连接,以便每个线程都能获得连接。

以下示例显示如何配置 TCP 线程关联连接工厂:

@Bean
public TcpNetClientConnectionFactory cf() {
    TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory("localhost",
            Integer.parseInt(System.getProperty(PORT)));
    cf.setSingleUse(true);
    return cf;
}

@Bean
public ThreadAffinityClientConnectionFactory tacf() {
    return new ThreadAffinityClientConnectionFactory(cf());
}

@Bean
@ServiceActivator(inputChannel = "out")
public TcpOutboundGateway outGate() {
    TcpOutboundGateway outGate = new TcpOutboundGateway();
    outGate.setConnectionFactory(tacf());
    outGate.setReplyChannelName("toString");
    return outGate;
}