About Non-blocking I/O (NIO)

使用 NIO(请参见 IP Configuration Attributes中的 using-nio)避免对来自每个套接字的读取专门指定一个线程。对于少数套接字,你可能会发现不使用 NIO,并结合使用异步移交(例如移交至 QueueChannel),执行效果与使用 NIO 一样好,甚至更好。 处理大量连接时,你应该考虑使用 NIO。但是,使用 NIO 会产生一些其他影响。一组线程(在任务执行程序中)在所有套接字之间共享。每个传入消息都会被组装起来并作为一个单独的工作单元发送到配置的通道,并在从该池中选择的线程上执行。同一套接字上接收到的两个顺序消息可能会由不同的线程处理。这意味着发送到通道的消息的顺序是不确定的。套接字上接收到的消息的严格顺序不会得到保持。 对于某些应用程序,这不是问题。对于其他应用程序,这是一个问题。如果你需要严格的顺序,请考虑将 using-nio 设置为 false 并使用异步交接。 或者,你可以在 inbound 端点下游插入一个重新排序器,以将消息返回到正确的顺序。如果在连接工厂上将 apply-sequence 设置为 true,则到达 TCP 连接的消息将具有 sequenceNumbercorrelationId 头。重新排序器使用这些头将消息返回到正确的顺序。

从 5.1.4 版本开始,优先接受新连接,而不是从现有连接中读取数据。除非你有大量新传入连接,否则这通常影响很小。如果你希望恢复到优先读取的旧行为,请在 TcpNioServerConnectionFactory 上设置 multiAccept 属性为 false

Pool Size

池大小属性不再使用。以前,它指定在未指定任务执行程序时的默认线程池大小。它还用于在服务器套接字上设定连接积压。第一个功能不再需要(请参阅下一段)。第二个功能已被 backlog 属性取代。

以前,在将固定线程池任务执行程序(它为默认值)与 NIO 一起使用时,可能会出现死锁,并且处理会停止。当缓冲区已满、从套接字读取的线程正在尝试向缓冲区添加更多数据,而没有可用的线程在缓冲区中腾出空间时,就会发生此问题。这仅发生在池大小非常小的情况下,但它在极端条件下是可能的。自 2.2 版以来,两项更改已消除了此问题。首先,默认任务执行程序是缓存线程池执行程序。其次,已经添加了死锁检测逻辑,如果发生线程饥饿,则抛出异常而不是死锁,从而释放死锁的资源。

现在,由于默认任务执行器没有边界,可能会出现一条消息处理占用时间长的消息传入频率较高时内存不足的情况。如果你的应用程序表现出此种行为,你应该使用一个配置了合适池大小的池化任务执行器,但请参阅 the next section

Thread Pool Task Executor with CALLER_RUNS Policy

使用具有 CallerRunsPolicy(在使用 <task/> 名称空间时为 CALLER_RUNS)的固定线程池并且队列容量较小的时候,你应该记住一些重要的注意事项。

如果您不使用固定线程池,以下内容不适用。

对于 NIO 连接,有三种不同的任务类型。输入/输出选择器处理是在一个专用的线程上执行的(检测事件、接收新连接,并使用任务执行程序将输入/输出读取操作派发到其他线程)。当输入/输出读取线程(已向其派发读取操作)读取数据时,它会交给另一个线程来组装传入消息。大型消息可能需要多次读取才能完成。这些“组装程序”线程在等待数据时可能会阻塞。当出现新的读取事件时,读取器确定此套接字是否已存在组装程序,如果不存在,则运行一个新的组装程序。当组装流程完成后,组装程序线程将返回到池中。

当池已耗尽、使用 CALLER_RUNS 拒绝策略且任务队列已满时,这可能会导致死锁。当池为空并且队列中没有空间时,输入/输出选择器线程接收 OP_READ 事件并使用执行程序派发读取操作。队列已满,所以选择器本身启动读取进程。现在它检测到此套接字没有组装程序,并且在执行读取操作之前启动一个组装程序。队列再次已满,并且选择器线程成为组装程序。现在组装程序被阻塞,等待正在读取的数据,但数据永远不会发生。由于选择器线程无法处理新事件,连接工厂现在处于死锁状态。

为了避免此死锁,我们必须避免选择器(或读取器)线程执行组装任务。我们要对输入/输出和组装操作使用单独的池。

该框架提供了一个 CompositeExecutor,它允许配置两个不同的执行器:一个用于执行 IO 操作,另一个用于消息装配。在此环境中,IO 线程永不会成为装配器线程,且不会出现死锁。

此外,任务执行器应配置为使用 AbortPolicy(使用 <task> 时为“ABORT”)。当无法完成 I/O 任务时,它将延迟一小段时间并持续重试,直到它可以完成并分配一个装配器。默认延迟为 100 毫秒,但可以通过在连接工厂上设置 readDelay 属性来更改该延迟(使用 XML 命名空间配置时的 “read-delay”)。

以下三个示例演示如何配置复合执行器:

@Bean
private CompositeExecutor compositeExecutor() {
    ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor();
    ioExec.setCorePoolSize(4);
    ioExec.setMaxPoolSize(10);
    ioExec.setQueueCapacity(0);
    ioExec.setThreadNamePrefix("io-");
    ioExec.setRejectedExecutionHandler(new AbortPolicy());
    ioExec.initialize();
    ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor();
    assemblerExec.setCorePoolSize(4);
    assemblerExec.setMaxPoolSize(10);
    assemblerExec.setQueueCapacity(0);
    assemblerExec.setThreadNamePrefix("assembler-");
    assemblerExec.setRejectedExecutionHandler(new AbortPolicy());
    assemblerExec.initialize();
    return new CompositeExecutor(ioExec, assemblerExec);
}
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
    <constructor-arg ref="io"/>
    <constructor-arg ref="assembler"/>
</bean>

<task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
    <constructor-arg>
        <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="threadNamePrefix" value="io-" />
            <property name="corePoolSize" value="4" />
            <property name="maxPoolSize" value="8" />
            <property name="queueCapacity" value="0" />
            <property name="rejectedExecutionHandler">
                <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
            </property>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="threadNamePrefix" value="assembler-" />
            <property name="corePoolSize" value="4" />
            <property name="maxPoolSize" value="10" />
            <property name="queueCapacity" value="0" />
            <property name="rejectedExecutionHandler">
                <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
            </property>
        </bean>
    </constructor-arg>
</bean>