Context Propagation in Quarkus

传统的阻塞代码使用 ThreadLocal 变量存储上下文对象,以便避免将它们作为参数到处传递。许多 Quarkus 扩展需要这些上下文对象才能正常运行:例如,Quarkus REST (formerly RESTEasy Reactive)ArCTransaction。 如果你编写反应式/异步代码,则必须将工作切分为一段代码块管道,这些管道在“稍后”执行,并且实际上在你定义它们的方法返回之后执行。因此,try/finally 块以及 ThreadLocal 变量将停止工作,因为你的反应式代码在另一个线程中执行,在调用者运行其 finally 块之后。 SmallRye Context Propagation 实现了 MicroProfile Context Propagation,目的是让这些 Quarkus 扩展在反应式/异步设置中正常工作。它通过捕获那些过去保存在线程局部变量中的上下文值并在调用你的代码时将其恢复来工作。

Solution

我们建议您遵循接下来的部分中的说明,按部就班地创建应用程序。然而,您可以直接跳到完成的示例。

克隆 Git 存储库: git clone $${quickstarts-base-url}.git,或下载 $${quickstarts-base-url}/archive/main.zip[存档]。

解决方案位于 context-propagation-quickstart directory 中。

Setting it up

如果你正在使用 Mutinyquarkus-mutiny 扩展),则只需添加 quarkus-smallrye-context-propagation 扩展即可启用上下文传播。

换句话说,将以下依赖项添加到你的构建文件中:

pom.xml
<!-- Quarkus REST extension if not already included -->
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-rest</artifactId>
</dependency>
<!-- Context Propagation extension -->
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-smallrye-context-propagation</artifactId>
</dependency>
build.gradle
// Quarkus REST extension if not already included
implementation("io.quarkus:quarkus-rest")
// Context Propagation extension
implementation("io.quarkus:quarkus-smallrye-context-propagation")

这样,如果你正在使用的话,你将获得 ArC、Quarkus REST 和事务的上下文传播。

Usage example with Mutiny

Mutiny

本节使用 Mutiny 反应式类型。如果你不熟悉 Mutiny,请查看 Mutiny - an intuitive reactive programming library

让我们编写一个 REST 端点,该端点从 Kafka topic 中读取接下来的 3 个项目,使用 Hibernate ORM with Panache 将它们存储在数据库中(全部在同一事务中),然后再将它们返回给客户端,你可以像这样操作:

    // Get the prices stream
    @Inject
    @Channel("prices") Publisher<Double> prices;

    @Transactional
    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Publisher<Double> prices() {
        // get the next three prices from the price stream
        return Multi.createFrom().publisher(prices)
                .select().first(3)
                // The items are received from the event loop, so cannot use Hibernate ORM (classic)
                // Switch to a worker thread, the transaction will be propagated
                .emitOn(Infrastructure.getDefaultExecutor())
                .map(price -> {
                    // store each price before we send them
                    Price priceEntity = new Price();
                    priceEntity.value = price;
                    // here we are all in the same transaction
                    // thanks to context propagation
                    priceEntity.persist();
                    return price;
                    // the transaction is committed once the stream completes
                });
    }

请注意,由于 Mutiny 对上下文传播的支持,这开箱即用。这 3 个项目使用同一事务进行持久性处理,并且该事务在流完成时提交。

Usage example for CompletionStage

如果你使用 CompletionStage,你需要进行手动上下文传播。你可以通过注入一个 `ThreadContext`或将传播所有上下文的 `ManagedExecutor`来实现。例如,这里我们使用 Vert.x Web Client来获取星球大战中的人物列表,然后使用 Hibernate ORM with Panache将它们存储到数据库中(所有都在同一个事务中),最后用 Jackson or JSON-B将它们作为 JSON 返回给客户端:

    @Inject ThreadContext threadContext;
    @Inject ManagedExecutor managedExecutor;
    @Inject Vertx vertx;

    @Transactional
    @GET
    @Path("/people")
    public CompletionStage<List<Person>> people() throws SystemException {
        // Create a REST client to the Star Wars API
        WebClient client = WebClient.create(vertx,
                         new WebClientOptions()
                          .setDefaultHost("swapi.dev")
                          .setDefaultPort(443)
                          .setSsl(true));
        // get the list of Star Wars people, with context capture
        return threadContext.withContextCapture(client.get("/api/people/").send())
                .thenApplyAsync(response -> {
                    JsonObject json = response.bodyAsJsonObject();
                    List<Person> persons = new ArrayList<>(json.getInteger("count"));
                    // Store them in the DB
                    // Note that we're still in the same transaction as the outer method
                    for (Object element : json.getJsonArray("results")) {
                        Person person = new Person();
                        person.name = ((JsonObject) element).getString("name");
                        person.persist();
                        persons.add(person);
                    }
                    return persons;
                }, managedExecutor);
    }

使用 ThreadContext`或 `ManagedExecutor,你可以包装大多数有用的函数类型和 `CompletionStage`以获取传播的上下文。

注入的 `ManagedExecutor`使用 Quarkus 线程池。

Overriding which contexts are propagated

默认情况下,所有可用的上下文都将被传播。但是,你可以通过多种方式来覆盖此行为。

Using configuration

以下配置属性允许你指定传播上下文的默认集:

Configuration property Description Default Value

