TCP Connection Factories

Overview

对于 TCP,底层连接的配置通过使用一个连接工厂提供。提供了两种类型的连接工厂:客户端连接工厂和服务器连接工厂。客户端连接工厂建立出站连接。服务器连接工厂侦听传入连接。

For TCP, the configuration of the underlying connection is provided by using a connection factory. Two types of connection factory are provided: a client connection factory and a server connection factory. Client connection factories establish outgoing connections. Server connection factories listen for incoming connections.

出站通道适配器使用客户端连接工厂,但你也可以向入站通道适配器提供到客户端连接工厂的引用。该适配器接收由出站适配器创建的连接上接收到的任何传入消息。

An outbound channel adapter uses a client connection factory, but you can also provide a reference to a client connection factory to an inbound channel adapter. That adapter receives any incoming messages that are received on connections created by the outbound adapter.

入站通道适配器或网关使用服务器连接工厂。(实际上,连接工厂不能在没有一个连接工厂的情况下运行)。你还可以向出站适配器提供到服务器连接工厂的引用。然后你可以使用该适配器在同一连接上向传入消息发送答复。

An inbound channel adapter or gateway uses a server connection factory. (In fact, the connection factory cannot function without one). You can also provide a reference to a server connection factory to an outbound adapter. You can then use that adapter to send replies to incoming messages on the same connection.

答复消息仅在答复包含连接工厂插入原始消息中的 ip_connectionId 头时才路由到连接。

Reply messages are routed to the connection only if the reply contains the ip_connectionId header that was inserted into the original message by the connection factory.

当我们在输入和输出适配器之间共享连接工厂时,消息关联的程度已执行完毕。共享允许通过 TCP 以异步双向方式通信。默认情况下,只有有效载信息通过 TCP 传输。因此,任何消息关联都必须由下游组件(如聚合器或其他端点)执行。对传输部分标头提供支持从版本 3.0 开始。更多信息,请参阅 TCP Message Correlation

This is the extent of message correlation performed when sharing connection factories between inbound and outbound adapters. Such sharing allows for asynchronous two-way communication over TCP. By default, only payload information is transferred using TCP. Therefore, any message correlation must be performed by downstream components such as aggregators or other endpoints. Support for transferring selected headers was introduced in version 3.0. For more information, see TCP Message Correlation.

你可以向最多一个每种类型的适配器提供一个连接工厂的引用。

You may give a reference to a connection factory to a maximum of one adapter of each type.

Spring Integration 提供使用 java.net.Socketjava.nio.channel.SocketChannel 的连接工厂。

Spring Integration provides connection factories that use java.net.Socket and java.nio.channel.SocketChannel.

以下示例显示一个使用 java.net.Socket 连接的简单服务器连接工厂:

The following example shows a simple server connection factory that uses java.net.Socket connections:

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"/>

以下示例显示一个使用 java.nio.channel.SocketChannel 连接的简单服务器连接工厂:

The following example shows a simple server connection factory that uses java.nio.channel.SocketChannel connections:

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    using-nio="true"/>

从 Spring Integration 4.2 版本开始,如果将服务器配置为侦听随机端口(通过将端口设置为 0),你可以使用 getPort() 来获取操作系统选择的实际端口。此外,getServerSocketAddress() 可让你获取完整的 SocketAddress。有关更多信息,请参阅 link:https://docs.spring.io/spring-integration/api/org/springframework/integration/ip/tcp/connection/TcpServerConnectionFactory.html[Javadoc for the TcpServerConnectionFactory 接口。

Starting with Spring Integration version 4.2, if the server is configured to listen on a random port (by setting the port to 0), you can get the actual port chosen by the OS by using getPort(). Also, getServerSocketAddress() lets you get the complete SocketAddress. See the Javadoc for the TcpServerConnectionFactory interface for more information.

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"/>

以下示例显示一个使用 java.net.Socket 连接的客户端连接工厂并为每个消息创建一个新的连接:

The following example shows a client connection factory that uses java.net.Socket connections and creates a new connection for each message:

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"
    using-nio=true/>

从 5.2 版本开始,客户端连接工厂支持属性 connectTimeout,以秒为单位指定,默认为 60。

Starting with version 5.2, the client connection factories support the property connectTimeout, specified in seconds, which defaults to 60.

Message Demarcation (Serializers and Deserializers)

