Tracing using Spring Cloud Sleuth
当 Spring Cloud Sleuth 存在基于 Spring Cloud Stream Apache Kafka Streams Binder 的应用程序的类路径中时,它的使用者和生产者都会自动借助追踪信息进行检测。然而,为了追踪任何应用程序特定操作,这些操作需要由用户代码明确地执行检测。可以通过在应用程序中注入 Spring Cloud Sleuth 中的 KafkaStreamsTracing
bean,然后通过此注入 bean 调用各种 Apache Kafka Streams 操作来完成此操作。以下是使用它的某些示例。
When Spring Cloud Sleuth is on the classpath of a Spring Cloud Stream Kafka Streams binder based application, both its consumer and producer are automatically instrumented with tracing information.
However, in order to trace any application specific operations, those need to be explicitly instrumented by the user code.
This can be done by injecting the KafkaStreamsTracing
bean from Spring Cloud Sleuth in the application and then invoke various Kafka Streams operations through this injected bean.
Here are some examples of using it.
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> clicks(KafkaStreamsTracing kafkaStreamsTracing) {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.transformValues(kafkaStreamsTracing.peek("span-1", (key, value) -> LOG.info("key/value: " + key + "/" + value)))
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.transform(kafkaStreamsTracing.map("span-2", (key, value) -> {
LOG.info("Click Info: " + value.getRegion() + "/" + value.getClicks());
return new KeyValue<>(value.getRegion(),
value.getClicks());
}))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum, Materialized.as(CLICK_UPDATES))
.toStream());
}
在上面的示例中,有两个添加显式追踪检测的地方。首先,我们正在记录来自传入 KStream
的键/值信息。记录此信息时,关联的 span 和追踪 ID 也将记录,以便监控系统可以追踪它们并与相同的 span ID 相关联。其次,当我们调用 map
操作时,我们不是直接在 KStream
类上调用它,而是将其包装在 transform
操作中,然后从 KafkaStreamsTracing
调用 map
。在这种情况,记录的消息中也会包含 span ID 和追踪 ID。
In the example above, there are two places where it adds explicit tracing instrumentation.
First, we are logging the key/value information from the incoming KStream
.
When this information is logged, the associated span and trace IDs get logged as well so that a monitoring system can track them and correlate with the same span id.
Second, when we call a map
operation, instead of calling it directly on the KStream
class, we wrap it inside a transform
operation and then call map
from KafkaStreamsTracing
.
In this case also, the logged message will contain the span ID and trace ID.
这里有另一个示例,其中我们对低级别 Transformer API 使用访问各种 Apache Kafka Streams 头。当 spring-cloud-sleuth
存在于类路径中时,所有追踪头也可以这样访问。
Here is another example, where we use the low-level transformer API for accessing the various Kafka Streams headers. When spring-cloud-sleuth is on the classpath, all the tracing headers can also be accessed like this.
@Bean
public Function<KStream<String, String>, KStream<String, String>> process(KafkaStreamsTracing kafkaStreamsTracing) {
return input -> input.transform(kafkaStreamsTracing.transformer(
"transformer-1",
() -> new Transformer<String, String, KeyValue<String, String>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<String, String> transform(String key, String value) {
LOG.info("Headers: " + this.context.headers());
LOG.info("K/V:" + key + "/" + value);
// More transformations, business logic execution, etc. go here.
return KeyValue.pair(key, value);
}
@Override
public void close() {
}
}));
}