Duplicated context, context locals, asynchronous processing and propagation

当使用传统的、阻塞的和同步的框架时,每个请求的处理都在一个专用线程中执行。所以,同一个线程用于整个处理过程。您知道这个线程在处理完成之前不会用于执行任何其他任务。如果您需要在处理过程中传播数据,例如安全主体或跟踪 ID,可以使用 ThreadLocals。传播的数据会在处理完成后清除。 在使用反应式和异步执行模型时,您不能使用相同机制。为了避免使用多个进程线程,以及减少资源使用(还提高了应用程序的并发性),同一个线程可用于处理多个并发处理。因此,不能使用 ThreadLocals 因为值会在不同的并发处理之间泄露。 Vert.x duplicated context 是一个提供了相同类型的传播(但是用于异步处理)的构造函数。它也可用于同步代码。 本文档解释了重复的上下文,如何检索一个,如何使用它,以及它是如何沿着(异步)处理传播的。

The reactive model

本节并未解释反应式模型。有关更多详细信息,请参阅 the Quarkus Reactive Architecture

从本质上讲,Quarkus 使用的是一个反应式引擎。这个引擎提供了处理现代的、容器化的和云原生的应用程序所需的效率和并发能力。

例如,当您使用 Quarkus REST(以前的 RESTEasy Reactive)或 gRPC 时,Quarkus 可以再 I/O 线程上调用您的业务逻辑。这些线程被命名为 event loops 并实现了 multi-reactor pattern

当使用命令式模型时,Quarkus 会将一个工作线程关联到每个处理单元(如一个 HTTP 请求或一个 gRPC 调用)。该线程会专门用于该特定处理,直至处理完成。因此,您可以使用 Thread Locals 在处理期间传播数据,并且在当前处理完成之前,没有其他处理单元会使用该线程。

借助反应式模型,代码会在事件循环线程上运行。这些事件循环执行多个并发处理单元。例如,同一个事件循环可以处理多个并发 HTTP 请求。下图展示了此反应式执行模型:

reactive continuation

您必须 NEVER 阻塞这些事件循环。如果您这样做,整个模型都会崩溃。因此,当 HTTP 请求的处理需要执行一个 I/O 操作(例如,调用外部服务)时,它会:

  1. schedules the operation,

  2. 传递一个延续(I/O 完成时要调用的代码),

  3. releases the thread.

然后,该线程可以处理其他并发请求。当计划的操作完成后,它会执行传递的延续 on the same event loop

该模型尤其高效(低线程数)和高性能(避免上下文切换)。然而,它需要不同的开发模型,并且不能使用 Thread Locals,因为并发请求将看到相同的值。事实上,它们均由同一线程(事件循环)处理。

MicroProfile Context Propagation 规范解决了该问题。它在每次切换到另一个处理单元时保存并还原存储在局部线程中的值。然而,该模型开销较大。上下文局部变量(也称为 duplicated context)是实现此目的的另一种方式,并且需要的机制较少。

Context and duplicated context

在 Quarkus 中,执行响应式代码时,会在 Context 中运行,代表执行线程(事件循环或工作线程)。

@GET
@NonBlocking // Force the usage of the event loop
@Path("/hello1")
public String hello1() {
   Context context = Vertx.currentContext();
   return "Hello, you are running on context: %s and on thread %s".formatted(context, Thread.currentThread());  1
}

@GET
@Path("/hello2")
public String hello2() { // Called on a worker thread (because it has a blocking signature)
   Context context = Vertx.currentContext();
   return "Hello, you are running on context: %s and on thread %s".formatted(context, Thread.currentThread()); 2
}
1 Produces: Hello 1, you are running on context: io.vertx.core.impl.DuplicatedContext@5dc42d4f and on thread Thread[vert.x-eventloop-thread-1,5,main] - 即在事件循环中调用。
2 Produces: Hello 2, you are running on context: io.vertx.core.impl.DuplicatedContext@41781ccb and on thread Thread[executor-thread-1,5,main] - 即在工作线程中调用。

利用这种 Context 对象,可以在同一上下文中调度操作。上下文可用于在同一事件循环中执行继续(因为上下文附加到事件循环中),如上图所述。例如,当需要调用异步内容时,捕获当前上下文,当响应到达时,将在该上下文中调用继续:

public Uni<String> invoke() {
   Context context = Vertx.currentContext();
   return invokeRemoteService()
       .emitOn(runnable -> context.runOnContext(runnable)); 1
}
1 在同一上下文中发出结果。

大多数 Quarkus 客户端会自动执行此操作,在适当的上下文中调用继续。

上下文分为两个级别:

  • 表示事件循环的 root 上下文,因此不能用于传播数据,而不会使其在并发处理之间泄漏

  • 基于 root 上下文,但不共享且表示处理单元的重复上下文

因此,每个处理单元都关联有一个重复上下文。重复上下文仍然与 root 上下文关联,且使用重复上下文调度的操作会在关联的 root 上下文中运行它们。但是,与 root 上下文不同的是,它们不会在处理单元之间共享。然而,同一处理单元的继续使用相同的重复上下文。因此,在之前的代码段中,继续不仅在同一事件循环中调用,还可以在同一重复上下文中调用(假设捕获的上下文是一个重复上下文,稍后会详细阐述)。