TCP 是一个流协议。这意味着必须向通过 TCP 传输的数据提供一些结构,以便接收者可以将数据分隔为离散的消息。连接工厂被配置为使用序列化器和反序列化器在消息有效负载和通过 TCP 发送的位之间转换。这是通过分别为入站消息与出站消息提供反序列化器和序列化器来完成的。Spring Integration 提供了许多标准序列化器和反序列化器。

TCP is a streaming protocol. This means that some structure has to be provided to data transported over TCP so that the receiver can demarcate the data into discrete messages. Connection factories are configured to use serializers and deserializers to convert between the message payload and the bits that are sent over TCP. This is accomplished by providing a deserializer and a serializer for inbound and outbound messages, respectively. Spring Integration provides a number of standard serializers and deserializers.

ByteArrayCrlfSerializer* 将一个字节数组转换为一个字节流,后面跟着回车和换行符(\r\n)。这是默认序列化器(和反序列化器),可以用电信作为客户端(例如)。

ByteArrayCrlfSerializer* converts a byte array to a stream of bytes followed by carriage return and linefeed characters (\r\n). This is the default serializer (and deserializer) and can be used (for example) with telnet as a client.

ByteArraySingleTerminatorSerializer* 将一个字节数组转换为一个字节流,后面跟着一个终止字符(默认是 0x00)。

The ByteArraySingleTerminatorSerializer* converts a byte array to a stream of bytes followed by a single termination character (the default is 0x00).

ByteArrayLfSerializer* 将一个字节数组转换为一个字节流,后面跟着一个换行符(0x0a)。

The ByteArrayLfSerializer* converts a byte array to a stream of bytes followed by a single linefeed character (0x0a).

ByteArrayStxEtxSerializer* 将一个字节数组转换为一个字节流,前面是 STX(0x02),后面是 ETX(0x03)。

The ByteArrayStxEtxSerializer* converts a byte array to a stream of bytes preceded by an STX (0x02) and followed by an ETX (0x03).

ByteArrayLengthHeaderSerializer 将一个字节数组转换为一个字节流,前面是按网络字节顺序(大端)保存的二进制长度。这是一个高效的反序列化器,因为它不需要解析每个字节来查找终止字符序列。它还可以用于包含二进制数据的有效负载。前面的序列化器只支持有效负载中的文本。长度头的默认大小是 4 个字节(一个整数),允许消息最多为(2^31 - 1)字节。但是,对于最多为 255 个字节的消息,length 头可以是单个字节(无符号);对于最多为(2^16 - 1)个字节的消息,可以是不带符号的短整数(2 个字节)。如果你需要头部的任何其他格式,可以将 ByteArrayLengthHeaderSerializer 子类化,并为 readHeaderwriteHeader 方法提供实现。绝对最大数据大小是(2^31 - 1)字节。从 5.2 版本开始,头值除了有效负载之外,还可以包括头部的长度。设置属性 inclusive 以启用该机制(对于生产者和消费者,必须将其设置为相同)。

The ByteArrayLengthHeaderSerializer converts a byte array to a stream of bytes preceded by a binary length in network byte order (big endian). This an efficient deserializer because it does not have to parse every byte to look for a termination character sequence. It can also be used for payloads that contain binary data. The preceding serializers support only text in the payload. The default size of the length header is four bytes (an Integer), allowing for messages up to (2^31 - 1) bytes. However, the length header can be a single byte (unsigned) for messages up to 255 bytes, or an unsigned short (2 bytes) for messages up to (2^16 - 1) bytes. If you need any other format for the header, you can subclass ByteArrayLengthHeaderSerializer and provide implementations for the readHeader and writeHeader methods. The absolute maximum data size is (2^31 - 1) bytes. Starting with version 5.2, the header value can include the length of the header in addition to the payload. Set the inclusive property to enable that mechanism (it must be set to the same for producers and consumers).

ByteArrayRawSerializer * 将一个字节数组转换为一个字节流,并且不添加任何附加的消息分隔数据。使用此序列化器(和反序列化器),消息的结尾由客户端以有序的方式关闭套接字来指示。当使用此序列化器时,消息接收会挂起,直到客户端关闭套接字或出现超时。超时不会导致消息。当此序列化器正在使用且客户端是 Spring Integration 应用程序时,客户端必须使用配置为 single-use="true" 的连接工厂。这样做会使适配器在发送消息后关闭套接字。序列化器本身不会关闭连接。你应该仅将此序列化器与通道适配器(而不是网关)使用的连接工厂一起使用,并且连接工厂应由入站或出站适配器使用,但不能同时使用这两个适配器。另请参阅此部分后面的 ByteArrayElasticRawDeserializer。但是,从 5.2 版本开始,出站网关有一个新属性 closeStreamAfterSend;这允许使用原始序列化器/反序列化器,因为 EOF 已向服务器发出信号,同时让连接保持打开以接收答复。

