A5.微服务分布式日志与Open Telemetry融合实践

前言

这是一个基于Java 21 的 六边形架构与领域驱动设计的一个通用项目,并且结合现有的最新版本技术架构实现了 领域驱动设计模式和六边形架构模式组件定义. 并且结合微服务,介绍了领域层,领域事件,资源库,分布式锁,序列化,安全认证,日志等,并提供了实现功能. 并且我会以日常发布文章和更新代码的形式来完善它.

简介

在多实例及其微服务的分布式环境下,我们需要利用日志来分析应用程序运行时的问题,但是当日志跨多个应用后,很难识别出哪些日志是属于哪个请求的。 最直接的解决方案就是创建一个唯一标识(traceId), 将该标识写入日志中,便于后续分析。

同时现在已经有了一个开源项目Open Telemetry,它提供了一套统一的分布式追踪方案,可以很方便的将traceId写入日志中。 由此我们也需要了解两个技术点, 然后来解决问题。

  • Logback MDC

  • Open Telemetry

Logback MDC

MDC(Mapped Diagnostic Contexts)映射诊断上下文,该特征是logback提供的一种方便在多线程条件下的记录日志的功能. 我们在使用的时候只需要知道: 在多线程环境中,可以为每个线程添加特定的上下文信息即可. 同时我也提供了一个简单的用例,来使用MDC记录日志。该模块在项目的 common-distributed-traceid-with-spring 目录下。

因为整个架构在是在六边形架构中,所有的入口请求都是在主适配器,所以我们只需要在主适配器中调用前添加traceId到MDC, 然后在接下来的业务处理及方法调用中打印日志,就可以显示该traceId。

1.我们使用Spring AOP 来监听,并填充 traceId到MDC中。

package com.iokays.common.traceid;

import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.slf4j.MDC;
import org.springframework.stereotype.Component;

import java.util.UUID;

import static io.vavr.API.println;


@Aspect
@Component
public class DriverAdapterWithMDCAspect {

    private static final String TRACE_ID = TRACE.ID;

    /**
     * Spring 前置通知 实现traceId的设置
     */
    @Before("@within(com.iokays.common.core.adapter.DriverAdapter)")
    public void before(final JoinPoint joinPoint) {
        println("Before method execution: " + joinPoint.getSignature().getName());
        // 从 MDC 中获取 traceId
        final String traceId = MDC.get(TRACE_ID);
        if (StringUtils.isEmpty(traceId)) {
            // 设置 traceId 到 DriverAdapter
            MDC.put(TRACE_ID, UUID.randomUUID().toString());
        }
    }

}

2.配置日志,并将 MDC 中的 traceId 输出到日志中。 很简单,使用 %X{} 就能得到 traceId值。

<?xml version="1.0" encoding="UTF-8"?>
<!--        scan : 当此属性设置为true时,配置文件如果发生改变,将会被重新加载,默认值为true-->
<configuration scan="false" scanPeriod="60 seconds" debug="ture">
    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>trace-id:%X{trace-id} %d %p - %m%n</pattern>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
    </root>

</configuration>

以上就是一个很简单的traceId配置,同时我们也可以支持Junit测试和多线程的处理。

3.Junit 支持

package com.iokays.common.traceid;

import org.junit.jupiter.api.BeforeEach;
import org.slf4j.MDC;

import java.util.UUID;

public interface JunitMDCSupport {

    @BeforeEach
    default void setMDC() {
        MDC.put(TRACE.ID, UUID.randomUUID().toString());
    }


}

4.因为不管怎么来处理,都不支持并行流的日志打印,但是我们可以利用基于线程池的方式来处理多线程。并且将线程池交给Spring来管理。然后类似包装器模式覆盖ThreadPoolTaskExecutor的执行方法,并将MDC信息传入线程中.

package com.iokays.common.traceid;

import org.apache.commons.collections4.MapUtils;
import org.slf4j.MDC;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;

/**
 * 在日志配置文件中<pattern>节点中以%X{trace-id}取出,比如:<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level [uniqid-%X{trace-id}] %logger{50}-%line - %m%n</pattern>
 */
public class ThreadPoolMDCExecutor extends ThreadPoolTaskExecutor {

    private static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
        return () -> {
            if (MapUtils.isEmpty(context)) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                return callable.call();
            } finally {
                // 清除子线程的,避免内存溢出,就和ThreadLocal.remove()一个原因
                MDC.clear();
            }
        };
    }

    private static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
        return () -> {
            if (MapUtils.isEmpty(context)) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                runnable.run();
            } finally {
                MDC.clear();
            }
        };
    }

    @Override
    public void execute(Runnable task) {
        super.execute(wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return super.submit(wrap(task, MDC.getCopyOfContextMap()));
    }
}

5.最后注入到Spring容器, 并使用。

package com.iokays.sample.traceid;

import com.iokays.common.traceid.ThreadPoolMDCExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * 线程池配置。把ThreadPoolTaskExecutor换成ThreadPoolMdcExecutor
 **/
