Timestamp extractor
借助 Kafka Streams,你可以根据各种时间戳概念控制消费者记录的处理过程。默认情况下,Kafka Streams 会抽取嵌入到消费者记录中的时间戳元数据。你可以通过为每个输入绑定提供不同的 TimestampExtractor
实现来更改此默认行为。如下是对如何进行此操作的一些详细说明。
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, Order>>>> process() {
return orderStream ->
customers ->
products -> orderStream;
}
@Bean
public TimestampExtractor timestampExtractor() {
return new WallclockTimestampExtractor();
}
然后,你可以针对每个消费者绑定设置上述 TimestampExtractor
Bean 名称。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"
如果你跳过一个输入消费者绑定以设置自定义时间戳提取器,那么该消费者将使用默认设置。