The ByteArrayRawSerializer*, converts a byte array to a stream of bytes and adds no additional message demarcation data. With this serializer (and deserializer), the end of a message is indicated by the client closing the socket in an orderly fashion. When using this serializer, message reception hangs until the client closes the socket or a timeout occurs. A timeout does not result in a message. When this serializer is being used and the client is a Spring Integration application, the client must use a connection factory that is configured with single-use="true". Doing so causes the adapter to close the socket after sending the message. The serializer does not, by itself, close the connection. You should use this serializer only with the connection factories used by channel adapters (not gateways), and the connection factories should be used by either an inbound or outbound adapter but not both. See also ByteArrayElasticRawDeserializer, later in this section. However, since version 5.2, the outbound gateway has a new property closeStreamAfterSend; this allows the use of raw serializers/deserializers because the EOF is signaled to the server, while leaving the connection open to receive the reply.

在 4.2.2 版本之前,使用非阻塞 I/O(NIO)时,此序列化器将超时(在读取期间)视为文件结尾,并且到目前为止读取的数据将作为消息发出。这是不可靠的,不应用于分隔消息。现在,它将此类条件视为异常。在不太可能以这种方式使用它的情况下,你可以通过将 treatTimeoutAsEndOfMessage 构造器参数设置为 true 来恢复以前的行为。

Before version 4.2.2, when using non-blocking I/O (NIO), this serializer treated a timeout (during read) as an end of file, and the data read so far was emitted as a message. This is unreliable and should not be used to delimit messages. It now treats such conditions as an exception. In the unlikely event that you use it this way, you can restore the previous behavior by setting the treatTimeoutAsEndOfMessage constructor argument to true.

这些中的每一个都是 AbstractByteArraySerializer 的子类,它同时实现了 org.springframework.core.serializer.Serializerorg.springframework.core.serializer.Deserializer。为了向后兼容,使用 AbstractByteArraySerializer 的任何子类进行序列化的连接也接受一个首先转换为字节数组的 String。这些序列化器和反序列化器中的每一个都将包含相应格式的输入流转换为字节数组有效负载。

Each of these is a subclass of AbstractByteArraySerializer, which implements both org.springframework.core.serializer.Serializer and org.springframework.core.serializer.Deserializer. For backwards compatibility, connections that use any subclass of AbstractByteArraySerializer for serialization also accept a String that is first converted to a byte array. Each of these serializers and deserializers converts an input stream that contains the corresponding format to a byte array payload.

为了避免因为表现不佳的客户端(不遵守配置的序列化器协议)而导致内存耗尽,这些序列化器将施加最大消息大小。如果传入消息超出此大小,则抛出异常。默认最大消息大小为 2048 字节。可以通过设置 maxMessageSize 属性来增加它。如果你使用默认序列化器或反序列化器并希望增加最大消息大小,则必须将最大消息大小声明为具有已设置的 maxMessageSize 属性的显式 bean,并配置连接工厂以使用该 bean。

To avoid memory exhaustion due to a badly behaved client (one that does not adhere to the protocol of the configured serializer), these serializers impose a maximum message size. If an incoming message exceeds this size, an exception is thrown. The default maximum message size is 2048 bytes. You can increase it by setting the maxMessageSize property. If you use the default serializer or deserializer and wish to increase the maximum message size, you must declare the maximum message size as an explicit bean with the maxMessageSize property set and configure the connection factory to use that bean.

本节前面标记为 * 的类使用中间缓冲区,并将解码数据复制到正确大小的最终缓冲区。从 4.3 版本开始,你可以通过设置 poolSize 属性来配置这些缓冲区,以允许重复使用这些原始缓冲区,而不是为每条消息分配和丢弃它们,这是默认行为。将属性设置为负值将创建一个没有边界的池。如果池有界,还可以设置 poolWaitTimeout 属性(以毫秒为单位),在该属性过期后如果没有可用缓冲区,则会抛出异常。它默认为无穷大。这样的异常会导致套接字关闭。

