Pausing and Resuming Listener Containers

2.1.3 版在侦听器容器中添加了 pause()resume() 方法。此前,你可以在 ConsumerAwareMessageListener 中暂停一个使用者,并通过侦听 ListenerContainerIdleEvent 来恢复它,该事件提供了对 Consumer 对象的访问。虽然你可以通过使用事件侦听器来暂停处于空闲容器中的使用者,但在某些情况下,这不是线程安全的,因为无法保证事件侦听器在使用者线程上被调用。要安全地暂停和恢复使用者,你应该在侦听器容器上使用 pauseresume 方法。pause() 恰好在下一次 poll() 之前生效;resume() 恰好在当前 poll() 返回之后生效。当一个容器暂停时,它会继续 poll() 该使用者,如果正在使用组管理,就避免重新平衡,但它不会检索出任何记录。有关更多信息,请参阅 Kafka 文档。

从 2.1.5 版开始,你可以调用 isPauseRequested() 来查看是否已调用 pause()。但是,使用者可能尚未实际暂停。isConsumerPaused() 在所有 Consumer 实例实际暂停时返回 true。

此外(自 2.1.5 版起),ConsumerPausedEventConsumerResumedEvent 实例会以容器作为 source 属性和涉及到的 TopicPartition 实例作为 partitions 属性来发布。

从版本 2.9 开始,一个新容器属性 pauseImmediate,当设置为 true 时,会导致在处理当前记录之后产生暂停效果。默认情况下,在处理先前轮询中的所有记录之后产生暂停效果。请参阅 pauseImmediate

以下简单的 Spring Boot 应用程序通过使用容器注册表来演示如何引用 @KafkaListener 方法的容器并暂停或恢复其使用者以及接收相应的事件:

@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args).close();
    }

    @Override
    public void onApplicationEvent(KafkaEvent event) {
        System.out.println(event);
    }

    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
            KafkaTemplate<String, String> template) {
        return args -> {
            template.send("pause.resume.topic", "thing1");
            Thread.sleep(10_000);
            System.out.println("pausing");
            registry.getListenerContainer("pause.resume").pause();
            Thread.sleep(10_000);
            template.send("pause.resume.topic", "thing2");
            Thread.sleep(10_000);
            System.out.println("resuming");
            registry.getListenerContainer("pause.resume").resume();
            Thread.sleep(10_000);
        };
    }

    @KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("pause.resume.topic")
            .partitions(2)
            .replicas(1)
            .build();
    }

}

以下清单展示了前述示例的结果:

partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2