Thread Safety
当使用并发消息侦听器容器时,所有使用者线程都会调用一个单独的侦听器实例。因此,侦听器需要是线程安全的,最好使用无状态侦听器。如果您无法使侦听器成为线程安全,或者添加同步会显著降低增加并发性的好处,您可以使用以下几种技术之一:
-
将 `n`容器与 `concurrency=1`一起使用具有原型作用域 `MessageListener`bean 使每个容器获得其自己的实例(在使用 `@KafkaListener`时无法使用)。
-
在 `ThreadLocal<?>`实例中保留状态。
-
将单例侦听器委托给 bean, 即在
SimpleThreadScope
(或类似的作用域)中声明 bean。
为了便于清理线程状态(针对上一个列表中的第二项和第三项),从 2.2 版本开始,侦听器容器在每个线程退出时发布一个 ConsumerStoppedEvent
。您可以使用 ApplicationListener
或 @EventListener
方法使用这些事件来移除 ThreadLocal<?>
实例或从作用域中 remove()
线程作用域 bean。请注意,SimpleThreadScope
不会销毁具有销毁接口(如 DisposableBean
)的 bean,因此您应该自己 destroy()
实例。
默认情况下,应用程序上下文的事件多路广播器在调用线程上调用事件侦听器。如果您更改多路广播器以使用异步执行器,线程清理将无效。