Listener Concurrency

SimpleMessageListenerContainer

默认情况下,监听器容器启动单个使用者,该使用者从队列接收消息。

在检查上一段中的表格时,您可以看到许多控制并发性的属性和特性。最简单的是 concurrentConsumers,它创建了同时处理消息的消费者(固定)数量。

在 1.3.0 版本之前,这是唯一可用的设置,并且必须停止并重新启动容器来更改设置。

自 1.3.0 版本以来,您现在可以动态调整 concurrentConsumers 属性。如果在容器运行时更改它,则会根据需要添加或删除使用者以调整到新设置。

此外,添加了一个名为 maxConcurrentConsumers 的新属性,并且容器根据工作负载动态调整并发性。这与四个附加属性一起使用:consecutiveActiveTriggerstartConsumerMinIntervalconsecutiveIdleTriggerstopConsumerMinInterval。在默认设置下,增加使用者的算法如下工作:

如果尚未达到 maxConcurrentConsumers 且现有使用者在连续十个周期内处于活动状态并且自上一个使用者启动后至少经过 10 秒,则启动新的使用者。如果使用者在 batchSize * receiveTimeout 毫秒内收到至少一条消息,则认为它是处于活动状态的。

在默认设置下,减少使用者的算法如下工作:

如果运行的使用者多于 concurrentConsumers 并且使用者检测到连续十个超时(空闲)且上一个使用者在至少 60 秒前已停止,则停止一个使用者。超时取决于 receiveTimeoutbatchSize 属性。如果使用者在 batchSize * receiveTimeout 毫秒内收不到任何消息,则认为它是空闲的。因此,在默认超时(一秒)和 batchSize 为四的情况下,在 40 秒空闲时间后考虑停止使用者(四个超时对应一个空闲检测)。

实际上,只有当整个容器闲置一段时间后才能停止消费者。这是因为代理在其所有活动消费者之间共享其工作。

每个使用者使用单个信道,而不管配置的队列数。

从版本 2.0 开始,可以使用 concurrency 属性设置 concurrentConsumersmaxConcurrentConsumers 属性,例如“2-4”。

Using DirectMessageListenerContainer

使用此容器,并发性基于已配置的队列和 consumersPerQueue。每个队列的每个使用者使用单独的信道,并且并发性由兔子客户端库控制。在编写本文时,它默认使用 DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2 线程池。

您可以配置一个 taskExecutor 以提供所需的并发性。