Apache Kafka Reference Guide

本参考指南演示了你的 Quarkus 应用程序如何利用 Quarkus Messaging 与 Apache Kafka 交互。

This reference guide demonstrates how your Quarkus application can utilize Quarkus Messaging to interact with Apache Kafka.

Introduction

Apache Kafka 是一个流行的开源分布式事件流平台。它通常用于高性能数据管道、流分析、数据集成和关键任务应用程序。类似于消息队列或企业消息平台,它使你可以:

Apache Kafka is a popular open-source distributed event streaming platform. It is used commonly for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. Similar to a message queue, or an enterprise messaging platform, it lets you:

  • publish (write) and subscribe to (read) streams of events, called records.

  • store streams of records durably and reliably inside topics.

  • process streams of records as they occur or retrospectively.

而且,所有这些功能都以分布式、高度可扩展、弹性、容错和安全的方式提供。

And all this functionality is provided in a distributed, highly scalable, elastic, fault-tolerant, and secure manner.

Quarkus Extension for Apache Kafka

Quarkus 通过 SmallRye Reactive Messaging 框架提供对 Apache Kafka 的支持。基于 Eclipse MicroProfile Reactive Messaging 规范 2.0,它提出了一个灵活的编程模型,桥接 CDI 和事件驱动。

Quarkus provides support for Apache Kafka through SmallRye Reactive Messaging framework. Based on Eclipse MicroProfile Reactive Messaging specification 2.0, it proposes a flexible programming model bridging CDI and event-driven.

本指南深入探讨了 Apache Kafka 和 SmallRye Reactive Messaging 框架。如需快速入门,请查看 Getting Started to Quarkus Messaging with Apache Kafka.

This guide provides an in-depth look on Apache Kafka and SmallRye Reactive Messaging framework. For a quick start take a look at Getting Started to Quarkus Messaging with Apache Kafka.

你可通过在项目基本目录中运行以下命令,将 messaging-kafka 扩展添加到项目中:

You can add the messaging-kafka extension to your project by running the following command in your project base directory:

Unresolved directive in kafka.adoc - include::{includes}/devtools/extension-add.adoc[]

这会将以下内容添加到构建文件中:

This will add the following to your build file:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-kafka</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-kafka")

该扩展包含 kafka-clients 版本 3.2.1 作为可传递依赖项,并且与 Kafka 代理版本 2.x 兼容。

The extension includes kafka-clients version 3.2.1 as a transitive dependency and is compatible with Kafka brokers version 2.x.

Configuring Smallrye Kafka Connector

由于 Smallrye Reactive Messaging 框架支持不同的消息传递后端,如 Apache Kafka、AMQP、Apache Camel、JMS、MQTT 等,所以它采用通用词汇:

Because Smallrye Reactive Messaging framework supports different messaging backends like Apache Kafka, AMQP, Apache Camel, JMS, MQTT, etc., it employs a generic vocabulary:

  • Applications send and receive messages. A message wraps a payload and can be extended with some metadata. With the Kafka connector, a message corresponds to a Kafka record.

  • Messages transit on channels. Application components connect to channels to publish and consume messages. The Kafka connector maps channels to Kafka topics.

  • Channels are connected to message backends using connectors. Connectors are configured to map incoming messages to a specific channel (consumed by the application) and collect outgoing messages sent to a specific channel. Each connector is dedicated to a specific messaging technology. For example, the connector dealing with Kafka is named smallrye-kafka.

带传入通道的 Kafka 连接器的最小配置如下所示:

A minimal configuration for the Kafka connector with an incoming channel looks like the following:

%prod.kafka.bootstrap.servers=kafka:9092 1
mp.messaging.incoming.prices.connector=smallrye-kafka 2
1 Configure the broker location for the production profile. You can configure it globally or per channel using mp.messaging.incoming.$channel.bootstrap.servers property. In dev mode and when running tests, Dev Services for Kafka automatically starts a Kafka broker. When not provided this property defaults to localhost:9092.
2 Configure the connector to manage the prices channel. By default, the topic name is same as the channel name. You can configure the topic attribute to override it.

%prod 前缀表示该属性仅在应用程序在生产模式下运行时使用(因此在开发或测试模式下不使用)。请参阅 Profile documentation 了解详细信息。

The %prod prefix indicates that the property is only used when the application runs in prod mode (so not in dev or test). Refer to the Profile documentation for further details.

Connector auto-attachment

如果类路径中只有一个连接器,则可以省略 connector 属性配置。Quarkus 自动将 orphan 通道与类路径上找到的(唯一的)连接器关联起来。Orphans 通道是无下游使用者的传出通道或无上游生产者的传入通道。

If you have a single connector on your classpath, you can omit the connector attribute configuration. Quarkus automatically associates orphan channels to the (unique) connector found on the classpath. Orphans channels are outgoing channels without a downstream consumer or incoming channels without an upstream producer.

可以使用以下方法禁用此自动附加功能:

This auto-attachment can be disabled using:

quarkus.messaging.auto-connector-attachment=false

Receiving messages from Kafka

从上一个最小配置继续,Quarkus 应用程序可以直接接收消息负载:

Continuing from the previous minimal configuration, your Quarkus application can receive message payload directly:

import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

应用程序可以通过多种其他方法使用传入消息:

There are several other ways your application can consume incoming messages:

Message
@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> msg) {
    // access record metadata
    var metadata = msg.getMetadata(IncomingKafkaRecordMetadata.class).orElseThrow();
    // process the message payload.
    double price = msg.getPayload();
    // Acknowledge the incoming message (commit the offset)
    return msg.ack();
}

Message 类型允许使用的方法访问传入消息元数据并手动处理确认。我们将在 Commit Strategies 中探讨不同的确认策略。

The Message type lets the consuming method access the incoming message metadata and handle the acknowledgment manually. We’ll explore different acknowledgment strategies in Commit Strategies.

如果您想要直接访问 Kafka 记录对象,请使用:

If you want to access the Kafka record objects directly, use:

ConsumerRecord
@Incoming("prices")
public void consume(ConsumerRecord<String, Double> record) {
    String key = record.key(); // Can be `null` if the incoming record has no key
    String value = record.value(); // Can be `null` if the incoming record has no value
    String topic = record.topic();
    int partition = record.partition();
    // ...
}

ConsumerRecord 由底层 Kafka 客户端提供,并且可以直接注入到使用者的方法中。另一种更简单的方法是使用 Record

ConsumerRecord is provided by the underlying Kafka client and can be injected directly to the consumer method. Another simpler approach consists in using Record:

Record
@Incoming("prices")
public void consume(Record<String, Double> record) {
    String key = record.key(); // Can be `null` if the incoming record has no key
    String value = record.value(); // Can be `null` if the incoming record has no value
}

Record 是围绕传入 Kafka 记录的密钥和负载的简单封装。

Record is a simple wrapper around key and payload of the incoming Kafka record.

@Channel

或者,应用程序可以在 bean 中注入 Multi 并订阅其事件,如下例所示:

@Channel

Alternatively, your application can inject a Multi in your bean and subscribe to its events as the following example:

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Channel;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.jboss.resteasy.reactive.RestStreamElementType;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("prices")
    Multi<Double> prices;

    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Multi<Double> stream() {
        return prices;
    }
}

这是一个如何将 Kafka 使用者与另一个下游集成的良好示例,在本例中将其公开作为服务器端发送的事件端点。

This is a good example of how to integrate a Kafka consumer with another downstream, in this example exposing it as a Server-Sent Events endpoint.

使用 @Channel 使用消息时,应用程序代码负责订阅。在上面的示例中,Quarkus REST 端点(以前的 RESTEasy Reactive)会为您处理此事。

When consuming messages with @Channel, the application code is responsible for the subscription. In the example above, the Quarkus REST (formerly RESTEasy Reactive) endpoint handles that for you.

可以将以下类型注入为通道:

Following types can be injected as channels:

@Inject @Channel("prices") Multi<Double> streamOfPayloads;

@Inject @Channel("prices") Multi<Message<Double>> streamOfMessages;

@Inject @Channel("prices") Publisher<Double> publisherOfPayloads;

@Inject @Channel("prices") Publisher<Message<Double>> publisherOfMessages;

与之前 Message 示例类似,如果注入的通道接收负载 (Multi<T>),它将自动确认消息,并支持多个使用者。如果您注入的通道接收消息 (Multi<Message<T>>),您将负责确认和广播。我们将在 Broadcasting messages on multiple consumers 中探讨如何发送广播消息。

As with the previous Message example, if your injected channel receives payloads (Multi<T>), it acknowledges the message automatically, and support multiple subscribers. If you injected channel receives Message (Multi<Message<T>>), you will be responsible for the acknowledgment and broadcasting. We will explore sending broadcast messages in Broadcasting messages on multiple consumers.

注入 @Channel("prices") 或拥有 @Incoming("prices") 不会自动配置应用程序从 Kafka 使用消息。你需要使用 mp.messaging.incoming.prices…​ 配置一个入站连接器,或者在应用程序中的某个地方拥有 @Outgoing("prices") 方法(在这种情况下,prices 将是一个内存内通道)。

Injecting @Channel("prices") or having @Incoming("prices") does not automatically configure the application to consume messages from Kafka. You need to configure an inbound connector with mp.messaging.incoming.prices…​ or have an @Outgoing("prices") method somewhere in your application (in which case, prices will be an in-memory channel).

Blocking processing

响应式消息传递在 I/O 线程中调用你的方法。有关此主题的更多详细信息,请查看 Quarkus Reactive Architecture documentation。但是,你通常需要将响应式消息传递与数据库交互等阻塞式处理结合使用。为此,你需要使用 @Blocking 注解,指示处理为 blocking 且不应该在调用者线程上运行。

Reactive Messaging invokes your method on an I/O thread. See the Quarkus Reactive Architecture documentation for further details on this topic. But, you often need to combine Reactive Messaging with blocking processing such as database interactions. For this, you need to use the @Blocking annotation indicating that the processing is blocking and should not be run on the caller thread.

例如,以下代码说明了如何使用带有 Panache 的 Hibernate 将传入有效负载存储到数据库:

For example, The following code illustrates how you can store incoming payloads to a database using Hibernate with Panache:

import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;

@ApplicationScoped
public class PriceStorage {

    @Incoming("prices")
    @Transactional
    public void store(int priceInUsd) {
        Price price = new Price();
        price.value = priceInUsd;
        price.persist();
    }

}

完整的示例可在 kafka-panache-quickstart directory 中获得。

The complete example is available in the kafka-panache-quickstart directory.

有 2 个 @Blocking 注释:

There are 2 @Blocking annotations:

  1. io.smallrye.reactive.messaging.annotations.Blocking

  2. io.smallrye.common.annotation.Blocking

它们具有相同的效果。因此,可以同时使用。第一个提供更精细的调整,例如要使用的工作器池以及是否保留顺序。第二个也与 Quarkus 的其他响应式特性结合使用,使用默认工作器池并保留顺序。

They have the same effect. Thus, you can use both. The first one provides more fine-grained tuning such as the worker pool to use and whether it preserves the order. The second one, used also with other reactive features of Quarkus, uses the default worker pool and preserves the order.

可以在 SmallRye Reactive Messaging – Handling blocking execution 中找到关于 @Blocking 注释的用法详细信息。

Detailed information on the usage of @Blocking annotation can be found in SmallRye Reactive Messaging – Handling blocking execution.

@RunOnVirtualThread

有关在 Java {@s7} 上运行阻塞处理,请参阅 {@s8}。

For running the blocking processing on Java virtual threads, see the Quarkus Virtual Thread support with Reactive Messaging documentation.

@Transactional

如果用 {@s9} 对您的方法进行了注解,则它将自动被视为`{@s11}`,即使没有用 {@s10} 对方法进行注解。

If your method is annotated with @Transactional, it will be considered blocking automatically, even if the method is not annotated with @Blocking.

Acknowledgment Strategies

使用者接收的所有消息都必须确认。如果没有确认,则处理被视为错误。如果使用者方法接收 {@s12} 或有效载荷,则消息将在方法返回时确认,也称为 {@s13}。如果使用者方法返回另一个响应式流或`{@s14}`,则消息将在下游消息确认时确认。您可以覆盖缺省行为,以便在使用者方法中确认到达消息 ({@s15}),或者根本不确认消息 ({@s16}),如下例所示:

All messages received by a consumer must be acknowledged. In the absence of acknowledgment, the processing is considered in error. If the consumer method receives a Record or a payload, the message will be acked on method return, also known as Strategy.POST_PROCESSING. If the consumer method returns another reactive stream or CompletionStage, the message will be acked when the downstream message is acked. You can override the default behavior to ack the message on arrival (Strategy.PRE_PROCESSING), or do not ack the message at all (Strategy.NONE) on the consumer method as in the following example:

@Incoming("prices")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public void process(double price) {
    // process price
}

如果使用者方法接收 {@s17},则确认策略为 {@s18},并且使用者方法负责确认/拒绝确认该消息。

If the consumer method receives a Message, the acknowledgment strategy is Strategy.MANUAL and the consumer method is in charge of ack/nack the message.

@Incoming("prices")
public CompletionStage<Void> process(Message<Double> msg) {
    // process price
    return msg.ack();
}

如上所述,该方法还可以将确认策略覆盖为 {@s19} 或 {@s20}。

As mentioned above, the method can also override the acknowledgment strategy to PRE_PROCESSING or NONE.

Commit Strategies

从一个 Kafka 记录生成的消息确认后,连接器将调用一个提交策略。这些策略决定某个主题/分区的使用者的偏移量何时提交。提交偏移量表明所有以前的记录都已处理。这也是应用程序在崩溃恢复或重新启动后重新启动处理的位置。

When a message produced from a Kafka record is acknowledged, the connector invokes a commit strategy. These strategies decide when the consumer offset for a specific topic/partition is committed. Committing an offset indicates that all previous records have been processed. It is also the position where the application would restart the processing after a crash recovery or a restart.

提交每个偏移量都会有性能损失,因为 Kafka 偏移量管理可能很慢。但是,如果在两个提交之间应用程序崩溃,则不经常提交偏移量可能会导致消息重复。

Committing every offset has performance penalties as Kafka offset management can be slow. However, not committing the offset often enough may lead to message duplication if the application crashes between two commits.

Kafka 连接器支持三种策略:

The Kafka connector supports three strategies:

  • throttled keeps track of received messages and commits an offset of the latest acked message in sequence (meaning, all previous messages were also acked). This strategy guarantees at-least-once delivery even if the channel performs asynchronous processing. The connector tracks the received records and periodically (period specified by auto.commit.interval.ms, default: 5000 ms) commits the highest consecutive offset. The connector will be marked as unhealthy if a message associated with a record is not acknowledged in throttled.unprocessed-record-max-age.ms (default: 60000 ms). Indeed, this strategy cannot commit the offset as soon as a single record processing fails. If throttled.unprocessed-record-max-age.ms is set to less than or equal to 0, it does not perform any health check verification. Such a setting might lead to running out of memory if there are "poison pill" messages (that are never acked). This strategy is the default if enable.auto.commit is not explicitly set to true.

  • checkpoint allows persisting consumer offsets on a state store, instead of committing them back to the Kafka broker. Using the CheckpointMetadata API, consumer code can persist a processing state with the record offset to mark the progress of a consumer. When the processing continues from a previously persisted offset, it seeks the Kafka consumer to that offset and also restores the persisted state, continuing the stateful processing from where it left off. The checkpoint strategy holds locally the processing state associated with the latest offset, and persists it periodically to the state store (period specified by auto.commit.interval.ms (default: 5000)). The connector will be marked as unhealthy if no processing state is persisted to the state store in checkpoint.unsynced-state-max-age.ms (default: 10000). If checkpoint.unsynced-state-max-age.ms is set to less than or equal to 0, it does not perform any health check verification. For more information, see Stateful processing with Checkpointing

  • latest commits the record offset received by the Kafka consumer as soon as the associated message is acknowledged (if the offset is higher than the previously committed offset). This strategy provides at-least-once delivery if the channel processes the message without performing any asynchronous processing. Specifically, the offset of the most recent acknowledged message will always be committed, even if older messages have not finished being processed. In case of an incident such as a crash, processing would restart after the last commit, leading to older messages never being successfully and fully processed, which would appear as message loss. This strategy should not be used in high load environment, as offset commit is expensive. However, it reduces the risk of duplicates.

  • ignore performs no commit. This strategy is the default strategy when the consumer is explicitly configured with enable.auto.commit to true. It delegates the offset commit to the underlying Kafka client. When enable.auto.commit is true this strategy DOES NOT guarantee at-least-once delivery. SmallRye Reactive Messaging processes records asynchronously, so offsets may be committed for records that have been polled but not yet processed. In case of a failure, only records that were not committed yet will be re-processed.

Kafka 连接器在未显式启用时禁用 Kafka 自动提交。此行为不同于传统的 Kafka 使用者。如果您注重高吞吐量且不受下游限制,我们建议您:

The Kafka connector disables the Kafka auto commit when it is not explicitly enabled. This behavior differs from the traditional Kafka consumer. If high throughput is important for you, and you are not limited by the downstream, we recommend to either:

  • use the throttled policy,

  • or set enable.auto.commit to true and annotate the consuming method with @Acknowledgment(Acknowledgment.Strategy.NONE).

Smallrye Reactive Messaging 允许实现自定义提交策略。有关更多信息,请参阅 SmallRye Reactive Messaging documentation

Smallrye Reactive Messaging enables implementing custom commit strategies. See SmallRye Reactive Messaging documentation for more information.

Error Handling Strategies

如果从 Kafka 记录产生的消息被否定确认,将应用故障策略。Kafka 连接器支持三种策略:

If a message produced from a Kafka record is nacked, a failure strategy is applied. The Kafka connector supports three strategies:

  • fail: fail the application, no more records will be processed (default strategy). The offset of the record that has not been processed correctly is not committed.

  • ignore: the failure is logged, but the processing continue. The offset of the record that has not been processed correctly is committed.

  • dead-letter-queue: the offset of the record that has not been processed correctly is committed, but the record is written to a Kafka dead letter topic.

使用 failure-strategy 属性选择策略。

The strategy is selected using the failure-strategy attribute.

dead-letter-queue 的情况下,您可以配置以下属性:

In the case of dead-letter-queue, you can configure the following attributes:

  • dead-letter-queue.topic: the topic to use to write the records not processed correctly, default is dead-letter-topic-$channel, with $channel being the name of the channel.

  • dead-letter-queue.key.serializer: the serializer used to write the record key on the dead letter queue. By default, it deduces the serializer from the key deserializer.

  • dead-letter-queue.value.serializer: the serializer used to write the record value on the dead letter queue. By default, it deduces the serializer from the value deserializer.

死信队列上写入的记录包含一组有关原始记录的其他标头:

The record written on the dead letter queue contains a set of additional headers about the original record:

  • dead-letter-reason: the reason of the failure

  • dead-letter-cause: the cause of the failure if any

  • dead-letter-topic: the original topic of the record

  • dead-letter-partition: the original partition of the record (integer mapped to String)

  • dead-letter-offset: the original offset of the record (long mapped to String)

Smallrye Reactive Messaging 可以实现自定义故障策略。请参见 SmallRye Reactive Messaging documentation 了解更多信息。

Smallrye Reactive Messaging enables implementing custom failure strategies. See SmallRye Reactive Messaging documentation for more information.

Retrying processing

你可以将 Reactive Messaging 与 SmallRye Fault Tolerance 相结合,并在处理失败后重试:

You can combine Reactive Messaging with SmallRye Fault Tolerance, and retry processing if it failed:

@Incoming("kafka")
@Retry(delay = 10, maxRetries = 5)
public void consume(String v) {
   // ... retry if this method throws an exception
}

你可以配置延迟、重试次数、抖动等。

You can configure the delay, the number of retries, the jitter, etc.

如果你的方法返回 Uni`或 `CompletionStage,则需要添加 `@NonBlocking`注解:

If your method returns a Uni or CompletionStage, you need to add the @NonBlocking annotation:

@Incoming("kafka")
@Retry(delay = 10, maxRetries = 5)
@NonBlocking
public Uni<String> consume(String v) {
   // ... retry if this method throws an exception or the returned Uni produce a failure
}

