Tracing using Spring Cloud Sleuth
当 Spring Cloud Sleuth 存在基于 Spring Cloud Stream Apache Kafka Streams Binder 的应用程序的类路径中时,它的使用者和生产者都会自动借助追踪信息进行检测。然而,为了追踪任何应用程序特定操作,这些操作需要由用户代码明确地执行检测。可以通过在应用程序中注入 Spring Cloud Sleuth 中的 KafkaStreamsTracing
bean,然后通过此注入 bean 调用各种 Apache Kafka Streams 操作来完成此操作。以下是使用它的某些示例。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> clicks(KafkaStreamsTracing kafkaStreamsTracing) {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.transformValues(kafkaStreamsTracing.peek("span-1", (key, value) -> LOG.info("key/value: " + key + "/" + value)))
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.transform(kafkaStreamsTracing.map("span-2", (key, value) -> {
LOG.info("Click Info: " + value.getRegion() + "/" + value.getClicks());
return new KeyValue<>(value.getRegion(),
value.getClicks());
}))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum, Materialized.as(CLICK_UPDATES))
.toStream());
}
在上面的示例中,有两个添加显式追踪检测的地方。首先,我们正在记录来自传入 KStream
的键/值信息。记录此信息时,关联的 span 和追踪 ID 也将记录,以便监控系统可以追踪它们并与相同的 span ID 相关联。其次,当我们调用 map
操作时,我们不是直接在 KStream
类上调用它,而是将其包装在 transform
操作中,然后从 KafkaStreamsTracing
调用 map
。在这种情况,记录的消息中也会包含 span ID 和追踪 ID。
这里有另一个示例,其中我们对低级别 Transformer API 使用访问各种 Apache Kafka Streams 头。当 spring-cloud-sleuth
存在于类路径中时,所有追踪头也可以这样访问。
@Bean
public Function<KStream<String, String>, KStream<String, String>> process(KafkaStreamsTracing kafkaStreamsTracing) {
return input -> input.transform(kafkaStreamsTracing.transformer(
"transformer-1",
() -> new Transformer<String, String, KeyValue<String, String>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<String, String> transform(String key, String value) {
LOG.info("Headers: " + this.context.headers());
LOG.info("K/V:" + key + "/" + value);
// More transformations, business logic execution, etc. go here.
return KeyValue.pair(key, value);
}
@Override
public void close() {
}
}));
}