Observability
Spring 通过 Micrometer 提供对 Observability 的支持,它在应用程序中定义了一个 Observation concept that enables both Metrics and Traces。
Spring Cloud Stream 通过提供一个 ObservationFunctionAroundWrapper
,在 Spring Cloud Function 的级别集成这种支持,该 ObservationFunctionAroundWrapper
会包装函数,以便即时处理观测值。
必需依赖项
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core-micrometer</artifactId>
</dependency>
以及一个可用的跟踪器桥接器。例如, Zipkin Brave
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>
Imperative Functions
命令式函数用观察包装器 ObservationFunctionAroundWrapper
包装,该包装器提供处理与观察注册表交互所需的必要基础设施。这样的交互在每次调用函数时发生,这意味着观察实际上附加到函数的每次调用(即每个消息一个观察)。换句话说,对于命令式函数,如果存在前面提到的必需依赖项,可观察性将开始工作。
Reactive Functions
响应函数本质上不同于命令式函数,因此不会用 ObservationFunctionAroundWrapper
包装。
命令式函数 是一个消息处理函数,每次有消息时由框架调用,类似于您的典型事件处理程序,其中对于 N 条消息,此类函数将被调用 N 次。这允许我们包装这样的函数,使用 错误处理、重试 和当然 可观察性 等其他功能对其进行装饰。
Reactive function 是初始化函数。它的职责是将用户提供的流处理代码(Flux)与绑定器提供的源流和目标流连接起来。它仅在应用程序启动期间调用一次。一旦流代码与源流/目标流连接,我们就无法查看或控制实际的流处理。它掌握在反应式 API 中。反应式函数还带来了一个附加变量。鉴于函数允许您查看整个流链(不仅仅是一个事件),那么默认观测单元应该是什么?流链中的单个项目?项目范围?如果一段时间后没有消息会怎样?等等。我们想强调的是,对于反应式函数,我们不能假设任何事情。(有关反应式函数与命令式函数之间差异的更多信息,请参见 Reactive Functions)。
所以,就像 重试 和 错误处理 一样,您需要手动处理观察。
值得庆幸的是,您可以通过使用 tap
操作利用响应 API 段来轻松地做到这一点,同时提供 ObservationRegistry
实例。这样的片段定义了一个观察单元,它可以是单一项目或范围,无论您想在流中观察什么。
@SpringBootApplication
public class DemoStreamApplication {
Logger logger = LoggerFactory.getLogger(DemoStreamApplication.class);
public static void main(String[] args) {
Hooks.enableAutomaticContextPropagation();
SpringApplication.run(DemoStreamApplication.class, args);
}
@Bean
public Function<Flux<String>, Flux<String>> uppercase(ObservationRegistry registry) {
return flux -> flux.flatMap(item -> {
return Mono.just(item)
.map(value -> value.toUpperCase())
.doOnNext(v -> logger.info(v))
.tap(Micrometer.observation(registry));
});
}
}
上面的示例模拟了将 Observation 附加到单个消息处理(即命令式函数),因为在这种情况下,观测单元从此处开始:Mono.just(..) 且最后一个操作将 ObservationRegistry
附加到订阅者。
如果已经将观察附加到订阅者,则将使用它为 tap
上游的链/段创建一个子观察,但是如我们已经陈述的,默认情况下,框架不会将任何观察附加到您返回的流链。