只有在 SmallRye 容错机制 5.1.0 及更早版本中才需要 `@NonBlocking`注解。从 SmallRye 容错机制 5.2.0 版(自 Quarkus 2.1.0.Final 起可用)开始,此注解已不再需要。请参见 SmallRye Fault Tolerance documentation 了解更多信息。

The @NonBlocking annotation is only required with SmallRye Fault Tolerance 5.1.0 and earlier. Starting with SmallRye Fault Tolerance 5.2.0 (available since Quarkus 2.1.0.Final), it is not necessary. See SmallRye Fault Tolerance documentation for more information.

传入消息仅在处理成功后才被确认。因此,它将在成功处理后提交偏移量。如果处理仍然失败(即使在所有重试之后),消息将 nacked 且应用故障策略。

The incoming messages are acknowledged only once the processing completes successfully. So, it commits the offset after the successful processing. If the processing still fails, even after all retries, the message is nacked and the failure strategy is applied.

Handling Deserialization Failures

当发生反序列化故障时,你可以进行拦截并提供故障策略。要实现此目的,你需要创建一个实现 DeserializationFailureHandler<T> 接口的 Bean:

When a deserialization failure occurs, you can intercept it and provide a failure strategy. To achieve this, you need to create a bean implementing DeserializationFailureHandler<T> interface:

@ApplicationScoped
@Identifier("failure-retry") // Set the name of the failure handler
public class MyDeserializationFailureHandler
    implements DeserializationFailureHandler<JsonObject> { // Specify the expected type

    @Override
    public JsonObject decorateDeserialization(Uni<JsonObject> deserialization, String topic, boolean isKey,
            String deserializer, byte[] data, Headers headers) {
        return deserialization
                    .onFailure().retry().atMost(3)
                    .await().atMost(Duration.ofMillis(200));
    }
}

要使用此故障处理程序,该 Bean 必须使用 @Identifier`限定符公开,且连接器配置必须指定属性 `mp.messaging.incoming.$channel.[key|value]-deserialization-failure-handler(对于键或值反序列化器)。

To use this failure handler, the bean must be exposed with the @Identifier qualifier and the connector configuration must specify the attribute mp.messaging.incoming.$channel.[key|value]-deserialization-failure-handler (for key or value deserializers).

处理程序使用反序列化的详细信息进行调用,包括表示为 `Uni<T>`的动作。在反序列化 `Uni`故障中,可以实施诸如重试、提供回退值或应用超时的故障策略。

The handler is called with details of the deserialization, including the action represented as Uni<T>. On the deserialization Uni failure strategies like retry, providing a fallback value or applying timeout can be implemented.

如果你没有配置反序列化故障处理程序,而发生了反序列化故障,则此应用程序被标记为不正常。你也可以忽略故障,这将记录异常并生成一个 null 值。要启用此行为,请将 mp.messaging.incoming.$channel.fail-on-deserialization-failure 属性设为 false

If you don’t configure a deserialization failure handler and a deserialization failure happens, the application is marked unhealthy. You can also ignore the failure, which will log the exception and produce a null value. To enable this behavior, set the mp.messaging.incoming.$channel.fail-on-deserialization-failure attribute to false.

如果 fail-on-deserialization-failure 属性设为 false`且 `failure-strategy 属性为 dead-letter-queue,则失败记录将发送到相应的 *dead letter queue*主题。

If the fail-on-deserialization-failure attribute is set to false and the failure-strategy attribute is dead-letter-queue the failed record will be sent to the corresponding dead letter queue topic.

Consumer Groups

在 Kafka 中,消费者组是一组共同合作以从主题消费数据的消费者。主题分为一组分区。主题的分区在组中的消费者之间分配,从而有效地扩展了消费吞吐量。请注意,每个分区都分配给组中单个消费者。但是,如果分区数大于组中的消费者数,则可以将多个分区分配给一个消费者。

In Kafka, a consumer group is a set of consumers which cooperate to consume data from a topic. A topic is divided into a set of partitions. The partitions of a topic are assigned among the consumers in the group, effectively allowing to scale consumption throughput. Note that each partition is assigned to a single consumer from a group. However, a consumer can be assigned multiple partitions if the number of partitions is greater than the number of consumer in the group.

我们简单探索一下不同的生产者/消费者模式以及如何使用 Quarkus 来实现这些模式:

Let’s explore briefly different producer/consumer patterns and how to implement them using Quarkus:

  1. Single consumer thread inside a consumer group[.iokays-translated-80980b07dbaf7da9969df5f3e84806b2] 这是订阅 Kafka 主题的应用程序的默认行为:每个 Kafka 连接器都将创建一个消费者线程并将其放在单个消费者组中。消费者组 ID 默认为应用程序名称,由 quarkus.application.name 配置属性设置。还可使用 kafka.group.id 属性进行设置。

This is the default behavior of an application subscribing to a Kafka topic: Each Kafka connector will create a single consumer thread and place it inside a single consumer group. Consumer group id defaults to the application name as set by the quarkus.application.name configuration property. It can also be set using the kafka.group.id property. image::kafka-one-app-one-consumer.png[] . Multiple consumer threads inside a consumer group[.iokays-translated-9d08116ac06c10a23be1de7281a3ff17] 对于给定的应用程序实例,可以使用 mp.messaging.incoming.$channel.concurrency 属性配置消费者组中的消费者数量。订阅主题的分区将在消费者线程之间进行划分。请注意,如果 concurrency 值超过主题的分区数,则不会将任何分区分配给某些消费者线程。

For a given application instance, the number of consumers inside the consumer group can be configured using mp.messaging.incoming.$channel.concurrency property. The partitions of the subscribed topic will be divided among the consumer threads. Note that if the concurrency value exceed the number of partitions of the topic, some consumer threads won’t be assigned any partitions. image::kafka-one-app-two-consumers.png[]

Deprecation

concurrency attribute为非阻塞并发通道提供了一种与连接器无关的方式,并替代了 Kafka 连接器特定的 partitions 属性。因此,partitions 属性已弃用,且将在未来版本中删除。

The concurrency attribute provides a connector agnostic way for non-blocking concurrent channels and replaces the Kafka connector specific partitions attribute. The partitions attribute is therefore deprecated and will be removed in future releases.

  1. Multiple consumer applications inside a consumer group[.iokays-translated-dc593b494af0c98e88a95593a69dc2da] 类似于前面的示例,某个应用程序的多实例可以订阅一个消费者组,在 mp.messaging.incoming.$channel.group.id 属性中配置该组或将其保留为应用程序名称。这反过来会将主题的分区分配给各个应用程序实例。

Similar to the previous example, multiple instances of an application can subscribe to a single consumer group, configured via mp.messaging.incoming.$channel.group.id property, or left default to the application name. This in turn will divide partitions of the topic among application instances. image::kafka-two-app-one-consumer-group.png[] . Pub/Sub: Multiple consumer groups subscribed to a topic[.iokays-translated-231c2a396fded5fd90219976436b01d1] 最后,使用不同的 consumer group ids,不同的应用程序可以独立地订阅相同的主题。例如,发布到称为 orders 的主题的消息可以在两个使用者应用程序上独立地消费,一个使用 mp.messaging.incoming.orders.group.id=invoicing,另一个使用 mp.messaging.incoming.orders.group.id=shipping。因此,不同的消费者组可以根据消息消费要求独立地进行扩展。

Lastly different applications can subscribe independently to same topics using different consumer group ids. For example, messages published to a topic called orders can be consumed independently on two consumer applications, one with mp.messaging.incoming.orders.group.id=invoicing and second with mp.messaging.incoming.orders.group.id=shipping. Different consumer groups can thus scale independently according to the message consumption requirements. image::kafka-two-app-two-consumer-groups.png[]

一个常见的业务要求是按顺序使用和处理 Kafka 记录。Kafka 代理在分区内保留记录的顺序,而不是在主题内。因此,考虑如何在主题内分区记录非常重要。默认分区器使用记录键哈希计算一个记录的分区,或者在未定义键时,为每批记录或随机选择一个分区。

A common business requirement is to consume and process Kafka records in order. The Kafka broker preserves order of records inside a partition and not inside a topic. Therefore, it is important to think about how records are partitioned inside a topic. The default partitioner uses record key hash to compute the partition for a record, or when the key is not defined, chooses a partition randomly per batch or records.

在正常操作期间,Kafka 使用者会在分配给该使用者的每个分区内保留记录的顺序。Smallrye Reactive Messaging 保留了此处理顺序,除非使用了 @Blocking(ordered = false)(请参阅 Blocking processing)。

During normal operation, a Kafka consumer preserves the order of records inside each partition assigned to it. Smallrye Reactive Messaging keeps this order for processing, unless @Blocking(ordered = false) is used (see Blocking processing).

请注意,由于使用者重新平衡,Kafka 使用者只能保证单个记录的一次或多次处理,这意味着未提交的记录 can 可以由使用者再次处理。

Note that due to consumer rebalances, Kafka consumers only guarantee at-least-once processing of single records, meaning that uncommitted records can be processed again by consumers.

Consumer Rebalance Listener

在消费者组内,随着新组成员的加入和旧成员的退出,分区会重新分配,以便每个成员接收分区的分摊份额。这称为重新平衡组。若要自行处理偏移提交和分配的分区,可以提供消费者重新平衡侦听器。为此,请实现 io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener 接口并将其作为带有 @Idenfier 限定符的 CDI Bean 公开。一个常见的用例是在单独的数据存储中存储偏移以实现完全一次语义,或在特定偏移处开始处理。

Inside a consumer group, as new group members arrive and old members leave, the partitions are re-assigned so that each member receives a proportional share of the partitions. This is known as rebalancing the group. To handle offset commit and assigned partitions yourself, you can provide a consumer rebalance listener. To achieve this, implement the io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener interface and expose it as a CDI bean with the @Idenfier qualifier. A common use case is to store offset in a separate data store to implement exactly-once semantic, or starting the processing at a specific offset.

每次使用者主题/分区分配发生更改时,就会调用侦听器。例如,当应用程序启动时,它会使用与使用者关联的初始主题/分区集调用 partitionsAssigned 回调。如果稍后此集合发生更改,它会再次调用 partitionsRevokedpartitionsAssigned 回调,以便可以实现自定义逻辑。

The listener is invoked every time the consumer topic/partition assignment changes. For example, when the application starts, it invokes the partitionsAssigned callback with the initial set of topics/partitions associated with the consumer. If, later, this set changes, it calls the partitionsRevoked and partitionsAssigned callbacks again, so you can implement custom logic.

请注意,重新平衡侦听器方法是从 Kafka 轮询线程调用的,并且在完成之前 will 会阻塞调用者线程。这是因为重新平衡协议有同步障碍,并且在重新平衡侦听器中使用异步代码可能在同步障碍后执行。

Note that the rebalance listener methods are called from the Kafka polling thread and will block the caller thread until completion. That’s because the rebalance protocol has synchronization barriers, and using asynchronous code in a rebalance listener may be executed after the synchronization barrier.

从使用者处分配或撤销主题/分区时,它会暂停消息传递并在重新平衡完成后恢复。

When topics/partitions are assigned or revoked from a consumer, it pauses the message delivery and resumes once the rebalance completes.

如果重新平衡侦听器代表用户处理偏移提交(使用 NONE 提交策略),则重新平衡侦听器必须在 partitionsRevoked 回调中同步提交偏移。我们还建议在应用程序停止时应用相同的逻辑。

If the rebalance listener handles offset commit on behalf of the user (using the NONE commit strategy), the rebalance listener must commit the offset synchronously in the partitionsRevoked callback. We also recommend applying the same logic when the application stops.

与 Apache Kafka 中的 ConsumerRebalanceListener 不同, io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener 方法会传递 Kafka 使用者和主题/分区集合。

Unlike the ConsumerRebalanceListener from Apache Kafka, the io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener methods pass the Kafka Consumer and the set of topics/partitions.

在以下示例中,我们设置了一个使用者,该使用者始终从最多 10 分钟前的消息(或偏移 0)开始。首先,我们需要提供一个实现 io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener 并标注为 io.smallrye.common.annotation.Identifier 的 Bean。然后,我们必须将入站连接器配置为使用此 Bean。

In the following example we set up a consumer that always starts on messages from at most 10 minutes ago (or offset 0). First we need to provide a bean that implements io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener and is annotated with io.smallrye.common.annotation.Identifier. We then must configure our inbound connector to use this bean.

package inbound;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.TopicPartition;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;

@ApplicationScoped
@Identifier("rebalanced-example.rebalancer")
public class KafkaRebalancedConsumerRebalanceListener implements KafkaConsumerRebalanceListener {

    private static final Logger LOGGER = Logger.getLogger(KafkaRebalancedConsumerRebalanceListener.class.getName());

    /**
     * When receiving a list of partitions, will search for the earliest offset within 10 minutes
     * and seek the consumer to it.
     *
     * @param consumer   underlying consumer
     * @param partitions set of assigned topic partitions
     */
    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        long now = System.currentTimeMillis();
        long shouldStartAt = now - 600_000L; //10 minute ago

        Map<TopicPartition, Long> request = new HashMap<>();
        for (TopicPartition partition : partitions) {
            LOGGER.info("Assigned " + partition);
            request.put(partition, shouldStartAt);
        }
        Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(request);
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> position : offsets.entrySet()) {
            long target = position.getValue() == null ? 0L : position.getValue().offset();
            LOGGER.info("Seeking position " + target + " for " + position.getKey());
            consumer.seek(position.getKey(), target);
        }
    }

}
package inbound;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class KafkaRebalancedConsumer {

    @Incoming("rebalanced-example")
    @Acknowledgment(Acknowledgment.Strategy.NONE)
    public CompletionStage<Void> consume(Message<ConsumerRecord<Integer, String>> message) {
        // We don't need to ACK messages because in this example,
        // we set offset during consumer rebalance
        return CompletableFuture.completedFuture(null);
    }

}

若要将入站连接器配置为使用提供的侦听器,可以设置消费者重新平衡侦听器的标识符: mp.messaging.incoming.rebalanced-example.consumer-rebalance-listener.name=rebalanced-example.rebalancer

To configure the inbound connector to use the provided listener, we either set the consumer rebalance listener’s identifier: mp.messaging.incoming.rebalanced-example.consumer-rebalance-listener.name=rebalanced-example.rebalancer

或者让侦听器的名称与组 ID 相同:

Or have the listener’s name be the same as the group id:

mp.messaging.incoming.rebalanced-example.group.id=rebalanced-example.rebalancer

设置消费者重新平衡侦听器的名称优先于使用组 ID。

Setting the consumer rebalance listener’s name takes precedence over using the group id.

Using unique consumer groups

如果您要处理主题中的所有记录(从其开头开始),您需要:

If you want to process all the records from a topic (from its beginning), you need:

  1. to set auto.offset.reset = earliest

  2. assign your consumer to a consumer group not used by any other application.

Quarkus 生成了两个执行之间更改的 UUID(包括在开发模式中)。因此,您确保没有其他消费者使用它,并且每次应用程序启动时,您都会收到新的唯一组 ID。

Quarkus generates a UUID that changes between two executions (including in dev mode). So, you are sure no other consumer uses it, and you receive a new unique group id every time your application starts.

您可以使用生成的 UUID 作为消费者组,如下所示:

You can use that generated UUID as the consumer group as follows:

mp.messaging.incoming.your-channel.auto.offset.reset=earliest
mp.messaging.incoming.your-channel.group.id=${quarkus.uuid}

如果未设置 group.id 属性,它将默认 quarkus.application.name 配置属性。

If the group.id attribute is not set, it defaults the quarkus.application.name configuration property.

Manual topic-partition assignment

assign-seek 通道属性允许将主题分区手动分配到 Kafka 输入通道,并可以选择在分区中寻求特定偏移量以开始使用记录。如果使用 assign-seek 则消费者不会动态订阅主题,而是会静态分配所描述的分区。在手动主题分区重新平衡中不会发生,因此不会调用重新平衡侦听器。

The assign-seek channel attribute allows manually assigning topic-partitions to a Kafka incoming channel, and optionally seek to a specified offset in the partition to start consuming records. If assign-seek is used, the consumer will not be dynamically subscribed to topics, but instead will statically assign the described partitions. In manual topic-partition rebalancing doesn’t happen and therefore rebalance listeners are never called.

该属性采用一个逗号分隔的三元组列表: <topic>:<partition>:<offset>

The attribute takes a list of triplets separated by commas: <topic>:<partition>:<offset>.

例如,配置

For example, the configuration

mp.messaging.incoming.data.assign-seek=topic1:0:10, topic2:1:20

将消费者分配给:

assigns the consumer to:

  • Partition 0 of topic 'topic1', setting the initial position at offset 10.

  • Partition 1 of topic 'topic2', setting the initial position at offset 20.

每个三元组中的主题、分区和偏移量可以有以下变化:

The topic, partition, and offset in each triplet can have the following variations:

  • If the topic is omitted, the configured topic will be used.

  • If the offset is omitted, partitions are assigned to the consumer but won’t be sought to offset.

  • If offset is 0, it seeks to the beginning of the topic-partition.

  • If offset is -1, it seeks to the end of the topic-partition.

Receiving Kafka Records in Batches

默认情况下,传入方法逐个接收每个 Kafka 记录。本质上,Kafka 消费者客户端不停地轮询代理并接收批量记录,这些记录显示在 ConsumerRecords 容器内。

By default, incoming methods receive each Kafka record individually. Under the hood, Kafka consumer clients poll the broker constantly and receive records in batches, presented inside the ConsumerRecords container.

batch 模式下,您的应用程序可以一次性接收消费者 poll 返回的所有记录。

In batch mode, your application can receive all the records returned by the consumer poll in one go.

要实现此目标,您需要指定一个兼容容器类型来接收所有数据:

To achieve this you need to specify a compatible container type to receive all the data:

@Incoming("prices")
public void consume(List<Double> prices) {
    for (double price : prices) {
        // process price
    }
}

传入方法还可以接收 Message<List<Payload>>、`Message<ConsumerRecords<Key, Payload>>`和 `ConsumerRecords<Key, Payload>`类型。它们提供对偏移量或时间戳等记录详细信息的访问:

The incoming method can also receive Message<List<Payload>>, Message<ConsumerRecords<Key, Payload>>, and ConsumerRecords<Key, Payload> types. They give access to record details such as offset or timestamp:

@Incoming("prices")
public CompletionStage<Void> consumeMessage(Message<ConsumerRecords<String, Double>> records) {
    for (ConsumerRecord<String, Double> record : records.getPayload()) {
        String payload = record.getPayload();
        String topic = record.getTopic();
        // process messages
    }
    // ack will commit the latest offsets (per partition) of the batch.
    return records.ack();
}

请注意,成功处理传入记录批处理将会提交批处理内接收到的每个分区的最新偏移量。将仅对这些记录应用已配置的提交策略。

Note that the successful processing of the incoming record batch will commit the latest offsets for each partition received inside the batch. The configured commit strategy will be applied for these records only.

相反,如果处理抛出异常,则所有消息为 nacked,对批处理内的所有记录应用故障策略。

Conversely, if the processing throws an exception, all messages are nacked, applying the failure strategy for all the records inside the batch.

Quarkus 会自动检测传入通道的批处理类型并自动设置批处理配置。您可以使用 `mp.messaging.incoming.$channel.batch`属性显式配置批处理模式。

Quarkus autodetects batch types for incoming channels and sets batch configuration automatically. You can configure batch mode explicitly with mp.messaging.incoming.$channel.batch property.

Stateful processing with Checkpointing

`checkpoint`提交策略是一项实验性功能,将来可能会更改。

The checkpoint commit strategy is an experimental feature and can change in the future.

Smallrye Reactive Messaging `checkpoint`提交策略允许消费者应用程序以有状态方式处理消息,同时还遵循 Kafka 消费者可扩展性。具有 `checkpoint`提交策略的传入通道在外部 state store上持久化消费者偏移量,例如关系数据库或键值存储。作为处理已消费记录的结果,消费者应用程序可以为分配给 Kafka 消费者每个主题分区累积内部状态。此本地状态将定期持久化到状态存储,并将与产生它的记录的偏移量关联。

