Context Propagation in Quarkus

传统的阻塞代码使用 ThreadLocal 变量存储上下文对象,以便避免将它们作为参数到处传递。许多 Quarkus 扩展需要这些上下文对象才能正常运行:例如,Quarkus REST (formerly RESTEasy Reactive)ArCTransaction

Traditional blocking code uses ThreadLocal variables to store contextual objects in order to avoid passing them as parameters everywhere. Many Quarkus extensions require those contextual objects to operate properly: Quarkus REST (formerly RESTEasy Reactive), ArC and Transaction for example.

如果你编写反应式/异步代码,则必须将工作切分为一段代码块管道,这些管道在“稍后”执行,并且实际上在你定义它们的方法返回之后执行。因此,try/finally 块以及 ThreadLocal 变量将停止工作,因为你的反应式代码在另一个线程中执行,在调用者运行其 finally 块之后。

If you write reactive/async code, you have to cut your work into a pipeline of code blocks that get executed "later", and in practice after the method you defined them in have returned. As such, try/finally blocks as well as ThreadLocal variables stop working, because your reactive code gets executed in another thread, after the caller ran its finally block.

SmallRye Context Propagation 实现了 MicroProfile Context Propagation,目的是让这些 Quarkus 扩展在反应式/异步设置中正常工作。它通过捕获那些过去保存在线程局部变量中的上下文值并在调用你的代码时将其恢复来工作。

SmallRye Context Propagation an implementation of MicroProfile Context Propagation was made to make those Quarkus extensions work properly in reactive/async settings. It works by capturing those contextual values that used to be in thread-locals, and restoring them when your code is called.

Solution

我们建议您遵循接下来的部分中的说明,按部就班地创建应用程序。然而,您可以直接跳到完成的示例。

We recommend that you follow the instructions in the next sections and create the application step by step. However, you can go right to the completed example.

克隆 Git 存储库: git clone {quickstarts-clone-url},或下载 {quickstarts-archive-url}[存档]。

Clone the Git repository: git clone {quickstarts-clone-url}, or download an {quickstarts-archive-url}[archive].

解决方案位于 context-propagation-quickstart directory 中。

The solution is located in the context-propagation-quickstart directory.

Setting it up

如果你正在使用 Mutinyquarkus-mutiny 扩展),则只需添加 quarkus-smallrye-context-propagation 扩展即可启用上下文传播。

If you are using Mutiny (the quarkus-mutiny extension), you just need to add the quarkus-smallrye-context-propagation extension to enable context propagation.

换句话说,将以下依赖项添加到你的构建文件中:

In other words, add the following dependencies to your build file:

pom.xml
<!-- Quarkus REST extension if not already included -->
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-rest</artifactId>
</dependency>
<!-- Context Propagation extension -->
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-smallrye-context-propagation</artifactId>
</dependency>
build.gradle
// Quarkus REST extension if not already included
implementation("io.quarkus:quarkus-rest")
// Context Propagation extension
implementation("io.quarkus:quarkus-smallrye-context-propagation")

这样,如果你正在使用的话,你将获得 ArC、Quarkus REST 和事务的上下文传播。

With this, you will get context propagation for ArC, Quarkus REST and transactions, if you are using them.

Usage example with Mutiny

Mutiny

本节使用 Mutiny 反应式类型。如果你不熟悉 Mutiny,请查看 Mutiny - an intuitive reactive programming library

This section uses Mutiny reactive types. If you are not familiar with Mutiny, check Mutiny - an intuitive reactive programming library.

让我们编写一个 REST 端点,该端点从 Kafka topic 中读取接下来的 3 个项目,使用 Hibernate ORM with Panache 将它们存储在数据库中(全部在同一事务中),然后再将它们返回给客户端,你可以像这样操作:

Let’s write a REST endpoint that reads the next 3 items from a Kafka topic, stores them in a database using Hibernate ORM with Panache (all in the same transaction) before returning them to the client, you can do it like this:

    // Get the prices stream
    @Inject
    @Channel("prices") Publisher<Double> prices;

    @Transactional
    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Publisher<Double> prices() {
        // get the next three prices from the price stream
        return Multi.createFrom().publisher(prices)
                .select().first(3)
                // The items are received from the event loop, so cannot use Hibernate ORM (classic)
                // Switch to a worker thread, the transaction will be propagated
                .emitOn(Infrastructure.getDefaultExecutor())
                .map(price -> {
                    // store each price before we send them
                    Price priceEntity = new Price();
                    priceEntity.value = price;
                    // here we are all in the same transaction
                    // thanks to context propagation
                    priceEntity.persist();
                    return price;
                    // the transaction is committed once the stream completes
                });
    }

请注意,由于 Mutiny 对上下文传播的支持,这开箱即用。这 3 个项目使用同一事务进行持久性处理,并且该事务在流完成时提交。

