Pausing and Resuming Listener Containers
2.1.3 版在侦听器容器中添加了 pause()
和 resume()
方法。此前,你可以在 ConsumerAwareMessageListener
中暂停一个使用者,并通过侦听 ListenerContainerIdleEvent
来恢复它,该事件提供了对 Consumer
对象的访问。虽然你可以通过使用事件侦听器来暂停处于空闲容器中的使用者,但在某些情况下,这不是线程安全的,因为无法保证事件侦听器在使用者线程上被调用。要安全地暂停和恢复使用者,你应该在侦听器容器上使用 pause
和 resume
方法。pause()
恰好在下一次 poll()
之前生效;resume()
恰好在当前 poll()
返回之后生效。当一个容器暂停时,它会继续 poll()
该使用者,如果正在使用组管理,就避免重新平衡,但它不会检索出任何记录。有关更多信息,请参阅 Kafka 文档。
Version 2.1.3 added pause()
and resume()
methods to listener containers.
Previously, you could pause a consumer within a ConsumerAwareMessageListener
and resume it by listening for a ListenerContainerIdleEvent
, which provides access to the Consumer
object.
While you could pause a consumer in an idle container by using an event listener, in some cases, this was not thread-safe, since there is no guarantee that the event listener is invoked on the consumer thread.
To safely pause and resume consumers, you should use the pause
and resume
methods on the listener containers.
A pause()
takes effect just before the next poll()
; a resume()
takes effect just after the current poll()
returns.
When a container is paused, it continues to poll()
the consumer, avoiding a rebalance if group management is being used, but it does not retrieve any records.
See the Kafka documentation for more information.
从 2.1.5 版开始,你可以调用 isPauseRequested()
来查看是否已调用 pause()
。但是,使用者可能尚未实际暂停。isConsumerPaused()
在所有 Consumer
实例实际暂停时返回 true。
Starting with version 2.1.5, you can call isPauseRequested()
to see if pause()
has been called.
However, the consumers might not have actually paused yet.
isConsumerPaused()
returns true if all Consumer
instances have actually paused.
此外(自 2.1.5 版起),ConsumerPausedEvent
和 ConsumerResumedEvent
实例会以容器作为 source
属性和涉及到的 TopicPartition
实例作为 partitions
属性来发布。
In addition(also since 2.1.5), ConsumerPausedEvent
and ConsumerResumedEvent
instances are published with the container as the source
property and the TopicPartition
instances involved in the partitions
property.
从版本 2.9 开始,一个新容器属性 pauseImmediate
,当设置为 true 时,会导致在处理当前记录之后产生暂停效果。默认情况下,在处理先前轮询中的所有记录之后产生暂停效果。请参阅 pauseImmediate。
Starting with version 2.9, a new container property pauseImmediate
, when set to true, causes the pause to take effect after the current record is processed.
By default, the pause takes effect when all the records from the previous poll have been processed.
See pauseImmediate.
以下简单的 Spring Boot 应用程序通过使用容器注册表来演示如何引用 @KafkaListener
方法的容器并暂停或恢复其使用者以及接收相应的事件:
The following simple Spring Boot application demonstrates by using the container registry to get a reference to a @KafkaListener
method’s container and pausing or resuming its consumers as well as receiving the corresponding events:
@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Override
public void onApplicationEvent(KafkaEvent event) {
System.out.println(event);
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
KafkaTemplate<String, String> template) {
return args -> {
template.send("pause.resume.topic", "thing1");
Thread.sleep(10_000);
System.out.println("pausing");
registry.getListenerContainer("pause.resume").pause();
Thread.sleep(10_000);
template.send("pause.resume.topic", "thing2");
Thread.sleep(10_000);
System.out.println("resuming");
registry.getListenerContainer("pause.resume").resume();
Thread.sleep(10_000);
};
}
@KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("pause.resume.topic")
.partitions(2)
.replicas(1)
.build();
}
}
以下清单展示了前述示例的结果:
The following listing shows the results of the preceding example:
partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2