Smallrye Reactive Messaging checkpoint commit strategy allows consumer applications to process messages in a stateful manner, while also respecting Kafka consumer scalability. An incoming channel with checkpoint commit strategy persists consumer offsets on an external state-stores, such as a relational database or a key-value store. As a result of processing consumed records, the consumer application can accumulate an internal state for each topic-partition assigned to the Kafka consumer. This local state will be periodically persisted to the state store and will be associated with the offset of the record that produced it.

此策略不会向 Kafka 代理提交任何偏移量,因此当新分区被分配给消费者(即消费者重新启动或消费者组实例扩展)时,消费者从具有其保存状态的最新 _checkpointed_偏移量恢复处理。

This strategy does not commit any offsets to the Kafka broker, so when new partitions get assigned to the consumer, i.e. consumer restarts or consumer group instances scale, the consumer resumes the processing from the latest checkpointed offset with its saved state.

`@Incoming`通道消费者代码可以通过 `CheckpointMetadata`API 操作处理状态。例如,计算在 Kafka 主题上接收的价格移动平均值的消费者如下所示:

The @Incoming channel consumer code can manipulate the processing state through the CheckpointMetadata API. For example, a consumer calculating the moving average of prices received on a Kafka topic would look the following:

package org.acme;

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;

@ApplicationScoped
public class MeanCheckpointConsumer {

    @Incoming("prices")
    public CompletionStage<Void> consume(Message<Double> record) {
        // Get the `CheckpointMetadata` from the incoming message
        CheckpointMetadata<AveragePrice> checkpoint = CheckpointMetadata.fromMessage(record);

        // `CheckpointMetadata` allows transforming the processing state
        // Applies the given function, starting from the value `0.0` when no previous state exists
        checkpoint.transform(new AveragePrice(), average -> average.update(record.getPayload()), /* persistOnAck */ true);

        // `persistOnAck` flag set to true, ack will persist the processing state
        // associated with the latest offset (per partition).
        return record.ack();
    }

    static class AveragePrice {
        long count;
        double mean;

        AveragePrice update(double newPrice) {
            mean += ((newPrice - mean) / ++count);
            return this;
        }
    }
}

`transform`方法将转换函数应用于当前状态,产生更改后的状态并在本地注册以进行检查点。默认情况下,本地状态将定期持久化到状态存储,该时间段由 `auto.commit.interval.ms`指定(默认值:5000)。如果给定 `persistOnAck`标志,则在消息确认时将最新状态急切地持久化到状态存储。`setNext`方法类似地直接设置最新状态。

The transform method applies the transformation function to the current state, producing a changed state and registering it locally for checkpointing. By default, the local state is persisted to the state store periodically, period specified by auto.commit.interval.ms, (default: 5000). If persistOnAck flag is given, the latest state is persisted to the state store eagerly on message acknowledgment. The setNext method works similarly directly setting the latest state.

检查点提交策略跟踪处理状态上次为每个主题分区持久化的时刻。如果未完成的状态更改无法持久化 checkpoint.unsynced-state-max-age.ms(默认值:10000),则通道将标记为不正常。

The checkpoint commit strategy tracks when a processing state is last persisted for each topic-partition. If an outstanding state change can not be persisted for checkpoint.unsynced-state-max-age.ms (default: 10000), the channel is marked unhealthy.

State stores

状态存储实现决定了处理状态在何处以及如何持久化。这是由 `mp.messaging.incoming.[channel-name].checkpoint.state-store`属性配置的。状态对象的序列化取决于状态存储实现。为了指导状态存储进行序列化,可能需要使用 `mp.messaging.incoming.[channel-name].checkpoint.state-type`属性配置状态对象类的名称。

State store implementations determine where and how the processing states are persisted. This is configured by the mp.messaging.incoming.[channel-name].checkpoint.state-store property. The serialization of state objects depends on the state store implementation. In order to instruct state stores for serialization can require configuring the class name of state objects using mp.messaging.incoming.[channel-name].checkpoint.state-type property.

Quarkus 提供以下状态存储实现:

Quarkus provides following state store implementations:

  • quarkus-redis: Uses the quarkus-redis-client extension to persist processing states. Jackson is used to serialize processing state in Json. For complex objects it is required to configure the checkpoint.state-type property with the class name of the object. By default, the state store uses the default redis client, but if a named client is to be used, the client name can be specified using the mp.messaging.incoming.[channel-name].checkpoint.quarkus-redis.client-name property. Processing states will be stored in Redis using the key naming scheme [consumer-group-id]:[topic]:[partition].

例如,下面是前一个代码的配置内容:

For example the configuration of the previous code would be the following:

mp.messaging.incoming.prices.group.id=prices-checkpoint
# ...
mp.messaging.incoming.prices.commit-strategy=checkpoint
mp.messaging.incoming.prices.checkpoint.state-store=quarkus-redis
mp.messaging.incoming.prices.checkpoint.state-type=org.acme.MeanCheckpointConsumer.AveragePrice
# ...
# if using a named redis client
mp.messaging.incoming.prices.checkpoint.quarkus-redis.client-name=my-redis
quarkus.redis.my-redis.hosts=redis://localhost:7000
quarkus.redis.my-redis.password=<redis-pwd>
  • quarkus-hibernate-reactive: Uses the quarkus-hibernate-reactive extension to persist processing states. Processing state objects are required to be a Jakarta Persistence entity and extend the CheckpointEntity class, which handles object identifiers composed of the consumer group id, topic and partition. Therefore, the class name of the entity needs to be configured using the checkpoint.state-type property.

例如,下面是前一个代码的配置内容:

For example the configuration of the previous code would be the following:

mp.messaging.incoming.prices.group.id=prices-checkpoint
# ...
mp.messaging.incoming.prices.commit-strategy=checkpoint
mp.messaging.incoming.prices.checkpoint.state-store=quarkus-hibernate-reactive
mp.messaging.incoming.prices.checkpoint.state-type=org.acme.AveragePriceEntity

其中 `AveragePriceEntity`是扩展 `CheckpointEntity`的 Jakarta Persistence 实体:

With AveragePriceEntity being a Jakarta Persistence entity extending CheckpointEntity:

package org.acme;

import jakarta.persistence.Entity;

import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntity;

@Entity
public class AveragePriceEntity extends CheckpointEntity {
    public long count;
    public double mean;

    public AveragePriceEntity update(double newPrice) {
        mean += ((newPrice - mean) / ++count);
        return this;
    }
}
  • quarkus-hibernate-orm: Uses the quarkus-hibernate-orm extension to persist processing states. It is similar to the previous state store, but it uses Hibernate ORM instead of Hibernate Reactive.

配置后,它可以为检查点状态存储使用命名的 persistence-unit

When configured, it can use a named persistence-unit for the checkpointing state store:

mp.messaging.incoming.prices.commit-strategy=checkpoint
mp.messaging.incoming.prices.checkpoint.state-store=quarkus-hibernate-orm
mp.messaging.incoming.prices.checkpoint.state-type=org.acme.AveragePriceEntity
mp.messaging.incoming.prices.checkpoint.quarkus-hibernate-orm.persistence-unit=prices
# ... Setup "prices" persistence unit
quarkus.datasource."prices".db-kind=postgresql
quarkus.datasource."prices".username=<your username>
quarkus.datasource."prices".password=<your password>
quarkus.datasource."prices".jdbc.url=jdbc:postgresql://localhost:5432/hibernate_orm_test
quarkus.hibernate-orm."prices".datasource=prices
quarkus.hibernate-orm."prices".packages=org.acme

有关如何实现自定义状态存储的说明,请参阅 Implementing State Stores

For instructions on how to implement custom state stores, see Implementing State Stores.

Sending messages to Kafka

Kafka 连接器传出渠道的配置类似于传入渠道:

Configuration for the Kafka connector outgoing channels is similar to that of incoming:

%prod.kafka.bootstrap.servers=kafka:9092 1
mp.messaging.outgoing.prices-out.connector=smallrye-kafka 2
mp.messaging.outgoing.prices-out.topic=prices 3
1 Configure the broker location for the production profile. You can configure it globally or per channel using mp.messaging.outgoing.$channel.bootstrap.servers property. In dev mode and when running tests, Dev Services for Kafka automatically starts a Kafka broker. When not provided, this property defaults to localhost:9092.
2 Configure the connector to manage the prices-out channel.
3 By default, the topic name is same as the channel name. You can configure the topic attribute to override it.

在应用程序配置中,渠道名称是唯一名称。因此,如果您希望在相同主题上配置传出和传入渠道,您需要对渠道使用不同的名称(例如本指南中的示例,mp.messaging.incoming.prices`和 `mp.messaging.outgoing.prices-out)。

Inside application configuration, channel names are unique. Therefore, if you’d like to configure an incoming and outgoing channel on the same topic, you will need to name channels differently (like in the examples of this guide, mp.messaging.incoming.prices and mp.messaging.outgoing.prices-out).

然后,您的应用程序可以生成消息,并将其发布到 `prices-out`渠道。它可以使用 `double`有效负载,如下面的代码段所示:

Then, your application can generate messages and publish them to the prices-out channel. It can use double payloads as in the following snippet:

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;

@ApplicationScoped
public class KafkaPriceProducer {

    private final Random random = new Random();

    @Outgoing("prices-out")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> random.nextDouble());
    }

}

您不得直接从代码中调用用 `@Incoming`和/或 `@Outgoing`注释的方法。它们是由框架调用的。让用户代码调用它们无法达到预期的结果。

You should not call methods annotated with @Incoming and/or @Outgoing directly from your code. They are invoked by the framework. Having user code invoking them would not have the expected outcome.

请注意,generate`方法返回了 `Multi<Double>,它会实现 Reactive Streams `Publisher`接口。此发布者将由框架使用,以生成消息并将它们发送到已配置的 Kafka 主题。

Note that the generate method returns a Multi<Double>, which implements the Reactive Streams Publisher interface. This publisher will be used by the framework to generate messages and send them to the configured Kafka topic.

您不必返回有效负载,可以返回 io.smallrye.reactive.messaging.kafka.Record,以发送键值对:

Instead of returning a payload, you can return a io.smallrye.reactive.messaging.kafka.Record to send key/value pairs:

@Outgoing("out")
public Multi<Record<String, Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
        .map(x -> Record.of("my-key", random.nextDouble()));
}

有效负载可以封装在 `org.eclipse.microprofile.reactive.messaging.Message`中,以更好地控制写入记录:

Payload can be wrapped inside org.eclipse.microprofile.reactive.messaging.Message to have more control on the written records:

@Outgoing("generated-price")
public Multi<Message<Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> Message.of(random.nextDouble())
                    .addMetadata(OutgoingKafkaRecordMetadata.<String>builder()
                            .withKey("my-key")
                            .withTopic("my-key-prices")
                            .withHeaders(new RecordHeaders().add("my-header", "value".getBytes()))
                            .build()));
}

OutgoingKafkaRecordMetadata`允许设置 Kafka 记录的元数据属性,如 `keytopicpartition`或 `timestamp。一个用例是动态选择消息的目标主题。在这种情况下,您需要使用传出元数据设置主题的名称,而不是在应用程序配置文件中配置主题。

OutgoingKafkaRecordMetadata allows to set metadata attributes of the Kafka record, such as key, topic, partition or timestamp. One use case is to dynamically select the destination topic of a message. In this case, instead of configuring the topic inside your application configuration file, you need to use the outgoing metadata to set the name of the topic.

除了返回 Reactive Stream `Publisher`的方法签名(`Multi`是 `Publisher`的实现),传出方法还可以返回单个消息。在这种情况下,生产者将使用此方法作为生成器,以创建无限流。

Other than method signatures returning a Reactive Stream Publisher (Multi being an implementation of Publisher), outgoing method can also return single message. In this case the producer will use this method as generator to create an infinite stream.

@Outgoing("prices-out") T generate(); // T excluding void

@Outgoing("prices-out") Message<T> generate();

@Outgoing("prices-out") Uni<T> generate();

@Outgoing("prices-out") Uni<Message<T>> generate();

@Outgoing("prices-out") CompletionStage<T> generate();

@Outgoing("prices-out") CompletionStage<Message<T>> generate();

Sending messages with Emitter

有时,您需要一种直接发送消息的方式。

Sometimes, you need to have an imperative way of sending messages.

例如,如果您需要在 REST 端点中接收 POST 请求时,向流发送消息。在这种情况下,您无法使用 @Outgoing,因为您的方法具有参数。

For example, if you need to send a message to a stream when receiving a POST request inside a REST endpoint. In this case, you cannot use @Outgoing because your method has parameters.

为此,您可以使用 Emitter

For this, you can use an Emitter.

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.core.MediaType;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("price-create")
    Emitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        CompletionStage<Void> ack = priceEmitter.send(price);
    }
}

发送有效负载会返回 CompletionStage,该值在确认消息时完成。如果消息传输失败,则 `CompletionStage`将根据未确认的原因进行异常完成。

Sending a payload returns a CompletionStage, completed when the message is acked. If the message transmission fails, the CompletionStage is completed exceptionally with the reason of the nack.

`Emitter`配置与 `@Incoming`和 `@Outgoing`使用的其他流配置相同。

The Emitter configuration is done the same way as the other stream configuration used by @Incoming and @Outgoing.

使用 Emitter`时,您会将消息从直接代码发送到反应式消息传递。这些消息存储在队列中,直至它们被发送。如果 Kafka 生产者客户端无法跟上尝试发送到 Kafka 的消息,此队列就会占用内存,您甚至可能耗尽内存。您可以使用 `@OnOverflow`来配置反压策略。它允许您配置队列的大小(默认为 256),以及在达到缓冲区大小时要应用的策略。可用的策略为 `DROPLATESTFAILBUFFERUNBOUNDED_BUFFER`和 `NONE

Using the Emitter you are sending messages from your imperative code to reactive messaging. These messages are stored in a queue until they are sent. If the Kafka producer client can’t keep up with messages trying to be sent over to Kafka, this queue can become a memory hog and you may even run out of memory. You can use @OnOverflow to configure back-pressure strategy. It lets you configure the size of the queue (default is 256) and the strategy to apply when the buffer size is reached. Available strategies are DROP, LATEST, FAIL, BUFFER, UNBOUNDED_BUFFER and NONE.

利用 Emitter API,您还可以在 Message<T> 内封装传出有效负载。与之前的示例一样,Message 可让您以不同的方式处理 ack/nack 案例。

With the Emitter API, you can also encapsulate the outgoing payload inside Message<T>. As with the previous examples, Message lets you handle the ack/nack cases differently.

import java.util.concurrent.CompletableFuture;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.core.MediaType;

@Path("/prices")
public class PriceResource {

    @Inject @Channel("price-create") Emitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        priceEmitter.send(Message.of(price)
            .withAck(() -> {
                // Called when the message is acked
                return CompletableFuture.completedFuture(null);
            })
            .withNack(throwable -> {
                // Called when the message is nacked
                return CompletableFuture.completedFuture(null);
            }));
    }
}

如果您偏好使用 Reactive Stream API,那么您可以使用 MutinyEmitter,它将会从 send 方法中返回 Uni<Void>。因此,您可以使用 Mutiny API 处理下游消息和错误。

If you prefer using Reactive Stream APIs, you can use MutinyEmitter that will return Uni<Void> from the send method. You can therefore use Mutiny APIs for handling downstream messages and errors.

import org.eclipse.microprofile.reactive.messaging.Channel;

import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.core.MediaType;

import io.smallrye.reactive.messaging.MutinyEmitter;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("price-create")
    MutinyEmitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public Uni<String> addPrice(Double price) {
        return quoteRequestEmitter.send(price)
                .map(x -> "ok")
                .onFailure().recoverWithItem("ko");
    }
}

使用 sendAndAwait 方法时,还可以阻塞以将事件发送至发射器。它只会当接收器确认或否定事件时才从该方法返回。

It is also possible to block on sending the event to the emitter with the sendAndAwait method. It will only return from the method when the event is acked or nacked by the receiver.

Deprecation

io.smallrye.reactive.messaging.annotations.Emitterio.smallrye.reactive.messaging.annotations.Channelio.smallrye.reactive.messaging.annotations.OnOverflow 类现在已弃用并且被替换为:

The io.smallrye.reactive.messaging.annotations.Emitter, io.smallrye.reactive.messaging.annotations.Channel and io.smallrye.reactive.messaging.annotations.OnOverflow classes are now deprecated and replaced by:

  • org.eclipse.microprofile.reactive.messaging.Emitter

  • org.eclipse.microprofile.reactive.messaging.Channel

  • org.eclipse.microprofile.reactive.messaging.OnOverflow

新的 Emitter.send 方法返回在已生成的消息被确认时完成的 CompletionStage

The new Emitter.send method returns a CompletionStage completed when the produced message is acknowledged.

Deprecation

现弃用 MutinyEmitter#send(Message msg) 方法,建议采用接收 Message 来发放的以下方法:

MutinyEmitter#send(Message msg) method is deprecated in favor of following methods receiving Message for emitting:

  • <M extends Message<? extends T>> Uni<Void> sendMessage(M msg)

  • <M extends Message<? extends T>> void sendMessageAndAwait(M msg)

  • <M extends Message<? extends T>> Cancellable sendMessageAndForget(M msg)

关于如何使用 Emitter 的更多信息可在 SmallRye Reactive Messaging – Emitters and Channels 中找到。

More information on how to use Emitter can be found in SmallRye Reactive Messaging – Emitters and Channels

Write Acknowledgement

当 Kafka 代理接收记录时,取决于配置,其确认可能会花费时间。同样地,它会将无法写入的记录存储在内存中。

When Kafka broker receives a record, its acknowledgement can take time depending on the configuration. Also, it stores in-memory the records that cannot be written.

默认情况下,连接器会等待 Kafka 确认记录从而继续处理(确认已接收的消息)。您可以通过将 waitForWriteCompletion 属性设置为 false 来禁用此功能。

By default, the connector does wait for Kafka to acknowledge the record to continue the processing (acknowledging the received Message). You can disable this by setting the waitForWriteCompletion attribute to false.

请注意,acks 属性对记录确认有巨大影响。

Note that the acks attribute has a huge impact on the record acknowledgement.

如果无法写入记录,则将否定消息。

If a record cannot be written, the message is nacked.

Backpressure

Kafka 输出连接器处理反压,监视等待写入 Kafka 代理的飞行中消息数量。飞行中消息的数量使用 max-inflight-messages 属性进行配置,并且默认为 1024。

The Kafka outbound connector handles back-pressure, monitoring the number of in-flight messages waiting to be written to the Kafka broker. The number of in-flight messages is configured using the max-inflight-messages attribute and defaults to 1024.

连接器仅同时发送该数量的消息。其他任何消息都不会被发送,直至代理确认至少一个飞行中消息为止。随后,当代理中的一个飞行中消息被确认时,连接器会向 Kafka 编写新消息。务必相应地配置 Kafka 的 batch.sizelinger.ms

The connector only sends that amount of messages concurrently. No other messages will be sent until at least one in-flight message gets acknowledged by the broker. Then, the connector writes a new message to Kafka when one of the broker’s in-flight messages get acknowledged. Be sure to configure Kafka’s batch.size and linger.ms accordingly.

您还可以通过将 max-inflight-messages 设置为 0 来移除飞行中消息的限制。但是,请注意,如果请求的数量达到 max.in.flight.requests.per.connection,Kafka 生产者可能会阻塞。

You can also remove the limit of in-flight messages by setting max-inflight-messages to 0. However, note that the Kafka producer may block if the number of requests reaches max.in.flight.requests.per.connection.

Retrying message dispatch

当 Kafka 生产者从服务器接收错误时,如果错误是可恢复的瞬态错误,客户端会重试发送消息批处理。此行为受 retriesretry.backoff.ms 参数控制。此外,SmallRye Reactive Messaging 会根据 retriesdelivery.timeout.ms 参数,在可恢复错误发生时重试各个消息。