Notice that thanks to Mutiny support for context propagation, this works out of the box. The 3 items are persisted using the same transaction and this transaction is committed when the stream completes.

Usage example for CompletionStage

如果你使用 CompletionStage,你需要进行手动上下文传播。你可以通过注入一个 `ThreadContext`或将传播所有上下文的 `ManagedExecutor`来实现。例如,这里我们使用 Vert.x Web Client来获取星球大战中的人物列表,然后使用 Hibernate ORM with Panache将它们存储到数据库中(所有都在同一个事务中),最后用 Jackson or JSON-B将它们作为 JSON 返回给客户端:

If you are using CompletionStage you need manual context propagation. You can do that by injecting a ThreadContext or ManagedExecutor that will propagate every context. For example, here we use the Vert.x Web Client to get the list of Star Wars people, then store them in the database using Hibernate ORM with Panache (all in the same transaction) before returning them to the client as JSON using Jackson or JSON-B:

    @Inject ThreadContext threadContext;
    @Inject ManagedExecutor managedExecutor;
    @Inject Vertx vertx;

    @Transactional
    @GET
    @Path("/people")
    public CompletionStage<List<Person>> people() throws SystemException {
        // Create a REST client to the Star Wars API
        WebClient client = WebClient.create(vertx,
                         new WebClientOptions()
                          .setDefaultHost("swapi.dev")
                          .setDefaultPort(443)
                          .setSsl(true));
        // get the list of Star Wars people, with context capture
        return threadContext.withContextCapture(client.get("/api/people/").send())
                .thenApplyAsync(response -> {
                    JsonObject json = response.bodyAsJsonObject();
                    List<Person> persons = new ArrayList<>(json.getInteger("count"));
                    // Store them in the DB
                    // Note that we're still in the same transaction as the outer method
                    for (Object element : json.getJsonArray("results")) {
                        Person person = new Person();
                        person.name = ((JsonObject) element).getString("name");
                        person.persist();
                        persons.add(person);
                    }
                    return persons;
                }, managedExecutor);
    }

使用 ThreadContext`或 `ManagedExecutor,你可以包装大多数有用的函数类型和 `CompletionStage`以获取传播的上下文。

Using ThreadContext or ManagedExecutor you can wrap most useful functional types and CompletionStage in order to get context propagated.

注入的 `ManagedExecutor`使用 Quarkus 线程池。

The injected ManagedExecutor uses the Quarkus thread pool.

Overriding which contexts are propagated

默认情况下,所有可用的上下文都将被传播。但是,你可以通过多种方式来覆盖此行为。

By default, all available contexts are propagated. However, you can override this behaviour in several ways.

Using configuration

以下配置属性允许你指定传播上下文的默认集:

The following configuration properties allow you to specify the default sets of propagated contexts:

Configuration property Description Default Value

mp.context.ThreadContext.propagated

The comma-separated set of propagated contexts

Remaining (all non-explicitly list contexts)

mp.context.ThreadContext.cleared

The comma-separated set of cleared contexts

None (no context), unless neither the propagated nor cleared sets contain Remaining, in which case the default is Remaining (all non-explicitly listed contexts)

mp.context.ThreadContext.unchanged

The comma-separated set of unchanged contexts

None (no context)

以下上下文在 Quarkus 中开箱即用或取决于你是否包含它们的扩展:

The following contexts are available in Quarkus either out of the box, or depending on whether you include their extensions:

Context Name Name Constant Description

None

ThreadContext.NONE

Can be used to specify an empty set of contexts, but setting the value to empty works too

Remaining

ThreadContext.ALL_REMAINING

All the contexts that are not explicitly listed in other sets

Transaction

ThreadContext.TRANSACTION

The JTA transaction context

CDI

ThreadContext.CDI

The CDI (ArC) context

Servlet

N/A

The servlet context

Jakarta REST

N/A

The Quarkus REST or RESTEasy Classic context

Application

ThreadContext.APPLICATION

The current ThreadContextClassLoader

Overriding the propagated contexts using annotations

为了让特定的方法覆盖自动上下文传播(例如 Mutiny 使用的),你可以使用 @CurrentThreadContext注解:

In order for automatic context propagation, such as Mutiny uses, to be overridden in specific methods, you can use the @CurrentThreadContext annotation:

    // Get the prices stream
    @Inject
    @Channel("prices") Publisher<Double> prices;

    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    // Get rid of all context propagation, since we don't need it here
    @CurrentThreadContext(propagated = {}, unchanged = ThreadContext.ALL_REMAINING)
    public Publisher<Double> prices() {
        // get the next three prices from the price stream
        return Multi.createFrom().publisher(prices)
                .select().first(3);
    }

Overriding the propagated contexts using CDI injection

你还可以使用注入点上的 @ThreadContextConfig注解来注入一个自定义构建的 ThreadContext