The classes marked with * earlier in this section use an intermediate buffer and copy the decoded data to a final buffer of the correct size. Starting with version 4.3, you can configure these buffers by setting a poolSize property to let these raw buffers be reused instead of being allocated and discarded for each message, which is the default behavior. Setting the property to a negative value creates a pool that has no bounds. If the pool is bounded, you can also set the poolWaitTimeout property (in milliseconds), after which an exception is thrown if no buffer becomes available. It defaults to infinity. Such an exception causes the socket to be closed.

如果你希望在自定义反序列化器中使用相同的机制,可以扩展 AbstractPooledBufferByteArraySerializer(而不是它的超类 AbstractByteArraySerializer),并实现 doDeserialize() 而非 deserialize()。缓冲区将自动返回到池中。AbstractPooledBufferByteArraySerializer 还提供了一个方便的实用方法:copyToSizedArray()

If you wish to use the same mechanism in custom deserializers, you can extend AbstractPooledBufferByteArraySerializer (instead of its super class, AbstractByteArraySerializer) and implement doDeserialize() instead of deserialize(). The buffer is automatically returned to the pool. AbstractPooledBufferByteArraySerializer also provides a convenient utility method: copyToSizedArray().

版本 5.0 添加了 ByteArrayElasticRawDeserializer。它类似于上方 ByteArrayRawSerializer 的反序列化器部分,不同之处在于不需要设置 maxMessageSize。在内部,它使用一个 ByteArrayOutputStream,让缓冲区按需要增长。客户端必须有序关闭套接字,以发出消息结束信号。

Version 5.0 added the ByteArrayElasticRawDeserializer. This is similar to the deserializer side of ByteArrayRawSerializer above, except that it is not necessary to set a maxMessageSize. Internally, it uses a ByteArrayOutputStream that lets the buffer grow as needed. The client must close the socket in an orderly manner to signal end of message.

只有在受信任的对等方的情况下才应该使用此反序列化器;由于内存不足,它容易受到 DoS 攻击。

This deserializer should only be used when the peer is trusted; it is susceptible to a DoS attach due to out of memory conditions.

MapJsonSerializer 使用 Jackson ObjectMapperMap 和 JSON 之间进行转换。你可以将该序列化器与 MessageConvertingTcpMessageMapperMapMessageConverter 搭配使用,以 JSON 形式传输选定的标头和有效负载。

The MapJsonSerializer uses a Jackson ObjectMapper to convert between a Map and JSON. You can use this serializer in conjunction with a MessageConvertingTcpMessageMapper and a MapMessageConverter to transfer selected headers and the payload in JSON.

Jackson ObjectMapper 无法界定流中的消息。因此,MapJsonSerializer 需要委派另一个序列化器或反序列化器来处理消息分界。默认情况下,使用 ByteArrayLfSerializer,从而在电线上产生格式为 <json><LF> 的消息,但你可以配置它改为使用其他消息。(下一个示例显示了如何执行此操作。)

The Jackson ObjectMapper cannot demarcate messages in the stream. Therefore, the MapJsonSerializer needs to delegate to another serializer or deserializer to handle message demarcation. By default, a ByteArrayLfSerializer is used, resulting in messages with a format of <json><LF> on the wire, but you can configure it to use others instead. (The next example shows how to do so.)

最后的标准序列化器是 org.springframework.core.serializer.DefaultSerializer,你可以使用它以 Java 序列化方式转换可序列化对象。org.springframework.core.serializer.DefaultDeserializer 提供用于包含可序列化对象流的入站反序列化的作用。

The final standard serializer is org.springframework.core.serializer.DefaultSerializer, which you can use to convert serializable objects with Java serialization. org.springframework.core.serializer.DefaultDeserializer is provided for inbound deserialization of streams that contain serializable objects.

如果你不想使用默认序列化器和反序列化器 (ByteArrayCrLfSerializer),则必须在连接工厂上设置 serializerdeserializer 属性。以下示例展示了如何进行设置:

If you do not wish to use the default serializer and deserializer (ByteArrayCrLfSerializer), you must set the serializer and deserializer attributes on the connection factory. The following example shows how to do so:

<bean id="javaSerializer"
      class="org.springframework.core.serializer.DefaultSerializer" />