When the Kafka producer receives an error from the server, if it is a transient, recoverable error, the client will retry sending the batch of messages. This behavior is controlled by retries and retry.backoff.ms parameters. In addition to this, SmallRye Reactive Messaging will retry individual messages on recoverable errors, depending on the retries and delivery.timeout.ms parameters.

请注意,虽然在可靠系统中进行重试是最佳实践,但 max.in.flight.requests.per.connection 参数默认为 5,这意味着无法保证消息的顺序。如果您的用例必须保证消息顺序,将 max.in.flight.requests.per.connection 设置为 1 将确保一次发送一批消息,但会以限制生产者吞吐量为代价。

Note that while having retries in a reliable system is a best practice, the max.in.flight.requests.per.connection parameter defaults to 5, meaning that the order of the messages is not guaranteed. If the message order is a must for your use case, setting max.in.flight.requests.per.connection to 1 will make sure a single batch of messages is sent at a time, in the expense of limiting the throughput of the producer.

有关如何在处理错误时应用重试机制,请参阅 [retrying-processing] 部分。

For applying retry mechanism on processing errors, see the section on [retrying-processing].

Handling Serialization Failures

对于 Kafka 生产者客户端序列化失败不可恢复,因此不会重试消息发送。在这些情况下,您可能需要对序列化程序应用故障策略。如需实现这一点,您需要创建一个实现 SerializationFailureHandler<T> 接口的 Bean:

For Kafka producer client serialization failures are not recoverable, thus the message dispatch is not retried. In these cases you may need to apply a failure strategy for the serializer. To achieve this, you need to create a bean implementing SerializationFailureHandler<T> interface:

@ApplicationScoped
@Identifier("failure-fallback") // Set the name of the failure handler
public class MySerializationFailureHandler
    implements SerializationFailureHandler<JsonObject> { // Specify the expected type

    @Override
    public byte[] decorateSerialization(Uni<byte[]> serialization, String topic, boolean isKey,
        String serializer, Object data, Headers headers) {
        return serialization
                    .onFailure().retry().atMost(3)
                    .await().indefinitely();
    }
}

要使用此故障处理程序,必须使用 @Identifier 限定符公开 Bean,且连接器配置必须指定 mp.messaging.outgoing.$channel.[key|value]-serialization-failure-handler 属性(对于键或值序列化程序)。

To use this failure handler, the bean must be exposed with the @Identifier qualifier and the connector configuration must specify the attribute mp.messaging.outgoing.$channel.[key|value]-serialization-failure-handler (for key or value serializers).

处理程序会通过序列化详细信息调用,包括表示为 Uni<byte[]> 的操作。请注意,方法必须等待结果,并返回序列化的字节数组。

The handler is called with details of the serialization, including the action represented as Uni<byte[]>. Note that the method must await on the result and return the serialized byte array.

In-memory channels

在某些用例中,方便使用消息模式在同一应用程序内传输消息。当您未将通道连接到如 Kafka 这类消息传递后端时,所有内容都在内存中进行,流通过链接方法创建。每个链仍然是响应式流,并且强制执行背压协议。

In some use cases, it is convenient to use the messaging patterns to transfer messages inside the same application. When you don’t connect a channel to a messaging backend like Kafka, everything happens in-memory, and the streams are created by chaining methods together. Each chain is still a reactive stream and enforces the back-pressure protocol.

框架会验证生产者/消费者链是否完整,这意味着如果应用程序将消息写入内存中通道(使用仅带有 @Outgoing 的方法)或 Emitter,它还必须从应用程序中使用仅带有 @Incoming 的方法或使用非托管流)获取消息。

The framework verifies that the producer/consumer chain is complete, meaning that if the application writes messages into an in-memory channel (using a method with only @Outgoing, or an Emitter), it must also consume the messages from within the application (using a method with only @Incoming or using an unmanaged stream).

Broadcasting messages on multiple consumers

默认情况下,通道可以使用 @Incoming 方法或 @Channel 响应式流链接到单个消费者。在应用程序启动时,将验证通道以形成具有单个消费者和生产者的消费者和生产者链。您可以通过在通道中设置 mp.messaging.$channel.broadcast=true 来覆盖此行为。

By default, a channel can be linked to a single consumer, using @Incoming method or @Channel reactive stream. At application startup, channels are verified to form a chain of consumers and producers with single consumer and producer. You can override this behavior by setting mp.messaging.$channel.broadcast=true on a channel.

对于内存中的通道,可以在 @Outgoing 方法中使用 @Broadcast 注释。例如,

In case of in-memory channels, @Broadcast annotation can be used on the @Outgoing method. For example,

import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.annotations.Broadcast;

@ApplicationScoped
public class MultipleConsumer {

    private final Random random = new Random();

    @Outgoing("in-memory-channel")
    @Broadcast
    double generate() {
        return random.nextDouble();
    }

    @Incoming("in-memory-channel")
    void consumeAndLog(double price) {
        System.out.println(price);
    }

    @Incoming("in-memory-channel")
    @Outgoing("prices2")
    double consumeAndSend(double price) {
        return price;
    }
}

反之,可以通过设置 mp.messaging.incoming.$channel.merge=true 来合并同一通道上的多个生产者。在 @Incoming 方法中,您可以使用 @Merge 注释控制如何合并多个通道。

Reciprocally, multiple producers on the same channel can be merged by setting mp.messaging.incoming.$channel.merge=true. On the @Incoming methods, you can control how multiple channels are merged using the @Merge annotation.

对输出或处理方法重复使用 @Outgoing 注释允许以另一种方式将消息发送到多个输出通道:

Repeating the @Outgoing annotation on outbound or processing methods allows another way of dispatching messages to multiple outgoing channels:

import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class MultipleProducers {

    private final Random random = new Random();

    @Outgoing("generated")
    @Outgoing("generated-2")
    double priceBroadcast() {
        return random.nextDouble();
    }

}

在上一个示例中,生成的 price 将广播到两个输出通道。以下示例使用 Targeted 容器对象有选择性地将消息发送到多个输出通道,容器对象包含键(作为通道名称)和值(作为消息负载)。

In the previous example generated price will be broadcast to both outbound channels. The following example selectively sends messages to multiple outgoing channels using the Targeted container object, containing key as channel name and value as message payload.

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.Targeted;

@ApplicationScoped
public class TargetedProducers {

    @Incoming("in")
    @Outgoing("out1")
    @Outgoing("out2")
    @Outgoing("out3")
    public Targeted process(double price) {
        Targeted targeted = Targeted.of("out1", "Price: " + price,
                "out2", "Quote: " + price);
        if (price > 90.0) {
            return targeted.with("out3", price);
        }
        return targeted;
    }

}

请注意,the auto-detection for Kafka serializers 不适用于使用 Targeted 的签名。

Note that serialization-autodetection doesn’t work for signatures using the Targeted.

有关使用多输出的更多详细信息,请参阅 SmallRye Reactive Messaging documentation

For more details on using multiple outgoings, please refer to the SmallRye Reactive Messaging documentation.

Kafka Transactions

Kafka 事务支持对多个 Kafka 主题和分区进行原子写。Kafka 连接器提供了 KafkaTransactions 自定义发射器,用于在事务内写入 Kafka 记录。它可以作为常规发射器 @Channel 注入:

Kafka transactions enable atomic writes to multiple Kafka topics and partitions. The Kafka connector provides KafkaTransactions custom emitter for writing Kafka records inside a transaction. It can be injected as a regular emitter @Channel:

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;

@ApplicationScoped
public class KafkaTransactionalProducer {

    @Channel("tx-out-example")
    KafkaTransactions<String> txProducer;

    public Uni<Void> emitInTransaction() {
        return txProducer.withTransaction(emitter -> {
            emitter.send(KafkaRecord.of(1, "a"));
            emitter.send(KafkaRecord.of(2, "b"));
            emitter.send(KafkaRecord.of(3, "c"));
            return Uni.createFrom().voidItem();
        });
    }

}

提供给 withTransaction 方法的函数接收一个 TransactionalEmitter 用于生成记录,并返回一个 Uni 提供事务结果。

The function given to the withTransaction method receives a TransactionalEmitter for producing records, and returns a Uni that provides the result of the transaction.

  • If the processing completes successfully, the producer is flushed and the transaction is committed.

  • If the processing throws an exception, returns a failing Uni, or marks the TransactionalEmitter for abort, the transaction is aborted.

Kafka 事务性生产者需要配置 acks=all 客户端属性和 transactional.id 的唯一 id,这暗示 enable.idempotence=true。当 Quarkus 检测到对传出通道使用 KafkaTransactions 时,它将为通道配置这些属性,并提供 transactional.id 属性的默认值 "${quarkus.application.name}-${channelName}"

Kafka transactional producers require configuring acks=all client property, and a unique id for transactional.id, which implies enable.idempotence=true. When Quarkus detects the use of KafkaTransactions for an outgoing channel it configures these properties on the channel, providing a default value of "${quarkus.application.name}-${channelName}" for transactional.id property.

请注意,为了在生产中使用, transactional.id 在所有应用程序实例中必须是唯一的。

Note that for production use the transactional.id must be unique across all application instances.

虽然普通的消息发射器会支持并发调用 send 方法,并且将传出消息排队写到 Kafka,但 KafkaTransactions 发射器一次只支持一个事务。从调用 withTransaction 到返回的 Uni 导致成功或失败,都会认为一个事务正在进行。当一个事务正在进行时,对 withTransaction 的后续调用,包括在给定函数中嵌套的调用,都将抛出 IllegalStateException

While a normal message emitter would support concurrent calls to send methods and consequently queues outgoing messages to be written to Kafka, a KafkaTransactions emitter only supports one transaction at a time. A transaction is considered in progress from the call to the withTransaction until the returned Uni results in success or failure. While a transaction is in progress, subsequent calls to the withTransaction, including nested ones inside the given function, will throw IllegalStateException.

请注意,在 Reactive Messaging 中,处理方法的执行已经序列化,除非使用 @Blocking(ordered = false)。如果 withTransaction 可以并发调用,例如,从 REST 终结点,则建议限制执行的并发性。这可以使用 Microprofile Fault Tolerance@Bulkhead 注释来完成。

Note that in Reactive Messaging, the execution of processing methods, is already serialized, unless @Blocking(ordered = false) is used. If withTransaction can be called concurrently, for example from a REST endpoint, it is recommended to limit the concurrency of the execution. This can be done using the @Bulkhead annotation from Microprofile Fault Tolerance.

可以在 Chaining Kafka Transactions with Hibernate Reactive transactions 中找到一个示例用法。

Transaction-aware consumers

如果您想仅使用 Kafka 事务写入和提交记录,则需要将 isolation.level 属性配置为传入通道:

If you’d like to consume records only written and committed inside a Kafka transaction you need to configure the isolation.level property on the incoming channel as such:

mp.messaging.incoming.prices-in.isolation.level=read_committed

Kafka Request-Reply

Kafka 请求-响应模式允许将请求记录发布到 Kafka 主题,然后等待响应记录来响应初始请求。Kafka 连接器提供 KafkaRequestReply 自定义发射器,它为 Kafka 传出通道的请求-响应模式实现请求方(或客户端):

The Kafka Request-Reply pattern allows to publish a request record to a Kafka topic and then await for a reply record that responds to the initial request. The Kafka connector provides the KafkaRequestReply custom emitter that implements the requestor (or the client) of the request-reply pattern for Kafka outbound channels:

它可以注入为常规发射器 @Channel

It can be injected as a regular emitter @Channel:

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply;

@ApplicationScoped
@Path("/kafka")
public class KafkaRequestReplyEmitter {

    @Channel("request-reply")
    KafkaRequestReply<Integer, String> requestReply;

    @POST
    @Path("/req-rep")
    @Produces(MediaType.TEXT_PLAIN)
    public Uni<String> post(Integer request) {
        return requestReply.request(request);
    }

}

请求方法将记录发布到传出通道的已配置目标主题,并轮询回复主题(默认情况下,目标主题带有 -replies 后缀)以获取回复记录。当收到响应时,用记录值完成返回的 Uni。请求发送操作会生成 correlation id,并设置一个标头(默认值为 REPLY_CORRELATION_ID),它期望在响应记录中将其发回。

The request method publishes the record to the configured target topic of the outgoing channel, and polls a reply topic (by default, the target topic with -replies suffix) for a reply record. When the reply is received the returned Uni is completed with the record value. The request send operation generates a correlation id and sets a header (by default REPLY_CORRELATION_ID), which it expects to be sent back in the reply record.

可以使用 Reactive Messaging 处理器来实现应答器(请参阅 Processing Messages)。

The replier can be implemented using a Reactive Messaging processor (see Processing Messages).

有关 Kafka 请求响应功能和高级配置选项的更多信息,请参阅 Smallrye Reactive Messaging Documentation

For more information on Kafka Request Reply feature and advanced configuration options, see the Smallrye Reactive Messaging Documentation.

Processing Messages

流化数据的应用程序通常需要从主题中获取一些事件,对其进行处理,并将结果发布到另一个主题。可以使用 @Incoming@Outgoing 注释轻松实现一个处理程序方法:

Applications streaming data often need to consume some events from a topic, process them and publish the result to a different topic. A processor method can be simply implemented using both the @Incoming and @Outgoing annotations:

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PriceProcessor {

    private static final double CONVERSION_RATE = 0.88;

    @Incoming("price-in")
    @Outgoing("price-out")
    public double process(double price) {
        return price * CONVERSION_RATE;
    }

}

process 方法的参数是传入消息负载,而返回值将用作传出消息负载。同样支持前面提到的参数和返回类型签名,例如 Message<T>Record<K, V> 等。

The parameter of the process method is the incoming message payload, whereas the return value will be used as the outgoing message payload. Previously mentioned signatures for parameter and return types are also supported, such as Message<T>, Record<K, V>, etc.

您可以通过使用并返回反应流 Multi<T> 类型来应用异步流处理:

You can apply asynchronous stream processing by consuming and returning reactive stream Multi<T> type:

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
public class PriceProcessor {

    private static final double CONVERSION_RATE = 0.88;

    @Incoming("price-in")
    @Outgoing("price-out")
    public Multi<Double> process(Multi<Integer> prices) {
        return prices.filter(p -> p > 100).map(p -> p * CONVERSION_RATE);
    }

}

Propagating Record Key

处理消息时,可以将传入记录键传播到传出记录。

When processing messages, you can propagate incoming record key to the outgoing record.

启用 mp.messaging.outgoing.$channel.propagate-record-key=true 配置后,记录键传播会生成与传入记录具有相同 key 的传出记录。

Enabled with mp.messaging.outgoing.$channel.propagate-record-key=true configuration, record key propagation produces the outgoing record with the same key as the incoming record.

如果传出记录已包含 key,则它 won’t be overridden 由传入记录键。如果传入记录具有 null 键,则将使用 mp.messaging.outgoing.$channel.key 属性。

If the outgoing record already contains a key, it won’t be overridden by the incoming record key. If the incoming record does have a null key, the mp.messaging.outgoing.$channel.key property is used.

Exactly-Once Processing

Kafka 事务允许在事务中管理消费者偏移量以及生成的消息。这使得以 consume-transform-produce 模式将消费者与事务性生产者耦合起来成为可能,这也称为 exactly-once processing

Kafka Transactions allows managing consumer offsets inside a transaction, together with produced messages. This enables coupling a consumer with a transactional producer in a consume-transform-produce pattern, also known as exactly-once processing.

KafkaTransactions 自定义发送器提供一种方法,可在事务中对传入的 Kafka 消息应用完全一次处理。

The KafkaTransactions custom emitter provides a way to apply exactly-once processing to an incoming Kafka message inside a transaction.

以下示例在事务中包括一批 Kafka 记录。

The following example includes a batch of Kafka records inside a transaction.

import jakarta.enterprise.context.ApplicationScoped;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;

@ApplicationScoped
public class KafkaExactlyOnceProcessor {

    @Channel("prices-out")
    @OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 500) (3)
    KafkaTransactions<Integer> txProducer;

    @Incoming("prices-in")
    public Uni<Void> emitInTransaction(Message<ConsumerRecords<String, Integer>> batch) { (1)
        return txProducer.withTransactionAndAck(batch, emitter -> { (2)
            for (ConsumerRecord<String, Integer> record : batch.getPayload()) {
                emitter.send(KafkaRecord.of(record.key(), record.value() + 1)); (3)
            }
            return Uni.createFrom().voidItem();
        });
    }

}
1 It is recommended to use exactly-once processing along with the batch consumption mode. While it is possible to use it with a single Kafka message, it’ll have a significant performance impact.
2 The consumed message is passed to the KafkaTransactions#withTransactionAndAck in order to handle the offset commits and message acks.
3 The send method writes records to Kafka inside the transaction, without waiting for send receipt from the broker. Messages pending to be written to Kafka will be buffered, and flushed before committing the transaction. It is therefore recommended configuring the @OnOverflow bufferSize in order to fit enough messages, for example the max.poll.records, maximum amount of records returned in a batch.
  • If the processing completes successfully, before committing the transaction, the topic partition offsets of the given batch message will be committed to the transaction.

  • If the processing needs to abort, after aborting the transaction, the consumer’s position is reset to the last committed offset, effectively resuming the consumption from that offset. If no consumer offset has been committed to a topic-partition, the consumer’s position is reset to the beginning of the topic-partition, even if the offset reset policy is `latest`.

在使用完全一次处理时,已使用消息偏移量提交由事务处理,因此应用程序不应通过其他方式提交偏移量。消费者应具备 enable.auto.commit=false(默认值)并显式设置 commit-strategy=ignore

When using exactly-once processing, consumed message offset commits are handled by the transaction and therefore the application should not commit offsets through other means. The consumer should have enable.auto.commit=false (the default) and set explicitly commit-strategy=ignore:

mp.messaging.incoming.prices-in.commit-strategy=ignore
mp.messaging.incoming.prices-in.failure-strategy=ignore

Error handling for the exactly-once processing

如果事务失败并中止,UniKafkaTransactions#withTransaction 返回的内容会报告一个失败。应用程序可以选择处理错误情况,但是如果从 @Incoming 方法返回一个失败的 Uni,则传入频道实际上会失败并停止反应式流。

The Uni returned from the KafkaTransactions#withTransaction will yield a failure if the transaction fails and is aborted. The application can choose to handle the error case, but if a failing Uni is returned from the @Incoming method, the incoming channel will effectively fail and stop the reactive stream.

KafkaTransactions#withTransactionAndAck 方法确认和不认可消息,但将 not 返回一个失败的 Uni。不认可的消息将由传入频道的失败策略处理(参见 Error Handling Strategies)。配置 failure-strategy=ignore 只需将 Kafka 消费者重置为最后提交的偏移量,并从此处重新开始使用。

The KafkaTransactions#withTransactionAndAck method acks and nacks the message but will not return a failing Uni. Nacked messages will be handled by the failure strategy of the incoming channel, (see Error Handling Strategies). Configuring failure-strategy=ignore simply resets the Kafka consumer to the last committed offsets and resumes the consumption from there.

Accessing Kafka clients directly

在极少数情况下,你可能需要访问基础 Kafka 客户端。KafkaClientService 提供对 ProducerConsumer 的线程安全访问。

In rare cases, you may need to access the underlying Kafka clients. KafkaClientService provides thread-safe access to Producer and Consumer.

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;

import org.apache.kafka.clients.producer.ProducerRecord;

import io.quarkus.runtime.StartupEvent;
import io.smallrye.reactive.messaging.kafka.KafkaClientService;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.KafkaProducer;

@ApplicationScoped
public class PriceSender {

    @Inject
    KafkaClientService clientService;

    void onStartup(@Observes StartupEvent startupEvent) {
        KafkaProducer<String, Double> producer = clientService.getProducer("generated-price");
        producer.runOnSendingThread(client -> client.send(new ProducerRecord<>("prices", 2.4)))
            .await().indefinitely();
    }
}

KafkaClientService 是一个实验性 API,将来可能会发生变更。

The KafkaClientService is an experimental API and can change in the future.

你还可以将 Kafka 配置注入到你的应用程序并直接创建 Kafka 生产者、消费者和管理客户端:

You can also get the Kafka configuration injected to your application and create Kafka producer, consumer and admin clients directly:

import io.smallrye.common.annotation.Identifier;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.KafkaAdminClient;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import java.util.HashMap;
import java.util.Map;

@ApplicationScoped
public class KafkaClients {

    @Inject
    @Identifier("default-kafka-broker")
    Map<String, Object> config;

    @Produces
    AdminClient getAdmin() {
        Map<String, Object> copy = new HashMap<>();
        for (Map.Entry<String, Object> entry : config.entrySet()) {
            if (AdminClientConfig.configNames().contains(entry.getKey())) {
                copy.put(entry.getKey(), entry.getValue());
            }
        }
        return KafkaAdminClient.create(copy);
    }

}

default-kafka-broker 配置映射包含所有应用程序属性,其前缀为 kafka.KAFKA_。有关更多配置选项,请查看 Kafka Configuration Resolution

The default-kafka-broker configuration map contains all application properties prefixed with kafka. or KAFKA_. For more configuration options check out Kafka Configuration Resolution.

JSON serialization

Quarkus 具备处理 JSON Kafka 消息的内置功能。

Quarkus has built-in capabilities to deal with JSON Kafka messages.

设想我们有一个 Fruit 数据类如下:

Imagine we have a Fruit data class as follows:

public class Fruit {

    public String name;
    public int price;

    public Fruit() {
    }

    public Fruit(String name, int price) {
        this.name = name;
        this.price = price;
    }
}

并且我们想用它来接收来自 Kafka 的消息、执行一些价格转换,并将消息发回 Kafka。

And we want to use it to receive messages from Kafka, make some price transformation, and send messages back to Kafka.

import io.smallrye.reactive.messaging.annotations.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;

/**
* A bean consuming data from the "fruit-in" channel and applying some price conversion.
* The result is pushed to the "fruit-out" channel.
*/
@ApplicationScoped
public class FruitProcessor {

    private static final double CONVERSION_RATE = 0.88;

    @Incoming("fruit-in")
    @Outgoing("fruit-out")
    @Broadcast
    public Fruit process(Fruit fruit) {
        fruit.price = fruit.price * CONVERSION_RATE;
        return fruit;
    }

}

为此,我们需要使用 Jackson 或 JSON-B 设置 JSON 序列化。

To do this, we will need to set up JSON serialization with Jackson or JSON-B.

在正确配置 JSON 序列化后,你还可以使用 Publisher<Fruit>Emitter<Fruit>

With JSON serialization correctly configured, you can also use Publisher<Fruit> and Emitter<Fruit>.

Serializing via Jackson

Quarkus 对基于 Jackson 的 JSON 序列化和反序列化内置了支持。它还将为您“@3”序列化程序和反序列化程序,因此您不必配置任何内容。如果禁用了生成,您可以按如下所述使用提供的“@1”和“@2”。

Quarkus has built-in support for JSON serialization and deserialization based on Jackson. It will also serialization-generation the serializer and deserializer for you, so you do not have to configure anything. When generation is disabled, you can use the provided ObjectMapperSerializer and ObjectMapperDeserializer as explained below.

存在可用于通过 Jackson 序列化所有数据对象的现有“@4”。如果您想使用“@5”,则可以创建一个空子类。

There is an existing ObjectMapperSerializer that can be used to serialize all data objects via Jackson. You may create an empty subclass if you want to use Serializer/deserializer autodetection.

默认情况下,“@6”序列化空为“@7”字符串,可以通过设置 Kafka 配置属性“@8”来自定义该属性,这会将空序列化为“@9”。在使用压缩主题时这个很方便,因为“@10”用作墓碑,了解在压缩阶段中删除哪些消息。

By default, the ObjectMapperSerializer serializes null as the "null" String, this can be customized by setting the Kafka configuration property json.serialize.null-as-null=true which will serialize null as null. This is handy when using a compacted topic, as null is used as a tombstone to know which messages delete during compaction phase.

相应的反序列化类需要被子类化。所以,让我们创建一个扩展“@12”的“@11”。

The corresponding deserializer class needs to be subclassed. So, let’s create a FruitDeserializer that extends the ObjectMapperDeserializer.

package com.acme.fruit.jackson;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

最后,配置您的通道以使用 Jackson 序列化程序和反序列化程序。

Finally, configure your channels to use the Jackson serializer and deserializer.

# Configure the Kafka source (we read from it)
mp.messaging.incoming.fruit-in.topic=fruit-in
mp.messaging.incoming.fruit-in.value.deserializer=com.acme.fruit.jackson.FruitDeserializer

# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.fruit-out.topic=fruit-out
mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

现在,您的 Kafka 消息将包含一个 Jackson 序列化的“@13”数据对象的表示形式。在这种情况下,“@14”配置不是必要的,因为“@15”在默认情况下已启用。

Now, your Kafka messages will contain a Jackson serialized representation of your Fruit data object. In this case, the deserializer configuration is not necessary as the Serializer/deserializer autodetection is enabled by default.

如果您想反序列化水果列表,则需要创建一个使用 Jackson“@16”表示所用泛型集合的反序列化程序。

If you want to deserialize a list of fruits, you need to create a deserializer with a Jackson TypeReference denoted the generic collection used.

package com.acme.fruit.jackson;

import java.util.List;
import com.fasterxml.jackson.core.type.TypeReference;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class ListOfFruitDeserializer extends ObjectMapperDeserializer<List<Fruit>> {
    public ListOfFruitDeserializer() {
        super(new TypeReference<List<Fruit>>() {});
    }
}

Serializing via JSON-B

首先,您需要包括“@17”扩展。

First, you need to include the quarkus-jsonb extension.

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-jsonb</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-jsonb")

存在可用于通过 JSON-B 序列化所有数据对象的现有“@18”。如果您想使用“@19”,则可以创建一个空子类。

There is an existing JsonbSerializer that can be used to serialize all data objects via JSON-B. You may create an empty subclass if you want to use Serializer/deserializer autodetection.

默认情况下,“@20”序列化空为“@21”字符串,可以通过设置 Kafka 配置属性“@22”来自定义该属性,这会将空序列化为“@23”。在使用压缩主题时这个很方便,因为“@24”用作墓碑,了解在压缩阶段中删除哪些消息。

By default, the JsonbSerializer serializes null as the "null" String, this can be customized by setting the Kafka configuration property json.serialize.null-as-null=true which will serialize null as null. This is handy when using a compacted topic, as null is used as a tombstone to know which messages delete during compaction phase.

相应的反序列化类需要被子类化。所以,让我们创建一个扩展泛型“@26”的“@25”。

The corresponding deserializer class needs to be subclassed. So, let’s create a FruitDeserializer that extends the generic JsonbDeserializer.

package com.acme.fruit.jsonb;

import io.quarkus.kafka.client.serialization.JsonbDeserializer;

public class FruitDeserializer extends JsonbDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

最后,配置您的通道以使用 JSON-B 序列化程序和反序列化程序。

Finally, configure your channels to use the JSON-B serializer and deserializer.

# Configure the Kafka source (we read from it)
mp.messaging.incoming.fruit-in.connector=smallrye-kafka
mp.messaging.incoming.fruit-in.topic=fruit-in
mp.messaging.incoming.fruit-in.value.deserializer=com.acme.fruit.jsonb.FruitDeserializer

# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.fruit-out.connector=smallrye-kafka
mp.messaging.outgoing.fruit-out.topic=fruit-out
mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer

现在,您的 Kafka 消息将包含一个 JSON-B 序列化的“@27”数据对象的表示形式。

Now, your Kafka messages will contain a JSON-B serialized representation of your Fruit data object.

如果您想反序列化水果列表,则需要创建一个带有“@28”的反序列化程序,用来表示所用的泛型集合。

If you want to deserialize a list of fruits, you need to create a deserializer with a Type denoted the generic collection used.

package com.acme.fruit.jsonb;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import io.quarkus.kafka.client.serialization.JsonbDeserializer;

public class ListOfFruitDeserializer extends JsonbDeserializer<List<Fruit>> {
    public ListOfFruitDeserializer() {
        super(new ArrayList<MyEntity>() {}.getClass().getGenericSuperclass());
    }
}

如果您不想为每个数据对象创建一个反序列化程序,则可以使用泛型“@29”,它将反序列化为“@30”。还可以使用相应的序列化程序:“@31”。

If you don’t want to create a deserializer for each data object, you can use the generic io.vertx.kafka.client.serialization.JsonObjectDeserializer that will deserialize to a io.vertx.core.json.JsonObject. The corresponding serializer can also be used: io.vertx.kafka.client.serialization.JsonObjectSerializer.

Avro Serialization

这在专门指南中进行了描述:@32。

This is described in a dedicated guide: Using Apache Kafka with Schema Registry and Avro.

JSON Schema Serialization

这在专门指南中进行了描述:@33。

This is described in a dedicated guide: Using Apache Kafka with Schema Registry and JSON Schema.

Serializer/deserializer autodetection

在使用带有 Kafka 的 Quarkus 消息传递(“@34”)时,Quarkus 通常可以自动检测正确的序列化程序和反序列化程序类。此自动检测基于“@35”和“@36”方法的声明以及注入的“@Channel”。

When using Quarkus Messaging with Kafka (io.quarkus:quarkus-messaging-kafka), Quarkus can often automatically detect the correct serializer and deserializer class. This autodetection is based on declarations of @Incoming and @Outgoing methods, as well as injected `@Channel`s.

例如,如果您声明

For example, if you declare

@Outgoing("generated-price")
public Multi<Integer> generate() {
    ...
}

如果您的配置表明 generated-price 通道使用 smallrye-kafka 连接器,Quarkus 会自动将 value.serializer 设置为 Kafka 的内置 IntegerSerializer

and your configuration indicates that the generated-price channel uses the smallrye-kafka connector, then Quarkus will automatically set the value.serializer to Kafka’s built-in IntegerSerializer.

类似地,如果您声明

Similarly, if you declare

@Incoming("my-kafka-records")
public void consume(Record<Long, byte[]> record) {
    ...
}

而且您的配置表明 my-kafka-records 通道使用 smallrye-kafka 连接器,那么 Quarkus 会自动将 key.deserializer 设置为 Kafka 的内置 LongDeserializer,并将 value.deserializer 设置为 ByteArrayDeserializer

and your configuration indicates that the my-kafka-records channel uses the smallrye-kafka connector, then Quarkus will automatically set the key.deserializer to Kafka’s built-in LongDeserializer, as well as the value.deserializer to ByteArrayDeserializer.

最后,如果您声明

Finally, if you declare

@Inject
@Channel("price-create")
Emitter<Double> priceEmitter;

而且您的配置表明 price-create 通道使用 smallrye-kafka 连接器,那么 Quarkus 会自动将 value.serializer 设置为 Kafka 的内置 DoubleSerializer

and your configuration indicates that the price-create channel uses the smallrye-kafka connector, then Quarkus will automatically set the value.serializer to Kafka’s built-in DoubleSerializer.

序列化程序/反序列化程序自动检测支持的完整类型集为:

The full set of types supported by the serializer/deserializer autodetection is:

  • short and java.lang.Short

  • int and java.lang.Integer

  • long and java.lang.Long

  • float and java.lang.Float

  • double and java.lang.Double

  • byte[]

  • java.lang.String

  • java.util.UUID

  • java.nio.ByteBuffer

  • org.apache.kafka.common.utils.Bytes

  • io.vertx.core.buffer.Buffer

  • io.vertx.core.json.JsonObject

  • io.vertx.core.json.JsonArray

  • classes for which a direct implementation of org.apache.kafka.common.serialization.Serializer<T> / org.apache.kafka.common.serialization.Deserializer<T> is present.

    • the implementation needs to specify the type argument T as the (de-)serialized type.

  • classes generated from Avro schemas, as well as Avro GenericRecord, if Confluent or Apicurio Registry serde is present

    • in case multiple Avro serdes are present, serializer/deserializer must be configured manually for Avro-generated classes, because autodetection is impossible

    • see Using Apache Kafka with Schema Registry and Avro for more information about using Confluent or Apicurio Registry libraries

  • classes for which a subclass of ObjectMapperSerializer / ObjectMapperDeserializer is present, as described in Serializing via Jackson

    • it is technically not needed to subclass ObjectMapperSerializer, but in such case, autodetection isn’t possible

  • classes for which a subclass of JsonbSerializer / JsonbDeserializer is present, as described in Serializing via JSON-B

    • it is technically not needed to subclass JsonbSerializer, but in such case, autodetection isn’t possible

如果序列化程序/反序列化程序通过配置设置,则不会被自动检测替换。

If a serializer/deserializer is set by configuration, it won’t be replaced by the autodetection.

如果您在序列化程序自动检测中遇到任何问题,可以通过设置 quarkus.messaging.kafka.serializer-autodetection.enabled=false 完全关闭该功能。如果您需要执行此操作,请在 Quarkus issue tracker 中提交缺陷,以便我们解决您遇到的问题。

In case you have any issues with serializer autodetection, you can switch it off completely by setting quarkus.messaging.kafka.serializer-autodetection.enabled=false. If you find you need to do this, please file a bug in the Quarkus issue tracker so we can fix whatever problem you have.

JSON Serializer/deserializer generation

Quarkus 会自动为如下位置生成序列化程序和反序列化程序:

Quarkus automatically generates serializers and deserializers for channels where:

  1. the serializer/deserializer is not configured

  2. the auto-detection did not find a matching serializer/deserializer

底层使用 Jackson。

It uses Jackson underneath.

可以使用以下方式禁用此生成:

This generation can be disabled using:

quarkus.messaging.kafka.serializer-generation.enabled=false

生成不支持 List<Fruit> 等集合。参考 Serializing via Jackson 为此情况编写自己的序列化器/反序列化器。

Generation does not support collections such as List<Fruit>. Refer to Serializing via Jackson to write your own serializer/deserializer for this case.

Using Schema Registry

在 Avro 的专用指南中对此进行了描述: Using Apache Kafka with Schema Registry and Avro。JSON Schema 中的描述有所不同: Using Apache Kafka with Schema Registry and JSON Schema

This is described in a dedicated guide for Avro: Using Apache Kafka with Schema Registry and Avro. And a different one for JSON Schema: Using Apache Kafka with Schema Registry and JSON Schema.

Health Checks

Quarkus 为 Kafka 提供了多个运行状况检查。这些检查与 quarkus-smallrye-health 扩展一起使用。

Quarkus provides several health checks for Kafka. These checks are used in combination with the quarkus-smallrye-health extension.

Kafka Broker Readiness Check

使用 quarkus-kafka-client 扩展时,可以通过在 application.properties 中将 quarkus.kafka.health.enabled 属性设置为 true 来启用 readiness 运行状况检查。此检查报告与 default Kafka 代理交互的状态(使用 kafka.bootstrap.servers 进行配置)。它要求与 Kafka 代理存在 admin connection,并且默认禁用该代理。如果启用,当您访问应用程序的 /q/health/ready 终结点时,您将具有有关连接验证状态的信息。

When using the quarkus-kafka-client extension, you can enable readiness health check by setting the quarkus.kafka.health.enabled property to true in your application.properties. This check reports the status of the interaction with a default Kafka broker (configured using kafka.bootstrap.servers). It requires an admin connection with the Kafka broker, and it is disabled by default. If enabled, when you access the /q/health/ready endpoint of your application, you will have information about the connection validation status.

Kafka Reactive Messaging Health Checks

使用响应式消息传递和 Kafka 连接器时,每个已配置通道(入站或出站)都提供 startuplivenessreadiness 检查。

When using Reactive Messaging and the Kafka connector, each configured channel (incoming or outgoing) provides startup, liveness and readiness checks.

  • The startup check verifies that the communication with Kafka cluster is established.

  • The liveness check captures any unrecoverable failure happening during the communication with Kafka.

  • The readiness check verifies that the Kafka connector is ready to consume/produce messages to the configured Kafka topics.

对于每个通道,您可以使用以下方式禁用检查:

For each channel, you can disable the checks using:

# Disable both liveness and readiness checks with `health-enabled=false`:

# Incoming channel (receiving records form Kafka)
mp.messaging.incoming.your-channel.health-enabled=false
# Outgoing channel (writing records to Kafka)
mp.messaging.outgoing.your-channel.health-enabled=false

# Disable only the readiness check with `health-readiness-enabled=false`:

mp.messaging.incoming.your-channel.health-readiness-enabled=false
mp.messaging.outgoing.your-channel.health-readiness-enabled=false

您可以使用 mp.messaging.incoming|outgoing.$channel.bootstrap.servers 属性为每个通道配置 bootstrap.servers。默认值为 kafka.bootstrap.servers

You can configure the bootstrap.servers for each channel using mp.messaging.incoming|outgoing.$channel.bootstrap.servers property. Default is kafka.bootstrap.servers.

响应式消息传递 startupreadiness 检查提供了两种策略。默认策略验证是否已与代理建立活动连接。此方法不会侵入,因为它基于内置 Kafka 客户端指标。

Reactive Messaging startup and readiness checks offer two strategies. The default strategy verifies that an active connection is established with the broker. This approach is not intrusive as it’s based on built-in Kafka client metrics.

使用 health-topic-verification-enabled=true 属性, startup 探针使用 admin client 检查主题列表。而入站通道的 readiness 探针会检查是否至少为某个分区分配了消耗,出站通道的探针会检查生产者使用的主题是否存在于代理中。

Using the health-topic-verification-enabled=true attribute, startup probe uses an admin client to check for the list of topics. Whereas the readiness probe for an incoming channel checks that at least one partition is assigned for consumption, and for an outgoing channel checks that the topic used by the producer exist in the broker.

请注意,要实现这一点,需要 admin connection。您可以使用 health-topic-verification-timeout 配置调整对代理执行的主题验证调用的超时时间。

Note that to achieve this, an admin connection is required. You can adjust the timeout for topic verification calls to the broker using the health-topic-verification-timeout configuration.

Observability

如果 OpenTelemetry extension 存在,则 Kafka 连接器通道与 OpenTelemetry Tracing 协同工作。写入 Kafka 主题的消息会传播当前跟踪范围。对于入站通道,如果消耗的 Kafka 记录包含跟踪信息,则消息处理会继承消息范围作为父代。

If the OpenTelemetry extension is present, then the Kafka connector channels work out-of-the-box with the OpenTelemetry Tracing. Messages written to Kafka topics propagate the current tracing span. On incoming channels, if a consumed Kafka record contains tracing information the message processing inherits the message span as parent.

可以为每个通道显式禁用跟踪:

Tracing can be disabled explicitly per channel:

mp.messaging.incoming.data.tracing-enabled=false

如果 Micrometer extension 存在,则 Kafka 生产者和消费者客户端指标会作为 Micrometer 指标公开。

If the Micrometer extension is present, then Kafka producer and consumer clients metrics are exposed as Micrometer meters.

Channel metrics

每个通道指标也可以被收集并公开为 Micrometer 仪表。每个通道可以通过带有 channel 标记标识的以下度量收集:

Per channel metrics can also be gathered and exposed as Micrometer meters. Following metrics can be gathered per channel, identified with the channel tag:

  • quarkus.messaging.message.count : The number of messages produced or received

  • quarkus.messaging.message.acks : The number of messages processed successfully

  • quarkus.messaging.message.failures : The number of messages processed with failures

  • quarkus.messaging.message.duration : The duration of the message processing.

出于向后兼容性考虑,默认情况下未启用通道指标,可以通过以下方式启用:

For backwards compatibility reasons channel metrics are not enabled by default and can be enabled with:

message observation 取决于截获消息,因此不支持消费带有自定义消息类型的通道,如 IncomingKafkaRecord, KafkaRecord, IncomingKafkaRecordBatchKafkaRecordBatch.

The message observation depends on intercepting messages and therefore doesn’t support channels consuming messages with a custom message type such as IncomingKafkaRecord, KafkaRecord, IncomingKafkaRecordBatch or KafkaRecordBatch.

消息拦截和观察仍然可以在消费通用 Message 类型或通过 converters 启用的自定义有效负载的通道中使用。

The message interception, and observation, still work with channels consuming the generic Message type, or custom payloads enabled by converters.

smallrye.messaging.observation.enabled=true

