Thread Safety

当使用并发消息侦听器容器时,所有使用者线程都会调用一个单独的侦听器实例。因此,侦听器需要是线程安全的,最好使用无状态侦听器。如果您无法使侦听器成为线程安全,或者添加同步会显著降低增加并发性的好处,您可以使用以下几种技术之一:

When using a concurrent message listener container, a single listener instance is invoked on all consumer threads. Listeners, therefore, need to be thread-safe, and it is preferable to use stateless listeners. If it is not possible to make your listener thread-safe or adding synchronization would significantly reduce the benefit of adding concurrency, you can use one of a few techniques:

  • Use n containers with concurrency=1 with a prototype scoped MessageListener bean so that each container gets its own instance (this is not possible when using @KafkaListener).

  • Keep the state in ThreadLocal<?> instances.

  • Have the singleton listener delegate to a bean that is declared in SimpleThreadScope (or a similar scope).

为了便于清理线程状态(针对上一个列表中的第二项和第三项),从 2.2 版本开始,侦听器容器在每个线程退出时发布一个 ConsumerStoppedEvent。您可以使用 ApplicationListener@EventListener 方法使用这些事件来移除 ThreadLocal<?> 实例或从作用域中 remove() 线程作用域 bean。请注意,SimpleThreadScope 不会销毁具有销毁接口(如 DisposableBean)的 bean,因此您应该自己 destroy() 实例。

To facilitate cleaning up thread state (for the second and third items in the preceding list), starting with version 2.2, the listener container publishes a ConsumerStoppedEvent when each thread exits. You can consume these events with an ApplicationListener or @EventListener method to remove ThreadLocal<?> instances or remove() thread-scoped beans from the scope. Note that SimpleThreadScope does not destroy beans that have a destruction interface (such as DisposableBean), so you should destroy() the instance yourself.

默认情况下,应用程序上下文的事件多路广播器在调用线程上调用事件侦听器。如果您更改多路广播器以使用异步执行器,线程清理将无效。

By default, the application context’s event multicaster invokes event listeners on the calling thread. If you change the multicaster to use an async executor, thread cleanup is not effective.