Tracing using Spring Cloud Sleuth

当 Spring Cloud Sleuth 存在基于 Spring Cloud Stream Apache Kafka Streams Binder 的应用程序的类路径中时,它的使用者和生产者都会自动借助追踪信息进行检测。然而,为了追踪任何应用程序特定操作,这些操作需要由用户代码明确地执行检测。可以通过在应用程序中注入 Spring Cloud Sleuth 中的 KafkaStreamsTracing bean,然后通过此注入 bean 调用各种 Apache Kafka Streams 操作来完成此操作。以下是使用它的某些示例。

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> clicks(KafkaStreamsTracing kafkaStreamsTracing) {
    return (userClicksStream, userRegionsTable) -> (userClicksStream
            .transformValues(kafkaStreamsTracing.peek("span-1", (key, value) -> LOG.info("key/value: " + key + "/" + value)))
            .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
                            "UNKNOWN" : region, clicks),
                    Joined.with(Serdes.String(), Serdes.Long(), null))
            .transform(kafkaStreamsTracing.map("span-2", (key, value) -> {
                LOG.info("Click Info: " + value.getRegion() + "/" + value.getClicks());
                return new KeyValue<>(value.getRegion(),
                        value.getClicks());
            }))
            .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
            .reduce(Long::sum, Materialized.as(CLICK_UPDATES))
            .toStream());
}

在上面的示例中,有两个添加显式追踪检测的地方。首先,我们正在记录来自传入 KStream 的键/值信息。记录此信息时,关联的 span 和追踪 ID 也将记录,以便监控系统可以追踪它们并与相同的 span ID 相关联。其次,当我们调用 map 操作时,我们不是直接在 KStream 类上调用它,而是将其包装在 transform 操作中,然后从 KafkaStreamsTracing 调用 map。在这种情况,记录的消息中也会包含 span ID 和追踪 ID。

这里有另一个示例,其中我们对低级别 Transformer API 使用访问各种 Apache Kafka Streams 头。当 spring-cloud-sleuth 存在于类路径中时,所有追踪头也可以这样访问。

@Bean
public Function<KStream<String, String>, KStream<String, String>> process(KafkaStreamsTracing kafkaStreamsTracing) {
    return input -> input.transform(kafkaStreamsTracing.transformer(
            "transformer-1",
            () -> new Transformer<String, String, KeyValue<String, String>>() {
                ProcessorContext context;

                @Override
                public void init(ProcessorContext context) {
                    this.context = context;
                }

                @Override
                public KeyValue<String, String> transform(String key, String value) {
                    LOG.info("Headers: " + this.context.headers());
                    LOG.info("K/V:" + key + "/" + value);
                    // More transformations, business logic execution, etc. go here.
                    return KeyValue.pair(key, value);
                }

                @Override
                public void close() {
                }
            }));
}