Seeking to a Specific Offset

为了搜索,你的侦听器必须实现 ConsumerSeekAware,该接口有以下方法:

In order to seek, your listener must implement ConsumerSeekAware, which has the following methods:

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onPartitionsRevoked(Collection<TopicPartition> partitions)

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

当容器启动以及分配分区时,会调用 registerSeekCallback。在初始化后的某个任意时间搜索时,您应该使用此回调。你应该保存对回调的引用。如果你在多个容器(或 ConcurrentMessageListenerContainer) 中使用相同的侦听器,你应该将回调存储在 ThreadLocal 或其他以侦听器 Thread 为键的结构中。

The registerSeekCallback is called when the container is started and whenever partitions are assigned. You should use this callback when seeking at some arbitrary time after initialization. You should save a reference to the callback. If you use the same listener in multiple containers (or in a ConcurrentMessageListenerContainer), you should store the callback in a ThreadLocal or some other structure keyed by the listener Thread.

使用组管理时,在分配分区时调用 onPartitionsAssigned。您可以使用此方法,例如,通过调用回调来设置分区的初始偏移量。您还可以使用此方法将此线程的回调与分配的分区相关联(请参阅以下示例)。您必须使用回调参数,而不是传递到 registerSeekCallback 的参数。从版本 2.5.5 开始,即使使用 manual partition assignment,也会调用此方法。

When using group management, onPartitionsAssigned is called when partitions are assigned. You can use this method, for example, for setting initial offsets for the partitions, by calling the callback. You can also use this method to associate this thread’s callback with the assigned partitions (see the example below). You must use the callback argument, not the one passed into registerSeekCallback. Starting with version 2.5.5, this method is called, even when using manual partition assignment.

容器停止或 Kafka 取消分配时将调用 onPartitionsRevoked。您应丢弃该线程的回调并删除所有与已取消分配的分区分派的关联。

onPartitionsRevoked is called when the container is stopped or Kafka revokes assignments. You should discard this thread’s callback and remove any associations to the revoked partitions.

回调具有以下方法:

The callback has the following methods:

void seek(String topic, int partition, long offset);

void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction);

void seekToBeginning(String topic, int partition);

void seekToBeginning(Collection<TopicPartitions> partitions);

void seekToEnd(String topic, int partition);

void seekToEnd(Collection<TopicPartitions> partitions);

void seekRelative(String topic, int partition, long offset, boolean toCurrent);

void seekToTimestamp(String topic, int partition, long timestamp);

void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);

seek 方法的两个不同变体提供了一种寻求任意偏移量的方法。将函数作为参数以计算偏移量的 seek 方法已添加到框架的 3.2 版中。此函数提供对当前偏移量的访问(消费者返回的当前位置,即要获取的下一个偏移量)。用户可以决定根据函数定义中消费者中的当前偏移量,寻求哪个偏移量。

The two different variants of the seek methods provide a way to seek to an arbitrary offset. The method that takes a Function as an argument to compute the offset was added in version 3.2 of the framework. This function provides access to the current offset (the current position returned by the consumer, which is the next offset to be fetched). The user can decide what offset to seek to based on the current offset in the consumer as part of the function definition.

在 2.3 版中添加了 seekRelative,以执行相对寻址。

seekRelative was added in version 2.3, to perform relative seeks.

  • offset negative and toCurrent false - seek relative to the end of the partition.

  • offset positive and toCurrent false - seek relative to the beginning of the partition.

  • offset negative and toCurrent true - seek relative to the current position (rewind).

  • offset positive and toCurrent true - seek relative to the current position (fast forward).

在 2.3 版中也添加了 seekToTimestamp 方法。

The seekToTimestamp methods were also added in version 2.3.

onIdleContaineronPartitionsAssigned 方法中为多个分区寻找相同的时间戳时,第二种方法更受青睐,因为通过一次调用消费者的 offsetsForTimes 方法在时间戳中查找偏移更有效。在从其他位置调用时,容器将收集所有时间戳查找请求并一次调用 offsetsForTimes