You can also inject a custom-built ThreadContext using the @ThreadContextConfig annotation on your injection point:

    // Get the prices stream
    @Inject
    @Channel("prices") Publisher<Double> prices;
    // Get a ThreadContext that doesn't propagate context
    @Inject
    @ThreadContextConfig(unchanged = ThreadContext.ALL_REMAINING)
    SmallRyeThreadContext threadContext;

    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Publisher<Double> prices() {
        // Get rid of all context propagation, since we don't need it here
        try(CleanAutoCloseable ac = SmallRyeThreadContext.withThreadContext(threadContext)){
            // get the next three prices from the price stream
            return Multi.createFrom().publisher(prices)
                    .select().first(3);
        }
    }

同样,可以使用 @ManagedExecutorConfig注解来注入 `ManagedExecutor`的已配置实例:

Likewise, there is a similar way to inject a configured instance of ManagedExecutor using the @ManagedExecutorConfig annotation:

    // Custom ManagedExecutor with different async limit, queue and no propagation
    @Inject
    @ManagedExecutorConfig(maxAsync = 2, maxQueued = 3, cleared = ThreadContext.ALL_REMAINING)
    ManagedExecutor configuredCustomExecutor;

Sharing configured CDI instances of ManagedExecutor and ThreadContext

如果你需要将同一个 ManagedExecutor`或 `ThreadContext`注入到多个地方并共享其容量,你可以使用 @NamedInstance`注解来命名实例。@NamedInstance`是一个 CDI 限定符,因此所有同类型和同名的注入都将共享同一个底层实例。如果你还需要自定义你的实例,可以使用它的一个注入点上的 `@ManagedExecutorConfig/`ThreadContextConfig`注解来实现:

If you need to inject the same ManagedExecutor or ThreadContext into several places and share its capacity, you can name the instance with @NamedInstance annotation. @NamedInstance is a CDI qualifier and all injections of the same type and name will therefore share the same underlying instance. If you also need to customize your instance, you can do so using @ManagedExecutorConfig/ThreadContextConfig annotation on one of its injection points:

    // Custom configured ManagedExecutor with name
    @Inject
    @ManagedExecutorConfig(maxAsync = 2, maxQueued = 3, cleared = ThreadContext.ALL_REMAINING)
    @NamedInstance("myExecutor")
    ManagedExecutor sharedConfiguredExecutor;

    // Since this executor has the same name, it will be the same instance as above
    @Inject
    @NamedInstance("myExecutor")
    ManagedExecutor sameExecutor;

    // Custom ThreadContext with a name
    @Inject
    @ThreadContextConfig(unchanged = ThreadContext.ALL_REMAINING)
    @NamedInstance("myContext")
    ThreadContext sharedConfiguredThreadContext;

    // Given equal value of @NamedInstance, this ThreadContext will be the same as the above one
    @Inject
    @NamedInstance("myContext")
    ThreadContext sameContext;

Context Propagation for CDI

在 CDI 方面,@RequestScoped@ApplicationScoped`和 `@Singleton`bean 会被传播并且可以在其他线程中使用。@Dependent`bean 以及任何自定义作用域的 bean 都不能通过 CDI 上下文传播自动传播。

In terms of CDI, @RequestScoped, @ApplicationScoped and @Singleton beans get propagated and are available in other threads. @Dependent beans as well as any custom scoped beans cannot be automatically propagated via CDI Context Propagation.

@ApplicationScoped`和 `@Singleton bean 始终是活动范围,因此易于处理——只要 CDI 容器正在运行,内容传播任务就可以使用这些 bean。然而 @RequestScoped bean 是另一个故事。它们仅在短时间内处于活动状态,当手动激活/停用时,可以将其绑定到 HTTP 请求或某些其他请求/任务。在这种情况下,用户必须意识到,一旦原始线程到达请求结束,它将终止上下文,对这些 bean 调用 @PreDestroy,然后从上下文中清除它们。从其他线程访问这些 bean 的后续尝试可能导致意外行为。因此,建议确保使用上下文传播通过请求范围 bean 的所有任务都以这样的方式执行,即它们不会超出原始请求持续时间。

@ApplicationScoped and @Singleton beans are always active scopes and as such are easy to deal with - context propagation tasks can work with those beans so long as the CDI container is running. However, @RequestScoped beans are a different story. They are only active for a short period of time which can be bound either to HTTP request or some other request/task when manually activated/deactivated. In this case user must be aware that once the original thread gets to an end of a request, it will terminate the context, calling @PreDestroy on those beans and then clearing them from the context. Subsequent attempts to access those beans from other threads can result in unexpected behaviour. It is therefore recommended to make sure all tasks using request scoped beans via context propagation are performed in such a manner that they don’t outlive the original request duration.

由于上述描述的行为,建议在使用 CDI 中的上下文传播时避免在 @RequestScoped bean 上使用 @PreDestroy

Due to the above described behavior, it is recommended to avoid using @PreDestroy on @RequestScoped beans when working with Context Propagation in CDI.