Kafka Streams

这将在专门指南中有所描述: Using Apache Kafka Streams.

This is described in a dedicated guide: Using Apache Kafka Streams.

Using Snappy for message compression

outgoing 通道上,你可以通过将 compression.type 属性设置为 snappy 来启用 Snappy 压缩:

On outgoing channels, you can enable Snappy compression by setting the compression.type attribute to snappy:

mp.messaging.outgoing.fruit-out.compression.type=snappy

在 JVM 模式下,它将立即生效。然而,要在本地可执行文件中编译你的应用程序,你需要在 application.properties 中添加 quarkus.kafka.snappy.enabled=true.

In JVM mode, it will work out of the box. However, to compile your application to a native executable, you need to add quarkus.kafka.snappy.enabled=true to your application.properties.

在本地模式下,Snappy 默认处于禁用状态,因为使用 Snappy 需要嵌入一个本地库并在应用程序启动时解压它。

In native mode, Snappy is disabled by default as the use of Snappy requires embedding a native library and unpacking it when the application starts.

Authentication with OAuth

如果你的 Kafka 中介使用 OAuth 作为认证机制,你需要配置 Kafka 消费者来启用这个认证过程。首先,将以下依赖项添加到你的应用程序中:

If your Kafka broker uses OAuth as authentication mechanism, you need to configure the Kafka consumer to enable this authentication process. First, add the following dependency to your application:

pom.xml
<dependency>
    <groupId>io.strimzi</groupId>
    <artifactId>kafka-oauth-client</artifactId>
</dependency>
<!-- if compiling to native you'd need also the following dependency -->
<dependency>
    <groupId>io.strimzi</groupId>
    <artifactId>kafka-oauth-common</artifactId>
</dependency>
build.gradle
implementation("io.strimzi:kafka-oauth-client")
// if compiling to native you'd need also the following dependency
implementation("io.strimzi:kafka-oauth-common")

这个依赖项提供了处理 OAuth 工作流所需的回调处理程序。然后,在 application.properties 中添加:

This dependency provides the callback handler required to handle the OAuth workflow. Then, in the application.properties, add:

mp.messaging.connector.smallrye-kafka.security.protocol=SASL_PLAINTEXT
mp.messaging.connector.smallrye-kafka.sasl.mechanism=OAUTHBEARER
mp.messaging.connector.smallrye-kafka.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.client.id="team-a-client" \
  oauth.client.secret="team-a-client-secret" \
  oauth.token.endpoint.uri="http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/token" ;
mp.messaging.connector.smallrye-kafka.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

quarkus.ssl.native=true

更新 oauth.client.id, oauth.client.secretoauth.token.endpoint.uri 值。

Update the oauth.client.id, oauth.client.secret and oauth.token.endpoint.uri values.

OAuth 认证适用于 JVM 和本地模式。由于在本地模式下默认没有启用 SSL,因此必须添加 quarkus.ssl.native=true 来支持 JaasClientOauthLoginCallbackHandler,它使用 SSL。(请参阅 Using SSL with Native Executables 指南了解更多详细信息。)

OAuth authentication works for both JVM and native modes. Since SSL in not enabled by default in native mode, quarkus.ssl.native=true must be added to support JaasClientOauthLoginCallbackHandler, which uses SSL. (See the Using SSL with Native Executables guide for more details.)

Testing a Kafka application

Testing without a broker

在无需启动 Kafka 代理的情况下测试应用程序非常有用。为此,你可以将 Kafka 连接器管理的通道 switchin-memory.

It can be useful to test the application without having to start a Kafka broker. To achieve this, you can switch the channels managed by the Kafka connector to in-memory.

此方法仅适用于 JVM 测试。它不能用于本地测试(因为它们不支持注入)。

This approach only works for JVM tests. It cannot be used for native tests (because they do not support injection).

假设我们要测试以下处理器应用程序:

Let’s say we want to test the following processor application:

@ApplicationScoped
public class BeverageProcessor {

    @Incoming("orders")
    @Outgoing("beverages")
    Beverage process(Order order) {
        System.out.println("Order received " + order.getProduct());
        Beverage beverage = new Beverage();
        beverage.setBeverage(order.getProduct());
        beverage.setCustomer(order.getCustomer());
        beverage.setOrderId(order.getOrderId());
        beverage.setPreparationState("RECEIVED");
        return beverage;
    }

}

首先,将以下测试依赖项添加到应用程序中:

First, add the following test dependency to your application:

pom.xml
<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>smallrye-reactive-messaging-in-memory</artifactId>
    <scope>test</scope>
</dependency>
build.gradle
testImplementation("io.smallrye.reactive:smallrye-reactive-messaging-in-memory")

接着,按照以下步骤创建 Quarkus 测试资源:

Then, create a Quarkus Test Resource as follows:

public class KafkaTestResourceLifecycleManager implements QuarkusTestResourceLifecycleManager {

    @Override
    public Map<String, String> start() {
        Map<String, String> env = new HashMap<>();
        Map<String, String> props1 = InMemoryConnector.switchIncomingChannelsToInMemory("orders");     (1)
        Map<String, String> props2 = InMemoryConnector.switchOutgoingChannelsToInMemory("beverages");  (2)
        env.putAll(props1);
        env.putAll(props2);
        return env;  (3)
    }

    @Override
    public void stop() {
        InMemoryConnector.clear();  (4)
    }
}
1 Switch the incoming channel orders (expecting messages from Kafka) to in-memory.
2 Switch the outgoing channel beverages (writing messages to Kafka) to in-memory.
3 Builds and returns a Map containing all the properties required to configure the application to use in-memory channels.
4 When the test stops, clear the InMemoryConnector (discard all the received and sent messages)

使用上面创建的测试资源创建一个 Quarkus 测试:

Create a Quarkus Test using the test resource created above:

import static org.awaitility.Awaitility.await;

@QuarkusTest
@WithTestResource(KafkaTestResourceLifecycleManager.class)
class BaristaTest {

    @Inject
    @Connector("smallrye-in-memory")
    InMemoryConnector connector; (1)

    @Test
    void testProcessOrder() {
        InMemorySource<Order> ordersIn = connector.source("orders");     (2)
        InMemorySink<Beverage> beveragesOut = connector.sink("beverages");  (3)

        Order order = new Order();
        order.setProduct("coffee");
        order.setName("Coffee lover");
        order.setOrderId("1234");

        ordersIn.send(order);  (4)

        await().<List<? extends Message<Beverage>>>until(beveragesOut::received, t -> t.size() == 1); (5)

        Beverage queuedBeverage = beveragesOut.received().get(0).getPayload();
        Assertions.assertEquals(Beverage.State.READY, queuedBeverage.getPreparationState());
        Assertions.assertEquals("coffee", queuedBeverage.getBeverage());
        Assertions.assertEquals("Coffee lover", queuedBeverage.getCustomer());
        Assertions.assertEquals("1234", queuedBeverage.getOrderId());
    }

}
1 Inject the in-memory connector in your test class.
2 Retrieve the incoming channel (orders) - the channel must have been switched to in-memory in the test resource.
3 Retrieve the outgoing channel (beverages) - the channel must have been switched to in-memory in the test resource.
4 Use the send method to send a message to the orders channel. The application will process this message and send a message to beverages channel.
5 Use the received method on beverages channel to check the messages produced by the application.

如果 Kafka 消费者基于批处理,你将需要将一批消息发送到该通道(通过手动创建它们)。

If your Kafka consumer is batch based, you will need to send a batch of messages to the channel as by creating them manually.

例如:

For instance:

@ApplicationScoped
public class BeverageProcessor {

    @Incoming("orders")
    CompletionStage<Void> process(KafkaRecordBatch<String, Order> orders) {
        System.out.println("Order received " + orders.getPayload().size());
        return orders.ack();
    }
}
import static org.awaitility.Awaitility.await;

@QuarkusTest
@WithTestResource(KafkaTestResourceLifecycleManager.class)
class BaristaTest {

    @Inject
    @Connector("smallrye-in-memory")

    InMemoryConnector connector;

    @Test
    void testProcessOrder() {
        InMemorySource<IncomingKafkaRecordBatch<String, Order>> ordersIn = connector.source("orders");
        var committed = new AtomicBoolean(false);  (1)
        var commitHandler = new KafkaCommitHandler() {
            @Override
            public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record) {
                committed.set(true);  (2)
                return null;
            }
        };
        var failureHandler = new KafkaFailureHandler() {
            @Override
            public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record, Throwable reason, Metadata metadata) {
                return null;
            }
        };

        Order order = new Order();
        order.setProduct("coffee");
        order.setName("Coffee lover");
        order.setOrderId("1234");
        var record = new ConsumerRecord<>("topic", 0, 0, "key", order);
        var records = new ConsumerRecords<>(Map.of(new TopicPartition("topic", 1), List.of(record)));
        var batch = new IncomingKafkaRecordBatch<>(
            records, "kafka", 0, commitHandler, failureHandler, false, false);  (3)

        ordersIn.send(batch);

        await().until(committed::get);  (4)
    }
}
1 Create an AtomicBoolean to track if the batch has been committed.
2 Update committed when the batch is committed.
3 Create a IncomingKafkaRecordBatch with a single record.
4 Wait until the batch is committed.

有了内存通道,我们不必启动 Kafka 代理便可以测试处理消息的应用程序代码。注意,不同的内存通道是独立的,并且将通道连接器切换为内存不会模拟配置为相同 Kafka 主题的通道之间的消息传递。

With in-memory channels we were able to test application code processing messages without starting a Kafka broker. Note that different in-memory channels are independent, and switching channel connector to in-memory does not simulate message delivery between channels configured to the same Kafka topic.

Context propagation with InMemoryConnector

默认情况下,内存通道在调用线程上调度消息,这将是单元测试中的主线程。

By default, in-memory channels dispatch messages on the caller thread, which would be the main thread in unit tests.

quarkus-test-vertx 依赖项提供了 @io.quarkus.test.vertx.RunOnVertxContext 注释,在测试方法中使用该注释时,将在 Vert.x 上下文中执行测试。

The quarkus-test-vertx dependency provides the @io.quarkus.test.vertx.RunOnVertxContext annotation, which when used on a test method, executes the test on a Vert.x context.

但是,大多数其他连接器都处理上下文传播,并在不同的重复 Vert.x 上下文中调度消息。

However, most of the other connectors handle context propagation dispatching messages on separate duplicated Vert.x contexts.

如果您的测试依赖于上下文传播,则可以使用 run-on-vertx-context 属性配置内存连接器通道以在 Vert.x 上下文中调度事件,包括消息和确认。或者,您可以使用 InMemorySource#runOnVertxContext 方法切换此行为。

If your tests are dependent on context propagation, you can configure the in-memory connector channels with the run-on-vertx-context attribute to dispatch events, including messages and acknowledgements, on a Vert.x context. Alternatively you can switch this behaviour using the InMemorySource#runOnVertxContext method.

Testing using a Kafka broker

如果您正在使用 Dev Services for Kafka,那么将启动一个 Kafka 代理并在测试期间一直可用,除非在 %test 配置文件中将其禁用。虽然可以使用 Kafka 客户端 API 连接到此代理,但 Kafka Companion Library 提供了一种与 Kafka 代理交互以及在测试中创建使用者、生成器和管理操作的简单方法。

If you are using Dev Services for Kafka, a Kafka broker will be started and available throughout the tests, unless it is disabled in %test profile. While it is possible to connect to this broker using Kafka Clients API, Kafka Companion Library proposes an easier way of interacting with a Kafka broker and, creating consumer, producer and admin actions inside tests.

要在测试中使用 KafkaCompanion API,请首先添加以下依赖项:

For using KafkaCompanion API in tests, start by adding the following dependency:

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-test-kafka-companion</artifactId>
    <scope>test</scope>
</dependency>

其中提供了 io.quarkus.test.kafka.KafkaCompanionResource,它是 io.quarkus.test.common.QuarkusTestResourceLifecycleManager 的实现。

which provides io.quarkus.test.kafka.KafkaCompanionResource - an implementation of io.quarkus.test.common.QuarkusTestResourceLifecycleManager.

然后使用 @WithTestResource 在测试中配置 Kafka Companion,例如:

Then use @WithTestResource to configure the Kafka Companion in tests, for example:

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.UUID;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;

import io.quarkus.test.common.WithTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.kafka.InjectKafkaCompanion;
import io.quarkus.test.kafka.KafkaCompanionResource;
import io.smallrye.reactive.messaging.kafka.companion.ConsumerTask;
import io.smallrye.reactive.messaging.kafka.companion.KafkaCompanion;

@QuarkusTest
@WithTestResource(KafkaCompanionResource.class)
public class OrderProcessorTest {

    @InjectKafkaCompanion (1)
    KafkaCompanion companion;

    @Test
    void testProcessor() {
        companion.produceStrings().usingGenerator(i -> new ProducerRecord<>("orders", UUID.randomUUID().toString())); (2)

        // Expect that the tested application processes orders from 'orders' topic and write to 'orders-processed' topic

        ConsumerTask<String, String> orders = companion.consumeStrings().fromTopics("orders-processed", 10); (3)
        orders.awaitCompletion(); (4)
        assertEquals(10, orders.count());
    }
}
1 @InjectKafkaCompanion injects the KafkaCompanion instance, configured to access the Kafka broker created for tests.
2 Use KafkaCompanion to create producer task which writes 10 records to 'orders' topic.
3 Create consumer task which subscribes to 'orders-processed' topic and consumes 10 records.
4 Await completion of the consumer task.

如果在测试期间 Kafka Dev 服务可用,KafkaCompanionResource 将使用已创建的 Kafka 代理,否则会使用 Strimzi Test Container 创建一个 Kafka 代理。

If the Kafka Dev Service is available during tests, KafkaCompanionResource uses the created Kafka broker, otherwise it creates a Kafka broker using Strimzi Test Container.

可以使用 @ResourceArg 自定义已创建 Kafka 代理的配置,例如:

The configuration of the created Kafka broker can be customized using @ResourceArg, for example:

@WithTestResource(value = KafkaCompanionResource.class, initArgs = {
        @ResourceArg(name = "strimzi.kafka.image", value = "quay.io/strimzi-test-container/test-container:0.106.0-kafka-3.7.0"), // Image name
        @ResourceArg(name = "kafka.port", value = "9092"), // Fixed port for kafka, by default it will be exposed on a random port
        @ResourceArg(name = "kraft", value = "true"), // Enable Kraft mode
        @ResourceArg(name = "num.partitions", value = "3"), // Other custom broker configurations
})
public class OrderProcessorTest {
    // ...
}

Custom test resource

或者,您可以在测试资源中启动一个 Kafka 代理。以下代码段展示了一个使用 Testcontainers 启动 Kafka 代理的测试资源:

Alternatively, you can start a Kafka broker in a test resource. The following snippet shows a test resource starting a Kafka broker using Testcontainers:

public class KafkaResource implements QuarkusTestResourceLifecycleManager {

    private final KafkaContainer kafka = new KafkaContainer();

    @Override
    public Map<String, String> start() {
        kafka.start();
        return Collections.singletonMap("kafka.bootstrap.servers", kafka.getBootstrapServers());  (1)
    }

    @Override
    public void stop() {
        kafka.close();
    }
}
1 Configure the Kafka bootstrap location, so the application connects to this broker.

Dev Services for Kafka

如果存在任何与 Kafka 相关的扩展(例如 quarkus-messaging-kafka),Dev Services for Kafka 会在 dev 模式和运行测试时自动启动一个 Kafka 代理。因此,您无需手动启动代理。应用程序会自动配置。

If any Kafka-related extension is present (e.g. quarkus-messaging-kafka), Dev Services for Kafka automatically starts a Kafka broker in dev mode and when running tests. So, you don’t have to start a broker manually. The application is configured automatically.

由于启动 Kafka 代理可能需要较长时间,因此 Dev Services for Kafka 使用 Redpanda,一个在约 1 秒内启动的与 Kafka 兼容的代理。

Because starting a Kafka broker can be long, Dev Services for Kafka uses Redpanda, a Kafka compatible broker which starts in ~1 second.

Enabling / Disabling Dev Services for Kafka

除非满足以下条件,否则 Dev Services for Kafka 会被自动启用:

Dev Services for Kafka is automatically enabled unless:

  • quarkus.kafka.devservices.enabled is set to false

  • the kafka.bootstrap.servers is configured

  • all the Reactive Messaging Kafka channels have the bootstrap.servers attribute set

Dev Services for Kafka 依赖于 Docker 来启动代理。如果您的环境不支持 Docker,您需要手动启动代理,或连接到已运行的代理。您可以使用 kafka.bootstrap.servers 来配置代理地址。

Dev Services for Kafka relies on Docker to start the broker. If your environment does not support Docker, you will need to start the broker manually, or connect to an already running broker. You can configure the broker address using kafka.bootstrap.servers.

Shared broker

大多数情况下,您需要在应用程序之间共享代理。Dev Services for Kafka 为您的多个,在 dev 模式下运行的 Quarkus 应用程序实现了共享单个代理的 service discovery 机制。

Most of the time you need to share the broker between applications. Dev Services for Kafka implements a service discovery mechanism for your multiple Quarkus applications running in dev mode to share a single broker.

Dev Services for Kafka 用 quarkus-dev-service-kafka 标签启动容器,用于标识容器。

Dev Services for Kafka starts the container with the quarkus-dev-service-kafka label which is used to identify the container.

如果您需要多个(共享)代理,您可以配置 quarkus.kafka.devservices.service-name 属性并指示代理名称。它将查找具有相同值的容器,或在找不到容器的情况下启动一个新容器。默认服务名称为 kafka

If you need multiple (shared) brokers, you can configure the quarkus.kafka.devservices.service-name attribute and indicate the broker name. It looks for a container with the same value, or starts a new one if none can be found. The default service name is kafka.

在 dev 模式下默认启用共享,但在测试模式下禁用共享。您可以使用 quarkus.kafka.devservices.shared=false 禁用共享。

Sharing is enabled by default in dev mode, but disabled in test mode. You can disable the sharing with quarkus.kafka.devservices.shared=false.

Setting the port

默认情况下,Dev Services for Kafka 选择一个随机端口并配置应用程序。您可以通过配置 quarkus.kafka.devservices.port 属性来设置端口。

By default, Dev Services for Kafka picks a random port and configures the application. You can set the port by configuring the quarkus.kafka.devservices.port property.

请注意,Kafka 通告地址会使用所选端口自动配置。

Note that the Kafka advertised address is automatically configured with the chosen port.

Configuring the image

Dev Services for Kafka 支持 Redpanda, kafka-nativeStrimzi(处于 Kraft模式)映像。

Dev Services for Kafka supports Redpanda, kafka-native and Strimzi (in Kraft mode) images.

*Redpanda*是一个与 Kafka 兼容的事件流式传输平台。由于它提供快速启动时间,因此 dev services 默认使用 `vectorized/redpanda`的 Redpanda 映像。您可以从 [role="bare"][role="bare"]https://hub.docker.com/r/vectorized/redpanda中选择任何版本。

Redpanda is a Kafka compatible event streaming platform. Because it provides a fast startup times, dev services defaults to Redpanda images from vectorized/redpanda. You can select any version from [role="bare"]https://hub.docker.com/r/vectorized/redpanda.

*kafka-native*提供使用 Quarkus 和 GraalVM 编译为原生二进制文件的 Apache Kafka 发行版的映像。同时仍然是 experimental,它提供了非常快速启动时间,并且空间占用小。

kafka-native provides images of standard Apache Kafka distribution compiled to native binary using Quarkus and GraalVM. While still being experimental, it provides very fast startup times with small footprint.

可以使用配置映像类型

Image type can be configured using

quarkus.kafka.devservices.provider=kafka-native

*Strimzi*为在 Kubernetes 上运行 Apache Kafka 提供容器映像和操作符。虽然 Strimzi 针对 Kubernetes 进行了优化,但这些映像在经典容器环境中也能完美运行。Strimzi 容器映像在 JVM 上运行“真正的”Kafka 代理,启动速度较慢。

