Timestamp extractor
借助 Kafka Streams,你可以根据各种时间戳概念控制消费者记录的处理过程。默认情况下,Kafka Streams 会抽取嵌入到消费者记录中的时间戳元数据。你可以通过为每个输入绑定提供不同的 TimestampExtractor
实现来更改此默认行为。如下是对如何进行此操作的一些详细说明。
Kafka Streams allows you to control the processing of the consumer records based on various notions of timestamp.
By default, Kafka Streams extracts the timestamp metadata embedded in the consumer record.
You can change this default behavior by providing a different TimestampExtractor
implementation per input binding.
Here are some details on how that can be done.
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, Order>>>> process() {
return orderStream ->
customers ->
products -> orderStream;
}
@Bean
public TimestampExtractor timestampExtractor() {
return new WallclockTimestampExtractor();
}
然后,你可以针对每个消费者绑定设置上述 TimestampExtractor
Bean 名称。
Then you set the above TimestampExtractor
bean name per consumer binding.
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"
如果你跳过一个输入消费者绑定以设置自定义时间戳提取器,那么该消费者将使用默认设置。
If you skip an input consumer binding for setting a custom timestamp extractor, that consumer will use the default settings.