<bean id="javaDeserializer"
      class="org.springframework.core.serializer.DefaultDeserializer" />

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    deserializer="javaDeserializer"
    serializer="javaSerializer"/>

使用 java.net.Socket 连接并在线路上使用 Java 序列化的服务器连接工厂。

A server connection factory that uses java.net.Socket connections and uses Java serialization on the wire.

有关连接工厂中可用属性的完整详细信息,请参见本节末尾的 the reference

For full details of the attributes available on connection factories, see the reference at the end of this section.

默认情况下,不针对入站数据包执行反向 DNS 查找:在未配置 DNS(例如,Docker 容器)的环境中,这会导致连接延迟。为了将 IP 地址转换为用于消息标头的主机名,可以通过将 lookup-host 属性设置为 true 来覆盖默认行为。

By default, reverse DNS lookups are not performed on inbound packets: in environments where DNS is not configured (e.g. Docker containers), this can cause connection delays. To convert IP addresses to host names for use in message headers, the default behavior can be overridden by setting the lookup-host attribute to true.

你还可以修改套接字和套接字工厂的属性。更多信息,请参阅 SSL/TLS Support。如其中所述,如果使用或不使用 SSL,都可以进行此类修改。

You can also modify the attributes of sockets and socket factories. See SSL/TLS Support for more information. As noted there, such modifications are possible if SSL is being used, or not.

Custom Serializers and Deserializers

如果你的数据不是受标准反序列化器支持的格式,你可以实现自己的反序列化器;你还可以实现一个自定义序列化器。

If your data is not in a format supported by one of the standard deserializers, you can implement your own; you can also implement a custom serializer.

若要实现自定义序列化器和反序列化器对,请实现 org.springframework.core.serializer.Deserializerorg.springframework.core.serializer.Serializer 接口。

To implement a custom serializer and deserializer pair, implement the org.springframework.core.serializer.Deserializer and org.springframework.core.serializer.Serializer interfaces.

当反序列化器检测到消息之间有封闭的输入流时,它必须抛出 SoftEndOfStreamException;这是向框架发出的信号,表示关闭是“正常的”。如果消息解码期间流被关闭,则应改为抛出其他某些异常。

When the deserializer detects a closed input stream between messages, it must throw a SoftEndOfStreamException; this is a signal to the framework to indicate that the close was "normal". If the stream is closed while decoding a message, some other exception should be thrown instead.

从版本 5.2 开始,SoftEndOfStreamException 现在是 RuntimeException,而不是扩展 IOException

Starting with version 5.2, SoftEndOfStreamException is now a RuntimeException instead of extending IOException.

TCP Caching Client Connection Factory

noted earlier一样,TCP 套接字可以是“单次使用”(一次请求或响应)或共享。在高流量环境中使用共享套接字与出站网关的配合效果不佳,因为此套接字一次只能处理一个请求或响应。

As noted earlier, TCP sockets can be 'single-use' (one request or response) or shared. Shared sockets do not perform well with outbound gateways in high-volume environments, because the socket can only process one request or response at a time.

为提高性能,您可以使用协作信道适配器来取代网关,但这需要进行应用程序级消息关联。有关更多信息,请参见 TCP Message Correlation

To improve performance, you can use collaborating channel adapters instead of gateways, but that requires application-level message correlation. See TCP Message Correlation for more information.

Spring Integration 2.2 引入了缓存客户端连接工厂,它使用共享套接字的池,让网关能够使用共享连接池处理多个并发请求。

Spring Integration 2.2 introduced a caching client connection factory, which uses a pool of shared sockets, letting a gateway process multiple concurrent requests with a pool of shared connections.

TCP Failover Client Connection Factory

你可以配置一个支持故障转移到一个或多个其他服务器的 TCP 连接工厂。在发送消息时,工厂会迭代其所有已配置的工厂,直至消息能够被发送,或者找不到连接。最初,将使用已配置列表中的第一个工厂。如果连接随后失败,下一个工厂将成为当前工厂。以下示例展示了如何配置故障转移客户端连接工厂:

You can configure a TCP connection factory that supports failover to one or more other servers. When sending a message, the factory iterates over all its configured factories until either the message can be sent or no connection can be found. Initially, the first factory in the configured list is used. If a connection subsequently fails, the next factory becomes the current factory. The following example shows how to configure a failover client connection factory:

