About Non-blocking I/O (NIO)

使用 NIO(请参见 IP Configuration Attributes中的 using-nio)避免对来自每个套接字的读取专门指定一个线程。对于少数套接字,你可能会发现不使用 NIO,并结合使用异步移交(例如移交至 QueueChannel),执行效果与使用 NIO 一样好,甚至更好。

Using NIO (see using-nio in IP Configuration Attributes) avoids dedicating a thread to read from each socket. For a small number of sockets, you are likely to find that not using NIO, together with an asynchronous hand-off (such as to a QueueChannel), performs as well as or better than using NIO.

处理大量连接时,你应该考虑使用 NIO。但是,使用 NIO 会产生一些其他影响。一组线程(在任务执行程序中)在所有套接字之间共享。每个传入消息都会被组装起来并作为一个单独的工作单元发送到配置的通道,并在从该池中选择的线程上执行。同一套接字上接收到的两个顺序消息可能会由不同的线程处理。这意味着发送到通道的消息的顺序是不确定的。套接字上接收到的消息的严格顺序不会得到保持。

You should consider using NIO when handling a large number of connections. However, the use of NIO has some other ramifications. A pool of threads (in the task executor) is shared across all the sockets. Each incoming message is assembled and sent to the configured channel as a separate unit of work on a thread selected from that pool. Two sequential messages arriving at the same socket might be processed by different threads. This means that the order in which the messages are sent to the channel is indeterminate. Strict ordering of the messages arriving at the socket is not maintained.

对于某些应用程序,这不是问题。对于其他应用程序,这是一个问题。如果你需要严格的顺序,请考虑将 using-nio 设置为 false 并使用异步交接。

For some applications, this is not an issue. For others, it is a problem. If you require strict ordering, consider setting using-nio to false and using an asynchronous hand-off.

或者,你可以在 inbound 端点下游插入一个重新排序器,以将消息返回到正确的顺序。如果在连接工厂上将 apply-sequence 设置为 true,则到达 TCP 连接的消息将具有 sequenceNumbercorrelationId 头。重新排序器使用这些头将消息返回到正确的顺序。

Alternatively, you can insert a resequencer downstream of the inbound endpoint to return the messages to their proper sequence. If you set apply-sequence to true on the connection factory, messages arriving at a TCP connection have sequenceNumber and correlationId headers set. The resequencer uses these headers to return the messages to their proper sequence.

从 5.1.4 版本开始,优先接受新连接,而不是从现有连接中读取数据。除非你有大量新传入连接,否则这通常影响很小。如果你希望恢复到优先读取的旧行为,请在 TcpNioServerConnectionFactory 上设置 multiAccept 属性为 false

Starting with version 5.1.4, priority is given to accepting new connections over reading from existing connections. This should, generally, have little impact unless you have a very high rate of new incoming connections. If you wish to revert to the previous behavior of giving reads priority, set the multiAccept property on the TcpNioServerConnectionFactory to false.

Pool Size

池大小属性不再使用。以前,它指定在未指定任务执行程序时的默认线程池大小。它还用于在服务器套接字上设定连接积压。第一个功能不再需要(请参阅下一段)。第二个功能已被 backlog 属性取代。

The pool size attribute is no longer used. Previously, it specified the size of the default thread pool when a task-executor was not specified. It was also used to set the connection backlog on server sockets. The first function is no longer needed (see the next paragraph). The second function is replaced by the backlog attribute.

以前,在将固定线程池任务执行程序(它为默认值)与 NIO 一起使用时,可能会出现死锁,并且处理会停止。当缓冲区已满、从套接字读取的线程正在尝试向缓冲区添加更多数据,而没有可用的线程在缓冲区中腾出空间时,就会发生此问题。这仅发生在池大小非常小的情况下,但它在极端条件下是可能的。自 2.2 版以来,两项更改已消除了此问题。首先,默认任务执行程序是缓存线程池执行程序。其次,已经添加了死锁检测逻辑,如果发生线程饥饿,则抛出异常而不是死锁,从而释放死锁的资源。

Previously, when using a fixed thread pool task executor (which was the default) with NIO, it was possible to get a deadlock and processing would stop. The problem occurred when a buffer was full, a thread reading from the socket was trying to add more data to the buffer, and no threads were available to make space in the buffer. This only occurred with a very small pool size, but it could be possible under extreme conditions. Since 2.2, two changes have eliminated this problem. First, the default task executor is a cached thread pool executor. Second, deadlock detection logic has been added such that, if thread starvation occurs, instead of deadlocking, an exception is thrown, thus releasing the deadlocked resources.

现在,由于默认任务执行器没有边界,可能会出现一条消息处理占用时间长的消息传入频率较高时内存不足的情况。如果你的应用程序表现出此种行为,你应该使用一个配置了合适池大小的池化任务执行器,但请参阅 the next section

Now that the default task executor is unbounded, it is possible that an out-of-memory condition might occur with high rates of incoming messages, if message processing takes extended time. If your application exhibits this type of behavior, you should use a pooled task executor with an appropriate pool size, but see the next section.

Thread Pool Task Executor with CALLER_RUNS Policy

使用具有 CallerRunsPolicy(在使用 <task/> 名称空间时为 CALLER_RUNS)的固定线程池并且队列容量较小的时候,你应该记住一些重要的注意事项。