When seeking to the same timestamp for multiple partitions in the onIdleContainer or onPartitionsAssigned methods, the second method is preferred because it is more efficient to find the offsets for the timestamps in a single call to the consumer’s offsetsForTimes method. When called from other locations, the container will gather all timestamp seek requests and make one call to offsetsForTimes.

当检测到空闲容器时,您还可以从 onIdleContainer() 执行查找操作。有关如何启用空闲容器检测,请参阅 Detecting Idle and Non-Responsive Consumers

You can also perform seek operations from onIdleContainer() when an idle container is detected. See Detecting Idle and Non-Responsive Consumers for how to enable idle container detection.

接受集合的 seekToBeginning 方法很有用,例如,在处理压缩主题并希望在每次启动应用程序时查找开头时:

The seekToBeginning method that accepts a collection is useful, for example, when processing a compacted topic and you wish to seek to the beginning every time the application is started:

public class MyListener implements ConsumerSeekAware {

    ...

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        callback.seekToBeginning(assignments.keySet());
    }

}

要在运行时进行任意寻址,请使用 registerSeekCallback 的回调引用,用于适当的线程。

To arbitrarily seek at runtime, use the callback reference from the registerSeekCallback for the appropriate thread.

这是一个简单的 Spring Boot 应用程序,演示了如何使用回调;它向主题发送 10 条记录;在控制台中按 <Enter> 将导致所有分区寻求到起始位置。

Here is a trivial Spring Boot application that demonstrates how to use the callback; it sends 10 records to the topic; hitting <Enter> in the console causes all partitions to seek to the beginning.

@SpringBootApplication
public class SeekExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(SeekExampleApplication.class, args);
    }

    @Bean
    public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send(
                new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
            while (true) {
                System.in.read();
                listener.seekToStart();
            }
        };
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("seekExample", 3, (short) 1);
    }

}

@Component
class Listener implements ConsumerSeekAware {

    private static final Logger logger = LoggerFactory.getLogger(Listener.class);

    private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();

    private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        this.callbackForThread.set(callback);
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        partitions.forEach(tp -> this.callbacks.remove(tp));
        this.callbackForThread.remove();
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
    }

    @KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
    public void listen(ConsumerRecord<String, String> in) {
        logger.info(in.toString());
    }

    public void seekToStart() {
        this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
    }

}

为了简化操作,版本 2.3 添加了 AbstractConsumerSeekAware 类,该类跟踪对主题/分区使用哪个回调。以下示例演示了如何在每次容器空闲时,对每个分区寻求到处理的最后一条记录。它还提供了一些方法,允许任意外部调用将分区倒带一条记录。

To make things simpler, version 2.3 added the AbstractConsumerSeekAware class, which keeps track of which callback is to be used for a topic/partition. The following example shows how to seek to the last record processed, in each partition, each time the container goes idle. It also has methods that allow arbitrary external calls to rewind partitions by one record.

public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {

    @KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
    public void listen(String in) {
        ...
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments,
            ConsumerSeekCallback callback) {

            assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
    }

    /**
    * Rewind all partitions one record.
    */
    public void rewindAllOneRecord() {
        getSeekCallbacks()
            .forEach((tp, callback) ->
                callback.seekRelative(tp.topic(), tp.partition(), -1, true));
    }

    /**
    * Rewind one partition one record.
    */
    public void rewindOnePartitionOneRecord(String topic, int partition) {
        getSeekCallbackFor(new TopicPartition(topic, partition))
            .seekRelative(topic, partition, -1, true);
    }

}

2.6 版为抽象类添加了以下便捷方法:

Version 2.6 added convenience methods to the abstract class:

  • seekToBeginning() - seeks all assigned partitions to the beginning.

  • seekToEnd() - seeks all assigned partitions to the end.

  • seekToTimestamp(long timestamp) - seeks all assigned partitions to the offset represented by that timestamp.

示例:

Example:

public class MyListener extends AbstractConsumerSeekAware {

    @KafkaListener(...)
    void listn(...) {
        ...
    }
}

public class SomeOtherBean {

    MyListener listener;

    ...

    void someMethod() {
        this.listener.seekToTimestamp(System.currentTimeMillis() - 60_000);
    }

}