Seeking to a Specific Offset
为了搜索,你的侦听器必须实现 ConsumerSeekAware
,该接口有以下方法:
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void onPartitionsRevoked(Collection<TopicPartition> partitions)
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
当容器启动以及分配分区时,会调用 registerSeekCallback
。在初始化后的某个任意时间搜索时,您应该使用此回调。你应该保存对回调的引用。如果你在多个容器(或 ConcurrentMessageListenerContainer
) 中使用相同的侦听器,你应该将回调存储在 ThreadLocal
或其他以侦听器 Thread
为键的结构中。
使用组管理时,在分配分区时调用 onPartitionsAssigned
。您可以使用此方法,例如,通过调用回调来设置分区的初始偏移量。您还可以使用此方法将此线程的回调与分配的分区相关联(请参阅以下示例)。您必须使用回调参数,而不是传递到 registerSeekCallback
的参数。从版本 2.5.5 开始,即使使用 manual partition assignment,也会调用此方法。
容器停止或 Kafka 取消分配时将调用 onPartitionsRevoked
。您应丢弃该线程的回调并删除所有与已取消分配的分区分派的关联。
回调具有以下方法:
void seek(String topic, int partition, long offset);
void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction);
void seekToBeginning(String topic, int partition);
void seekToBeginning(Collection<TopicPartitions> partitions);
void seekToEnd(String topic, int partition);
void seekToEnd(Collection<TopicPartitions> partitions);
void seekRelative(String topic, int partition, long offset, boolean toCurrent);
void seekToTimestamp(String topic, int partition, long timestamp);
void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);
seek
方法的两个不同变体提供了一种寻求任意偏移量的方法。将函数作为参数以计算偏移量的 seek
方法已添加到框架的 3.2 版中。此函数提供对当前偏移量的访问(消费者返回的当前位置,即要获取的下一个偏移量)。用户可以决定根据函数定义中消费者中的当前偏移量,寻求哪个偏移量。
在 2.3 版中添加了 seekRelative
,以执行相对寻址。
-
offset
为负,toCurrent
false
- 相对于分区的末尾进行查找。 -
offset
为正,toCurrent
false
- 相对于分区开头进行查找。 -
offset
为负,toCurrent
true
- 相对于当前位置进行查找(回退)。 -
offset
为正,toCurrent
true
- 相对于当前位置进行查找(快进)。
在 2.3 版中也添加了 seekToTimestamp
方法。
在 |
当检测到空闲容器时,您还可以从 onIdleContainer()
执行查找操作。有关如何启用空闲容器检测,请参阅 Detecting Idle and Non-Responsive Consumers。
接受集合的 |
public class MyListener implements ConsumerSeekAware {
...
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
callback.seekToBeginning(assignments.keySet());
}
}
要在运行时进行任意寻址,请使用 registerSeekCallback
的回调引用,用于适当的线程。
这是一个简单的 Spring Boot 应用程序,演示了如何使用回调;它向主题发送 10 条记录;在控制台中按 <Enter>
将导致所有分区寻求到起始位置。
@SpringBootApplication
public class SeekExampleApplication {
public static void main(String[] args) {
SpringApplication.run(SeekExampleApplication.class, args);
}
@Bean
public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send(
new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
while (true) {
System.in.read();
listener.seekToStart();
}
};
}
@Bean
public NewTopic topic() {
return new NewTopic("seekExample", 3, (short) 1);
}
}
@Component
class Listener implements ConsumerSeekAware {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();
private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
this.callbackForThread.set(callback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> this.callbacks.remove(tp));
this.callbackForThread.remove();
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
@KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
public void listen(ConsumerRecord<String, String> in) {
logger.info(in.toString());
}
public void seekToStart() {
this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
为了简化操作,版本 2.3 添加了 AbstractConsumerSeekAware
类,该类跟踪对主题/分区使用哪个回调。以下示例演示了如何在每次容器空闲时,对每个分区寻求到处理的最后一条记录。它还提供了一些方法,允许任意外部调用将分区倒带一条记录。
public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {
@KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
public void listen(String in) {
...
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind all partitions one record.
*/
public void rewindAllOneRecord() {
getSeekCallbacks()
.forEach((tp, callback) ->
callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind one partition one record.
*/
public void rewindOnePartitionOneRecord(String topic, int partition) {
getSeekCallbackFor(new TopicPartition(topic, partition))
.seekRelative(topic, partition, -1, true);
}
}
2.6 版为抽象类添加了以下便捷方法:
-
seekToBeginning()
- 将所有分配的分区都查找至开头。 -
seekToEnd()
- 将所有分配的分区都查找至末尾。 -
seekToTimestamp(long timestamp)
- 将所有分配的分区都查找至该时间戳表示的偏移量。
示例:
public class MyListener extends AbstractConsumerSeekAware {
@KafkaListener(...)
void listn(...) {
...
}
}
public class SomeOtherBean {
MyListener listener;
...
void someMethod() {
this.listener.seekToTimestamp(System.currentTimeMillis() - 60_000);
}
}