Rebalancing Listeners

ContainerProperties 有一个名为 consumerRebalanceListener 的属性,它采用 Kafka 客户端的 ConsumerRebalanceListener 接口的一个实现。如果未提供此属性,容器会配置一个记录侦听器,以 INFO 级别记录再平衡事件。该框架还会添加一个子接口 ConsumerAwareRebalanceListener。以下清单显示了 ConsumerAwareRebalanceListener 接口定义:

ContainerProperties has a property called consumerRebalanceListener, which takes an implementation of the Kafka client’s ConsumerRebalanceListener interface. If this property is not provided, the container configures a logging listener that logs rebalance events at the INFO level. The framework also adds a sub-interface ConsumerAwareRebalanceListener. The following listing shows the ConsumerAwareRebalanceListener interface definition:

public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {

    void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

}

请注意,在撤销分区时有两个回调。第一个会立即调用。第二个在提交任何待定偏移量后调用。如果您希望在某些外部存储库中维护偏移量,这将非常有用,如下例所示:

Notice that there are two callbacks when partitions are revoked. The first is called immediately. The second is called after any pending offsets are committed. This is useful if you wish to maintain offsets in some external repository, as the following example shows:

containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {

    @Override
    public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // acknowledge any pending Acknowledgments (if using manual acks)
    }

    @Override
    public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // ...
        store(consumer.position(partition));
        // ...
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // ...
        consumer.seek(partition, offsetTracker.getOffset() + 1);
        // ...
    }
});

从版本 2.4 开始,添加了一个新的方法 onPartitionsLost()(类似于 ConsumerRebalanceLister 中具有相同名称的方法)。ConsumerRebalanceLister 中的默认实现简单地调用 onPartionsRevokedConsumerAwareRebalanceListener 中的默认实现不执行任何操作。当为侦听器容器提供自定义侦听器(任一类型)时,重要的是,您的实现不从 onPartitionsLost 中调用 onPartitionsRevoked。如果您实现 ConsumerRebalanceListener,则应覆盖默认方法。这是因为侦听器容器将在调用对您的实现的方法后,从其对 onPartitionsLost 的实现中调用它自己的 onPartitionsRevoked。如果您实现委托给默认行为,则每当 Consumer 在容器的侦听器上调用该方法时,onPartitionsRevoked 将被调用两次。

Starting with version 2.4, a new method onPartitionsLost() has been added (similar to a method with the same name in ConsumerRebalanceLister). The default implementation on ConsumerRebalanceLister simply calls onPartionsRevoked. The default implementation on ConsumerAwareRebalanceListener does nothing. When supplying the listener container with a custom listener (of either type), it is important that your implementation does not call onPartitionsRevoked from onPartitionsLost. If you implement ConsumerRebalanceListener you should override the default method. This is because the listener container will call its own onPartitionsRevoked from its implementation of onPartitionsLost after calling the method on your implementation. If you implementation delegates to the default behavior, onPartitionsRevoked will be called twice each time the Consumer calls that method on the container’s listener.