Quarkus Messaging Extensions
事件驱动的消息系统已成为大多数现代应用程序的基础,让您可以构建消息驱动的微服务或复杂的数据流管道。 Quarkus 提供了一套全面的消息扩展套件,旨在毫不费力地与领先的消息技术同步。这使开发人员能够集中精力构建核心应用程序逻辑,不必深入研究各个 API 和消息基础架构的复杂性。 image::messaging-quarkus.png[] 本页重点介绍所有消息扩展的通用功能和开发模型。 其中一些扩展在 Quarkus core 存储库中维护:
-
@"21":核心扩展定义了开发消息应用程序的基本概念和 API
-
@"23"
-
Messaging - MQTT Connector
一些扩展由社区提供并维护:
-
@"24"
-
AWS SQS Connector
其他连接器(如 @"25" 或 @"26")无法从相同级别的集成中受益,并且需要更多的手动配置才能设置。 另一方面,一些与消息相关的扩展提出了供应商特定的低级别集成。此页面涵盖的支持级别不涉及这些低级别扩展。以下是一份此类扩展的非详尽列表:
Quarkus Messaging Development Model
Quarkus 通过建立一个用于发布、消费和处理消息的统一模型来简化消息驱动的应用程序开发,而不管底层代理技术使用的是消息队列还是事件流。Quarkus 消息扩展基于 MicroProfile 响应式消息传递规范构建,可确保与这些技术的无缝集成。重要的是,精通响应式编程并非利用这些功能的先决条件。
响应式消息传递规范为实现事件驱动和消息驱动的应用程序定义了一个基于 CDI 的编程模型。使用一小部分注释,CDI Bean 成为实现与消息代理交互的模块。这些交互通过 channels 发生,应用程序组件在此读取和写入消息。
Channels 由一个唯一名称识别,并使用一组注释进行声明。
@Incoming
and @Outgoing
annotations
@Incoming
和 @Outgoing
方法注释定义了 channels,允许从消息代理消费消息并向其生成消息:
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@ApplicationScoped
public class MessageProcessingBean {
@Incoming("source")
@Outgoing("sink")
public String process(String consumedPayload) {
// Process the incoming message payload and return an updated payload
return consumedPayload.toUpperCase();
}
}
可以使用 @Outgoing
单独用于方法以生成消息:
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@ApplicationScoped
public class MessageGeneratorBean {
@Outgoing("sink")
public Multi<String> generate() {
return Multi.createFrom().items("a", "b", "c");
}
}
可以使用 @Incoming
单独用于消费消息:
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@ApplicationScoped
public class MessageProcessingBean {
@Incoming("source")
public void process(String consumedPayload) {
// process the payload
consumedPayload.toUpperCase();
}
}
请注意,不应直接从代码中调用注释为 @Incoming
和/或 @Outgoing
的方法。它们由框架调用。让用户代码调用它们不会产生预期的结果。
可以在 SmallRye Reactive Messaging – Supported signatures 中阅读有关受支持方法签名的更多信息。
Emitters and @Channel
annotation
应用程序通常需要将消息传递与应用程序的其他部分结合起来,例如从 HTTP 端点生成消息,或将消耗的消息流式传输为响应。
要将消息从命令式代码发送到特定通道,需要注入由 @Channel
注释识别的 Emitter
对象:
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
@ApplicationScoped
@Path("/")
public class MyImperativeBean {
@Channel("prices")
Emitter<Double> emitter;
@GET
@Path("/send")
public CompletionStage<Void> send(double d) {
return emitter.send(d);
}
}
使用 @Channel
注释,可以指示将有效负载或消息发送到的通道。Emitter
允许缓冲发送到通道的消息。
为了获得更多控制,使用 Mutiny API,可以使用 MutinyEmitter
发射器接口:
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.MutinyEmitter;
@ApplicationScoped
@Path("/")
public class MyImperativeBean {
@Channel("prices")
MutinyEmitter<Double> emitter;
@GET
@Path("/send")
public void send(double d) {
emitter.sendAndAwait(d);
}
}
@Channel
注释还可用于注入来自传入通道的消息流:
import org.eclipse.microprofile.reactive.messaging.Channel;
@ApplicationScoped
@Path("/")
public class SseResource {
@Channel("prices")
Multi<Double> prices;
@GET
@Path("/prices")
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<Double> stream() {
return prices;
}
}
在使用 @Channel
消费消息时,应用程序代码负责订阅流。在上面的示例中,Quarkus REST(以前称为 RESTEasy Reactive)端点会为您处理它。
可以在 SmallRye Reactive Messaging – Emitter and Channels 文档中阅读有关发射器和通道的更多信息。
Messages and Metadata
Message
是有效负载周围的一个信封。在上面的示例中,只使用了有效负载,但是每个有效负载都在 Quarkus Messaging 中内部环绕一个 Message
。
Message<T>
接口将类型为 <T>
的有效负载与 Metadata
关联起来,Metadata
是用于确认 (ack) 和否定确认 (nack) 的一组任意对象和异步操作。
import org.eclipse.microprofile.reactive.messaging.Message;
@Incoming("source")
@Outgoing("sink")
public Message<String> process(Message<String> consumed) {
// Access the metadata
MyMetadata my = consumed.getMetadata(MyMetadata.class).get();
// Process the incoming message and return an updated message
return consumed.withPayload(consumed.getPayload().toUpperCase());
}
当消息的处理或接收成功时,会确认它返回代理。消息之间的确认被链接,这意味着在处理消息时,发出消息的确认将触发传入消息的确认。在大多数情况下,ack 和 nack 是为您管理的,连接器允许您为每个通道配置不同的策略。因此,通常无需直接与 Message
接口进行交互。只有高级用例才需要直接处理消息。
另一方面,访问 Metadata
在许多情况下可能是实用的。连接器将特定元数据对象添加到消息,以访问消息头、属性和其它连接器特定信息。您无需与 Message
接口交互即可访问连接器特定元数据。在有效负荷参数后,您可以简单地将元数据对象插入为方法参数:
import org.eclipse.microprofile.reactive.messaging.Metadata;
@Incoming("source")
@Outgoing("sink")
public String process(String payload, MyMetadata my) {
// Access the metadata
Map<String, Object> props = my.getProperties();
// Process the payload and return an updated payload
return payload.toUpperCase();
}
取决于连接器,可以在处理方法中使用的有效负荷类型不同。您可以实现自定义 MessageConverter
将有效负荷转换为应用程序接受的类型。
Channel configuration
可以使用 mp.messaging.incoming.<channel-name>
和 mp.messaging.outgoing.<channel-name>
配置属性配置信道属性。
例如,要配置 Kafka 连接器使用自定义反序列化器从 my-topic
主题使用消息:
mp.messaging.incoming.source.connector=smallrye-kafka
mp.messaging.incoming.source.topic=my-topic
mp.messaging.incoming.source.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.source.auto.offset.reset=earliest
connector
属性是所有信道必需的,并指定要使用的连接器。如果您类路径上有一个单独的连接器,则可以忽略此配置,因为 Quarkus 将自动选择连接器。
可以使用连接器名称配置全局信道属性:
mp.messaging.connector.smallrye-kafka.bootstrap.servers=localhost:9092
连接器特定属性在连接器文档中列出。
Channel wiring and Messaging patterns
在启动时,Quarkus 分析已声明的信道以将它们连线起来,并验证所有的信道均已连接。具体来说,每个信道都会创建一个消息的 reactive stream,该消息连接到另一信道的消息的 reactive stream。在遵守反应式流协议下,会在信道之间执行反压机制,从而能够控制应用程序资源使用,并且不会过度提交和使系统的一部分过载。
另一方面,在运行时通过编程方式创建新信道是不可行的。但是,有很多模式可用于实现大部分(如果不是全部的话)消息传递和集成用例:
一些消息传递技术允许消费者订阅一组主题或队列,并允许生产者基于消息将消息发送到特定主题。如果您确定需要在运行时动态地配置和创建客户端,则应考虑直接使用底层客户端。
Internal Channels
在某些用例中,使用消息传递模式在同一应用程序内传输消息非常方便。当您不将信道连接到消息传递后端(即连接器)时,所有操作都在应用程序内部发生,并且流是通过将方法链接在一起的方式创建的。每个链条仍然是反应流,并且强制执行反压协议。
该框架验证生产者/消费者链是否完整,这意味着如果应用程序向内部信道写入消息(使用只有 @Outgoing
的方法或 Emitter
),则它还必须从应用程序内部使用消息(使用只有 @Incoming
的方法或使用非托管流)。
Enable/Disable channels
默认情况下,所有已定义信道均已启用,但是可以使用配置禁用信道:
mp.messaging.incoming.my-channel.enabled=false
这不可以在 Quarkus 构建配置文件中与之一起使用,以基于某些构建时间条件(如目标环境)启用/禁用信道。禁用信道时您需要确保两件事:
-
禁用的信道用法位于构建时间可过滤掉的 Bean 中,
-
没有该信道,其余信道仍然能够正常工作。
@ApplicationScoped
@IfBuildProfile("my-profile")
public class MyProfileBean {
@Outgoing("my-channel")
public Multi<String> generate() {
return Multi.createFrom().items("a", "b", "c");
}
}
Multiple Outgoings and @Broadcast
默认情况下,在信道中传输的消息仅发送到单个消费者。拥有多个消费者被视为一个错误,并且会在部署时报告。
@Broadcast
注释会更改此行为,并指示在信道中传输的消息发送到所有消费者。必须将 @Broadcast
与 @Outgoing
注释一起使用:
import org.eclipse.microprofile.reactive.messaging.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@Incoming("in")
@Outgoing("out")
@Broadcast
public int increment(int i) {
return i + 1;
}
@Incoming("out")
public void consume1(int i) {
//...
}
@Incoming("out")
public void consume2(int i) {
//...
}
与 @Broadcast
类似,您可以在同一个方法上多次使用 @Outgoing
注释,以表明该方法向多个信道生成消息:
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@Incoming("in")
@Outgoing("out1")
@Outgoing("out2")
public String process(String s) {
// send messages from channel in to both channels out1 and out2
return s.toUpperCase();
}
使用多输出有助于实现扇出模式,其中单条信息可由多条目标通道处理。
您可以通过从处理方法返回 Targeted
来选择性地将消息分发至多输出:
@Incoming("in")
@Outgoing("out1")
@Outgoing("out2")
@Outgoing("out3")
public Targeted process(double price) {
// send messages from channel-in to both channel-out1 and channel-out2
Targeted targeted = Targeted.of("out1", "Price: " + price, "out2", "Quote: " + price);
if (price > 90.0) {
return targeted.with("out3", price);
}
return targeted;
}
Multiple Incomings and @Merge
默认情况下,单生产者可在通道中传送消息。拥有多生产者被认为是错误的,并会在部署时报告。@Merge
注解会更改此行为并指示一个通道可以有多个生产者。@Merge
必须与 @Incoming
注解一起使用:
@Incoming("in1")
@Outgoing("out")
public int increment(int i) {
return i + 1;
}
@Incoming("in2")
@Outgoing("out")
public int multiply(int i) {
return i * 2;
}
@Incoming("out")
@Merge
public void getAll(int i) {
//...
}
与 @Merge
类似,您可以在同一方法上多次使用 @Incoming
注解来指示方法从多通道使用消息:
@Incoming("in1")
@Incoming("in2")
public String process(String s) {
// get messages from channel-1 and channel-2
return s.toUpperCase();
}
Stream Processing
在一些高级场景中,您可以直接操作消息流,而不是每个单独消息。
在传入和传出签名中使用 Mutiny APIs 来处理消息流:
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@ApplicationScoped
public class StreamProcessor {
@Incoming("source")
@Outgoing("sink")
public Multi<String> process(Multi<String> in) {
return in.map(String::toUpperCase);
}
}
Execution Model
Quarkus Messaging 基于 Quarkus 的 reactive engine 并利用 Eclipse Vert.x 分发要处理的消息。它支持三种执行模式:
-
Event-loop,在此模式中,消息会在 Vert.x I/O 线程上分发。请记住,您不应在事件循环上执行阻塞操作。
-
Worker-threads,在此模式中,消息会在工作线程池上分发。
-
Virtual-threads,在此模式中,消息会在虚拟线程上分发(需要 Java 21+)。由于虚拟线程未合并,因此为每条消息创建一个新的虚拟线程。请参阅专用的 Quarkus Virtual Thread support 指南以了解更多信息。
根据方法签名,Quarkus 选择默认执行模式。如果方法签名为 synchronous,消息会在 worker threads 上分发;否则,它默认为 event-loop:
Method signature | Default execution mode |
---|---|
@Incoming("source") void process(String payload) |
Worker-threads |
@Incoming("source") Uni<Void> process(String payload) |
Event-loop |
@Incoming("source") CompletionStage<Void> process(Message<String> message) |
Event-loop |
@Incoming("source") @Outgoing("sink") Multi<R> process(Multi<T> in) |
流处理方法在启动时执行,然后每个消息会分发到事件循环。 |
可以使用注解对执行模型实施细粒度控制:
-
@Blocking
将强制方法在工作线程池上执行。工作线程的默认池在所有通道间共享。使用@Blocking("my-custom-pool")
,您可以使用自定义线程池配置通道。配置属性smallrye.messaging.worker.my-custom-pool.max-concurrency
指定池中的最大线程数。您可以在 SmallRye Reactive Messaging documentation 中阅读有关阻塞处理的更多信息。 -
@NonBlocking
将强制方法在事件循环线程上执行。 -
@RunOnVirtualThread
将强制方法在虚拟线程上执行。为利用虚拟线程的轻量特性,带@RunOnVirtualThread
注解的方法的默认最大并发性为 1024。可以通过设置smallrye.messaging.worker.<virtual-thread>.max-concurrency
配置属性或与@Blocking("my-custom-pool")
注解一起使用来更改此项设置。
@Transactional
注解的存在意味着阻塞执行。
在消息传递应用程序中,已生成和已使用消息构成了有序的事件流,或者是由代理(在主题或队列内)执行的,或者是由应用程序中的接收和发送顺序执行的。为保持此顺序,Quarkus Messaging 会默认按顺序分发消息。您可以使用 @Blocking(ordered = false)
或 @RunOnVirtualThread
注解覆盖此行为。
Health Checks
Quarkus Messaging 扩展与 Smallrye Health 扩展一起为每个通道提供运行状况检查支持。startup、_readiness_和 _liveness_检查的实现取决于连接器。一些连接器允许配置运行状况检查行为或完全禁用它们或按通道禁用。
可以使用 quarkus.messaging.health.<channel-name>.enabled`或按运行状况检查类型(例如 `quarkus.messaging.health.<channel-name>.liveness.enabled
)禁用通道运行状况检查。
将 `quarkus.messaging.health.enabled`配置属性设置为 `false`将完全禁用消息传递运行状况检查。
Observability
Micrometer Metrics
Quarkus Messaging 扩展提供了简单但有用的指标来监视消息传递系统的运行状况。Micrometer extension公开了这些指标。
可以按通道收集以下指标,这些指标通过 `channel`标记标识:
-
quarkus.messaging.message.count
: 已生产或接收的消息数 -
quarkus.messaging.message.acks
: 成功处理的消息数 -
quarkus.messaging.message.failures
: 带有故障消息处理的数量 -
quarkus.messaging.message.duration
:消息处理持续时间
出于向后兼容性的原因,默认情况下未启用通道指标,可以使用 `smallrye.messaging.observation.enabled=true`启用。
OpenTelemetry Tracing
一些 Quarkus Messaging 连接器开箱即用地与 OpenTelemetry Tracing 集成。当存在 OpenTelemetry extension时,传出消息会传播当前跟踪跨度。在传入通道上,如果收到的消息包含跟踪信息,消息处理会将消息跨度继承为父级。
可以使用以下配置禁用特定通道的跟踪:
mp.messaging.incoming.data.tracing-enabled=false
Testing
Testing with Dev Services
大多数 Quarkus Messaging 扩展都提供 Dev Service 以简化应用程序的开发和测试。Dev Service 创建了一个代理实例,该实例配置为开箱即用地与 Quarkus Messaging 扩展一起使用。
在测试期间,Quarkus 创建一个单独的代理实例以在其上运行测试。
可以在 Dev Services指南中阅读更多有关 Dev Service 的信息,包括平台扩展提供的 Dev Service 列表。
Testing with InMemoryConnector
在不启动代理的情况下测试应用程序可能会很有用。要实现此目的,可以将连接器管理的通道_switch_到_in-memory_。
此方法仅适用于 JVM 测试。它不能用于本地测试(因为它们不支持注入)。
假设我们要测试以下示例应用程序:
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class MyMessagingApplication {
@Inject
@Channel("words-out")
Emitter<String> emitter;
public void sendMessage(String out) {
emitter.send(out);
}
@Incoming("words-in")
@Outgoing("uppercase")
public Message<String> toUpperCase(Message<String> message) {
return message.withPayload(message.getPayload().toUpperCase());
}
}
首先,将以下测试依赖项添加到应用程序中:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-in-memory</artifactId>
<scope>test</scope>
</dependency>
testImplementation("io.smallrye.reactive:smallrye-reactive-messaging-in-memory")
接着,按照以下步骤创建 Quarkus 测试资源:
public class InMemoryConnectorLifecycleManager implements QuarkusTestResourceLifecycleManager {
@Override
public Map<String, String> start() {
Map<String, String> env = new HashMap<>();
Map<String, String> props1 = InMemoryConnector.switchIncomingChannelsToInMemory("words-in"); (1)
Map<String, String> props2 = InMemoryConnector.switchOutgoingChannelsToInMemory("uppercase"); (2)
Map<String, String> props3 = InMemoryConnector.switchOutgoingChannelsToInMemory("words-out"); (3)
env.putAll(props1);
env.putAll(props2);
env.putAll(props3);
return env; (4)
}
@Override
public void stop() {
InMemoryConnector.clear(); (5)
}
}
1 | 将传入通道`words-in`(接收的消息)切换到内存中。 |
2 | 将传出通道`words-out`(发送的消息)切换到内存中。 |
3 | 将传出通道`uppercase`(处理的消息)切换到内存中。 |
4 | 构建并返回一个 Map ,其中包含使用内存内通道配置应用程序所需的所有属性。 |
5 | 当测试停止时,清除 InMemoryConnector (丢弃所有已接收和已发送的消息) |
使用上面创建的测试资源创建一个`@QuarkusTest`:
import io.quarkus.test.common.WithTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.reactive.messaging.memory.InMemoryConnector;
import io.smallrye.reactive.messaging.memory.InMemorySink;
import io.smallrye.reactive.messaging.memory.InMemorySource;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.junit.jupiter.api.Test;
import jakarta.inject.Inject;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.awaitility.Awaitility.await;
@QuarkusTest
@WithTestResource(InMemoryConnectorLifecycleManager.class)
class MyMessagingApplicationTest {
@Inject
@Connector("smallrye-in-memory")
InMemoryConnector connector; (1)
@Inject
MyMessagingApplication app;
@Test
void test() {
InMemorySink<String> wordsOut = connector.sink("words-out"); (2)
InMemorySource<String> wordsIn = connector.source("words-in"); (3)
InMemorySink<String> uppercaseOut = connector.sink("uppercase"); (4)
app.sendMessage("Hello"); (5)
assertEquals("Hello", wordsOut.received().get(0).getPayload()); (6)
wordsIn.send("Bonjour"); (7)
await().untilAsserted(() -> assertEquals("BONJOUR", uppercaseOut.received().get(0).getPayload())); (8)
}
}
1 | 在测试类中使用 @Connector 或 @Any 限定符注入内存连接器。 |
2 | 检索出站通道 (words-out ) - 必须在测试资源中将通道切换到内存中。 |
3 | 检索入站通道 (words-in ) |
4 | 检索出站通道 (uppercase ) |
5 | 使用注入的应用程序 bean 调用 sendMessage 方法以通过通道 words-out 使用发信器发送消息。 |
6 | 在 words-out 内存通道上使用 received 方法检查应用程序生成的消息。 |
7 | 在 words-in 内存通道上使用 send 方法发送消息。应用程序将处理此消息并向 uppercase 通道发送消息。 |
8 | 在 uppercase 通道上使用 received 方法检查应用程序生成的消息。 |
内存连接器仅用于测试目的。使用内存连接器时需要考虑一些注意事项:
-
内存连接器仅传输使用
InMemorySource#send
方法发送的对象(有效负载或已配置的消息)。应用程序方法接收的消息不包含连接器特定元数据。 -
默认情况下,内存通道在
InMemorySource#send
方法的调用者线程上发送消息,在单元测试中这将是主线程。然而,大多数其他连接器会处理在单独复制的 Vert.x 上下文中发送消息的上下文传播。
quarkus-test-vertx
依赖项提供 @io.quarkus.test.vertx.RunOnVertxContext
注释,当用于测试方法时,该注释会在 Vert.x 上下文中执行测试。
如果测试依赖于上下文传播,则可以使用 run-on-vertx-context
属性配置内存连接器通道,从而在 Vert.x 上下文中发送事件(包括消息和确认)。或者,你可以使用 InMemorySource#runOnVertxContext
方法切换此行为。
Going further
本指南展示了 Quarkus 消息传递扩展的一般原则。
如果你想更进一步,可以查看 SmallRye Reactive Messaging 文档,其中为每个这些概念和其他概念提供了深入的文档。