Quarkus Messaging Extensions
事件驱动的消息系统已成为大多数现代应用程序的基础,让您可以构建消息驱动的微服务或复杂的数据流管道。
Event-driven messaging systems have become the backbone of most modern applications, enabling the building of message-driven microservices or complex data streaming pipelines.
Quarkus 提供了一套全面的消息扩展套件,旨在毫不费力地与领先的消息技术同步。这使开发人员能够集中精力构建核心应用程序逻辑,不必深入研究各个 API 和消息基础架构的复杂性。
Quarkus offers a comprehensive suite of messaging extensions designed to synchronize with leading messaging technologies effortlessly. This empowers developers to concentrate on crafting the core application logic, liberating them from the necessity to delve into the complexities of individual APIs and messaging infrastructures. image::messaging-quarkus.png[]
本页重点介绍所有消息扩展的通用功能和开发模型。
This page focuses on common features and the development model for all messaging extensions.
其中一些扩展在 Quarkus core 存储库中维护:
Some of these extensions are maintained in the core Quarkus repository:
-
Messaging: The core extension defines the basic concepts and APIs to develop messaging applications
-
Messaging - MQTT Connector
一些扩展由社区提供并维护:
Some extensions are contributed and maintained by the community:
其他连接器(如 @"25" 或 @"26")无法从相同级别的集成中受益,并且需要更多的手动配置才能设置。
Other connectors, such as the JMS Connector or the Google PubSub Connector, do not benefit from the same level of integration and require more manual configuration to set up.
另一方面,一些与消息相关的扩展提出了供应商特定的低级别集成。此页面涵盖的支持级别不涉及这些低级别扩展。以下是一份此类扩展的非详尽列表:
On the other hand, some messaging-related extensions propose low-level provider-specific integrations. The level of support covered on this page DOES NOT involve these low-level extensions. A non-exhaustive list of this kind of extension are the following:
Quarkus Messaging Development Model
Quarkus 通过建立一个用于发布、消费和处理消息的统一模型来简化消息驱动的应用程序开发,而不管底层代理技术使用的是消息队列还是事件流。Quarkus 消息扩展基于 MicroProfile 响应式消息传递规范构建,可确保与这些技术的无缝集成。重要的是,精通响应式编程并非利用这些功能的先决条件。
Quarkus simplifies message-driven application development by establishing a uniform model for publishing, consuming, and processing messages, regardless of whether the underlying broker technology uses message queuing or event streaming. Built upon the MicroProfile Reactive Messaging specification, Quarkus Messaging extensions ensure seamless integration with these technologies. Importantly, proficiency in reactive programming is NOT a prerequisite for leveraging these capabilities.
响应式消息传递规范为实现事件驱动和消息驱动的应用程序定义了一个基于 CDI 的编程模型。使用一小部分注释,CDI Bean 成为实现与消息代理交互的模块。这些交互通过 channels 发生,应用程序组件在此读取和写入消息。
The Reactive Messaging specification defines a CDI-based programming model for implementing event-driven and message-driven applications. Using a small set of annotations, CDI beans become building blocks for implementing interactions with message brokers. These interactions happen through channels where application components read and write messages.
Channels 由一个唯一名称识别,并使用一组注释进行声明。
Channels are identified by a unique name and declared using a set of annotations.
@Incoming
and @Outgoing
annotations
@Incoming
和 @Outgoing
方法注释定义了 channels,允许从消息代理消费消息并向其生成消息:
@Incoming
and @Outgoing
method annotations define channels allowing to consume messages from and produce messages to the message broker:
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@ApplicationScoped
public class MessageProcessingBean {
@Incoming("source")
@Outgoing("sink")
public String process(String consumedPayload) {
// Process the incoming message payload and return an updated payload
return consumedPayload.toUpperCase();
}
}
可以使用 @Outgoing
单独用于方法以生成消息:
@Outgoing
can be used by itself on a method to generate messages:
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@ApplicationScoped
public class MessageGeneratorBean {
@Outgoing("sink")
public Multi<String> generate() {
return Multi.createFrom().items("a", "b", "c");
}
}
可以使用 @Incoming
单独用于消费消息:
@Incoming
can be used by itself to consume messages:
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@ApplicationScoped
public class MessageProcessingBean {
@Incoming("source")
public void process(String consumedPayload) {
// process the payload
consumedPayload.toUpperCase();
}
}
请注意,不应直接从代码中调用注释为 @Incoming
和/或 @Outgoing
的方法。它们由框架调用。让用户代码调用它们不会产生预期的结果。
Note that you should not call methods annotated with @Incoming
and/or @Outgoing
directly from your code.
They are invoked by the framework.
Having user code invoking them would not have the expected outcome.
可以在 SmallRye Reactive Messaging – Supported signatures 中阅读有关受支持方法签名的更多信息。
You can read more on supported method signatures in the SmallRye Reactive Messaging – Supported signatures.
Emitters and @Channel
annotation
应用程序通常需要将消息传递与应用程序的其他部分结合起来,例如从 HTTP 端点生成消息,或将消耗的消息流式传输为响应。
An application often needs to combine messaging with other parts of the application, ex. produce messages from HTTP endpoints, or stream consumed messages as a response.
要将消息从命令式代码发送到特定通道,需要注入由 @Channel
注释识别的 Emitter
对象:
To send messages from imperative code to a specific channel, you need to inject an Emitter
object identified by the @Channel
annotation:
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
@ApplicationScoped
@Path("/")
public class MyImperativeBean {
@Channel("prices")
Emitter<Double> emitter;
@GET
@Path("/send")
public CompletionStage<Void> send(double d) {
return emitter.send(d);
}
}
使用 @Channel
注释,可以指示将有效负载或消息发送到的通道。Emitter
允许缓冲发送到通道的消息。
The @Channel
annotation lets you indicate to which channel you will send your payloads or messages.
The Emitter
allows buffering messages sent to the channel.
为了获得更多控制,使用 Mutiny API,可以使用 MutinyEmitter
发射器接口:
For more control, using Mutiny APIs, you can use the MutinyEmitter
emitter interface:
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.MutinyEmitter;
@ApplicationScoped
@Path("/")
public class MyImperativeBean {
@Channel("prices")
MutinyEmitter<Double> emitter;
@GET
@Path("/send")
public void send(double d) {
emitter.sendAndAwait(d);
}
}
@Channel
注释还可用于注入来自传入通道的消息流:
The @Channel
annotation can also be used to inject the stream of messages from an incoming channel:
import org.eclipse.microprofile.reactive.messaging.Channel;
@ApplicationScoped
@Path("/")
public class SseResource {
@Channel("prices")
Multi<Double> prices;
@GET
@Path("/prices")
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<Double> stream() {
return prices;
}
}
在使用 @Channel
消费消息时,应用程序代码负责订阅流。在上面的示例中,Quarkus REST(以前称为 RESTEasy Reactive)端点会为您处理它。
When consuming messages with @Channel
, the application code is responsible for subscribing to the stream.
In the example above, the Quarkus REST (formerly RESTEasy Reactive) endpoint handles that for you.
可以在 SmallRye Reactive Messaging – Emitter and Channels 文档中阅读有关发射器和通道的更多信息。
You can read more on the emitters and channels in the SmallRye Reactive Messaging – Emitter and Channels documentation.
Messages and Metadata
Message
是有效负载周围的一个信封。在上面的示例中,只使用了有效负载,但是每个有效负载都在 Quarkus Messaging 中内部环绕一个 Message
。
A Message
is an envelope around a payload.
In the examples above only payloads were used, but every payload is wrapped around a Message
internally in Quarkus Messaging.
Message<T>
接口将类型为 <T>
的有效负载与 Metadata
关联起来,Metadata
是用于确认 (ack) 和否定确认 (nack) 的一组任意对象和异步操作。
The Message<T>
interface associates a payload of type <T>
with Metadata
,
a set of arbitrary objects and asynchronous actions for acknowledgement (ack) and negative acknowledgement (nack).
import org.eclipse.microprofile.reactive.messaging.Message;
@Incoming("source")
@Outgoing("sink")
public Message<String> process(Message<String> consumed) {
// Access the metadata
MyMetadata my = consumed.getMetadata(MyMetadata.class).get();
// Process the incoming message and return an updated message
return consumed.withPayload(consumed.getPayload().toUpperCase());
}
当消息的处理或接收成功时,会确认它返回代理。消息之间的确认被链接,这意味着在处理消息时,发出消息的确认将触发传入消息的确认。在大多数情况下,ack 和 nack 是为您管理的,连接器允许您为每个通道配置不同的策略。因此,通常无需直接与 Message
接口进行交互。只有高级用例才需要直接处理消息。
A message is acknowledged back to the broker when its processing or reception has been successful.
Acknowledgements between messages are chained, meaning that when processing a message,
the acknowledgement of an outgoing message triggers the acknowledgement of incoming message(s).
In most cases, acks and nacks are managed for you and connectors allow you to configure different strategies per channel.
So, you usually don’t need to interact with the Message
interface directly.
Only advanced use cases require dealing with the Message directly.
另一方面,访问 Metadata
在许多情况下可能是实用的。连接器将特定元数据对象添加到消息,以访问消息头、属性和其它连接器特定信息。您无需与 Message
接口交互即可访问连接器特定元数据。在有效负荷参数后,您可以简单地将元数据对象插入为方法参数:
Accessing the Metadata
, on the other hand, can be practical in many cases.
Connectors add specific metadata objects to the message to give access to the message headers, properties, and other connector-specific information.
You do not need to interact with the Message
interface to access connector-specific metadata.
You can simply inject the metadata object as a method parameter after the payload parameter:
import org.eclipse.microprofile.reactive.messaging.Metadata;
@Incoming("source")
@Outgoing("sink")
public String process(String payload, MyMetadata my) {
// Access the metadata
Map<String, Object> props = my.getProperties();
// Process the payload and return an updated payload
return payload.toUpperCase();
}
取决于连接器,可以在处理方法中使用的有效负荷类型不同。您可以实现自定义 MessageConverter
将有效负荷转换为应用程序接受的类型。
Depending on the connector, payload types available to consume in processing methods differ.
You can implement a custom MessageConverter
to transform the payload to a type that is accepted by your application.
Channel configuration
可以使用 mp.messaging.incoming.<channel-name>
和 mp.messaging.outgoing.<channel-name>
配置属性配置信道属性。
Channel attributes can be configured using the mp.messaging.incoming.<channel-name>
and mp.messaging.outgoing.<channel-name>
configuration properties.
例如,要配置 Kafka 连接器使用自定义反序列化器从 my-topic
主题使用消息:
For example, to configure the Kafka connector to consume messages from the my-topic
topic with a custom deserializer:
mp.messaging.incoming.source.connector=smallrye-kafka
mp.messaging.incoming.source.topic=my-topic
mp.messaging.incoming.source.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.source.auto.offset.reset=earliest
connector
属性是所有信道必需的,并指定要使用的连接器。如果您类路径上有一个单独的连接器,则可以忽略此配置,因为 Quarkus 将自动选择连接器。
The connector
attribute is required for all channels and specifies the connector to use.
You can omit this configuration if you have a single connector on your classpath, as Quarkus will automatically select the connector.
可以使用连接器名称配置全局信道属性:
Global channel attributes can be configured using the connector name:
mp.messaging.connector.smallrye-kafka.bootstrap.servers=localhost:9092
连接器特定属性在连接器文档中列出。
Connector-specific attributes are listed in connector documentation.
Channel wiring and Messaging patterns
在启动时,Quarkus 分析已声明的信道以将它们连线起来,并验证所有的信道均已连接。具体来说,每个信道都会创建一个消息的 reactive stream,该消息连接到另一信道的消息的 reactive stream。在遵守反应式流协议下,会在信道之间执行反压机制,从而能够控制应用程序资源使用,并且不会过度提交和使系统的一部分过载。
At startup time, Quarkus analyzes declared channels to wire them together and verify that all channels are connected. Concretely, each channel creates a reactive stream of messages connected to another channel’s reactive stream of messages. Adhering to the reactive stream protocol, the back-pressure mechanism is enforced between channels, allowing to control application resource usage and not over-commit and overloading part of the system.
另一方面,在运行时通过编程方式创建新信道是不可行的。但是,有很多模式可用于实现大部分(如果不是全部的话)消息传递和集成用例:
On the flip side it is NOT possible to create new channels programmatically at runtime. There are, however, many patterns that let you implement most, if not all, messaging and integration use cases:
一些消息传递技术允许消费者订阅一组主题或队列,并允许生产者基于消息将消息发送到特定主题。如果您确定需要在运行时动态地配置和创建客户端,则应考虑直接使用底层客户端。
Some messaging technologies allow consumers to subscribe to a set of topics or queues, and producers to send messages to a specific topic on message basis. If you are sure you need to configure and create clients dynamically at runtime, you should consider using the low-level clients directly.
Internal Channels
在某些用例中,使用消息传递模式在同一应用程序内传输消息非常方便。当您不将信道连接到消息传递后端(即连接器)时,所有操作都在应用程序内部发生,并且流是通过将方法链接在一起的方式创建的。每个链条仍然是反应流,并且强制执行反压协议。
In some use cases, it is convenient to use messaging patterns to transfer messages inside the same application. When you don’t connect a channel to a messaging backend, i.e. a connector, everything happens internally to the application, and the streams are created by chaining methods together. Each chain is still a reactive stream and enforces the back-pressure protocol.
该框架验证生产者/消费者链是否完整,这意味着如果应用程序向内部信道写入消息(使用只有 @Outgoing
的方法或 Emitter
),则它还必须从应用程序内部使用消息(使用只有 @Incoming
的方法或使用非托管流)。
The framework verifies that the producer/consumer chain is complete,
meaning that if the application writes messages into an internal channel (using a method with only @Outgoing
, or an Emitter
),
it must also consume the messages from within the application (using a method with only @Incoming
or using an unmanaged stream).
Enable/Disable channels
默认情况下,所有已定义信道均已启用,但是可以使用配置禁用信道:
All defined channels are enabled by default, but it is possible to disable a channel with the configuration:
mp.messaging.incoming.my-channel.enabled=false
这不可以在 Quarkus 构建配置文件中与之一起使用,以基于某些构建时间条件(如目标环境)启用/禁用信道。禁用信道时您需要确保两件事:
This can be used alongside Quarkus build profiles to enable/disable channels based on some build-time condition, such as the the target environment. You need to make sure of two things when disabling a channel:
-
the disabled channel usage is located in a bean that can be filtered out at build time,
-
that without the channel, the remaining channels still work correctly.
@ApplicationScoped
@IfBuildProfile("my-profile")
public class MyProfileBean {
@Outgoing("my-channel")
public Multi<String> generate() {
return Multi.createFrom().items("a", "b", "c");
}
}
Multiple Outgoings and @Broadcast
默认情况下,在信道中传输的消息仅发送到单个消费者。拥有多个消费者被视为一个错误,并且会在部署时报告。
By default, messages transmitted in a channel are only dispatched to a single consumer. Having multiple consumers is considered an error and is reported at deployment time.
@Broadcast
注释会更改此行为,并指示在信道中传输的消息发送到所有消费者。必须将 @Broadcast
与 @Outgoing
注释一起使用:
The @Broadcast
annotation changes this behavior and indicates that messages transiting in the channel are dispatched to all the consumers.
@Broadcast
must be used with the @Outgoing
annotation:
import org.eclipse.microprofile.reactive.messaging.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@Incoming("in")
@Outgoing("out")
@Broadcast
public int increment(int i) {
return i + 1;
}
@Incoming("out")
public void consume1(int i) {
//...
}
@Incoming("out")
public void consume2(int i) {
//...
}
与 @Broadcast
类似,您可以在同一个方法上多次使用 @Outgoing
注释,以表明该方法向多个信道生成消息:
Similarly to @Broadcast
, you can use @Outgoing
annotation multiple times on the same method to indicate that the method produces messages to multiple channels:
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@Incoming("in")
@Outgoing("out1")
@Outgoing("out2")
public String process(String s) {
// send messages from channel in to both channels out1 and out2
return s.toUpperCase();
}
使用多输出有助于实现扇出模式,其中单条信息可由多条目标通道处理。
Using Multiple Outgoings can be useful for implementing fan-out patterns, in which a single message is processed by multiple target channels.
您可以通过从处理方法返回 Targeted
来选择性地将消息分发至多输出:
You can selectively dispatch messages to multiple outgoings by returning Targeted
from the processing method:
@Incoming("in")
@Outgoing("out1")
@Outgoing("out2")
@Outgoing("out3")
public Targeted process(double price) {
// send messages from channel-in to both channel-out1 and channel-out2
Targeted targeted = Targeted.of("out1", "Price: " + price, "out2", "Quote: " + price);
if (price > 90.0) {
return targeted.with("out3", price);
}
return targeted;
}
Multiple Incomings and @Merge
默认情况下,单生产者可在通道中传送消息。拥有多生产者被认为是错误的,并会在部署时报告。@Merge
注解会更改此行为并指示一个通道可以有多个生产者。@Merge
必须与 @Incoming
注解一起使用:
By default, a single producer can transmit messages in a channel.
Having multiple producers is considered erroneous and is reported at deployment time.
The @Merge
annotation changes this behavior and indicates that a channel can have multiple producers.
@Merge
must be used with the @Incoming
annotation:
@Incoming("in1")
@Outgoing("out")
public int increment(int i) {
return i + 1;
}
@Incoming("in2")
@Outgoing("out")
public int multiply(int i) {
return i * 2;
}
@Incoming("out")
@Merge
public void getAll(int i) {
//...
}
与 @Merge
类似,您可以在同一方法上多次使用 @Incoming
注解来指示方法从多通道使用消息:
Similarly to @Merge
, you can use @Incoming
annotation multiple times on the same method to indicate that the method consumes messages from multiple channels:
@Incoming("in1")
@Incoming("in2")
public String process(String s) {
// get messages from channel-1 and channel-2
return s.toUpperCase();
}
Stream Processing
在一些高级场景中,您可以直接操作消息流,而不是每个单独消息。
In some advanced scenarios, you can manipulate directly the stream of messages instead of each individual message.
在传入和传出签名中使用 Mutiny APIs 来处理消息流:
Using Mutiny APIs in incoming and outgoing signatures allow you to process the stream of messages:
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@ApplicationScoped
public class StreamProcessor {
@Incoming("source")
@Outgoing("sink")
public Multi<String> process(Multi<String> in) {
return in.map(String::toUpperCase);
}
}
Execution Model
Quarkus Messaging 基于 Quarkus 的 reactive engine 并利用 Eclipse Vert.x 分发要处理的消息。它支持三种执行模式:
Quarkus Messaging sits on top of the reactive engine of Quarkus and leverages Eclipse Vert.x to dispatch messages for processing. It supports three execution modes:
-
Event-loop, where messages are dispatched on the Vert.x I/O thread. Remember that you should not perform blocking operations on the event loop.
-
Worker-threads, where messages are dispatched on a worker thread pool.
-
Virtual-threads, where messages are dispatched on a virtual thread (requires Java 21+). As virtual threads are not pooled, a new virtual thread is created for each message. Please refer to the dedicated Quarkus Virtual Thread support guide for more information.
根据方法签名,Quarkus 选择默认执行模式。如果方法签名为 synchronous,消息会在 worker threads 上分发;否则,它默认为 event-loop:
Quarkus chooses the default execution mode based on the method signature. If the method signature is synchronous, messages are dispatched on worker threads otherwise it defaults to event-loop:
Method signature | Default execution mode |
---|---|
@Incoming("source") void process(String payload) |
Worker-threads |
@Incoming("source") Uni<Void> process(String payload) |
Event-loop |
@Incoming("source") CompletionStage<Void> process(Message<String> message) |
Event-loop |
@Incoming("source") @Outgoing("sink") Multi<R> process(Multi<T> in) |
Stream-processing methods are executed at startup, then each message is dispatched on event loop. |
可以使用注解对执行模型实施细粒度控制:
Fine-grained control over the execution model is possible using annotations:
-
@Blocking
will force the method to be executed on a worker thread pool. The default pool of worker threads is shared between all channels. Using@Blocking("my-custom-pool")
you can configure channels with a custom thread pool. The configuration propertysmallrye.messaging.worker.my-custom-pool.max-concurrency
specifies the maximum number of threads in the pool. You can read more on the blocking processing in SmallRye Reactive Messaging documentation. -
@NonBlocking
will force the method to be executed on the event-loop thread. -
@RunOnVirtualThread
will force the method to be executed on a virtual thread. To leverage the lightweight nature of virtual threads, the default maximum concurrency for methods annotated with@RunOnVirtualThread
is 1024. This can be changed by setting thesmallrye.messaging.worker.<virtual-thread>.max-concurrency
configuration property or using together with the@Blocking("my-custom-pool")
annotation.
@Transactional
注解的存在意味着阻塞执行。
The presence of @Transactional
annotation implies blocking execution.
在消息传递应用程序中,已生成和已使用消息构成了有序的事件流,或者是由代理(在主题或队列内)执行的,或者是由应用程序中的接收和发送顺序执行的。为保持此顺序,Quarkus Messaging 会默认按顺序分发消息。您可以使用 @Blocking(ordered = false)
或 @RunOnVirtualThread
注解覆盖此行为。
In messaging applications, produced and consumed messages constitute an ordered stream of events,
either enforced by the broker (inside a topic or a queue)
or by the order of reception and emission in the application.
To preserve this order, Quarkus Messaging dispatches messages sequentially by default.
You can override this behavior by using @Blocking(ordered = false)
or @RunOnVirtualThread
annotation.
Incoming Channel Concurrency
一些连接器支持配置传入通道的并发性级别。
Some connectors support configuring the concurrency level of incoming channels.
mp.messaging.incoming.my-channel.concurrency=4
这会建立四个后台的传入通道副本,并将它们连接到同一处理方法。根据代理技术,这可以通过并发处理多个消息来提高应用程序的吞吐量,同时仍保留在不同副本中接收到的消息的部分顺序。这种情况例如存在于 Kafka 中,其中多个使用者可以消耗不同的主题分区。
This creates four copies of the incoming channel under the hood, wiring them to the same processing method. Depending on the broker technology, this can be useful to increase the application’s throughput by processing multiple messages concurrently while still preserving the partial order of messages received in different copies. This is the case, for example, for Kafka, where multiple consumers can consume different topic partitions.
Health Checks
Quarkus Messaging 扩展与 Smallrye Health 扩展一起为每个通道提供运行状况检查支持。startup、_readiness_和 _liveness_检查的实现取决于连接器。一些连接器允许配置运行状况检查行为或完全禁用它们或按通道禁用。
Together with the Smallrye Health extension, Quarkus Messaging extensions provide health check support per channel. The implementation of startup, readiness and liveness checks depends on the connector. Some connectors allow configuring the health check behavior or disabling them completely or per channel.
可以使用 quarkus.messaging.health.<channel-name>.enabled`或按运行状况检查类型(例如 `quarkus.messaging.health.<channel-name>.liveness.enabled
)禁用通道运行状况检查。
Channel health checks can be disabled using quarkus.messaging.health.<channel-name>.enabled
or per health check type,
ex. quarkus.messaging.health.<channel-name>.liveness.enabled
.
将 `quarkus.messaging.health.enabled`配置属性设置为 `false`将完全禁用消息传递运行状况检查。
Setting the quarkus.messaging.health.enabled
configuration property to false
completely disables the messaging health checks.
Observability
Micrometer Metrics
Quarkus Messaging 扩展提供了简单但有用的指标来监视消息传递系统的运行状况。Micrometer extension公开了这些指标。
Quarkus Messaging extensions provide simple but useful metrics to monitor the health of the messaging system. The Micrometer extension exposes these metrics.
可以按通道收集以下指标,这些指标通过 `channel`标记标识:
The following metrics can be gathered per channel, identified with the channel
tag:
-
quarkus.messaging.message.count
: The number of messages produced or received -
quarkus.messaging.message.acks
: The number of messages processed successfully -
quarkus.messaging.message.failures
: The number of messages processed with failures -
quarkus.messaging.message.duration
: The duration of the message processing
出于向后兼容性的原因,默认情况下未启用通道指标,可以使用 `smallrye.messaging.observation.enabled=true`启用。
For backwards compatibility reasons, channel metrics are not enabled by default and can be enabled with: smallrye.messaging.observation.enabled=true
.
OpenTelemetry Tracing
一些 Quarkus Messaging 连接器开箱即用地与 OpenTelemetry Tracing 集成。当存在 OpenTelemetry extension时,传出消息会传播当前跟踪跨度。在传入通道上,如果收到的消息包含跟踪信息,消息处理会将消息跨度继承为父级。
Some Quarkus Messaging connectors integrate out-of-the-box with OpenTelemetry Tracing. When the OpenTelemetry extension is present, outgoing messages propagate the current tracing span. On incoming channels, if a received message contains tracing information, the message processing inherits the message span as parent.
可以使用以下配置禁用特定通道的跟踪:
You can disable tracing for a specific channel using the following configuration:
mp.messaging.incoming.data.tracing-enabled=false
Testing
Testing with Dev Services
大多数 Quarkus Messaging 扩展都提供 Dev Service 以简化应用程序的开发和测试。Dev Service 创建了一个代理实例,该实例配置为开箱即用地与 Quarkus Messaging 扩展一起使用。
Most Quarkus Messaging extensions provide a Dev Service to simplify the development and testing of applications. The Dev Service creates a broker instance configured to work out-of-the-box with the Quarkus Messaging extension.
在测试期间,Quarkus 创建一个单独的代理实例以在其上运行测试。
During testing Quarkus creates a separate brok er instance to run the tests against it.
可以在 Dev Services指南中阅读更多有关 Dev Service 的信息,包括平台扩展提供的 Dev Service 列表。
You can read more about Dev Services in the Dev Services guide, including a list of Dev Services provided by platform extensions.
Testing with InMemoryConnector
在不启动代理的情况下测试应用程序可能会很有用。要实现此目的,可以将连接器管理的通道_switch_到_in-memory_。
It can be useful to test the application without starting a broker. To achieve this, you can switch the channels managed by a connector to in-memory.
此方法仅适用于 JVM 测试。它不能用于本地测试(因为它们不支持注入)。
This approach only works for JVM tests. It cannot be used for native tests (because they do not support injection).
假设我们要测试以下示例应用程序:
Let’s say we want to test the following sample application:
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class MyMessagingApplication {
@Inject
@Channel("words-out")
Emitter<String> emitter;
public void sendMessage(String out) {
emitter.send(out);
}
@Incoming("words-in")
@Outgoing("uppercase")
public Message<String> toUpperCase(Message<String> message) {
return message.withPayload(message.getPayload().toUpperCase());
}
}
首先,将以下测试依赖项添加到应用程序中:
First, add the following test dependency to your application:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-in-memory</artifactId>
<scope>test</scope>
</dependency>
testImplementation("io.smallrye.reactive:smallrye-reactive-messaging-in-memory")
接着,按照以下步骤创建 Quarkus 测试资源:
Then, create a Quarkus Test Resource as follows:
public class InMemoryConnectorLifecycleManager implements QuarkusTestResourceLifecycleManager {
@Override
public Map<String, String> start() {
Map<String, String> env = new HashMap<>();
Map<String, String> props1 = InMemoryConnector.switchIncomingChannelsToInMemory("words-in"); (1)
Map<String, String> props2 = InMemoryConnector.switchOutgoingChannelsToInMemory("uppercase"); (2)
Map<String, String> props3 = InMemoryConnector.switchOutgoingChannelsToInMemory("words-out"); (3)
env.putAll(props1);
env.putAll(props2);
env.putAll(props3);
return env; (4)
}
@Override
public void stop() {
InMemoryConnector.clear(); (5)
}
}
1 | Switch the incoming channel words-in (consumed messages) to in-memory. |
2 | Switch the outgoing channel words-out (produced messages) to in-memory. |
3 | Switch the outgoing channel uppercase (processed messages) to in-memory. |
4 | Builds and returns a Map containing all the properties required to configure the application to use in-memory channels. |
5 | When the test stops, clear the InMemoryConnector (discard all the received and sent messages) |
使用上面创建的测试资源创建一个`@QuarkusTest`:
Create a @QuarkusTest
using the test resource created above:
import io.quarkus.test.common.WithTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.reactive.messaging.memory.InMemoryConnector;
import io.smallrye.reactive.messaging.memory.InMemorySink;
import io.smallrye.reactive.messaging.memory.InMemorySource;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.junit.jupiter.api.Test;
import jakarta.inject.Inject;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.awaitility.Awaitility.await;
@QuarkusTest
@WithTestResource(InMemoryConnectorLifecycleManager.class)
class MyMessagingApplicationTest {
@Inject
@Connector("smallrye-in-memory")
InMemoryConnector connector; (1)
@Inject
MyMessagingApplication app;
@Test
void test() {
InMemorySink<String> wordsOut = connector.sink("words-out"); (2)
InMemorySource<String> wordsIn = connector.source("words-in"); (3)
InMemorySink<String> uppercaseOut = connector.sink("uppercase"); (4)
app.sendMessage("Hello"); (5)
assertEquals("Hello", wordsOut.received().get(0).getPayload()); (6)
wordsIn.send("Bonjour"); (7)
await().untilAsserted(() -> assertEquals("BONJOUR", uppercaseOut.received().get(0).getPayload())); (8)
}
}
1 | Inject the in-memory connector in your test class, using the @Connector or @Any qualifier. |
2 | Retrieve the outgoing channel (words-out ) - the channel must have been switched to in-memory in the test resource. |
3 | Retrieve the incoming channel (words-in ) |
4 | Retrieve the outgoing channel (uppercase ) |
5 | Use the injected application bean to call sendMessage method to send a message using the emitter with the channel words-out . |
6 | Use the received method on words-out in-memory channel to check the message produced by the application. |
7 | Use the send mwthod on words-in in-memory channel to send a message.
The application will process this message and send a message to uppercase channel. |
8 | Use the received method on uppercase channel to check the messages produced by the application. |
内存连接器仅用于测试目的。使用内存连接器时需要考虑一些注意事项:
In-memory connector is solely intended for testing purposes. There are some caveats to consider when using the in-memory connector:
-
The in-memory connector only transmits objects (payloads or configured messages) sent using the
InMemorySource#send
method. Messages received by the application methods won’t contain connector-specific metadata. -
By default, in-memory channels dispatch messages on the caller thread of the
InMemorySource#send
method, which would be the main thread in unit tests. However, most of the other connectors handle context propagation dispatching messages on separate duplicated Vert.x contexts.
quarkus-test-vertx
依赖项提供 @io.quarkus.test.vertx.RunOnVertxContext
注释,当用于测试方法时,该注释会在 Vert.x 上下文中执行测试。
The quarkus-test-vertx
dependency provides the @io.quarkus.test.vertx.RunOnVertxContext
annotation, which when used on a test method, executes the test on a Vert.x context.
如果测试依赖于上下文传播,则可以使用 run-on-vertx-context
属性配置内存连接器通道,从而在 Vert.x 上下文中发送事件(包括消息和确认)。或者,你可以使用 InMemorySource#runOnVertxContext
方法切换此行为。
If your tests are dependent on context propagation, you can configure the in-memory connector channels with run-on-vertx-context
attribute to dispatch events, including messages and acknowledgements, on a Vert.x context.
Alternatively you can switch this behaviour using the InMemorySource#runOnVertxContext
method.
Going further
本指南展示了 Quarkus 消息传递扩展的一般原则。
This guide shows the general principles of Quarkus Messaging extensions.
如果你想更进一步,可以查看 SmallRye Reactive Messaging 文档,其中为每个这些概念和其他概念提供了深入的文档。
If you want to go further, you can check the SmallRye Reactive Messaging documentation, which has in-depth documentation for each of these concepts and more.