Threading and Asynchronous Consumers

异步使用者涉及许多不同的线程。

A number of different threads are involved with asynchronous consumers.

将从 SimpleMessageListenerContainer 中配置的 TaskExecutor 中的线程用于在 RabbitMQ Client 传送新消息时调用 MessageListener。如果未配置,则使用 SimpleAsyncTaskExecutor。如果您使用的是池化执行器,则需要确保池大小足以处理已配置的并发性。通过 DirectMessageListenerContainerMessageListener 直接在 RabbitMQ Client 线程上调用。在这种情况下,taskExecutor 用于监视使用者的任务。

Threads from the TaskExecutor configured in the SimpleMessageListenerContainer are used to invoke the MessageListener when a new message is delivered by RabbitMQ Client. If not configured, a SimpleAsyncTaskExecutor is used. If you use a pooled executor, you need to ensure the pool size is sufficient to handle the configured concurrency. With the DirectMessageListenerContainer, the MessageListener is invoked directly on a RabbitMQ Client thread. In this case, the taskExecutor is used for the task that monitors the consumers.

在对侦听器调用的线程使用默认 SimpleAsyncTaskExecutor 时,侦听器容器 beanName 将用于 threadNamePrefix。这对于日志分析很有用。我们通常建议始终在日志记录追加程序配置中包含线程名称。当通过容器上的 taskExecutor 属性具体提供 TaskExecutor 时,它将按原样使用,而不会进行修改。建议使用类似的技术为自定义 TaskExecutor bean 定义创建的线程命名,以帮助标识日志消息中的线程。

When using the default SimpleAsyncTaskExecutor, for the threads the listener is invoked on, the listener container beanName is used in the threadNamePrefix. This is useful for log analysis. We generally recommend always including the thread name in the logging appender configuration. When a TaskExecutor is specifically provided through the taskExecutor property on the container, it is used as is, without modification. It is recommended that you use a similar technique to name the threads created by a custom TaskExecutor bean definition, to aid with thread identification in log messages.

在创建连接时,在 CachingConnectionFactory 中配置的 Executor 被传送到 RabbitMQ Client,并且其线程用于将新消息传递到监听器容器。如果未配置此项,客户端将使用内部线程池执行器,其池大小(在撰写本文时)为每个连接的 Runtime.getRuntime().availableProcessors() * 2。

The Executor configured in the CachingConnectionFactory is passed into the RabbitMQ Client when creating the connection, and its threads are used to deliver new messages to the listener container. If this is not configured, the client uses an internal thread pool executor with (at the time of writing) a pool size of Runtime.getRuntime().availableProcessors() * 2 for each connection.

如果您有大量工厂或正在使用 CacheMode.CONNECTION,则您可能希望考虑使用一个共享的 ThreadPoolTaskExecutor,其中包含满足您的工作负载的足够的线程。

If you have a large number of factories or are using CacheMode.CONNECTION, you may wish to consider using a shared ThreadPoolTaskExecutor with enough threads to satisfy your workload.

使用 DirectMessageListenerContainer 时,您需要确保连接工厂配置了具有足够线程的任务执行程序,以支持您的所需并发性,这些线程分布在使用该工厂的所有侦听器容器中。默认池大小(在撰写时)为 Runtime.getRuntime().availableProcessors() * 2

With the DirectMessageListenerContainer, you need to ensure that the connection factory is configured with a task executor that has sufficient threads to support your desired concurrency across all listener containers that use that factory. The default pool size (at the time of writing) is Runtime.getRuntime().availableProcessors() * 2.

RabbitMQ client 使用 ThreadFactory 创建用于低级 I/O(套接字)操作的线程。要修改此工厂,您需要配置底层的 RabbitMQ ConnectionFactory,如 Configuring the Underlying Client Connection Factory 中所述。

The RabbitMQ client uses a ThreadFactory to create threads for low-level I/O (socket) operations. To modify this factory, you need to configure the underlying RabbitMQ ConnectionFactory, as discussed in Configuring the Underlying Client Connection Factory.