Pausing and Resuming Listener Containers

2.1.3 版在侦听器容器中添加了 pause()resume() 方法。此前,你可以在 ConsumerAwareMessageListener 中暂停一个使用者,并通过侦听 ListenerContainerIdleEvent 来恢复它,该事件提供了对 Consumer 对象的访问。虽然你可以通过使用事件侦听器来暂停处于空闲容器中的使用者,但在某些情况下,这不是线程安全的,因为无法保证事件侦听器在使用者线程上被调用。要安全地暂停和恢复使用者,你应该在侦听器容器上使用 pauseresume 方法。pause() 恰好在下一次 poll() 之前生效;resume() 恰好在当前 poll() 返回之后生效。当一个容器暂停时,它会继续 poll() 该使用者,如果正在使用组管理,就避免重新平衡,但它不会检索出任何记录。有关更多信息,请参阅 Kafka 文档。

Version 2.1.3 added pause() and resume() methods to listener containers. Previously, you could pause a consumer within a ConsumerAwareMessageListener and resume it by listening for a ListenerContainerIdleEvent, which provides access to the Consumer object. While you could pause a consumer in an idle container by using an event listener, in some cases, this was not thread-safe, since there is no guarantee that the event listener is invoked on the consumer thread. To safely pause and resume consumers, you should use the pause and resume methods on the listener containers. A pause() takes effect just before the next poll(); a resume() takes effect just after the current poll() returns. When a container is paused, it continues to poll() the consumer, avoiding a rebalance if group management is being used, but it does not retrieve any records. See the Kafka documentation for more information.

从 2.1.5 版开始,你可以调用 isPauseRequested() 来查看是否已调用 pause()。但是,使用者可能尚未实际暂停。isConsumerPaused() 在所有 Consumer 实例实际暂停时返回 true。

Starting with version 2.1.5, you can call isPauseRequested() to see if pause() has been called. However, the consumers might not have actually paused yet. isConsumerPaused() returns true if all Consumer instances have actually paused.

此外(自 2.1.5 版起),ConsumerPausedEventConsumerResumedEvent 实例会以容器作为 source 属性和涉及到的 TopicPartition 实例作为 partitions 属性来发布。

In addition(also since 2.1.5), ConsumerPausedEvent and ConsumerResumedEvent instances are published with the container as the source property and the TopicPartition instances involved in the partitions property.

从版本 2.9 开始,一个新容器属性 pauseImmediate,当设置为 true 时,会导致在处理当前记录之后产生暂停效果。默认情况下,在处理先前轮询中的所有记录之后产生暂停效果。请参阅 pauseImmediate

Starting with version 2.9, a new container property pauseImmediate, when set to true, causes the pause to take effect after the current record is processed. By default, the pause takes effect when all the records from the previous poll have been processed. See pauseImmediate.

以下简单的 Spring Boot 应用程序通过使用容器注册表来演示如何引用 @KafkaListener 方法的容器并暂停或恢复其使用者以及接收相应的事件:

The following simple Spring Boot application demonstrates by using the container registry to get a reference to a @KafkaListener method’s container and pausing or resuming its consumers as well as receiving the corresponding events:

@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args).close();
    }

    @Override
    public void onApplicationEvent(KafkaEvent event) {
        System.out.println(event);
    }

    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
            KafkaTemplate<String, String> template) {
        return args -> {
            template.send("pause.resume.topic", "thing1");
            Thread.sleep(10_000);
            System.out.println("pausing");
            registry.getListenerContainer("pause.resume").pause();
            Thread.sleep(10_000);
            template.send("pause.resume.topic", "thing2");
            Thread.sleep(10_000);
            System.out.println("resuming");
            registry.getListenerContainer("pause.resume").resume();
            Thread.sleep(10_000);
        };
    }

    @KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("pause.resume.topic")
            .partitions(2)
            .replicas(1)
            .build();
    }

}

以下清单展示了前述示例的结果:

The following listing shows the results of the preceding example:

partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2