<bean id="failCF" class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory">
    <constructor-arg>
        <list>
            <ref bean="clientFactory1"/>
            <ref bean="clientFactory2"/>
        </list>
    </constructor-arg>
</bean>

使用故障转移连接工厂时,singleUse 属性必须在工厂本身及其配置为使用的工厂列表之间保持一致。

When using the failover connection factory, the singleUse property must be consistent between the factory itself and the list of factories it is configured to use.

该连接工厂有两个与故障回退相关的属性(与共享连接一起使用时,即 singleUse=false):

The connection factory has two properties related to failing back, when used with a shared connection (singleUse=false):

  • refreshSharedInterval

  • closeOnRefresh

考虑根据上述配置的以下场景:假设 clientFactory1 无法建立连接,但 clientFactory2 可以。在 refreshSharedInterval 经过后调用 failCF getConnection() 方法时,我们将再次尝试使用 clientFactory1 进行连接;如果成功,到 clientFactory2 的连接将被关闭。如果 closeOnRefreshfalse,“旧”连接将保持打开状态,并且如果第一个工厂再次失败,可能会在将来重复使用。

Consider the following scenario based on the above configuration: Let’s say clientFactory1 cannot establish a connection but clientFactory2 can. When the failCF getConnection() method is called after the refreshSharedInterval has passed, we will again attempt to connect using clientFactory1; if successful, the connection to clientFactory2 will be closed. If closeOnRefresh is false, the "old" connection will remain open and may be reused in future if the first factory fails once more.

设置 refreshSharedInterval,仅在经过该时间后尝试重新连接到第一个工厂;将其设置为 Long.MAX_VALUE(默认值),如果你只想在当前连接失败时故障回退到第一个工厂,请执行此操作。

Set refreshSharedInterval to only attempt to reconnect with the first factory after that time has expired; setting it to Long.MAX_VALUE (default) if you only want to fail back to the first factory when the current connection fails.

设置 closeOnRefresh,让刷新实际创建新连接后关闭“旧”连接。

Set closeOnRefresh to close the "old" connection after a refresh actually creates a new connection.

如果任何代理工厂是 CachingClientConnectionFactory,则这些属性不适用,因为连接缓存是在那里处理的;在这种情况下,系统始终会查阅连接工厂列表以获取连接。

These properties do not apply if any of the delegate factories is a CachingClientConnectionFactory because the connection caching is handled there; in that case the list of connection factories will always be consulted to get a connection.

从版本 5.3 开始,这些默认值变为 Long.MAX_VALUEtrue,因此当当前连接失败时,该工厂仅尝试进行故障回退。要恢复到以前版本中的默认行为,请将其设置为 0false

Starting with version 5.3, these default to Long.MAX_VALUE and true so the factory only attempts to fail back when the current connection fails. To revert to the default behavior of previous versions, set them to 0 and false.

另请参见 Testing Connections

TCP Thread Affinity Connection Factory

Spring Integration 版本 5.0 引入了此连接工厂。它将连接绑定到调用线程,并且每次该线程发送消息时都会重复使用相同的连接。这将一直持续到连接被关闭(由服务器或网络)为止,或者直到线程调用 releaseConnection() 方法为止。连接本身由另一个客户端工厂实现提供,该实现必须配置为提供非共享(一次性)连接,以便每个线程都能获得连接。

Spring Integration version 5.0 introduced this connection factory. It binds a connection to the calling thread, and the same connection is reused each time that thread sends a message. This continues until the connection is closed (by the server or the network) or until the thread calls the releaseConnection() method. The connections themselves are provided by another client factory implementation, which must be configured to provide non-shared (single-use) connections so that each thread gets a connection.

以下示例显示如何配置 TCP 线程关联连接工厂:

The following example shows how to configure a TCP thread affinity connection factory:

@Bean
public TcpNetClientConnectionFactory cf() {
    TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory("localhost",
            Integer.parseInt(System.getProperty(PORT)));
    cf.setSingleUse(true);
    return cf;
}

@Bean
public ThreadAffinityClientConnectionFactory tacf() {
    return new ThreadAffinityClientConnectionFactory(cf());
}

@Bean
@ServiceActivator(inputChannel = "out")
public TcpOutboundGateway outGate() {
    TcpOutboundGateway outGate = new TcpOutboundGateway();
    outGate.setConnectionFactory(tacf());
    outGate.setReplyChannelName("toString");
    return outGate;
}