mp.context.ThreadContext.propagated

传播上下文的逗号分隔集

Remaining(所有非显式列出的上下文)

mp.context.ThreadContext.cleared

清除上下文的逗号分隔集

None(无上下文),除非传播集和清除集中都不包含 Remaining,在这种情况下,默认值为 Remaining(所有非显式列出的上下文)

mp.context.ThreadContext.unchanged

未更改上下文的逗号分隔集

None (no context)

以下上下文在 Quarkus 中开箱即用或取决于你是否包含它们的扩展:

Context Name Name Constant Description

None

ThreadContext.NONE

可用于指定一个空上下文集,但将值设置为空也是可行的

Remaining

ThreadContext.ALL_REMAINING

在其他集中未显式列出的所有上下文

Transaction

ThreadContext.TRANSACTION

The JTA transaction context

CDI

ThreadContext.CDI

The CDI (ArC) context

Servlet

N/A

The servlet context

Jakarta REST

N/A

Quarkus REST 或 RESTEasy Classic 上下文

Application

ThreadContext.APPLICATION

The current ThreadContextClassLoader

Overriding the propagated contexts using annotations

为了让特定的方法覆盖自动上下文传播(例如 Mutiny 使用的),你可以使用 @CurrentThreadContext注解:

    // Get the prices stream
    @Inject
    @Channel("prices") Publisher<Double> prices;

    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    // Get rid of all context propagation, since we don't need it here
    @CurrentThreadContext(propagated = {}, unchanged = ThreadContext.ALL_REMAINING)
    public Publisher<Double> prices() {
        // get the next three prices from the price stream
        return Multi.createFrom().publisher(prices)
                .select().first(3);
    }

Overriding the propagated contexts using CDI injection

你还可以使用注入点上的 @ThreadContextConfig注解来注入一个自定义构建的 ThreadContext

    // Get the prices stream
    @Inject
    @Channel("prices") Publisher<Double> prices;
    // Get a ThreadContext that doesn't propagate context
    @Inject
    @ThreadContextConfig(unchanged = ThreadContext.ALL_REMAINING)
    SmallRyeThreadContext threadContext;

    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Publisher<Double> prices() {
        // Get rid of all context propagation, since we don't need it here
        try(CleanAutoCloseable ac = SmallRyeThreadContext.withThreadContext(threadContext)){
            // get the next three prices from the price stream
            return Multi.createFrom().publisher(prices)
                    .select().first(3);
        }
    }

同样,可以使用 @ManagedExecutorConfig注解来注入 `ManagedExecutor`的已配置实例:

    // Custom ManagedExecutor with different async limit, queue and no propagation
    @Inject
    @ManagedExecutorConfig(maxAsync = 2, maxQueued = 3, cleared = ThreadContext.ALL_REMAINING)
    ManagedExecutor configuredCustomExecutor;

Sharing configured CDI instances of ManagedExecutor and ThreadContext

如果你需要将同一个 ManagedExecutor`或 `ThreadContext`注入到多个地方并共享其容量,你可以使用 @NamedInstance`注解来命名实例。@NamedInstance`是一个 CDI 限定符,因此所有同类型和同名的注入都将共享同一个底层实例。如果你还需要自定义你的实例,可以使用它的一个注入点上的 `@ManagedExecutorConfig/`ThreadContextConfig`注解来实现:

    // Custom configured ManagedExecutor with name
    @Inject
    @ManagedExecutorConfig(maxAsync = 2, maxQueued = 3, cleared = ThreadContext.ALL_REMAINING)
    @NamedInstance("myExecutor")
    ManagedExecutor sharedConfiguredExecutor;

    // Since this executor has the same name, it will be the same instance as above
    @Inject
    @NamedInstance("myExecutor")
    ManagedExecutor sameExecutor;

    // Custom ThreadContext with a name
    @Inject
    @ThreadContextConfig(unchanged = ThreadContext.ALL_REMAINING)
    @NamedInstance("myContext")
    ThreadContext sharedConfiguredThreadContext;

    // Given equal value of @NamedInstance, this ThreadContext will be the same as the above one
    @Inject
    @NamedInstance("myContext")
    ThreadContext sameContext;

Context Propagation for CDI

在 CDI 方面,@RequestScoped@ApplicationScoped`和 `@Singleton`bean 会被传播并且可以在其他线程中使用。@Dependent`bean 以及任何自定义作用域的 bean 都不能通过 CDI 上下文传播自动传播。

@ApplicationScoped`和 `@Singleton bean 始终是活动范围,因此易于处理——只要 CDI 容器正在运行,内容传播任务就可以使用这些 bean。然而 @RequestScoped bean 是另一个故事。它们仅在短时间内处于活动状态,当手动激活/停用时,可以将其绑定到 HTTP 请求或某些其他请求/任务。在这种情况下,用户必须意识到,一旦原始线程到达请求结束,它将终止上下文,对这些 bean 调用 @PreDestroy,然后从上下文中清除它们。从其他线程访问这些 bean 的后续尝试可能导致意外行为。因此,建议确保使用上下文传播通过请求范围 bean 的所有任务都以这样的方式执行,即它们不会超出原始请求持续时间。

由于上述描述的行为,建议在使用 CDI 中的上下文传播时避免在 @RequestScoped bean 上使用 @PreDestroy