Thread Safety

当使用并发消息侦听器容器时,所有使用者线程都会调用一个单独的侦听器实例。因此,侦听器需要是线程安全的,最好使用无状态侦听器。如果您无法使侦听器成为线程安全,或者添加同步会显著降低增加并发性的好处,您可以使用以下几种技术之一:

  • 将 `n`容器与 `concurrency=1`一起使用具有原型作用域 `MessageListener`bean 使每个容器获得其自己的实例(在使用 `@KafkaListener`时无法使用)。

  • 在 `ThreadLocal<?>`实例中保留状态。

  • 将单例侦听器委托给 bean, 即在 SimpleThreadScope(或类似的作用域)中声明 bean。

为了便于清理线程状态(针对上一个列表中的第二项和第三项),从 2.2 版本开始,侦听器容器在每个线程退出时发布一个 ConsumerStoppedEvent。您可以使用 ApplicationListener@EventListener 方法使用这些事件来移除 ThreadLocal<?> 实例或从作用域中 remove() 线程作用域 bean。请注意,SimpleThreadScope 不会销毁具有销毁接口(如 DisposableBean)的 bean,因此您应该自己 destroy() 实例。

默认情况下,应用程序上下文的事件多路广播器在调用线程上调用事件侦听器。如果您更改多路广播器以使用异步执行器,线程清理将无效。