Threading and Asynchronous Consumers

异步使用者涉及许多不同的线程。

将从 SimpleMessageListenerContainer 中配置的 TaskExecutor 中的线程用于在 RabbitMQ Client 传送新消息时调用 MessageListener。如果未配置,则使用 SimpleAsyncTaskExecutor。如果您使用的是池化执行器,则需要确保池大小足以处理已配置的并发性。通过 DirectMessageListenerContainerMessageListener 直接在 RabbitMQ Client 线程上调用。在这种情况下,taskExecutor 用于监视使用者的任务。

在对侦听器调用的线程使用默认 SimpleAsyncTaskExecutor 时,侦听器容器 beanName 将用于 threadNamePrefix。这对于日志分析很有用。我们通常建议始终在日志记录追加程序配置中包含线程名称。当通过容器上的 taskExecutor 属性具体提供 TaskExecutor 时,它将按原样使用,而不会进行修改。建议使用类似的技术为自定义 TaskExecutor bean 定义创建的线程命名,以帮助标识日志消息中的线程。

在创建连接时,在 CachingConnectionFactory 中配置的 Executor 被传送到 RabbitMQ Client,并且其线程用于将新消息传递到监听器容器。如果未配置此项,客户端将使用内部线程池执行器,其池大小(在撰写本文时)为每个连接的 Runtime.getRuntime().availableProcessors() * 2。

如果您有大量工厂或正在使用 CacheMode.CONNECTION,则您可能希望考虑使用一个共享的 ThreadPoolTaskExecutor,其中包含满足您的工作负载的足够的线程。

使用 DirectMessageListenerContainer 时,您需要确保连接工厂配置了具有足够线程的任务执行程序,以支持您的所需并发性,这些线程分布在使用该工厂的所有侦听器容器中。默认池大小(在撰写时)为 Runtime.getRuntime().availableProcessors() * 2

RabbitMQ client 使用 ThreadFactory 创建用于低级 I/O(套接字)操作的线程。要修改此工厂,您需要配置底层的 RabbitMQ ConnectionFactory,如 Configuring the Underlying Client Connection Factory 中所述。