Strimzi provides container images and Operators for running Apache Kafka on Kubernetes. While Strimzi is optimized for Kubernetes, the images work perfectly in classic container environments. Strimzi container images run "genuine" Kafka broker on JVM, which is slower to start.

quarkus.kafka.devservices.provider=strimzi

对于 Strimzi,您可以从 [role="bare"][role="bare"]https://quay.io/repository/strimzi-test-container/test-container?tab=tags中选择具有 Kraft 支持(2.8.1 及更高版本)的任何 Kafka 版本映像。

For Strimzi, you can select any image with a Kafka version which has Kraft support (2.8.1 and higher) from [role="bare"]https://quay.io/repository/strimzi-test-container/test-container?tab=tags

quarkus.kafka.devservices.image-name=quay.io/strimzi-test-container/test-container:0.106.0-kafka-3.7.0

Configuring Kafka topics

您可以将 Dev Services for Kafka 配置为在代理启动后创建主题。主题使用给定数量的分区和 1 个副本创建。

You can configure the Dev Services for Kafka to create topics once the broker is started. Topics are created with given number of partitions and 1 replica.

以下示例创建一个名为 `test`且有 3 个分区的主题,以及一个名为 `messages`且有 2 个分区的第二个主题。

The following example creates a topic named test with 3 partitions, and a second topic named messages with 2 partitions.

quarkus.kafka.devservices.topic-partitions.test=3
quarkus.kafka.devservices.topic-partitions.messages=2

如果已经存在具有指定名称的主题,则将跳过创建过程,而不会尝试将现有主题重新分区为其他数量的分区。

If a topic already exists with the given name, the creation is skipped, without trying to re-partition the existing topic to a different number of partitions.

您可以使用 `quarkus.kafka.devservices.topic-partitions-timeout`来配置主题创建中使用的 Kafka 管理客户端调用的超时时间,其默认值为 2 秒。

You can configure timeout for Kafka admin client calls used in topic creation using quarkus.kafka.devservices.topic-partitions-timeout, it defaults to 2 seconds.

Transactional and Idempotent producers support

默认情况下,Redpanda 代理配置为启用事务和幂等性功能。您可以使用以下方法禁用它们:

By default, the Redpanda broker is configured to enable transactions and idempotence features. You can disable those using:

quarkus.kafka.devservices.redpanda.transaction-enabled=false

Redpanda 事务不支持完全一次处理。

Redpanda transactions does not support exactly-once processing.

Kafka Dev UI

如果存在任何与 Kafka 相关的扩展(例如 quarkus-messaging-kafka),Quarkus Dev UI 将扩展为包含有 Kafka 代理管理 UI。该管理 UI 会自动连接到为应用程序配置的 Kafka 代理。

If any Kafka-related extension is present (e.g. quarkus-messaging-kafka), the Quarkus Dev UI is extended with a Kafka broker management UI. It is connected automatically to the Kafka broker configured for the application.

kafka dev ui link

使用 Kafka Dev UI,您可以直接管理您的 Kafka 集群并执行任务,例如:

With the Kafka Dev UI, you can directly manage your Kafka cluster and perform tasks, such as:

  • Listing and creating topics

  • Visualizing records

  • Publishing new records

  • Inspecting the list of consumer groups and their consumption lag

kafka dev ui records

Kafka Dev UI 属于 Quarkus Dev UI 的一部分,它仅在开发模式下可用。

Kafka Dev UI is part of the Quarkus Dev UI and is only available in development mode.

Kubernetes Service Bindings

Quarkus Kafka 扩展支持 Service Binding Specification for Kubernetes。您可以通过将 `quarkus-kubernetes-service-binding`扩展添加到您的应用程序中来启用此功能。

Quarkus Kafka extension supports Service Binding Specification for Kubernetes. You can enable this by adding the quarkus-kubernetes-service-binding extension to your application.

在适当地配置的 Kubernetes 集群中运行时,Kafka 扩展将从集群内可用的服务绑定中提取其 Kafka 代理连接配置,而无需用户配置。

When running in appropriately configured Kubernetes clusters, Kafka extension will pull its Kafka broker connection configuration from the service binding available inside the cluster, without the need for user configuration.

Execution model

反应式消息传递在 I/O 线程上调用用户的函数。因此,默认情况下,这些函数不得阻塞。正如 Blocking processing中所述,如果此函数将阻塞调用线程,您需要在函数上添加 `@Blocking`注解。

Reactive Messaging invokes user’s methods on an I/O thread. Thus, by default, the methods must not block. As described in Blocking processing, you need to add the @Blocking annotation on the method if this method will block the caller thread.

请参阅 Quarkus Reactive Architecture documentation以了解有关此主题的更多详细信息。

See the Quarkus Reactive Architecture documentation for further details on this topic.

Channel Decorators

SmallRye 反应式消息传递支持装饰传入和传出频道,以便实现横切关注点,例如监视、跟踪或消息拦截。有关实现装饰器和消息拦截器的更多信息,请参阅 SmallRye Reactive Messaging documentation

SmallRye Reactive Messaging supports decorating incoming and outgoing channels for implementing cross-cutting concerns such as monitoring, tracing or message interception. For more information on implementing decorators and message interceptors see the SmallRye Reactive Messaging documentation.

Configuration Reference

有关 SmallRye 反应式消息传递配置的更多详细信息,请参见 SmallRye Reactive Messaging - Kafka Connector Documentation

More details about the SmallRye Reactive Messaging configuration can be found in the SmallRye Reactive Messaging - Kafka Connector Documentation.

每个频道可以通过使用以下方式通过配置来关闭:

Each channel can be disabled via configuration using:

mp.messaging.[incoming|outgoing].[channel].enabled=false

最重要的属性列在下面的表格中:

The most important attributes are listed in the tables below:

Incoming channel configuration (polling from Kafka)

使用以下内容配置下列属性:

The following attributes are configured using:

mp.messaging.incoming.your-channel-name.attribute=value

一些属性具有可以在全局范围内配置的别名:

Some properties have aliases which can be configured globally:

kafka.bootstrap.servers=...

您还可以传递基础 Kafka consumer支持的任何属性。

You can also pass any property supported by the underlying Kafka consumer.

例如,要配置 `max.poll.records`属性,请使用:

For example, to configure the max.poll.records property, use:

mp.messaging.incoming.[channel].max.poll.records=1000

一些消费者客户端属性被配置为合理的默认值:

Some consumer client properties are configured to sensible default values:

如果未设置,则 reconnect.backoff.max.ms`设置为 `10000,以避免在断开连接时产生高负载。

If not set, reconnect.backoff.max.ms is set to 10000 to avoid high load on disconnection.

如果未设置,则 key.deserializer`设置为 `org.apache.kafka.common.serialization.StringDeserializer

If not set, key.deserializer is set to org.apache.kafka.common.serialization.StringDeserializer.

消费者 `client.id`根据使用 `mp.messaging.incoming.[channel].partitions`属性创建的客户端数进行配置。

The consumer client.id is configured according to the number of clients to create using mp.messaging.incoming.[channel].partitions property.

  • If a client.id is provided, it is used as-is or suffixed with client index if partitions property is set.

  • If a client.id is not provided, it is generated as [client-id-prefix][channel-name][-index].

Unresolved directive in kafka.adoc - include::{includes}/smallrye-kafka-incoming.adoc[]

Outgoing channel configuration (writing to Kafka)

使用以下内容配置下列属性:

The following attributes are configured using:

mp.messaging.outgoing.your-channel-name.attribute=value

一些属性具有可以在全局范围内配置的别名:

Some properties have aliases which can be configured globally:

kafka.bootstrap.servers=...

您还可以传递底层 Kafka producer支持的任何属性。

You can also pass any property supported by the underlying Kafka producer.

例如,若要配置 `max.block.ms`属性,请使用:

For example, to configure the max.block.ms property, use:

mp.messaging.incoming.[channel].max.block.ms=10000

某些生产者客户端属性配置为合理的默认值:

Some producer client properties are configured to sensible default values:

如果未设置,则 reconnect.backoff.max.ms`设置为 `10000,以避免在断开连接时产生高负载。

If not set, reconnect.backoff.max.ms is set to 10000 to avoid high load on disconnection.

如果未设置,则 key.serializer`将设置为 `org.apache.kafka.common.serialization.StringSerializer

If not set, key.serializer is set to org.apache.kafka.common.serialization.StringSerializer.

如果未设置,则生产者 client.id`将生成为 `[client-id-prefix][channel-name]

If not set, producer client.id is generated as [client-id-prefix][channel-name].

Unresolved directive in kafka.adoc - include::{includes}/smallrye-kafka-outgoing.adoc[]

Kafka Configuration Resolution

Quarkus 暴露出所有与 Kafka 相关的应用程序属性,这些属性以 `kafka.`或 `KAFKA_`为前缀,位于具有 `default-kafka-broker`名称的配置映射中。此配置用于与 Kafka 代理商建立连接。

Quarkus exposes all Kafka related application properties, prefixed with kafka. or KAFKA_ inside a configuration map with default-kafka-broker name. This configuration is used to establish the connection with the Kafka broker.

除了此默认配置之外,您还可以使用 `kafka-configuration`属性配置 `Map`生产者的名称:

In addition to this default configuration, you can configure the name of the Map producer using the kafka-configuration attribute:

mp.messaging.incoming.my-channel.connector=smallrye-kafka
mp.messaging.incoming.my-channel.kafka-configuration=my-configuration

在这种情况下,连接器将查找与`my-configuration`名称关联的`Map`。如果未设置 kafka-configuration,则会查找与通道名称关联可选的 Map(即前一示例中的 my-channel)。

In this case, the connector looks for the Map associated with the my-configuration name. If kafka-configuration is not set, an optional lookup for a Map exposed with the channel name (my-channel in the previous example) is done.

@Produces
@ApplicationScoped
@Identifier("my-configuration")
Map<String, Object> outgoing() {
    return Map.ofEntries(
            Map.entry("value.serializer", ObjectMapperSerializer.class.getName())
    );
}

如果设置了 kafka-configuration`且找不到 `Map,则部署失败。

If kafka-configuration is set and no Map can be found, the deployment fails.

属性值按以下方式解析:

Attribute values are resolved as follows:

  1. the attribute is set directly on the channel configuration (mp.messaging.incoming.my-channel.attribute=value),

  2. if not set, the connector looks for a Map with the channel name or the configured kafka-configuration (if set) and the value is retrieved from that Map

  3. If the resolved Map does not contain the value the default Map is used (exposed with the default-kafka-broker name)

Conditionally configure channels

您可以使用特定配置文件配置通道。因此,仅当启用了指定配置文件时才配置通道(并将它们添加到应用程序中)。

You can configure the channels using a specific profile. Thus, the channels are only configured (and added to the application) when the specified profile is enabled.

要实现此目的,您需要做以下操作:

To achieve this, you need:

  1. Prefix the mp.messaging.[incoming|outgoing].$channel entries with %my-profile such as %my-profile.mp.messaging.[incoming|outgoing].$channel.key=value

  2. Use the @IfBuildProfile("my-profile") on the CDI beans containing @Incoming(channel) and @Outgoing(channel) annotations that need only to be enabled when the profile is enabled.

请注意,响应性消息传递会验证图形是否完整。因此,在使用这样的条件配置时,请确保应用程序可与已启用和未启用的配置文件配合使用。

Note that reactive messaging verifies that the graph is complete. So, when using such a conditional configuration, ensure the application works with and without the profile enabled.

请注意,此方法还可以根据配置文件来更改通道配置。

Note that this approach can also be used to change the channel configuration based on a profile.

Integrating with Kafka - Common patterns

Writing to Kafka from an HTTP endpoint

若要从 HTTP 终端向 Kafka 发送消息,请在端点中注入 Emitter(或 MutinyEmitter):

To send messages to Kafka from an HTTP endpoint, inject an Emitter (or a MutinyEmitter) in your endpoint:

package org.acme;

import java.util.concurrent.CompletionStage;

import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@Path("/")
public class ResourceSendingToKafka {

    @Channel("kafka") Emitter<String> emitter;          (1)

    @POST
    @Produces(MediaType.TEXT_PLAIN)
    public CompletionStage<Void> send(String payload) { (2)
        return emitter.send(payload);                   (3)
    }
}
1 Inject an Emitter<String>
2 The HTTP method receives the payload and returns a CompletionStage completed when the message is written to Kafka
3 Send the message to Kafka, the send method returns a CompletionStage

该端点将通过 POST HTTP 请求传递的有效负载发送到发送器。发送器的通道在 application.properties 文件中映射到一个 Kafka 主题:

The endpoint sends the passed payload (from a POST HTTP request) to the emitter. The emitter’s channel is mapped to a Kafka topic in the application.properties file:

mp.messaging.outgoing.kafka.connector=smallrye-kafka
mp.messaging.outgoing.kafka.topic=my-topic

该端点返回一个 CompletionStage 以指示方法的异步属性。emitter.send 方法返回一个 CompletionStage<Void>。当消息写入 Kafka 后,返回的 future 完成。如果写入失败,则返回的 CompletionStage 异常地完成。

The endpoint returns a CompletionStage indicating the asynchronous nature of the method. The emitter.send method returns a CompletionStage<Void> . The returned future is completed when the message has been written to Kafka. If the writing fails, the returned CompletionStage is completed exceptionally.

如果端点未返回 CompletionStage ,则可能会在将消息发送到 Kafka 之前写入 HTTP 响应,因此故障不会报告给用户。

If the endpoint does not return a CompletionStage, the HTTP response may be written before the message is sent to Kafka, and so failures won’t be reported to the user.

如果您需要发送 Kafka 记录,请使用:

If you need to send a Kafka record, use:

package org.acme;

import java.util.concurrent.CompletionStage;

import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import io.smallrye.reactive.messaging.kafka.Record;

@Path("/")
public class ResourceSendingToKafka {

    @Channel("kafka") Emitter<Record<String,String>> emitter;  (1)


    @POST
    @Produces(MediaType.TEXT_PLAIN)
    public CompletionStage<Void> send(String payload) {
        return emitter.send(Record.of("my-key", payload));    (2)
    }
}
1 Note the usage of an Emitter<Record<K, V>>
2 Create the record using Record.of(k, v)

Persisting Kafka messages with Hibernate with Panache

要将从 Kafka 接收的对象持久化到数据库中,可以使用带潘纳切的 Hibernate。

To persist objects received from Kafka into a database, you can use Hibernate with Panache.

如果您使用 Hibernate Reactive,请查看 Persisting Kafka messages with Hibernate Reactive

If you use Hibernate Reactive, look at Persisting Kafka messages with Hibernate Reactive.

让我们想象一下您接收 Fruit 对象。出于简单起见,我们的 Fruit 类非常简单:

Let’s imagine you receive Fruit objects. For simplicity purposes, our Fruit class is pretty simple:

package org.acme;

import jakarta.persistence.Entity;

import io.quarkus.hibernate.orm.panache.PanacheEntity;

@Entity
public class Fruit extends PanacheEntity {

    public String name;

}

若要使用 Kafka 主题中存储的 Fruit 实例,并将其持久化到数据库,您可以使用以下方法:

To consume Fruit instances stored on a Kafka topic, and persist them into a database, you can use the following approach:

package org.acme;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.smallrye.common.annotation.Blocking;

@ApplicationScoped
public class FruitConsumer {

    @Incoming("fruits")                                     (1)
    @Transactional                                          (2)
    public void persistFruits(Fruit fruit) {                (3)
        fruit.persist();                                    (4)
    }
}
1 Configuring the incoming channel. This channel reads from Kafka.
2 As we are writing in a database, we must be in a transaction. This annotation starts a new transaction and commits it when the method returns. Quarkus automatically considers the method as blocking. Indeed, writing to a database using classic Hibernate is blocking. So, Quarkus calls the method on a worker thread you can block (and not an I/O thread).
3 The method receives each Fruit. Note that you would need a deserializer to reconstruct the Fruit instances from the Kafka records.
4 Persist the received fruit object.

如 <4> 中所述,你需要一个反序列化程序,它能够从记录创建一个 Fruit。这可以使用 Jackson 反序列化程序来实现:

As mentioned in <4>, you need a deserializer that can create a Fruit from the record. This can be done using a Jackson deserializer:

package org.acme;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

关联的配置如下:

The associated configuration would be:

mp.messaging.incoming.fruits.connector=smallrye-kafka
mp.messaging.incoming.fruits.value.deserializer=org.acme.FruitDeserializer

请查看 Serializing via Jackson 以了解如何将 Jackson 与 Kafka 配合使用的更多详细信息。你还可以使用 Avro。

Check Serializing via Jackson for more detail about the usage of Jackson with Kafka. You can also use Avro.

Persisting Kafka messages with Hibernate Reactive

要将从 Kafka 接收的对象持久化到数据库中,可以使用 Hibernate Reactive with Panache。

To persist objects received from Kafka into a database, you can use Hibernate Reactive with Panache.

让我们想象一下您接收 Fruit 对象。出于简单起见,我们的 Fruit 类非常简单:

Let’s imagine you receive Fruit objects. For simplicity purposes, our Fruit class is pretty simple:

package org.acme;

import jakarta.persistence.Entity;

import io.quarkus.hibernate.reactive.panache.PanacheEntity;  (1)

@Entity
public class Fruit extends PanacheEntity {

    public String name;

}
1 Make sure to use the reactive variant

若要使用 Kafka 主题中存储的 Fruit 实例,并将其持久化到数据库,您可以使用以下方法:

To consume Fruit instances stored on a Kafka topic, and persist them into a database, you can use the following approach:

package org.acme;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.control.ActivateRequestContext;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class FruitStore {

    @Inject
    Mutiny.Session session;                    (1)

    @Incoming("in")
    @ActivateRequestContext (2)
    public Uni<Void> consume(Fruit entity) {
        return session.withTransaction(t -> {  (3)
            return entity.persistAndFlush()    (4)
                    .replaceWithVoid();        (5)
        }).onTermination().call(() -> session.close()); (6)
    }

}
1 Inject the Hibernate Reactive Session
2 Hibernate Reactive Session and Panache APIs require an active CDI Request context. @ActivateRequestContext annotation creates a new request context and destroys it when the Uni returned from the method completes. If Panache is not used, Mutiny.SessionFactory can be injected and used similarly without the need of activating the request context or closing the session manually.
3 Requests a new transaction. The transaction completes when the passed action completes.
4 Persist the entity. It returns a Uni<Fruit>.
5 Switch back to a Uni<Void>.
6 Close the session - this is close the connection with the database. The connection can then be recycled.

classic Hibernate 不同,你无法使用 @Transactional。相反,我们要使用 session.withTransaction 并持久化我们的实体。map 用于返回一个 Uni<Void>,而不是 Uni<Fruit>

Unlike with classic Hibernate, you can’t use @Transactional. Instead, we use session.withTransaction and persist our entity. The map is used to return a Uni<Void> and not a Uni<Fruit>.

你需要一个反序列化程序,它能够从记录创建一个 Fruit。这可以使用 Jackson 反序列化程序来实现:

You need a deserializer that can create a Fruit from the record. This can be done using a Jackson deserializer:

package org.acme;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

关联的配置如下:

The associated configuration would be:

mp.messaging.incoming.fruits.connector=smallrye-kafka
mp.messaging.incoming.fruits.value.deserializer=org.acme.FruitDeserializer

请查看 Serializing via Jackson 以了解如何将 Jackson 与 Kafka 配合使用的更多详细信息。你还可以使用 Avro。

Check Serializing via Jackson for more detail about the usage of Jackson with Kafka. You can also use Avro.

Writing entities managed by Hibernate to Kafka

我们想象以下过程:

Let’s imagine the following process:

  1. You receive an HTTP request with a payload,

  2. You create an Hibernate entity instance from this payload,

  3. You persist that entity into a database,

  4. You send the entity to a Kafka topic

如果您使用 Hibernate Reactive,请看 Writing entities managed by Hibernate Reactive to Kafka

If you use Hibernate Reactive, look at Writing entities managed by Hibernate Reactive to Kafka.

