Observability
Spring 通过 Micrometer 提供对 Observability 的支持,它在应用程序中定义了一个 Observation concept that enables both Metrics and Traces。
Spring provides support for Observability via Micrometer which defines an Observation concept that enables both Metrics and Traces in applications.
Spring Cloud Stream 通过提供一个 ObservationFunctionAroundWrapper
,在 Spring Cloud Function 的级别集成这种支持,该 ObservationFunctionAroundWrapper
会包装函数,以便即时处理观测值。
Spring cloud Stream integrates such support at the level of Spring Cloud Function by providing amongst several abstractions an ObservationFunctionAroundWrapper
, which wraps function to handle observations out of the box.
必需依赖项
Required dependencies
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core-micrometer</artifactId>
</dependency>
以及一个可用的跟踪器桥接器。例如, Zipkin Brave
and one of the available tracer bridges. For example Zipkin Brave
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>
Imperative Functions
命令式函数用观察包装器 ObservationFunctionAroundWrapper
包装,该包装器提供处理与观察注册表交互所需的必要基础设施。这样的交互在每次调用函数时发生,这意味着观察实际上附加到函数的每次调用(即每个消息一个观察)。换句话说,对于命令式函数,如果存在前面提到的必需依赖项,可观察性将开始工作。
Imperative functions are wrapped with the observation wrapper ObservationFunctionAroundWrapper
which provides necessary infrastructure to handle the interaction with the Observation registry.
Such interactions happen per each invocation of the function which effectively means that observation is attached to each invocation of the
function (i.e., single observation per message).
In other words for imperative functions if the required dependencies mentioned earlier are present, observability will just work.
Reactive Functions
响应函数本质上不同于命令式函数,因此不会用 ObservationFunctionAroundWrapper
包装。
Reactive functions are inherently different then imperative functions and as such are not wrapped with ObservationFunctionAroundWrapper
.
命令式函数 是一个消息处理函数,每次有消息时由框架调用,类似于您的典型事件处理程序,其中对于 N 条消息,此类函数将被调用 N 次。这允许我们包装这样的函数,使用 错误处理、重试 和当然 可观察性 等其他功能对其进行装饰。
Imperative function is a message handler function and invoked by the framework each time there is a message, sort of your typical event handler where for N messages there will be N invocations of such function. That allows us to wrap such function to decorate it with additional functionality such as error handling, retries, and of course observability.
Reactive function 是初始化函数。它的职责是将用户提供的流处理代码(Flux)与绑定器提供的源流和目标流连接起来。它仅在应用程序启动期间调用一次。一旦流代码与源流/目标流连接,我们就无法查看或控制实际的流处理。它掌握在反应式 API 中。反应式函数还带来了一个附加变量。鉴于函数允许您查看整个流链(不仅仅是一个事件),那么默认观测单元应该是什么?流链中的单个项目?项目范围?如果一段时间后没有消息会怎样?等等。我们想强调的是,对于反应式函数,我们不能假设任何事情。(有关反应式函数与命令式函数之间差异的更多信息,请参见 Reactive Functions)。
Reactive function is initialization function. Its job is to connect user provided stream processing code (Flux) with source and target stream provided by the binder. It is invoked only once during the startup of the application. Once stream code is connected with source/target stream we have no visibility nor control of the actual stream processing. It’s in the hands of reactive API. Reactive function also brings an additional variable. Given the fact that the function gives you a visibility to the entire stream chain (not just a single event), what should be the default unit of observation? A single item in the stream chain? A range of items? What if there are no messages after some time elapsed? etc. . . What we wanted is to emphasise that with reactive functions we can’t assume anything. (For more information about the differences between reactive and imperative functions please see Reactive Functions).
所以,就像 重试 和 错误处理 一样,您需要手动处理观察。
So, just like with retries and error handling you need to handle observation manually.
值得庆幸的是,您可以通过使用 tap
操作利用响应 API 段来轻松地做到这一点,同时提供 ObservationRegistry
实例。这样的片段定义了一个观察单元,它可以是单一项目或范围,无论您想在流中观察什么。
Thankfully you can do it easily by tapping into a segment of your stream using the tap
operation of reactive API while providing an instance of ObservationRegistry
. Such segment defines a unit of observation, which could be a single item in the flux or a range or whatever else you may want to observe within the stream.
@SpringBootApplication
public class DemoStreamApplication {
Logger logger = LoggerFactory.getLogger(DemoStreamApplication.class);
public static void main(String[] args) {
Hooks.enableAutomaticContextPropagation();
SpringApplication.run(DemoStreamApplication.class, args);
}
@Bean
public Function<Flux<String>, Flux<String>> uppercase(ObservationRegistry registry) {
return flux -> flux.flatMap(item -> {
return Mono.just(item)
.map(value -> value.toUpperCase())
.doOnNext(v -> logger.info(v))
.tap(Micrometer.observation(registry));
});
}
}
上面的示例模拟了将 Observation 附加到单个消息处理(即命令式函数),因为在这种情况下,观测单元从此处开始:Mono.just(..) 且最后一个操作将 ObservationRegistry
附加到订阅者。
The above example emulates attaching an Observation to a single message processing (i.e., imperative function), since in this case the unit of observation begins with Mono.just(..) and the last operation attaches the ObservationRegistry
to the subscriber.
如果已经将观察附加到订阅者,则将使用它为 tap
上游的链/段创建一个子观察,但是如我们已经陈述的,默认情况下,框架不会将任何观察附加到您返回的流链。
If there is an observation already attached to the subscriber, it will be used to create a child Observation for the chain/segment upstream of tap
, however as we already stated, by default, the framework does not attach any Observation to the stream chains you return.