Seeking to a Specific Offset
为了搜索,你的侦听器必须实现 ConsumerSeekAware
,该接口有以下方法:
In order to seek, your listener must implement ConsumerSeekAware
, which has the following methods:
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void onPartitionsRevoked(Collection<TopicPartition> partitions)
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
当容器启动以及分配分区时,会调用 registerSeekCallback
。在初始化后的某个任意时间搜索时,您应该使用此回调。你应该保存对回调的引用。如果你在多个容器(或 ConcurrentMessageListenerContainer
) 中使用相同的侦听器,你应该将回调存储在 ThreadLocal
或其他以侦听器 Thread
为键的结构中。
The registerSeekCallback
is called when the container is started and whenever partitions are assigned.
You should use this callback when seeking at some arbitrary time after initialization.
You should save a reference to the callback.
If you use the same listener in multiple containers (or in a ConcurrentMessageListenerContainer
), you should store the callback in a ThreadLocal
or some other structure keyed by the listener Thread
.
使用组管理时,在分配分区时调用 onPartitionsAssigned
。您可以使用此方法,例如,通过调用回调来设置分区的初始偏移量。您还可以使用此方法将此线程的回调与分配的分区相关联(请参阅以下示例)。您必须使用回调参数,而不是传递到 registerSeekCallback
的参数。从版本 2.5.5 开始,即使使用 manual partition assignment,也会调用此方法。
When using group management, onPartitionsAssigned
is called when partitions are assigned.
You can use this method, for example, for setting initial offsets for the partitions, by calling the callback.
You can also use this method to associate this thread’s callback with the assigned partitions (see the example below).
You must use the callback argument, not the one passed into registerSeekCallback
.
Starting with version 2.5.5, this method is called, even when using manual partition assignment.
容器停止或 Kafka 取消分配时将调用 onPartitionsRevoked
。您应丢弃该线程的回调并删除所有与已取消分配的分区分派的关联。
onPartitionsRevoked
is called when the container is stopped or Kafka revokes assignments.
You should discard this thread’s callback and remove any associations to the revoked partitions.
回调具有以下方法:
The callback has the following methods:
void seek(String topic, int partition, long offset);
void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction);
void seekToBeginning(String topic, int partition);
void seekToBeginning(Collection<TopicPartitions> partitions);
void seekToEnd(String topic, int partition);
void seekToEnd(Collection<TopicPartitions> partitions);
void seekRelative(String topic, int partition, long offset, boolean toCurrent);
void seekToTimestamp(String topic, int partition, long timestamp);
void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);
seek
方法的两个不同变体提供了一种寻求任意偏移量的方法。将函数作为参数以计算偏移量的 seek
方法已添加到框架的 3.2 版中。此函数提供对当前偏移量的访问(消费者返回的当前位置,即要获取的下一个偏移量)。用户可以决定根据函数定义中消费者中的当前偏移量,寻求哪个偏移量。
The two different variants of the seek
methods provide a way to seek to an arbitrary offset.
The method that takes a Function
as an argument to compute the offset was added in version 3.2 of the framework.
This function provides access to the current offset (the current position returned by the consumer, which is the next offset to be fetched).
The user can decide what offset to seek to based on the current offset in the consumer as part of the function definition.
在 2.3 版中添加了 seekRelative
,以执行相对寻址。
seekRelative
was added in version 2.3, to perform relative seeks.
-
offset
negative andtoCurrent
false
- seek relative to the end of the partition. -
offset
positive andtoCurrent
false
- seek relative to the beginning of the partition. -
offset
negative andtoCurrent
true
- seek relative to the current position (rewind). -
offset
positive andtoCurrent
true
- seek relative to the current position (fast forward).
在 2.3 版中也添加了 seekToTimestamp
方法。
The seekToTimestamp
methods were also added in version 2.3.
在 |
When seeking to the same timestamp for multiple partitions in the |
当检测到空闲容器时,您还可以从 onIdleContainer()
执行查找操作。有关如何启用空闲容器检测,请参阅 Detecting Idle and Non-Responsive Consumers。
You can also perform seek operations from onIdleContainer()
when an idle container is detected.
See Detecting Idle and Non-Responsive Consumers for how to enable idle container detection.
接受集合的 |
The |
public class MyListener implements ConsumerSeekAware {
...
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
callback.seekToBeginning(assignments.keySet());
}
}
要在运行时进行任意寻址,请使用 registerSeekCallback
的回调引用,用于适当的线程。
To arbitrarily seek at runtime, use the callback reference from the registerSeekCallback
for the appropriate thread.
这是一个简单的 Spring Boot 应用程序,演示了如何使用回调;它向主题发送 10 条记录;在控制台中按 <Enter>
将导致所有分区寻求到起始位置。
Here is a trivial Spring Boot application that demonstrates how to use the callback; it sends 10 records to the topic; hitting <Enter>
in the console causes all partitions to seek to the beginning.
@SpringBootApplication
public class SeekExampleApplication {
public static void main(String[] args) {
SpringApplication.run(SeekExampleApplication.class, args);
}
@Bean
public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send(
new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
while (true) {
System.in.read();
listener.seekToStart();
}
};
}
@Bean
public NewTopic topic() {
return new NewTopic("seekExample", 3, (short) 1);
}
}
@Component
class Listener implements ConsumerSeekAware {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();
private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
this.callbackForThread.set(callback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> this.callbacks.remove(tp));
this.callbackForThread.remove();
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
@KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
public void listen(ConsumerRecord<String, String> in) {
logger.info(in.toString());
}
public void seekToStart() {
this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
为了简化操作,版本 2.3 添加了 AbstractConsumerSeekAware
类,该类跟踪对主题/分区使用哪个回调。以下示例演示了如何在每次容器空闲时,对每个分区寻求到处理的最后一条记录。它还提供了一些方法,允许任意外部调用将分区倒带一条记录。
To make things simpler, version 2.3 added the AbstractConsumerSeekAware
class, which keeps track of which callback is to be used for a topic/partition.
The following example shows how to seek to the last record processed, in each partition, each time the container goes idle.
It also has methods that allow arbitrary external calls to rewind partitions by one record.
public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {
@KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
public void listen(String in) {
...
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind all partitions one record.
*/
public void rewindAllOneRecord() {
getSeekCallbacks()
.forEach((tp, callback) ->
callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind one partition one record.
*/
public void rewindOnePartitionOneRecord(String topic, int partition) {
getSeekCallbackFor(new TopicPartition(topic, partition))
.seekRelative(topic, partition, -1, true);
}
}
2.6 版为抽象类添加了以下便捷方法:
Version 2.6 added convenience methods to the abstract class:
-
seekToBeginning()
- seeks all assigned partitions to the beginning. -
seekToEnd()
- seeks all assigned partitions to the end. -
seekToTimestamp(long timestamp)
- seeks all assigned partitions to the offset represented by that timestamp.
示例:
Example:
public class MyListener extends AbstractConsumerSeekAware {
@KafkaListener(...)
void listn(...) {
...
}
}
public class SomeOtherBean {
MyListener listener;
...
void someMethod() {
this.listener.seekToTimestamp(System.currentTimeMillis() - 60_000);
}
}