Enforcing Consumer Rebalance

Kafka 客户端现在支持触发 enforced rebalance 的选项。从版本 3.1.2 开始,Spring for Apache Kafka 提供了一种选择,即通过消息侦听器容器在 Kafka 使用者上调用该 API。当调用此 API 时,它只是提醒 Kafka 使用者触发强制再平衡;实际再平衡只会作为下一次 poll() 操作的一部分发生。如果已经有一个正在进行的再平衡,则调用强制再平衡就是 NO-OP。调用者必须等到当前再平衡完成之后才能调用另一个再平衡。有关更多详细信息,请参阅 enfroceRebalance 的 javadoc。

以下代码片段展示了使用消息侦听器容器强制执行再平衡的本质。

@KafkaListener(id = "my.id", topics = "my-topic")
void listen(ConsumerRecord<String, String> in) {
    System.out.println("From KafkaListener: " + in);
}

@Bean
public ApplicationRunner runner(KafkaTemplate<String, Object> template, KafkaListenerEndpointRegistry registry) {
    return args -> {
        final MessageListenerContainer listenerContainer = registry.getListenerContainer("my.id");
        System.out.println("Enforcing a rebalance");
        Thread.sleep(5_000);
        listenerContainer.enforceRebalance();
        Thread.sleep(5_000);
    };
}

如上面的代码所示,应用程序使用 KafkaListenerEndpointRegistry 访问消息侦听器容器,然后在其上调用 enforceRebalnce API。当在侦听器容器上调用 enforceRebalance 时,它会将调用委派给底层的 Kafka 消费者。Kafka 消费者会作为下一个 poll() 操作的一部分触发再平衡。