Rebalancing Listeners
ContainerProperties
有一个名为 consumerRebalanceListener
的属性,它采用 Kafka 客户端的 ConsumerRebalanceListener
接口的一个实现。如果未提供此属性,容器会配置一个记录侦听器,以 INFO
级别记录再平衡事件。该框架还会添加一个子接口 ConsumerAwareRebalanceListener
。以下清单显示了 ConsumerAwareRebalanceListener
接口定义:
ContainerProperties
has a property called consumerRebalanceListener
, which takes an implementation of the Kafka client’s ConsumerRebalanceListener
interface.
If this property is not provided, the container configures a logging listener that logs rebalance events at the INFO
level.
The framework also adds a sub-interface ConsumerAwareRebalanceListener
.
The following listing shows the ConsumerAwareRebalanceListener
interface definition:
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}
请注意,在撤销分区时有两个回调。第一个会立即调用。第二个在提交任何待定偏移量后调用。如果您希望在某些外部存储库中维护偏移量,这将非常有用,如下例所示:
Notice that there are two callbacks when partitions are revoked. The first is called immediately. The second is called after any pending offsets are committed. This is useful if you wish to maintain offsets in some external repository, as the following example shows:
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// acknowledge any pending Acknowledgments (if using manual acks)
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
// ...
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
});
从版本 2.4 开始,添加了一个新的方法 onPartitionsLost()
(类似于 ConsumerRebalanceLister
中具有相同名称的方法)。ConsumerRebalanceLister
中的默认实现简单地调用 onPartionsRevoked
。ConsumerAwareRebalanceListener
中的默认实现不执行任何操作。当为侦听器容器提供自定义侦听器(任一类型)时,重要的是,您的实现不从 onPartitionsLost
中调用 onPartitionsRevoked
。如果您实现 ConsumerRebalanceListener
,则应覆盖默认方法。这是因为侦听器容器将在调用对您的实现的方法后,从其对 onPartitionsLost
的实现中调用它自己的 onPartitionsRevoked
。如果您实现委托给默认行为,则每当 Consumer
在容器的侦听器上调用该方法时,onPartitionsRevoked
将被调用两次。
Starting with version 2.4, a new method onPartitionsLost()
has been added (similar to a method with the same name in ConsumerRebalanceLister
).
The default implementation on ConsumerRebalanceLister
simply calls onPartionsRevoked
.
The default implementation on ConsumerAwareRebalanceListener
does nothing.
When supplying the listener container with a custom listener (of either type), it is important that your implementation does not call onPartitionsRevoked
from onPartitionsLost
.
If you implement ConsumerRebalanceListener
you should override the default method.
This is because the listener container will call its own onPartitionsRevoked
from its implementation of onPartitionsLost
after calling the method on your implementation.
If you implementation delegates to the default behavior, onPartitionsRevoked
will be called twice each time the Consumer
calls that method on the container’s listener.