Rebalancing Listeners
ContainerProperties
有一个名为 consumerRebalanceListener
的属性,它采用 Kafka 客户端的 ConsumerRebalanceListener
接口的一个实现。如果未提供此属性,容器会配置一个记录侦听器,以 INFO
级别记录再平衡事件。该框架还会添加一个子接口 ConsumerAwareRebalanceListener
。以下清单显示了 ConsumerAwareRebalanceListener
接口定义:
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}
请注意,在撤销分区时有两个回调。第一个会立即调用。第二个在提交任何待定偏移量后调用。如果您希望在某些外部存储库中维护偏移量,这将非常有用,如下例所示:
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// acknowledge any pending Acknowledgments (if using manual acks)
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
// ...
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
});
从版本 2.4 开始,添加了一个新的方法 onPartitionsLost()
(类似于 ConsumerRebalanceLister
中具有相同名称的方法)。ConsumerRebalanceLister
中的默认实现简单地调用 onPartionsRevoked
。ConsumerAwareRebalanceListener
中的默认实现不执行任何操作。当为侦听器容器提供自定义侦听器(任一类型)时,重要的是,您的实现不从 onPartitionsLost
中调用 onPartitionsRevoked
。如果您实现 ConsumerRebalanceListener
,则应覆盖默认方法。这是因为侦听器容器将在调用对您的实现的方法后,从其对 onPartitionsLost
的实现中调用它自己的 onPartitionsRevoked
。如果您实现委托给默认行为,则每当 Consumer
在容器的侦听器上调用该方法时,onPartitionsRevoked
将被调用两次。