duplicated context continuation

Context local data

在重复上下文中执行时,代码可以存储数据,而不与其它的并发处理共享数据。因此,可以存储、检索和删除局部数据。继续在同一重复上下文中调用,将可以访问该数据:

import io.smallrye.common.vertx.ContextLocals;

AtomicInteger counter = new AtomicInteger();

public Uni<String> invoke() {
   Context context = Vertx.currentContext();

   ContextLocals.put("message", "hello");
   ContextLocals.put("id", counter.incrementAndGet());

   return invokeRemoteService()
       .emitOn(runnable -> context.runOnContext(runnable))
       .map(res -> {
           // Can still access the context local data
           // `get(...)` returns an Optional
           String msg = ContextLocals.<String>get("message").orElseThrow();
           Integer id = ContextLocals.<Integer>get("id").orElseThrow();
           return "%s - %s - %d".formatted(res, msg, id);
       });
}

之前的代码段使用了 io.smallrye.common.vertx.ContextLocals,简化了对局部数据的访问。还可以使用 Vertx.currentContext().getLocal("key") 访问它们。

上下文局部数据提供了一种在响应式执行期间传播对象的高效方法。跟踪元数据、指标和会话可以安全地存储和检索。

Context locals restrictions

然而,这种特性只能与重复上下文一起使用。如上所述,这对代码是透明的。重复上下文是一个上下文,因此它们公开相同的 API。

在 Quarkus 中,对局部数据的访问仅限于重复上下文。如果尝试从 root 上下文中访问局部数据,它将抛出 UnsupportedOperationException。它防止访问在不同的处理单元之间共享的数据。

java.lang.UnsupportedOperationException: Access to Context.putLocal(), Context.getLocal() and Context.removeLocal() are forbidden from a 'root' context  as it can leak data between unrelated processing. Make sure the method runs on a 'duplicated' (local) Context.

Safe context

可以将上下文标记为 safe。它用于其它扩展集成以帮助识别哪些上下文被隔离,并且保证仅由一个线程访问。Hibernate Reactive 使用此特性来检查当前上下文是否安全地存储当前打开的会话,以保护用户免于错误地交错多个响应式操作,这些操作可能会无意中共享同一个会话。

Vert.x web 将为每个 http web 请求创建一个新的重复上下文;Quarkus REST 将将此类上下文标记为安全。当它们设置一个安全的新上下文时,其它的扩展应该遵循类似的模式,以保证顺序使用、非并发访问以及针对当前响应式链的范围,以便于不必显式传递一个“上下文”对象。

在其他情况下,对当前上下文标记为不安全,而不是明确地标记,可能是很有用的;例如,如果需要对现有上下文进行跨多个工作人员共享以并行处理一些操作:通过适当标记和取消标记,同一个上下文可以包含它安全所在的跨度,后跟它不安全所在的跨度。

要将上下文标记为安全,你可以:

  1. Use the io.quarkus.vertx.SafeVertxContext annotation

  2. Use the io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle class

通过使用`io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle`类,当前上下文可以明确标记为`safe`,或者可以明确标记为`unsafe`;还有第三种状态,它任何新上下文的默认状态:unmarked。默认情况下,将任何未标记的上下文视为`unsafe`,除非系统属性`io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.UNRESTRICTED_BY_DEFAULT`被设置为`true`;

SafeVertxContext`注解将当前复制的上下文标记为安全,并在上下文为 `unmarked`或已被标记为 `safe`时调用被注解的方法。如果上下文被标记为 `unsafe,你可以使用 force=true`参数强制将其标记为 `safe。但是,必须小心地使用这个可能性。

`@SafeVertxContext`注解是一个 CDI 拦截器绑定注解。因此,它只适用于 CDI bean 和非私有方法。

Extensions supporting duplicated contexts

一般来说,Quarkus 在复制上下文中调用响应式代码。所以它可以安全地访问本地数据。在以下情况下会出现这种情况:

  • Quarkus REST

  • gRPC

  • Reactive Routes

  • Vert.x Event Bus @ConsumeEvent

  • REST Client

  • Reactive Messaging (Kafka, AMQP)

  • Funqy

  • Quarkus scheduler and Quartz

  • Redis 客户端(用于 pub/sub 命令)

  • GraphQL

Distinguish between root and duplicated contexts

你可以使用以下内容区分根上下文和复制上下文:

boolean isDuplicated = VertxContext.isDuplicatedContext(context);

此代码使用`io.smallrye.common.vertx.VertxContext`帮助类。

Duplicated contexts and mapped diagnostic context (MDC)

使用记录器时,MDC(添加到日志消息的上下文数据)存储在可用的复制上下文中。查看logging reference guide以了解更多详情。

CDI request scope

在 Quarkus 中,CDI 请求范围存储在复制上下文中,这意味着它会随着请求的响应式处理而自动传播。

Reactive Messaging

Kafka 和 AMQP 连接器为每条消息创建一个复制上下文,实现了一个_message context_。此消息上下文用于完成消息处理,因此可以用来传播数据。

有关更多信息,请参阅 Message Context文档。

OpenTelemetry

OpenTelemetry 扩展将跟踪存储在复制上下文中,即使在使用响应式和异步代码时也能确保传播。