Context Propagation in Quarkus
传统的阻塞代码使用 ThreadLocal
变量存储上下文对象,以便避免将它们作为参数到处传递。许多 Quarkus 扩展需要这些上下文对象才能正常运行:例如,Quarkus REST (formerly RESTEasy Reactive)、ArC 和 Transaction。
Traditional blocking code uses ThreadLocal
variables to store contextual objects in order to avoid
passing them as parameters everywhere. Many Quarkus extensions require those contextual objects to operate
properly: Quarkus REST (formerly RESTEasy Reactive), ArC and Transaction
for example.
如果你编写反应式/异步代码,则必须将工作切分为一段代码块管道,这些管道在“稍后”执行,并且实际上在你定义它们的方法返回之后执行。因此,try/finally
块以及 ThreadLocal
变量将停止工作,因为你的反应式代码在另一个线程中执行,在调用者运行其 finally
块之后。
If you write reactive/async code, you have to cut your work into a pipeline of code blocks that get executed
"later", and in practice after the method you defined them in have returned. As such, try/finally
blocks
as well as ThreadLocal
variables stop working, because your reactive code gets executed in another thread,
after the caller ran its finally
block.
SmallRye Context Propagation 实现了 MicroProfile Context Propagation,目的是让这些 Quarkus 扩展在反应式/异步设置中正常工作。它通过捕获那些过去保存在线程局部变量中的上下文值并在调用你的代码时将其恢复来工作。
SmallRye Context Propagation an implementation of MicroProfile Context Propagation was made to make those Quarkus extensions work properly in reactive/async settings. It works by capturing those contextual values that used to be in thread-locals, and restoring them when your code is called.
Solution
我们建议您遵循接下来的部分中的说明,按部就班地创建应用程序。然而,您可以直接跳到完成的示例。
We recommend that you follow the instructions in the next sections and create the application step by step. However, you can go right to the completed example.
克隆 Git 存储库: git clone {quickstarts-clone-url}
,或下载 {quickstarts-archive-url}[存档]。
Clone the Git repository: git clone {quickstarts-clone-url}
, or download an {quickstarts-archive-url}[archive].
解决方案位于 context-propagation-quickstart
directory 中。
The solution is located in the context-propagation-quickstart
directory.
Setting it up
如果你正在使用 Mutiny(quarkus-mutiny
扩展),则只需添加 quarkus-smallrye-context-propagation
扩展即可启用上下文传播。
If you are using Mutiny (the quarkus-mutiny
extension), you just need to add
the quarkus-smallrye-context-propagation
extension to enable context propagation.
换句话说,将以下依赖项添加到你的构建文件中:
In other words, add the following dependencies to your build file:
<!-- Quarkus REST extension if not already included -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest</artifactId>
</dependency>
<!-- Context Propagation extension -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-context-propagation</artifactId>
</dependency>
// Quarkus REST extension if not already included
implementation("io.quarkus:quarkus-rest")
// Context Propagation extension
implementation("io.quarkus:quarkus-smallrye-context-propagation")
这样,如果你正在使用的话,你将获得 ArC、Quarkus REST 和事务的上下文传播。
With this, you will get context propagation for ArC, Quarkus REST and transactions, if you are using them.
Usage example with Mutiny
Mutiny
本节使用 Mutiny 反应式类型。如果你不熟悉 Mutiny,请查看 Mutiny - an intuitive reactive programming library。 This section uses Mutiny reactive types. If you are not familiar with Mutiny, check Mutiny - an intuitive reactive programming library. |
让我们编写一个 REST 端点,该端点从 Kafka topic 中读取接下来的 3 个项目,使用 Hibernate ORM with Panache 将它们存储在数据库中(全部在同一事务中),然后再将它们返回给客户端,你可以像这样操作:
Let’s write a REST endpoint that reads the next 3 items from a Kafka topic, stores them in a database using Hibernate ORM with Panache (all in the same transaction) before returning them to the client, you can do it like this:
// Get the prices stream
@Inject
@Channel("prices") Publisher<Double> prices;
@Transactional
@GET
@Path("/prices")
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Publisher<Double> prices() {
// get the next three prices from the price stream
return Multi.createFrom().publisher(prices)
.select().first(3)
// The items are received from the event loop, so cannot use Hibernate ORM (classic)
// Switch to a worker thread, the transaction will be propagated
.emitOn(Infrastructure.getDefaultExecutor())
.map(price -> {
// store each price before we send them
Price priceEntity = new Price();
priceEntity.value = price;
// here we are all in the same transaction
// thanks to context propagation
priceEntity.persist();
return price;
// the transaction is committed once the stream completes
});
}
请注意,由于 Mutiny 对上下文传播的支持,这开箱即用。这 3 个项目使用同一事务进行持久性处理,并且该事务在流完成时提交。
Notice that thanks to Mutiny support for context propagation, this works out of the box. The 3 items are persisted using the same transaction and this transaction is committed when the stream completes.
Usage example for CompletionStage
如果你使用 CompletionStage
,你需要进行手动上下文传播。你可以通过注入一个 `ThreadContext`或将传播所有上下文的 `ManagedExecutor`来实现。例如,这里我们使用 Vert.x Web Client来获取星球大战中的人物列表,然后使用 Hibernate ORM with Panache将它们存储到数据库中(所有都在同一个事务中),最后用 Jackson or JSON-B将它们作为 JSON 返回给客户端:
If you are using CompletionStage
you need manual context propagation. You can do that by injecting a ThreadContext
or ManagedExecutor
that will propagate every context. For example, here we use the Vert.x Web Client
to get the list of Star Wars people, then store them in the database using
Hibernate ORM with Panache (all in the same transaction) before returning
them to the client as JSON using Jackson or JSON-B:
@Inject ThreadContext threadContext;
@Inject ManagedExecutor managedExecutor;
@Inject Vertx vertx;
@Transactional
@GET
@Path("/people")
public CompletionStage<List<Person>> people() throws SystemException {
// Create a REST client to the Star Wars API
WebClient client = WebClient.create(vertx,
new WebClientOptions()
.setDefaultHost("swapi.dev")
.setDefaultPort(443)
.setSsl(true));
// get the list of Star Wars people, with context capture
return threadContext.withContextCapture(client.get("/api/people/").send())
.thenApplyAsync(response -> {
JsonObject json = response.bodyAsJsonObject();
List<Person> persons = new ArrayList<>(json.getInteger("count"));
// Store them in the DB
// Note that we're still in the same transaction as the outer method
for (Object element : json.getJsonArray("results")) {
Person person = new Person();
person.name = ((JsonObject) element).getString("name");
person.persist();
persons.add(person);
}
return persons;
}, managedExecutor);
}
使用 ThreadContext`或 `ManagedExecutor
,你可以包装大多数有用的函数类型和 `CompletionStage`以获取传播的上下文。
Using ThreadContext
or ManagedExecutor
you can wrap most useful functional types and CompletionStage
in order to get context propagated.
注入的 `ManagedExecutor`使用 Quarkus 线程池。 The injected |
Overriding which contexts are propagated
默认情况下,所有可用的上下文都将被传播。但是,你可以通过多种方式来覆盖此行为。
By default, all available contexts are propagated. However, you can override this behaviour in several ways.
Using configuration
以下配置属性允许你指定传播上下文的默认集:
The following configuration properties allow you to specify the default sets of propagated contexts:
Configuration property | Description | Default Value |
---|---|---|
|
The comma-separated set of propagated contexts |
|
|
The comma-separated set of cleared contexts |
|
|
The comma-separated set of unchanged contexts |
|
以下上下文在 Quarkus 中开箱即用或取决于你是否包含它们的扩展:
The following contexts are available in Quarkus either out of the box, or depending on whether you include their extensions:
Context Name | Name Constant | Description |
---|---|---|
|
Can be used to specify an empty set of contexts, but setting the value to empty works too |
|
|
All the contexts that are not explicitly listed in other sets |
|
|
The JTA transaction context |
|
|
The CDI (ArC) context |
|
|
N/A |
The servlet context |
|
N/A |
The Quarkus REST or RESTEasy Classic context |
|
The current |
Overriding the propagated contexts using annotations
为了让特定的方法覆盖自动上下文传播(例如 Mutiny 使用的),你可以使用 @CurrentThreadContext
注解:
In order for automatic context propagation, such as Mutiny uses, to be overridden in specific methods,
you can use the @CurrentThreadContext
annotation:
// Get the prices stream
@Inject
@Channel("prices") Publisher<Double> prices;
@GET
@Path("/prices")
@RestStreamElementType(MediaType.TEXT_PLAIN)
// Get rid of all context propagation, since we don't need it here
@CurrentThreadContext(propagated = {}, unchanged = ThreadContext.ALL_REMAINING)
public Publisher<Double> prices() {
// get the next three prices from the price stream
return Multi.createFrom().publisher(prices)
.select().first(3);
}
Overriding the propagated contexts using CDI injection
你还可以使用注入点上的 @ThreadContextConfig
注解来注入一个自定义构建的 ThreadContext
:
You can also inject a custom-built ThreadContext
using the @ThreadContextConfig
annotation on your injection point:
// Get the prices stream
@Inject
@Channel("prices") Publisher<Double> prices;
// Get a ThreadContext that doesn't propagate context
@Inject
@ThreadContextConfig(unchanged = ThreadContext.ALL_REMAINING)
SmallRyeThreadContext threadContext;
@GET
@Path("/prices")
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Publisher<Double> prices() {
// Get rid of all context propagation, since we don't need it here
try(CleanAutoCloseable ac = SmallRyeThreadContext.withThreadContext(threadContext)){
// get the next three prices from the price stream
return Multi.createFrom().publisher(prices)
.select().first(3);
}
}
同样,可以使用 @ManagedExecutorConfig
注解来注入 `ManagedExecutor`的已配置实例:
Likewise, there is a similar way to inject a configured instance of ManagedExecutor
using the @ManagedExecutorConfig
annotation:
// Custom ManagedExecutor with different async limit, queue and no propagation
@Inject
@ManagedExecutorConfig(maxAsync = 2, maxQueued = 3, cleared = ThreadContext.ALL_REMAINING)
ManagedExecutor configuredCustomExecutor;
Sharing configured CDI instances of ManagedExecutor and ThreadContext
如果你需要将同一个 ManagedExecutor`或 `ThreadContext`注入到多个地方并共享其容量,你可以使用
@NamedInstance`注解来命名实例。@NamedInstance`是一个 CDI 限定符,因此所有同类型和同名的注入都将共享同一个底层实例。如果你还需要自定义你的实例,可以使用它的一个注入点上的 `@ManagedExecutorConfig
/`ThreadContextConfig`注解来实现:
If you need to inject the same ManagedExecutor
or ThreadContext
into several places and share its capacity, you can name the instance with @NamedInstance
annotation.
@NamedInstance
is a CDI qualifier and all injections of the same type and name will therefore share the same underlying instance.
If you also need to customize your instance, you can do so using @ManagedExecutorConfig
/ThreadContextConfig
annotation on one of its injection points:
// Custom configured ManagedExecutor with name
@Inject
@ManagedExecutorConfig(maxAsync = 2, maxQueued = 3, cleared = ThreadContext.ALL_REMAINING)
@NamedInstance("myExecutor")
ManagedExecutor sharedConfiguredExecutor;
// Since this executor has the same name, it will be the same instance as above
@Inject
@NamedInstance("myExecutor")
ManagedExecutor sameExecutor;
// Custom ThreadContext with a name
@Inject
@ThreadContextConfig(unchanged = ThreadContext.ALL_REMAINING)
@NamedInstance("myContext")
ThreadContext sharedConfiguredThreadContext;
// Given equal value of @NamedInstance, this ThreadContext will be the same as the above one
@Inject
@NamedInstance("myContext")
ThreadContext sameContext;
Context Propagation for CDI
在 CDI 方面,@RequestScoped
、@ApplicationScoped`和 `@Singleton`bean 会被传播并且可以在其他线程中使用。
@Dependent`bean 以及任何自定义作用域的 bean 都不能通过 CDI 上下文传播自动传播。
In terms of CDI, @RequestScoped
, @ApplicationScoped
and @Singleton
beans get propagated and are available in other threads.
@Dependent
beans as well as any custom scoped beans cannot be automatically propagated via CDI Context Propagation.
@ApplicationScoped`和 `@Singleton
bean 始终是活动范围,因此易于处理——只要 CDI 容器正在运行,内容传播任务就可以使用这些 bean。然而 @RequestScoped
bean 是另一个故事。它们仅在短时间内处于活动状态,当手动激活/停用时,可以将其绑定到 HTTP 请求或某些其他请求/任务。在这种情况下,用户必须意识到,一旦原始线程到达请求结束,它将终止上下文,对这些 bean 调用 @PreDestroy
,然后从上下文中清除它们。从其他线程访问这些 bean 的后续尝试可能导致意外行为。因此,建议确保使用上下文传播通过请求范围 bean 的所有任务都以这样的方式执行,即它们不会超出原始请求持续时间。
@ApplicationScoped
and @Singleton
beans are always active scopes and as such are easy to deal with - context propagation tasks can work with those beans so long as the CDI container is running.
However, @RequestScoped
beans are a different story. They are only active for a short period of time which can be bound either to HTTP request or some other request/task when manually activated/deactivated.
In this case user must be aware that once the original thread gets to an end of a request, it will terminate the context, calling @PreDestroy
on those beans and then clearing them from the context.
Subsequent attempts to access those beans from other threads can result in unexpected behaviour.
It is therefore recommended to make sure all tasks using request scoped beans via context propagation are performed in such a manner that they don’t outlive the original request duration.
由于上述描述的行为,建议在使用 CDI 中的上下文传播时避免在 Due to the above described behavior, it is recommended to avoid using |