You should keep in mind some important considerations when you use a fixed thread pool with the CallerRunsPolicy (CALLER_RUNS when using the <task/> namespace) and the queue capacity is small.

如果您不使用固定线程池,以下内容不适用。

The following does not apply if you do not use a fixed thread pool.

对于 NIO 连接,有三种不同的任务类型。输入/输出选择器处理是在一个专用的线程上执行的(检测事件、接收新连接,并使用任务执行程序将输入/输出读取操作派发到其他线程)。当输入/输出读取线程(已向其派发读取操作)读取数据时,它会交给另一个线程来组装传入消息。大型消息可能需要多次读取才能完成。这些“组装程序”线程在等待数据时可能会阻塞。当出现新的读取事件时,读取器确定此套接字是否已存在组装程序,如果不存在,则运行一个新的组装程序。当组装流程完成后,组装程序线程将返回到池中。

With NIO connections, there are three distinct task types. The I/O selector processing is performed on one dedicated thread (detecting events, accepting new connections, and dispatching the I/O read operations to other threads by using the task executor). When an I/O reader thread (to which the read operation is dispatched) reads data, it hands off to another thread to assemble the incoming message. Large messages can take several reads to complete. These “assembler” threads can block while waiting for data. When a new read event occurs, the reader determines if this socket already has an assembler and, if not, runs a new one. When the assembly process is complete, the assembler thread is returned to the pool.

当池已耗尽、使用 CALLER_RUNS 拒绝策略且任务队列已满时,这可能会导致死锁。当池为空并且队列中没有空间时,输入/输出选择器线程接收 OP_READ 事件并使用执行程序派发读取操作。队列已满,所以选择器本身启动读取进程。现在它检测到此套接字没有组装程序,并且在执行读取操作之前启动一个组装程序。队列再次已满,并且选择器线程成为组装程序。现在组装程序被阻塞,等待正在读取的数据,但数据永远不会发生。由于选择器线程无法处理新事件,连接工厂现在处于死锁状态。

This can cause a deadlock when the pool is exhausted, the CALLER_RUNS rejection policy is in use, and the task queue is full. When the pool is empty and there is no room in the queue, the IO selector thread receives an OP_READ event and dispatches the read by using the executor. The queue is full, so the selector thread itself starts the read process. Now it detects that there is no assembler for this socket and, before it does the read, fires off an assembler. Again, the queue is full, and the selector thread becomes the assembler. The assembler is now blocked, waiting for the data to be read, which never happens. The connection factory is now deadlocked because the selector thread cannot handle new events.

为了避免此死锁,我们必须避免选择器(或读取器)线程执行组装任务。我们要对输入/输出和组装操作使用单独的池。

To avoid this deadlock, we must avoid the selector (or reader) threads performing the assembly task. We want to use separate pools for the IO and assembly operations.

该框架提供了一个 CompositeExecutor,它允许配置两个不同的执行器:一个用于执行 IO 操作,另一个用于消息装配。在此环境中,IO 线程永不会成为装配器线程,且不会出现死锁。

The framework provides a CompositeExecutor, which allows the configuration of two distinct executors: one for performing IO operations and one for message assembly. In this environment, an IO thread can never become an assembler thread, and the deadlock cannot occur.

此外,任务执行器应配置为使用 AbortPolicy(使用 <task> 时为“ABORT”)。当无法完成 I/O 任务时,它将延迟一小段时间并持续重试,直到它可以完成并分配一个装配器。默认延迟为 100 毫秒,但可以通过在连接工厂上设置 readDelay 属性来更改该延迟(使用 XML 命名空间配置时的 “read-delay”)。

In addition, the task executors should be configured to use an AbortPolicy (ABORT when using <task>). When an I/O task cannot be completed, it is deferred for a short time and continually retried until it can be completed and have an assembler allocated. By default, the delay is 100ms, but you can change it by setting the readDelay property on the connection factory (read-delay when configuring with the XML namespace).

以下三个示例演示如何配置复合执行器:

The following three examples shows how to configure the composite executor:

@Bean
private CompositeExecutor compositeExecutor() {
    ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor();
    ioExec.setCorePoolSize(4);
    ioExec.setMaxPoolSize(10);
    ioExec.setQueueCapacity(0);
    ioExec.setThreadNamePrefix("io-");
    ioExec.setRejectedExecutionHandler(new AbortPolicy());
    ioExec.initialize();
    ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor();
    assemblerExec.setCorePoolSize(4);
    assemblerExec.setMaxPoolSize(10);
    assemblerExec.setQueueCapacity(0);
    assemblerExec.setThreadNamePrefix("assembler-");
    assemblerExec.setRejectedExecutionHandler(new AbortPolicy());
    assemblerExec.initialize();
    return new CompositeExecutor(ioExec, assemblerExec);
}
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
    <constructor-arg ref="io"/>
    <constructor-arg ref="assembler"/>
</bean>

<task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
    <constructor-arg>
        <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="threadNamePrefix" value="io-" />
            <property name="corePoolSize" value="4" />
            <property name="maxPoolSize" value="8" />
            <property name="queueCapacity" value="0" />
            <property name="rejectedExecutionHandler">
                <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
            </property>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="threadNamePrefix" value="assembler-" />
            <property name="corePoolSize" value="4" />
            <property name="maxPoolSize" value="10" />
            <property name="queueCapacity" value="0" />
            <property name="rejectedExecutionHandler">
                <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
            </property>
        </bean>
    </constructor-arg>
</bean>