Listener Concurrency

SimpleMessageListenerContainer

默认情况下,监听器容器启动单个使用者,该使用者从队列接收消息。

By default, the listener container starts a single consumer that receives messages from the queues.

在检查上一段中的表格时,您可以看到许多控制并发性的属性和特性。最简单的是 concurrentConsumers,它创建了同时处理消息的消费者(固定)数量。

When examining the table in the previous section, you can see a number of properties and attributes that control concurrency. The simplest is concurrentConsumers, which creates that (fixed) number of consumers that concurrently process messages.

在 1.3.0 版本之前,这是唯一可用的设置,并且必须停止并重新启动容器来更改设置。

Prior to version 1.3.0, this was the only setting available and the container had to be stopped and started again to change the setting.

自 1.3.0 版本以来,您现在可以动态调整 concurrentConsumers 属性。如果在容器运行时更改它,则会根据需要添加或删除使用者以调整到新设置。

Since version 1.3.0, you can now dynamically adjust the concurrentConsumers property. If it is changed while the container is running, consumers are added or removed as necessary to adjust to the new setting.

此外,添加了一个名为 maxConcurrentConsumers 的新属性,并且容器根据工作负载动态调整并发性。这与四个附加属性一起使用:consecutiveActiveTriggerstartConsumerMinIntervalconsecutiveIdleTriggerstopConsumerMinInterval。在默认设置下,增加使用者的算法如下工作:

In addition, a new property called maxConcurrentConsumers has been added and the container dynamically adjusts the concurrency based on workload. This works in conjunction with four additional properties: consecutiveActiveTrigger, startConsumerMinInterval, consecutiveIdleTrigger, and stopConsumerMinInterval. With the default settings, the algorithm to increase consumers works as follows:

如果尚未达到 maxConcurrentConsumers 且现有使用者在连续十个周期内处于活动状态并且自上一个使用者启动后至少经过 10 秒,则启动新的使用者。如果使用者在 batchSize * receiveTimeout 毫秒内收到至少一条消息,则认为它是处于活动状态的。

If the maxConcurrentConsumers has not been reached and an existing consumer is active for ten consecutive cycles AND at least 10 seconds has elapsed since the last consumer was started, a new consumer is started. A consumer is considered active if it received at least one message in batchSize * receiveTimeout milliseconds.

在默认设置下,减少使用者的算法如下工作:

With the default settings, the algorithm to decrease consumers works as follows:

如果运行的使用者多于 concurrentConsumers 并且使用者检测到连续十个超时(空闲)且上一个使用者在至少 60 秒前已停止,则停止一个使用者。超时取决于 receiveTimeoutbatchSize 属性。如果使用者在 batchSize * receiveTimeout 毫秒内收不到任何消息,则认为它是空闲的。因此,在默认超时(一秒)和 batchSize 为四的情况下,在 40 秒空闲时间后考虑停止使用者(四个超时对应一个空闲检测)。

If there are more than concurrentConsumers running and a consumer detects ten consecutive timeouts (idle) AND the last consumer was stopped at least 60 seconds ago, a consumer is stopped. The timeout depends on the receiveTimeout and the batchSize properties. A consumer is considered idle if it receives no messages in batchSize * receiveTimeout milliseconds. So, with the default timeout (one second) and a batchSize of four, stopping a consumer is considered after 40 seconds of idle time (four timeouts correspond to one idle detection).

实际上,只有当整个容器闲置一段时间后才能停止消费者。这是因为代理在其所有活动消费者之间共享其工作。

Practically, consumers can be stopped only if the whole container is idle for some time. This is because the broker shares its work across all the active consumers.

每个使用者使用单个信道,而不管配置的队列数。

Each consumer uses a single channel, regardless of the number of configured queues.

从版本 2.0 开始,可以使用 concurrency 属性设置 concurrentConsumersmaxConcurrentConsumers 属性,例如“2-4”。

Starting with version 2.0, the concurrentConsumers and maxConcurrentConsumers properties can be set with the concurrency property — for example, 2-4.

Using DirectMessageListenerContainer

使用此容器,并发性基于已配置的队列和 consumersPerQueue。每个队列的每个使用者使用单独的信道,并且并发性由兔子客户端库控制。在编写本文时,它默认使用 DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2 线程池。

With this container, concurrency is based on the configured queues and consumersPerQueue. Each consumer for each queue uses a separate channel, and the concurrency is controlled by the rabbit client library. By default, at the time of writing, it uses a pool of DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2 threads.

您可以配置一个 taskExecutor 以提供所需的并发性。

You can configure a taskExecutor to provide the required maximum concurrency.