Enforcing Consumer Rebalance
Kafka 客户端现在支持触发 enforced rebalance 的选项。从版本 3.1.2
开始,Spring for Apache Kafka 提供了一种选择,即通过消息侦听器容器在 Kafka 使用者上调用该 API。当调用此 API 时,它只是提醒 Kafka 使用者触发强制再平衡;实际再平衡只会作为下一次 poll()
操作的一部分发生。如果已经有一个正在进行的再平衡,则调用强制再平衡就是 NO-OP。调用者必须等到当前再平衡完成之后才能调用另一个再平衡。有关更多详细信息,请参阅 enfroceRebalance
的 javadoc。
Kafka clients now support an option to trigger an enforced rebalance.
Starting with version 3.1.2
, Spring for Apache Kafka provides an option to invoke this API on the Kafka consumer via the message listener container.
When calling this API, it is simply alerting the Kafka consumer to trigger an enforced rebalance; the actual rebalance will only occur as part of the next poll()
operation.
If there is already a rebalance in progress, calling an enforced rebalance is a NO-OP.
The caller must wait for the current rebalance to complete before invoking another one.
See the javadocs for enfroceRebalance
for more details.
以下代码片段展示了使用消息侦听器容器强制执行再平衡的本质。
The following code snippet shows the essence of enforcing a rebalance using the message listener container.
@KafkaListener(id = "my.id", topics = "my-topic")
void listen(ConsumerRecord<String, String> in) {
System.out.println("From KafkaListener: " + in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, Object> template, KafkaListenerEndpointRegistry registry) {
return args -> {
final MessageListenerContainer listenerContainer = registry.getListenerContainer("my.id");
System.out.println("Enforcing a rebalance");
Thread.sleep(5_000);
listenerContainer.enforceRebalance();
Thread.sleep(5_000);
};
}
如上面的代码所示,应用程序使用 KafkaListenerEndpointRegistry
访问消息侦听器容器,然后在其上调用 enforceRebalnce
API。当在侦听器容器上调用 enforceRebalance
时,它会将调用委派给底层的 Kafka 消费者。Kafka 消费者会作为下一个 poll()
操作的一部分触发再平衡。
As the code above shows, the application uses the KafkaListenerEndpointRegistry
to gain access to the message listener container and then calling the enforceRebalnce
API on it.
When calling the enforceRebalance
on the listener container, it delegates the call to the underlying Kafka consumer.
The Kafka consumer will trigger a rebalance as part of the next poll()
operation.