因为我们要写入数据库,所以必须在这个交易中运行此方法。然而,以异步方式给 Kafka 发送实体。操作返回一个 CompletionStage(或一个 Uni,如果你使用 MutinyEmitter)报告操作完成的情况。我们必须确信该交易仍然在运行,直到对象被写入。否则,您可能会在交易之外访问该对象,这是不允许的。

Because we write to a database, we must run this method in a transaction. Yet, sending the entity to Kafka happens asynchronously. The operation returns a CompletionStage (or a Uni if you use a MutinyEmitter) reporting when the operation completes. We must be sure that the transaction is still running until the object is written. Otherwise, you may access the object outside the transaction, which is not allowed.

要实施该进程,您需要以下方法:

To implement this process, you need the following approach:

package org.acme;

import java.util.concurrent.CompletionStage;

import jakarta.transaction.Transactional;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@Path("/")
public class ResourceSendingToKafka {

    @Channel("kafka") Emitter<Fruit> emitter;

    @POST
    @Path("/fruits")
    @Transactional                                                      (1)
    public CompletionStage<Void> storeAndSendToKafka(Fruit fruit) {     (2)
        fruit.persist();
        return emitter.send(new FruitDto(fruit));                       (3)
    }
}
1 As we are writing to the database, make sure we run inside a transaction
2 The method receives the fruit instance to persist. It returns a CompletionStage which is used for the transaction demarcation. The transaction is committed when the return CompletionStage completes. In our case, it’s when the message is written to Kafka.
3 Wrap the managed entity inside a Data transfer object and send it to Kafka. This makes sure that managed entity is not impacted by the Kafka serialization.

Writing entities managed by Hibernate Reactive to Kafka

若要发送由 Hibernate Reactive 管理的实体到 Kafka,我们建议使用:

To send to Kafka entities managed by Hibernate Reactive, we recommend using:

  • Quarkus REST to serve HTTP requests

  • A MutinyEmitter to send message to a channel, so it can be easily integrated with the Mutiny API exposed by Hibernate Reactive or Hibernate Reactive with Panache.

以下示例演示了如何接收有效载荷,使用带有 Panache 的 Hibernate Reactive 存储在数据库内,并将持续存在的实体发送给 Kafka:

The following example demonstrates how to receive a payload, store it in the database using Hibernate Reactive with Panache, and send the persisted entity to Kafka:

package org.acme;

import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.MutinyEmitter;

@Path("/")
public class ReactiveGreetingResource {

    @Channel("kafka") MutinyEmitter<Fruit> emitter;     (1)

    @POST
    @Path("/fruits")
    public Uni<Void> sendToKafka(Fruit fruit) {         (2)
        return Panache.withTransaction(() ->            (3)
            fruit.<Fruit>persist()
        )
            .chain(f -> emitter.send(f));               (4)
    }
}
1 Inject a MutinyEmitter which exposes a Mutiny API. It simplifies the integration with the Mutiny API exposed by Hibernate Reactive with Panache.
2 The HTTP method receiving the payload returns a Uni<Void>. The HTTP response is written when the operation completes (the entity is persisted and written to Kafka).
3 We need to write the entity into the database in a transaction.
4 Once the persist operation completes, we send the entity to Kafka. The send method returns a Uni<Void>.

Streaming Kafka topics as server-sent events

将 Kafka 主题作为服务器发送的事件 (SSE) 流是简单的:

Streaming a Kafka topic as server-sent events (SSE) is straightforward:

  1. You inject the channel representing the Kafka topic in your HTTP endpoint

  2. You return that channel as a Publisher or a Multi from the HTTP method

以下代码提供了一个示例:

The following code provides an example:

@Channel("fruits")
Multi<Fruit> fruits;

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<Fruit> stream() {
    return fruits;
}

当没有足够的活动时,一些环境会切断 SSE 连接。解决方法包括定期发送 _ping_消息(或空对象)。

Some environment cuts the SSE connection when there is not enough activity. The workaround consists of sending ping messages (or empty objects) periodically.

@Channel("fruits")
Multi<Fruit> fruits;

@Inject
ObjectMapper mapper;

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> stream() {
    return Multi.createBy().merging()
            .streams(
                    fruits.map(this::toJson),
                    emitAPeriodicPing()
            );
}

Multi<String> emitAPeriodicPing() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(10))
            .onItem().transform(x -> "{}");
}

private String toJson(Fruit f) {
    try {
        return mapper.writeValueAsString(f);
    } catch (JsonProcessingException e) {
        throw new RuntimeException(e);
    }
}

除了发送来自 Kafka 的数据之外,还需要定期发送 ping,因此解决方法稍微复杂一些。要实现此目的,我们将来自 Kafka 的流与每 10 秒发出 {} 的定期流合并。

The workaround is a bit more complex as besides sending the fruits coming from Kafka, we need to send pings periodically. To achieve this we merge the stream coming from Kafka and a periodic stream emitting {} every 10 seconds.

Chaining Kafka Transactions with Hibernate Reactive transactions

通过将 Kafka 事务与 Hibernate Reactive 事务链接,可以将记录发送到 Kafka 事务,执行数据库更新并仅在数据库事务成功时提交 Kafka 事务。

By chaining a Kafka transaction with a Hibernate Reactive transaction you can send records to a Kafka transaction, perform database updates and commit the Kafka transaction only if the database transaction is successful.

以下示例演示:

The following example demonstrates:

  • Receive a payload by serving HTTP requests using Quarkus REST,

  • Limit concurrency of that HTTP endpoint using Smallrye Fault Tolerance,

  • Start a Kafka transaction and send the payload to Kafka record,

  • Store the payload in the database using Hibernate Reactive with Panache,

  • Commit the Kafka transaction only if the entity is persisted successfully.

package org.acme;

import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.faulttolerance.Bulkhead;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.hibernate.reactive.mutiny.Mutiny;

import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;

@Path("/")
public class FruitProducer {

    @Channel("kafka") KafkaTransactions<Fruit> kafkaTx; (1)

    @POST
    @Path("/fruits")
    @Consumes(MediaType.APPLICATION_JSON)
    @Bulkhead(1) (2)
    public Uni<Void> post(Fruit fruit) { (3)
        return kafkaTx.withTransaction(emitter -> { (4)
            emitter.send(fruit); (5)
            return Panache.withTransaction(() -> { (6)
                return fruit.<Fruit>persist(); (7)
            });
        }).replaceWithVoid();
    }
}
1 Inject a KafkaTransactions which exposes a Mutiny API. It allows the integration with the Mutiny API exposed by Hibernate Reactive with Panache.
2 Limit the concurrency of the HTTP endpoint to "1", preventing starting multiple transactions at a given time.
3 The HTTP method receiving the payload returns a Uni<Void>. The HTTP response is written when the operation completes (the entity is persisted and Kafka transaction is committed).
4 Begin a Kafka transaction.
5 Send the payload to Kafka inside the Kafka transaction.
6 Persist the entity into the database in a Hibernate Reactive transaction.
7 Once the persist operation completes, and there is no errors, the Kafka transaction is committed. The result is omitted and returned as the HTTP response.

在前面的示例中,数据库事务(内部)将提交,然后是 Kafka 事务(外部)。如果您希望首先提交 Kafka 事务,然后是数据库事务,则需要按相反的顺序嵌套它们。

In the previous example the database transaction (inner) will commit followed by the Kafka transaction (outer). If you wish to commit the Kafka transaction first and the database transaction second, you need to nest them in the reverse order.

下一个示例演示如何使用 Hibernate Reactive API(不带 Panache):

The next example demonstrates that using the Hibernate Reactive API (without Panache):

import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.faulttolerance.Bulkhead;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.hibernate.reactive.mutiny.Mutiny;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;

@Path("/")
public class FruitProducer {

    @Channel("kafka") KafkaTransactions<Fruit> kafkaTx;

    @Inject Mutiny.SessionFactory sf; (1)

    @POST
    @Path("/fruits")
    @Consumes(MediaType.APPLICATION_JSON)
    @Bulkhead(1)
    public Uni<Void> post(Fruit fruit) {
        Context context = Vertx.currentContext(); (2)
        return sf.withTransaction(session -> (3)
                kafkaTx.withTransaction(emitter -> (4)
                        session.persist(fruit).invoke(() -> emitter.send(fruit)) (5)
                ).emitOn(context::runOnContext) (6)
        );
    }
}
1 Inject the Hibernate Reactive SessionFactory.
2 Capture the caller Vert.x context.
3 Begin a Hibernate Reactive transaction.
4 Begin a Kafka transaction.
5 Persist the payload and send the entity to Kafka.
6 The Kafka transaction terminates on the Kafka producer sender thread. We need to switch to the Vert.x context previously captured in order to terminate the Hibernate Reactive transaction on the same context we started it.

Logging

为了减少 Kafka 客户端写入的日志数量,Quarkus 将以下日志类别的级别设置为 “1”:

To reduce the amount of log written by the Kafka client, Quarkus sets the level of the following log categories to WARNING:

  • org.apache.kafka.clients

  • org.apache.kafka.common.utils

  • org.apache.kafka.common.metrics

可以通过将以下行添加到 “2” 中来覆盖配置:

You can override the configuration by adding the following lines to the application.properties:

quarkus.log.category."org.apache.kafka.clients".level=INFO
quarkus.log.category."org.apache.kafka.common.utils".level=INFO
quarkus.log.category."org.apache.kafka.common.metrics".level=INFO

Connecting to Managed Kafka clusters

本部分说明如何连接到著名的 Kafka 云服务。

This section explains how to connect to notorious Kafka Cloud Services.

Azure Event Hub

“3” 提供与 Apache Kafka 兼容的端点。

Azure Event Hub provides an endpoint compatible with Apache Kafka.

在 “4” 等级中,无法使用用于 Kafka 的 Azure 事件中心。您至少需要 “5” 等级才能使用 Kafka。请参阅 “6” 查看其他选项。

Azure Event Hubs for Kafka is not available in the basic tier. You need at least the standard tier to use Kafka. See Azure Event Hubs Pricing to see the other options.

若要使用 TLS 连接到 Azure 事件中心,需要以下配置,并使用 Kafka 协议:

To connect to Azure Event Hub, using the Kafka protocol with TLS, you need the following configuration:

kafka.bootstrap.servers=my-event-hub.servicebus.windows.net:9093 (1)
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=PLAIN
kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ (2)
    username="$ConnectionString" \ (3)
    password="<YOUR.EVENTHUBS.CONNECTION.STRING>"; (4)
1 The port is 9093.
2 You need to use the JAAS PlainLoginModule.
3 The username is the $ConnectionString string.
4 The Event Hub connection string given by Azure.

使用事件中心命名空间的连接字符串替换 “9”。有关获取连接字符串的说明,请参阅 “10”。结果应类似于:

Replace <YOUR.EVENTHUBS.CONNECTION.STRING> with the connection string for your Event Hubs namespace. For instructions on getting the connection string, see Get an Event Hubs connection string. The result would be something like:

kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="Endpoint=sb://my-event-hub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

此配置可以是全局的(如上所示),也可以设置在通道配置中:

This configuration can be global (as above), or set in the channel configuration:

mp.messaging.incoming.$channel.bootstrap.servers=my-event-hub.servicebus.windows.net:9093
mp.messaging.incoming.$channel.security.protocol=SASL_SSL
mp.messaging.incoming.$channel.sasl.mechanism=PLAIN
mp.messaging.incoming.$channel.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="Endpoint=sb://my-event-hub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=...";

Red Hat OpenShift Streams for Apache Kafka

“13” 提供托管的 Kafka 代理。首先,按照 “14” 中的说明创建您的 Kafka 代理实例。确保复制了与您创建的 “12” 关联的客户端 ID 和客户端密钥。

Red Hat OpenShift Streams for Apache Kafka provides managed Kafka brokers. First, follow the instructions from Getting started with the rhoas CLI for Red Hat OpenShift Streams for Apache Kafka to create your Kafka broker instance. Make sure you copied the client id and client secret associated with the ServiceAccount you created.

然后,可以将 Quarkus 应用程序配置为如下连接到代理:

Then, you can configure the Quarkus application to connect to the broker as follows:

kafka.bootstrap.servers=<connection url> (1)
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=PLAIN
kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="${KAFKA_USERNAME}" \ (2)
  password="${KAFKA_PASSWORD}"; (3)
1 The connection string, given on the admin console, such as demo-c—​bjsv-ldd-cvavkc-a.bf2.kafka.rhcloud.com:443
2 The kafka username (the client id from the service account)
3 the kafka password (the client secret from the service account)

通常,这些属性的前缀使用 “16” 来启用它们,仅在运行于生产模式时启用。

In general, these properties are prefixed using %prod to enable them only when running in production mode.

正如 Getting started with the rhoas CLI for Red Hat OpenShift Streams for Apache Kafka中解释的那样,要使用 Red Hat OpenShift Streams for Apache Kafka,您必须事先创建主题,创建一个 Service Account,并从该服务帐户提供读取和写入主题的权限。认证数据(客户端 ID 和密钥)与服务帐户相关,这意味着您可以实施细粒度权限并限制对主题的访问。

As explained in Getting started with the rhoas CLI for Red Hat OpenShift Streams for Apache Kafka, to use Red Hat OpenShift Streams for Apache Kafka, you must create the topic beforehand, create a Service Account, and provide permissions to read and write to your topic from that service account. The authentication data (client id and secret) relates to the service account, which means you can implement fine-grain permissions and restrict access to the topic.

在使用 Kubernetes 时,建议在 Kubernetes 密钥中设置客户端 ID 和密钥:

When using Kubernetes, it is recommended to set the client id and secret in a Kubernetes secret:

apiVersion: v1
kind: Secret
metadata:
  name: kafka-credentials
stringData:
  KAFKA_USERNAME: "..."
  KAFKA_PASSWORD: "..."

要允许您的 Quarkus 应用程序使用该密钥,请将以下行添加到 `application.properties`文件中:

To allow your Quarkus application to use that secret, add the following line to the application.properties file:

%prod.quarkus.openshift.env.secrets=kafka-credentials

Red Hat OpenShift Service Registry

Red Hat OpenShift Service Registry提供了完全管理的服务注册表,用于处理 Kafka 模式。

Red Hat OpenShift Service Registry provides fully managed service registry for handling Kafka schemas.

您可以按照 Getting started with Red Hat OpenShift Service Registry中的说明进行操作,或使用 `rhoas`CLI 创建新的服务注册表实例:

You can follow the instructions from Getting started with Red Hat OpenShift Service Registry, or use the rhoas CLI to create a new service registry instance:

rhoas service-registry create --name my-schema-registry

请务必注意已创建实例的 Registry URL。对于认证,您可以使用之前创建的相同 ServiceAccount。您需要确保它具有访问服务注册表的必要权限。

Make sure to note the Registry URL of the instance created. For authentication, you can use the same ServiceAccount you created previously. You need to make sure that it has the necessary permissions to access the service registry.

例如,可以使用 `rhoas`CLI 将 `MANAGER`角色授予服务帐户:

For example, using the rhoas CLI, you can grant the MANAGER role to the service account:

rhoas service-registry role add --role manager --service-account [SERVICE_ACCOUNT_CLIENT_ID]

然后,您可以将 Quarkus 应用程序配置为连接到模式注册表,如下所示:

Then, you can configure the Quarkus application to connect to the schema registry as follows:

mp.messaging.connector.smallrye-kafka.apicurio.registry.url=${RHOAS_SERVICE_REGISTRY_URL} 1
mp.messaging.connector.smallrye-kafka.apicurio.auth.service.token.endpoint=${RHOAS_OAUTH_TOKEN_ENDPOINT} 2
mp.messaging.connector.smallrye-kafka.apicurio.auth.client.id=${RHOAS_CLIENT_ID} 3
mp.messaging.connector.smallrye-kafka.apicurio.auth.client.secret=${RHOAS_CLIENT_ID} 4
1 The service registry URL, given on the admin console, such as https://bu98.serviceregistry.rhcloud.com/t/0e95af2c-6e11-475e-82ee-f13bd782df24/apis/registry/v2
2 The OAuth token endpoint URL, such as https://identity.api.openshift.com/auth/realms/rhoas/protocol/openid-connect/token
3 The client id (from the service account)
4 The client secret (from the service account)

Binding Red Hat OpenShift managed services to Quarkus application using the Service Binding Operator

如果您的 Quarkus 应用程序部署在安装了 Service Binding OperatorOpenShift Application Services操作符的 Kubernetes 或 OpenShift 集群上,则可以使用 Kubernetes Service Binding向应用程序注入访问 Red Hat OpenShift Streams for Apache Kafka 及服务注册表所需的配置。

If your Quarkus application is deployed on a Kubernetes or OpenShift cluster with Service Binding Operator and OpenShift Application Services operators installed, configurations necessary to access Red Hat OpenShift Streams for Apache Kafka and Service Registry can be injected to the application using Kubernetes Service Binding.

为了设置服务绑定,您首先需要将 OpenShift 托管服务连接到您的集群。对于 OpenShift 集群,您可以按照 Connecting a Kafka and Service Registry instance to your OpenShift cluster中的说明进行操作。

In order to set up the Service Binding, you need first to connect OpenShift managed services to your cluster. For an OpenShift cluster you can follow the instructions from Connecting a Kafka and Service Registry instance to your OpenShift cluster.

在将您的集群与 RHOAS Kafka 及服务注册表实例连接后,请确保已向新创建的服务帐户授予必要的权限。

Once you’ve connected your cluster with the RHOAS Kafka and Service Registry instances, make sure you’ve granted necessary permissions to the newly created service account.

然后,您可以使用 Kubernetes Service Binding扩展将 Quarkus 应用程序配置为生成这些服务的 `ServiceBinding`资源:

Then, using the Kubernetes Service Binding extension, you can configure the Quarkus application to generate ServiceBinding resources for those services:

quarkus.kubernetes-service-binding.detect-binding-resources=true

quarkus.kubernetes-service-binding.services.kafka.api-version=rhoas.redhat.com/v1alpha1
quarkus.kubernetes-service-binding.services.kafka.kind=KafkaConnection
quarkus.kubernetes-service-binding.services.kafka.name=my-kafka

quarkus.kubernetes-service-binding.services.serviceregistry.api-version=rhoas.redhat.com/v1alpha1
quarkus.kubernetes-service-binding.services.serviceregistry.kind=ServiceRegistryConnection
quarkus.kubernetes-service-binding.services.serviceregistry.name=my-schema-registry

对于此示例,Quarkus 构建将生成以下 `ServiceBinding`资源:

For this example Quarkus build will generate the following ServiceBinding resources:

apiVersion: binding.operators.coreos.com/v1alpha1
kind: ServiceBinding
metadata:
  name: my-app-kafka
spec:
  application:
    group: apps.openshift.io
    name: my-app
    version: v1
    kind: DeploymentConfig
  services:
    - group: rhoas.redhat.com
      version: v1alpha1
      kind: KafkaConnection
      name: my-kafka
  detectBindingResources: true
  bindAsFiles: true
---
apiVersion: binding.operators.coreos.com/v1alpha1
kind: ServiceBinding
metadata:
  name: my-app-serviceregistry
spec:
  application:
    group: apps.openshift.io
    name: my-app
    version: v1
    kind: DeploymentConfig
  services:
    - group: rhoas.redhat.com
      version: v1alpha1
      kind: ServiceRegistryConnection
      name: my-schema-registry
  detectBindingResources: true
  bindAsFiles: true

您可以按照 Deploying to OpenShift部署您的应用程序,包括生成的 `ServiceBinding`资源。用于访问 Kafka 及模式注册表实例的配置属性将在部署时自动注入到应用程序中。

You can follow Deploying to OpenShift to deploy your application, including generated ServiceBinding resources. The configuration properties necessary to access the Kafka and Schema Registry instances will be injected to the application automatically at deployment.

Going further

本指南展示了如何使用 Quarkus 与 Kafka 进行交互。它利用 Quarkus Messaging 构建数据流应用程序。

This guide has shown how you can interact with Kafka using Quarkus. It utilizes Quarkus Messaging to build data streaming applications.

如果您想进一步了解,请查看 SmallRye Reactive Messaging的文档,这是Quarkus中使用的实现。

If you want to go further, check the documentation of SmallRye Reactive Messaging, the implementation used in Quarkus.