@Configuration
public class ThreadPoolConfig {

    /**
     * 业务用到的线程池
     *
     * @return
     */
    @Bean(name = "threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        final var executor = new ThreadPoolMDCExecutor();
        executor.setCorePoolSize(10); // 核心线程数
        executor.setMaxPoolSize(20); // 最大线程数
        executor.setQueueCapacity(500); // 队列大小
        executor.setKeepAliveSeconds(60); // 线程空闲时的存活时间
        executor.setThreadNamePrefix("MyThreadPoolTaskExecutor-"); // 线程名称前缀
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略
        executor.initialize(); // 初始化
        return executor;
    }
}
package com.iokays.sample.traceid;

import jakarta.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Service;

@Service
public class TraceIdService {

    private static final Logger log = LoggerFactory.getLogger(TraceIdServiceTest.class);

    @Resource
    private TaskExecutor taskExecutor;

    void run() {
        log.info("main.info");
        taskExecutor.execute(() -> log.info("taskExecutor.execute.info"));
    }


}

6.一个简单基于MDC的日志跟踪就处理完了, 其日志打印如下:

trace-id:0fbc5795-2038-43d1-95c0-cc94a601abcd 10:38:47,654 INFO - main.info
trace-id:0fbc5795-2038-43d1-95c0-cc94a601abcd 10:38:47,664 INFO - taskExecutor.execute.info

Open Telemetry

OpenTelemetry (OTel) 是一个开源可观测性框架,可允许开发团队以统一的 单一格式生成、处理和传输 遥测数据。

我们先了解下 OpenTelemetry 相关术语表

  • API(应用程序编程接口):定义数据类型和操作,以生成遥测数据并确定遥测数据的关联性。API 软件包包括插桩所用的横切公共接口。

  • SDK(软件开发工具包):由 OpenTelemetry 项目提供的 API 实施。在应用程序内,SDK 由应用程序负责人进行安装和管理。

  • 分布式跟踪:分布式跟踪能够允许您跟踪完整执行路径并找出造成问题的代码。

  • Jaeger:Jaeger 是一个开源的分布式跟踪工具,IT 团队会基于微服务架构使用 Jaeger 来对应用程序进行监测和故障排查。

  • 可观测性:可观测性为在复杂环境内运行的应用程序的行为提供细粒度的洞察和背景,让团队使用遥测数据理解其应用程序、服务和基础架构的表现,并能实时地和基于历史数据跟踪问题并进行响应。

  • 跟踪:跟踪表示某个请求在分布式系统内的完整路径。OpenTelemetry 中的跟踪是由其跨度进行定义的。跟踪能够帮助团队理解请求在不同服务和组件中的端到端历程和行为。

  • 指标:指标是在一段时间内测量的数值。指标包括诸如时间戳、事件名称以及事件值等属性。日志:日志是在特定时间点系统中发生的离散事件的文本记录。每次执行代码块时都会生成日志,而且日志通常都会包含时间戳。

open telemetry

并且OpenTelemetry提供了一整套用于生成分布式追踪、度量和日志的观测数据的标准和工具。并且这些标准和工具为开发者提供了一种统一、灵活和可扩展的方式来监控和管理我们的系统。对此,我们使用OpenTelemetry来处理分布式traceId。该部分的代码在项目的: common-distributed-traceid-with-open-telemetry 模块.

1.因为我们使用的SpringBoot项目,其依赖关系如下:

import org.springframework.boot.gradle.plugin.SpringBootPlugin

apply(plugin = "org.springframework.boot")
apply(plugin = "io.spring.dependency-management")

dependencies {
    implementation(platform(SpringBootPlugin.BOM_COORDINATES))
    implementation(platform("io.opentelemetry.instrumentation:opentelemetry-instrumentation-bom:2.9.0"))
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("io.opentelemetry.instrumentation:opentelemetry-spring-boot-starter")
    implementation("io.opentelemetry.instrumentation:opentelemetry-logback-mdc-1.0:2.9.0-alpha")
    implementation("io.opentelemetry.contrib:opentelemetry-samplers:1.33.0-alpha")

    testImplementation("org.springframework.boot:spring-boot-starter-test")
}

2.然后我们做一个最简单的配置

package com.iokays.common.traceid;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class OpenTelemetryConfig {

    @Bean
    public OpenTelemetry openTelemetry() {

        final var serviceNameResource = Resource.create(Attributes.of(AttributeKey.stringKey("service.name"), "app-test"));

        final var tracerProvider = SdkTracerProvider.builder()
                .setResource(Resource.getDefault().merge(serviceNameResource))
                .build();
        return OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build();
    }

}

3.最后启动SpringBoot项目,然后使用浏览器访问: http://localhost:8080/ping 就可以看到traceId的结果。

未完待续…​

微服务分布式日志与Open Telemetry融合实践开篇就讲完了,基于Open Telemetry的分布式追踪的实战后续会陆续添加,下篇将会介绍分布式ID与雪花算法(Snowflake)融合实践。