Apache Kafka Reference Guide

本参考指南演示了你的 Quarkus 应用程序如何利用 Quarkus Messaging 与 Apache Kafka 交互。

Table of Contents

Introduction

Apache Kafka 是一个流行的开源分布式事件流平台。它通常用于高性能数据管道、流分析、数据集成和关键任务应用程序。类似于消息队列或企业消息平台,它使你可以:

  • publish (写入)和 subscribe 至(读取)事件流,称为 records.

  • store 稳定且可靠地存储在 topics 中的记录流。

  • process 即时或追溯记录流。

而且,所有这些功能都以分布式、高度可扩展、弹性、容错和安全的方式提供。

Quarkus Extension for Apache Kafka

Quarkus 通过 SmallRye Reactive Messaging 框架提供对 Apache Kafka 的支持。基于 Eclipse MicroProfile Reactive Messaging 规范 2.0,它提出了一个灵活的编程模型,桥接 CDI 和事件驱动。

本指南深入探讨了 Apache Kafka 和 SmallRye Reactive Messaging 框架。如需快速入门,请查看 Getting Started to Quarkus Messaging with Apache Kafka.

你可通过在项目基本目录中运行以下命令,将 messaging-kafka 扩展添加到项目中:

CLI
quarkus extension add {add-extension-extensions}
Maven
./mvnw quarkus:add-extension -Dextensions='{add-extension-extensions}'
Gradle
./gradlew addExtension --extensions='{add-extension-extensions}'

这会将以下内容添加到构建文件中:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-kafka</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-kafka")

该扩展包含 kafka-clients 版本 3.2.1 作为可传递依赖项,并且与 Kafka 代理版本 2.x 兼容。

Configuring Smallrye Kafka Connector

由于 Smallrye Reactive Messaging 框架支持不同的消息传递后端,如 Apache Kafka、AMQP、Apache Camel、JMS、MQTT 等,所以它采用通用词汇:

  • 应用程序发送和接收 messages. 消息包装 payload ,并可以使用某些 metadata 进行扩展。使用 Kafka 连接器, message 对应于 Kafka record.

  • 消息在 channels 上传输。应用程序组件连接到通道以发布和使用消息。Kafka 连接器将 channels 映射到 Kafka topics.

  • 通道使用 connectors 连接到消息后端。连接器配置为将传入消息映射到特定通道(由应用程序使用)并收集发送到特定通道的传出消息。每个连接器都专注于特定的消息技术。例如,处理 Kafka 的连接器命名为 smallrye-kafka

带传入通道的 Kafka 连接器的最小配置如下所示:

%prod.kafka.bootstrap.servers=kafka:9092 1
mp.messaging.incoming.prices.connector=smallrye-kafka 2
1 为生产环境配置代理位置。可以使用 mp.messaging.incoming.$channel.bootstrap.servers 属性在全局或每个通道配置此设置。在开发模式下和运行测试时,Dev Services for Kafka 会自动启动 Kafka 代理。未提供此属性时,此属性默认为 localhost:9092
2 配置连接器以管理价格通道。默认情况下,主题名称与通道名称相同。您可以配置主题属性来覆盖此名称。

%prod 前缀表示该属性仅在应用程序在生产模式下运行时使用(因此在开发或测试模式下不使用)。请参阅 Profile documentation 了解详细信息。

Connector auto-attachment

如果类路径中只有一个连接器,则可以省略 connector 属性配置。Quarkus 自动将 orphan 通道与类路径上找到的(唯一的)连接器关联起来。Orphans 通道是无下游使用者的传出通道或无上游生产者的传入通道。 可以使用以下方法禁用此自动附加功能:

quarkus.messaging.auto-connector-attachment=false

Receiving messages from Kafka

从上一个最小配置继续,Quarkus 应用程序可以直接接收消息负载:

import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

应用程序可以通过多种其他方法使用传入消息:

Message
@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> msg) {
    // access record metadata
    var metadata = msg.getMetadata(IncomingKafkaRecordMetadata.class).orElseThrow();
    // process the message payload.
    double price = msg.getPayload();
    // Acknowledge the incoming message (commit the offset)
    return msg.ack();
}

Message 类型允许使用的方法访问传入消息元数据并手动处理确认。我们将在 Commit Strategies 中探讨不同的确认策略。

如果您想要直接访问 Kafka 记录对象,请使用:

ConsumerRecord
@Incoming("prices")
public void consume(ConsumerRecord<String, Double> record) {
    String key = record.key(); // Can be `null` if the incoming record has no key
    String value = record.value(); // Can be `null` if the incoming record has no value
    String topic = record.topic();
    int partition = record.partition();
    // ...
}

ConsumerRecord 由底层 Kafka 客户端提供,并且可以直接注入到使用者的方法中。另一种更简单的方法是使用 Record

Record
@Incoming("prices")
public void consume(Record<String, Double> record) {
    String key = record.key(); // Can be `null` if the incoming record has no key
    String value = record.value(); // Can be `null` if the incoming record has no value
}

Record 是围绕传入 Kafka 记录的密钥和负载的简单封装。

@Channel

或者,应用程序可以在 bean 中注入 Multi 并订阅其事件,如下例所示:

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Channel;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.jboss.resteasy.reactive.RestStreamElementType;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("prices")
    Multi<Double> prices;

    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Multi<Double> stream() {
        return prices;
    }
}

这是一个如何将 Kafka 使用者与另一个下游集成的良好示例,在本例中将其公开作为服务器端发送的事件端点。

使用 @Channel 使用消息时,应用程序代码负责订阅。在上面的示例中,Quarkus REST 端点(以前的 RESTEasy Reactive)会为您处理此事。

可以将以下类型注入为通道:

@Inject @Channel("prices") Multi<Double> streamOfPayloads;

@Inject @Channel("prices") Multi<Message<Double>> streamOfMessages;

@Inject @Channel("prices") Publisher<Double> publisherOfPayloads;

@Inject @Channel("prices") Publisher<Message<Double>> publisherOfMessages;

与之前 Message 示例类似,如果注入的通道接收负载 (Multi<T>),它将自动确认消息,并支持多个使用者。如果您注入的通道接收消息 (Multi<Message<T>>),您将负责确认和广播。我们将在 Broadcasting messages on multiple consumers 中探讨如何发送广播消息。

注入 @Channel("prices") 或拥有 @Incoming("prices") 不会自动配置应用程序从 Kafka 使用消息。你需要使用 mp.messaging.incoming.prices…​ 配置一个入站连接器,或者在应用程序中的某个地方拥有 @Outgoing("prices") 方法(在这种情况下,prices 将是一个内存内通道)。

Blocking processing

响应式消息传递在 I/O 线程中调用你的方法。有关此主题的更多详细信息,请查看 Quarkus Reactive Architecture documentation。但是,你通常需要将响应式消息传递与数据库交互等阻塞式处理结合使用。为此,你需要使用 @Blocking 注解,指示处理为 blocking 且不应该在调用者线程上运行。

例如,以下代码说明了如何使用带有 Panache 的 Hibernate 将传入有效负载存储到数据库:

import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;

@ApplicationScoped
public class PriceStorage {

    @Incoming("prices")
    @Transactional
    public void store(int priceInUsd) {
        Price price = new Price();
        price.value = priceInUsd;
        price.persist();
    }

}

完整的示例可在 kafka-panache-quickstart directory 中获得。

有 2 个 @Blocking 注释:

  1. io.smallrye.reactive.messaging.annotations.Blocking

  2. io.smallrye.common.annotation.Blocking

它们具有相同的效果。因此,可以同时使用。第一个提供更精细的调整,例如要使用的工作器池以及是否保留顺序。第二个也与 Quarkus 的其他响应式特性结合使用,使用默认工作器池并保留顺序。 可以在 SmallRye Reactive Messaging – Handling blocking execution 中找到关于 @Blocking 注释的用法详细信息。

@RunOnVirtualThread

有关在 Java {@s7} 上运行阻塞处理,请参阅 {@s8}。

@Transactional

如果用 {@s9} 对您的方法进行了注解,则它将自动被视为`{@s11}`,即使没有用 {@s10} 对方法进行注解。

Acknowledgment Strategies

使用者接收的所有消息都必须确认。如果没有确认,则处理被视为错误。如果使用者方法接收 {@s12} 或有效载荷,则消息将在方法返回时确认,也称为 {@s13}。如果使用者方法返回另一个响应式流或`{@s14}`,则消息将在下游消息确认时确认。您可以覆盖缺省行为,以便在使用者方法中确认到达消息 ({@s15}),或者根本不确认消息 ({@s16}),如下例所示:

@Incoming("prices")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public void process(double price) {
    // process price
}

如果使用者方法接收 {@s17},则确认策略为 {@s18},并且使用者方法负责确认/拒绝确认该消息。

@Incoming("prices")
public CompletionStage<Void> process(Message<Double> msg) {
    // process price
    return msg.ack();
}

如上所述,该方法还可以将确认策略覆盖为 {@s19} 或 {@s20}。

Commit Strategies

从一个 Kafka 记录生成的消息确认后,连接器将调用一个提交策略。这些策略决定某个主题/分区的使用者的偏移量何时提交。提交偏移量表明所有以前的记录都已处理。这也是应用程序在崩溃恢复或重新启动后重新启动处理的位置。

提交每个偏移量都会有性能损失,因为 Kafka 偏移量管理可能很慢。但是,如果在两个提交之间应用程序崩溃,则不经常提交偏移量可能会导致消息重复。

Kafka 连接器支持三种策略:

  • {@s21} 跟踪接收到的消息并按顺序提交最后一个已确认消息的偏移量(这意味着,所有先前的消息也已确认)。即使通道执行异步处理,此策略也能保证至少一次传递。连接器跟踪接收到的记录,并定期(由 {@s22} 指定的周期,默认值:5000 毫秒)提交最高的连续偏移量。如果一个记录关联的消息在 {@s23} (默认值:60000 毫秒)内未确认,该连接器将被标记为不正常。实际上,此策略无法在单个记录处理失败时立即提交偏移量。如果 {@s24} 设置为小于或等于 {@s25},则它不会执行任何运行状况检查验证。如果存在“毒丸”消息(永远不会确认),此类设置可能会导致内存不足。如果未将 {@s26} 明确设置为真,则此策略是默认值。

  • {@s28} 允许在 {@s27} 上保留使用者偏移量,而不是将偏移量提交回 Kafka 代理。使用 {@s29} API,使用者代码可以保留带有记录偏移量的 {@s33} 为使用者的进度做标记。当处理从先前保留的偏移量继续进行时,它会将 Kafka 使用者查找该偏移量,并恢复保留的状态,从中断处继续有状态处理。检查点策略在本地保存与最新偏移量关联的处理状态,并定期将其保留到状态存储(由 {@s30} (默认值:5000)指定的周期)。如果在 {@s31} (默认值:10000)内没有处理状态保留到状态存储,则将该连接器标记为不正常。如果将 {@s32} 设置为小于或等于 0,则不会执行任何运行状况检查验证。有关更多信息,请参见 {@s34}

  • {@s35} 在确认关联消息时立即提交 Kafka 使用者接收的记录偏移量(如果偏移量高于以前提交的偏移量)。如果通道在不执行任何异步处理的情况下处理消息,则此策略提供至少一次传递。具体来说,将始终提交最近确认的消息的偏移量,即使尚未完成对旧消息的处理。发生崩溃之类的事件时,处理将在最后一次提交后重新启动,这会导致旧消息永远不会成功完全处理,这似乎是消息丢失。此策略不应在高负载环境中使用,因为提交偏移量很昂贵。但是,它降低了重复的风险。

  • ignore 不执行提交。当使用 enable.auto.commit 显式将使用者配置为 true 时,此策略是默认策略。它将偏移提交委派给底层的 Kafka 客户端。当 enable.auto.committrue 时,此策略 DOES NOT 保证至少一次传递。SmallRye Reactive Messaging 异步处理记录,因此已轮询但尚未处理的记录可能已提交偏移。发生故障时,仅尚未提交的记录将被重新处理。

Kafka 连接器在未显式启用时禁用 Kafka 自动提交。此行为不同于传统的 Kafka 使用者。如果您注重高吞吐量且不受下游限制,我们建议您:

  • use the throttled policy,

  • 或将 enable.auto.commit 设置为 true,并使用 @Acknowledgment(Acknowledgment.Strategy.NONE) 注释使用此方法。

Smallrye Reactive Messaging 允许实现自定义提交策略。有关更多信息,请参阅 SmallRye Reactive Messaging documentation

Error Handling Strategies

如果从 Kafka 记录产生的消息被否定确认,将应用故障策略。Kafka 连接器支持三种策略:

  • fail:使应用程序出现故障,将不会处理更多记录(默认策略)。尚未正确处理的记录的偏移将不会提交。

  • ignore:记录故障,但处理继续。尚未正确处理的记录的偏移已提交。

  • dead-letter-queue:尚未正确处理的记录的偏移已提交,但记录已写入 Kafka 死信主题。

使用 failure-strategy 属性选择策略。

dead-letter-queue 的情况下,您可以配置以下属性:

  • dead-letter-queue.topic:用于写入未正确处理的记录的主题,默认值为 dead-letter-topic-$channel,其中 $channel 为通道名称。

  • dead-letter-queue.key.serializer:用于在死信队列上写入记录键的序列化器。默认情况下,它从键反序列化器推断序列化器。

  • dead-letter-queue.value.serializer:用于在死信队列上写入记录值的序列化器。默认情况下,它从值反序列化器推断序列化器。

死信队列上写入的记录包含一组有关原始记录的其他标头:

  • dead-letter-reason:故障原因

  • dead-letter-cause:如有,则为故障原因

  • dead-letter-topic:记录的原始主题

  • dead-letter-partition:记录的原始分区(整型映射到字符串)

  • dead-letter-offset: 记录的原始偏移量(映射至字符串的 long)

Smallrye Reactive Messaging 可以实现自定义故障策略。请参见 SmallRye Reactive Messaging documentation 了解更多信息。

Retrying processing

你可以将 Reactive Messaging 与 SmallRye Fault Tolerance 相结合,并在处理失败后重试:

@Incoming("kafka")
@Retry(delay = 10, maxRetries = 5)
public void consume(String v) {
   // ... retry if this method throws an exception
}

你可以配置延迟、重试次数、抖动等。

如果你的方法返回 Uni`或 `CompletionStage,则需要添加 `@NonBlocking`注解:

@Incoming("kafka")
@Retry(delay = 10, maxRetries = 5)
@NonBlocking
public Uni<String> consume(String v) {
   // ... retry if this method throws an exception or the returned Uni produce a failure
}

只有在 SmallRye 容错机制 5.1.0 及更早版本中才需要 `@NonBlocking`注解。从 SmallRye 容错机制 5.2.0 版(自 Quarkus 2.1.0.Final 起可用)开始,此注解已不再需要。请参见 SmallRye Fault Tolerance documentation 了解更多信息。

传入消息仅在处理成功后才被确认。因此,它将在成功处理后提交偏移量。如果处理仍然失败(即使在所有重试之后),消息将 nacked 且应用故障策略。

Handling Deserialization Failures

当发生反序列化故障时,你可以进行拦截并提供故障策略。要实现此目的,你需要创建一个实现 DeserializationFailureHandler<T> 接口的 Bean:

@ApplicationScoped
@Identifier("failure-retry") // Set the name of the failure handler
public class MyDeserializationFailureHandler
    implements DeserializationFailureHandler<JsonObject> { // Specify the expected type

    @Override
    public JsonObject decorateDeserialization(Uni<JsonObject> deserialization, String topic, boolean isKey,
            String deserializer, byte[] data, Headers headers) {
        return deserialization
                    .onFailure().retry().atMost(3)
                    .await().atMost(Duration.ofMillis(200));
    }
}

要使用此故障处理程序,该 Bean 必须使用 @Identifier`限定符公开,且连接器配置必须指定属性 `mp.messaging.incoming.$channel.[key|value]-deserialization-failure-handler(对于键或值反序列化器)。

处理程序使用反序列化的详细信息进行调用,包括表示为 `Uni<T>`的动作。在反序列化 `Uni`故障中,可以实施诸如重试、提供回退值或应用超时的故障策略。

如果你没有配置反序列化故障处理程序,而发生了反序列化故障,则此应用程序被标记为不正常。你也可以忽略故障,这将记录异常并生成一个 null 值。要启用此行为,请将 mp.messaging.incoming.$channel.fail-on-deserialization-failure 属性设为 false

如果 fail-on-deserialization-failure 属性设为 false`且 `failure-strategy 属性为 dead-letter-queue,则失败记录将发送到相应的 *dead letter queue*主题。

Consumer Groups

在 Kafka 中,消费者组是一组共同合作以从主题消费数据的消费者。主题分为一组分区。主题的分区在组中的消费者之间分配,从而有效地扩展了消费吞吐量。请注意,每个分区都分配给组中单个消费者。但是,如果分区数大于组中的消费者数,则可以将多个分区分配给一个消费者。

我们简单探索一下不同的生产者/消费者模式以及如何使用 Quarkus 来实现这些模式:

  1. *Single consumer thread inside a consumer group*这是订阅 Kafka 主题的应用程序的默认行为:每个 Kafka 连接器都将创建一个消费者线程并将其放在单个消费者组中。消费者组 ID 默认为应用程序名称,由 quarkus.application.name 配置属性设置。还可使用 kafka.group.id 属性进行设置。 image::kafka-one-app-one-consumer.png[]

  2. *Multiple consumer threads inside a consumer group*对于给定的应用程序实例,可以使用 mp.messaging.incoming.$channel.concurrency 属性配置消费者组中的消费者数量。订阅主题的分区将在消费者线程之间进行划分。请注意,如果 concurrency 值超过主题的分区数,则不会将任何分区分配给某些消费者线程。 image::kafka-one-app-two-consumers.png[]

Deprecation

concurrency attribute为非阻塞并发通道提供了一种与连接器无关的方式,并替代了 Kafka 连接器特定的 partitions 属性。因此,partitions 属性已弃用,且将在未来版本中删除。

  1. *Multiple consumer applications inside a consumer group*类似于前面的示例,某个应用程序的多实例可以订阅一个消费者组,在 mp.messaging.incoming.$channel.group.id 属性中配置该组或将其保留为应用程序名称。这反过来会将主题的分区分配给各个应用程序实例。 image::kafka-two-app-one-consumer-group.png[]

  2. Pub/Sub: Multiple consumer groups subscribed to a topic*最后,使用不同的 *consumer group ids,不同的应用程序可以独立地订阅相同的主题。例如,发布到称为 orders 的主题的消息可以在两个使用者应用程序上独立地消费,一个使用 mp.messaging.incoming.orders.group.id=invoicing,另一个使用 mp.messaging.incoming.orders.group.id=shipping。因此,不同的消费者组可以根据消息消费要求独立地进行扩展。 image::kafka-two-app-two-consumer-groups.png[]

一个常见的业务要求是按顺序使用和处理 Kafka 记录。Kafka 代理在分区内保留记录的顺序,而不是在主题内。因此,考虑如何在主题内分区记录非常重要。默认分区器使用记录键哈希计算一个记录的分区,或者在未定义键时,为每批记录或随机选择一个分区。 在正常操作期间,Kafka 使用者会在分配给该使用者的每个分区内保留记录的顺序。Smallrye Reactive Messaging 保留了此处理顺序,除非使用了 @Blocking(ordered = false)(请参阅 Blocking processing)。 请注意,由于使用者重新平衡,Kafka 使用者只能保证单个记录的一次或多次处理,这意味着未提交的记录 can 可以由使用者再次处理。

Consumer Rebalance Listener

在消费者组内,随着新组成员的加入和旧成员的退出,分区会重新分配,以便每个成员接收分区的分摊份额。这称为重新平衡组。若要自行处理偏移提交和分配的分区,可以提供消费者重新平衡侦听器。为此,请实现 io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener 接口并将其作为带有 @Idenfier 限定符的 CDI Bean 公开。一个常见的用例是在单独的数据存储中存储偏移以实现完全一次语义,或在特定偏移处开始处理。

每次使用者主题/分区分配发生更改时,就会调用侦听器。例如,当应用程序启动时,它会使用与使用者关联的初始主题/分区集调用 partitionsAssigned 回调。如果稍后此集合发生更改,它会再次调用 partitionsRevokedpartitionsAssigned 回调,以便可以实现自定义逻辑。

请注意,重新平衡侦听器方法是从 Kafka 轮询线程调用的,并且在完成之前 will 会阻塞调用者线程。这是因为重新平衡协议有同步障碍,并且在重新平衡侦听器中使用异步代码可能在同步障碍后执行。

从使用者处分配或撤销主题/分区时,它会暂停消息传递并在重新平衡完成后恢复。

如果重新平衡侦听器代表用户处理偏移提交(使用 NONE 提交策略),则重新平衡侦听器必须在 partitionsRevoked 回调中同步提交偏移。我们还建议在应用程序停止时应用相同的逻辑。

与 Apache Kafka 中的 ConsumerRebalanceListener 不同, io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener 方法会传递 Kafka 使用者和主题/分区集合。

在以下示例中,我们设置了一个使用者,该使用者始终从最多 10 分钟前的消息(或偏移 0)开始。首先,我们需要提供一个实现 io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener 并标注为 io.smallrye.common.annotation.Identifier 的 Bean。然后,我们必须将入站连接器配置为使用此 Bean。

package inbound;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.TopicPartition;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;

@ApplicationScoped
@Identifier("rebalanced-example.rebalancer")
public class KafkaRebalancedConsumerRebalanceListener implements KafkaConsumerRebalanceListener {

    private static final Logger LOGGER = Logger.getLogger(KafkaRebalancedConsumerRebalanceListener.class.getName());

    /**
     * When receiving a list of partitions, will search for the earliest offset within 10 minutes
     * and seek the consumer to it.
     *
     * @param consumer   underlying consumer
     * @param partitions set of assigned topic partitions
     */
    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        long now = System.currentTimeMillis();
        long shouldStartAt = now - 600_000L; //10 minute ago

        Map<TopicPartition, Long> request = new HashMap<>();
        for (TopicPartition partition : partitions) {
            LOGGER.info("Assigned " + partition);
            request.put(partition, shouldStartAt);
        }
        Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(request);
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> position : offsets.entrySet()) {
            long target = position.getValue() == null ? 0L : position.getValue().offset();
            LOGGER.info("Seeking position " + target + " for " + position.getKey());
            consumer.seek(position.getKey(), target);
        }
    }

}
package inbound;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class KafkaRebalancedConsumer {

    @Incoming("rebalanced-example")
    @Acknowledgment(Acknowledgment.Strategy.NONE)
    public CompletionStage<Void> consume(Message<ConsumerRecord<Integer, String>> message) {
        // We don't need to ACK messages because in this example,
        // we set offset during consumer rebalance
        return CompletableFuture.completedFuture(null);
    }

}

若要将入站连接器配置为使用提供的侦听器,可以设置消费者重新平衡侦听器的标识符: mp.messaging.incoming.rebalanced-example.consumer-rebalance-listener.name=rebalanced-example.rebalancer

或者让侦听器的名称与组 ID 相同:

mp.messaging.incoming.rebalanced-example.group.id=rebalanced-example.rebalancer

设置消费者重新平衡侦听器的名称优先于使用组 ID。

Using unique consumer groups

如果您要处理主题中的所有记录(从其开头开始),您需要:

  1. 设置 auto.offset.reset = earliest

  2. 将您的消费者分配给未被其他任何应用程序使用的消费者组。

Quarkus 生成了两个执行之间更改的 UUID(包括在开发模式中)。因此,您确保没有其他消费者使用它,并且每次应用程序启动时,您都会收到新的唯一组 ID。

您可以使用生成的 UUID 作为消费者组,如下所示:

mp.messaging.incoming.your-channel.auto.offset.reset=earliest
mp.messaging.incoming.your-channel.group.id=${quarkus.uuid}

如果未设置 group.id 属性,它将默认 quarkus.application.name 配置属性。

Manual topic-partition assignment

assign-seek 通道属性允许将主题分区手动分配到 Kafka 输入通道,并可以选择在分区中寻求特定偏移量以开始使用记录。如果使用 assign-seek 则消费者不会动态订阅主题,而是会静态分配所描述的分区。在手动主题分区重新平衡中不会发生,因此不会调用重新平衡侦听器。

该属性采用一个逗号分隔的三元组列表: <topic>:<partition>:<offset>

例如,配置

mp.messaging.incoming.data.assign-seek=topic1:0:10, topic2:1:20

将消费者分配给:

  • 主题 'topic1' 的分区 0,将初始位置设置为偏移量 10。

  • 主题 'topic2' 的分区 1,将初始位置设置为偏移量 20。

每个三元组中的主题、分区和偏移量可以有以下变化:

  • 如果省略主题,将使用已配置的主题。

  • 如果省略偏移量,则将分区分配给消费者,但不会将其定位到偏移量。

  • 如果偏移量为 0,则它将定位到主题分区的开头。

  • 如果偏移量为 -1,则它将定位到主题分区的末尾。

Receiving Kafka Records in Batches

默认情况下,传入方法逐个接收每个 Kafka 记录。本质上,Kafka 消费者客户端不停地轮询代理并接收批量记录,这些记录显示在 ConsumerRecords 容器内。

batch 模式下,您的应用程序可以一次性接收消费者 poll 返回的所有记录。

要实现此目标,您需要指定一个兼容容器类型来接收所有数据:

@Incoming("prices")
public void consume(List<Double> prices) {
    for (double price : prices) {
        // process price
    }
}

传入方法还可以接收 Message<List<Payload>>、`Message<ConsumerRecords<Key, Payload>>`和 `ConsumerRecords<Key, Payload>`类型。它们提供对偏移量或时间戳等记录详细信息的访问:

@Incoming("prices")
public CompletionStage<Void> consumeMessage(Message<ConsumerRecords<String, Double>> records) {
    for (ConsumerRecord<String, Double> record : records.getPayload()) {
        String payload = record.getPayload();
        String topic = record.getTopic();
        // process messages
    }
    // ack will commit the latest offsets (per partition) of the batch.
    return records.ack();
}

请注意,成功处理传入记录批处理将会提交批处理内接收到的每个分区的最新偏移量。将仅对这些记录应用已配置的提交策略。

相反,如果处理抛出异常,则所有消息为 nacked,对批处理内的所有记录应用故障策略。

Quarkus 会自动检测传入通道的批处理类型并自动设置批处理配置。您可以使用 `mp.messaging.incoming.$channel.batch`属性显式配置批处理模式。

Stateful processing with Checkpointing

`checkpoint`提交策略是一项实验性功能,将来可能会更改。

Smallrye Reactive Messaging `checkpoint`提交策略允许消费者应用程序以有状态方式处理消息,同时还遵循 Kafka 消费者可扩展性。具有 `checkpoint`提交策略的传入通道在外部 state store上持久化消费者偏移量,例如关系数据库或键值存储。作为处理已消费记录的结果,消费者应用程序可以为分配给 Kafka 消费者每个主题分区累积内部状态。此本地状态将定期持久化到状态存储,并将与产生它的记录的偏移量关联。

此策略不会向 Kafka 代理提交任何偏移量,因此当新分区被分配给消费者(即消费者重新启动或消费者组实例扩展)时,消费者从具有其保存状态的最新 _checkpointed_偏移量恢复处理。

`@Incoming`通道消费者代码可以通过 `CheckpointMetadata`API 操作处理状态。例如,计算在 Kafka 主题上接收的价格移动平均值的消费者如下所示:

package org.acme;

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;

@ApplicationScoped
public class MeanCheckpointConsumer {

    @Incoming("prices")
    public CompletionStage<Void> consume(Message<Double> record) {
        // Get the `CheckpointMetadata` from the incoming message
        CheckpointMetadata<AveragePrice> checkpoint = CheckpointMetadata.fromMessage(record);

        // `CheckpointMetadata` allows transforming the processing state
        // Applies the given function, starting from the value `0.0` when no previous state exists
        checkpoint.transform(new AveragePrice(), average -> average.update(record.getPayload()), /* persistOnAck */ true);

        // `persistOnAck` flag set to true, ack will persist the processing state
        // associated with the latest offset (per partition).
        return record.ack();
    }

    static class AveragePrice {
        long count;
        double mean;

        AveragePrice update(double newPrice) {
            mean += ((newPrice - mean) / ++count);
            return this;
        }
    }
}

`transform`方法将转换函数应用于当前状态,产生更改后的状态并在本地注册以进行检查点。默认情况下,本地状态将定期持久化到状态存储,该时间段由 `auto.commit.interval.ms`指定(默认值:5000)。如果给定 `persistOnAck`标志,则在消息确认时将最新状态急切地持久化到状态存储。`setNext`方法类似地直接设置最新状态。

检查点提交策略跟踪处理状态上次为每个主题分区持久化的时刻。如果未完成的状态更改无法持久化 checkpoint.unsynced-state-max-age.ms(默认值:10000),则通道将标记为不正常。

State stores

状态存储实现决定了处理状态在何处以及如何持久化。这是由 `mp.messaging.incoming.[channel-name].checkpoint.state-store`属性配置的。状态对象的序列化取决于状态存储实现。为了指导状态存储进行序列化,可能需要使用 `mp.messaging.incoming.[channel-name].checkpoint.state-type`属性配置状态对象类的名称。

Quarkus 提供以下状态存储实现:

  • quarkus-redis:使用 quarkus-redis-client扩展来持久化处理状态。Jackson 用于在 JSON 中序列化处理状态。对于复杂对象,需要使用对象的类名配置 `checkpoint.state-type`属性。默认情况下,状态存储使用默认 redis 客户端,但如果要使用 named client,则可以使用 `mp.messaging.incoming.[channel-name].checkpoint.quarkus-redis.client-name`属性指定客户端名称。处理状态将使用密钥命名方案 `[consumer-group-id]:[topic]:[partition]`存储在 Redis 中。

例如,下面是前一个代码的配置内容:

mp.messaging.incoming.prices.group.id=prices-checkpoint
# ...
mp.messaging.incoming.prices.commit-strategy=checkpoint
mp.messaging.incoming.prices.checkpoint.state-store=quarkus-redis
mp.messaging.incoming.prices.checkpoint.state-type=org.acme.MeanCheckpointConsumer.AveragePrice
# ...
# if using a named redis client
mp.messaging.incoming.prices.checkpoint.quarkus-redis.client-name=my-redis
quarkus.redis.my-redis.hosts=redis://localhost:7000
quarkus.redis.my-redis.password=<redis-pwd>
  • quarkus-hibernate-reactive:使用 quarkus-hibernate-reactive扩展来持久化处理状态。处理状态对象必须是 Jakarta Persistence 实体,并且必须扩展 `CheckpointEntity`类,此类处理对象标识符由消费者组 ID、主题和分区组成。因此,需要使用 `checkpoint.state-type`属性配置实体的类名。

例如,下面是前一个代码的配置内容:

mp.messaging.incoming.prices.group.id=prices-checkpoint
# ...
mp.messaging.incoming.prices.commit-strategy=checkpoint
mp.messaging.incoming.prices.checkpoint.state-store=quarkus-hibernate-reactive
mp.messaging.incoming.prices.checkpoint.state-type=org.acme.AveragePriceEntity

其中 `AveragePriceEntity`是扩展 `CheckpointEntity`的 Jakarta Persistence 实体:

package org.acme;

import jakarta.persistence.Entity;

import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntity;

@Entity
public class AveragePriceEntity extends CheckpointEntity {
    public long count;
    public double mean;

    public AveragePriceEntity update(double newPrice) {
        mean += ((newPrice - mean) / ++count);
        return this;
    }
}
  • quarkus-hibernate-orm:使用 quarkus-hibernate-orm扩展来持久化处理状态。它类似于前面的状态存储,但它使用 Hibernate ORM 而不是 Hibernate Reactive。

配置后,它可以为检查点状态存储使用命名的 persistence-unit

mp.messaging.incoming.prices.commit-strategy=checkpoint
mp.messaging.incoming.prices.checkpoint.state-store=quarkus-hibernate-orm
mp.messaging.incoming.prices.checkpoint.state-type=org.acme.AveragePriceEntity
mp.messaging.incoming.prices.checkpoint.quarkus-hibernate-orm.persistence-unit=prices
# ... Setup "prices" persistence unit
quarkus.datasource."prices".db-kind=postgresql
quarkus.datasource."prices".username=<your username>
quarkus.datasource."prices".password=<your password>
quarkus.datasource."prices".jdbc.url=jdbc:postgresql://localhost:5432/hibernate_orm_test
quarkus.hibernate-orm."prices".datasource=prices
quarkus.hibernate-orm."prices".packages=org.acme

有关如何实现自定义状态存储的说明,请参阅 Implementing State Stores

Sending messages to Kafka

Kafka 连接器传出渠道的配置类似于传入渠道:

%prod.kafka.bootstrap.servers=kafka:9092 1
mp.messaging.outgoing.prices-out.connector=smallrye-kafka 2
mp.messaging.outgoing.prices-out.topic=prices 3
1 为此,请使用 mp.messaging.outgoing.$channel.bootstrap.servers`属性配置生产配置文件的代理位置。您可以在全局或按渠道配置它。在开发模式和运行测试时,Dev Services for Kafka会自动启动 Kafka 代理。当未提供此属性时,此属性默认为 `localhost:9092
2 配置连接器,以管理 `prices-out`渠道。
3 默认情况下,主题名称与渠道名称相同。您可以配置主题属性以覆盖它。

在应用程序配置中,渠道名称是唯一名称。因此,如果您希望在相同主题上配置传出和传入渠道,您需要对渠道使用不同的名称(例如本指南中的示例,mp.messaging.incoming.prices`和 `mp.messaging.outgoing.prices-out)。

然后,您的应用程序可以生成消息,并将其发布到 `prices-out`渠道。它可以使用 `double`有效负载,如下面的代码段所示:

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;

@ApplicationScoped
public class KafkaPriceProducer {

    private final Random random = new Random();

    @Outgoing("prices-out")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> random.nextDouble());
    }

}

您不得直接从代码中调用用 `@Incoming`和/或 `@Outgoing`注释的方法。它们是由框架调用的。让用户代码调用它们无法达到预期的结果。

请注意,generate`方法返回了 `Multi<Double>,它会实现 Reactive Streams `Publisher`接口。此发布者将由框架使用,以生成消息并将它们发送到已配置的 Kafka 主题。

您不必返回有效负载,可以返回 io.smallrye.reactive.messaging.kafka.Record,以发送键值对:

@Outgoing("out")
public Multi<Record<String, Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
        .map(x -> Record.of("my-key", random.nextDouble()));
}

有效负载可以封装在 `org.eclipse.microprofile.reactive.messaging.Message`中,以更好地控制写入记录:

@Outgoing("generated-price")
public Multi<Message<Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> Message.of(random.nextDouble())
                    .addMetadata(OutgoingKafkaRecordMetadata.<String>builder()
                            .withKey("my-key")
                            .withTopic("my-key-prices")
                            .withHeaders(new RecordHeaders().add("my-header", "value".getBytes()))
                            .build()));
}

OutgoingKafkaRecordMetadata`允许设置 Kafka 记录的元数据属性,如 `keytopicpartition`或 `timestamp。一个用例是动态选择消息的目标主题。在这种情况下,您需要使用传出元数据设置主题的名称,而不是在应用程序配置文件中配置主题。

除了返回 Reactive Stream `Publisher`的方法签名(`Multi`是 `Publisher`的实现),传出方法还可以返回单个消息。在这种情况下,生产者将使用此方法作为生成器,以创建无限流。

@Outgoing("prices-out") T generate(); // T excluding void

@Outgoing("prices-out") Message<T> generate();

@Outgoing("prices-out") Uni<T> generate();

@Outgoing("prices-out") Uni<Message<T>> generate();

@Outgoing("prices-out") CompletionStage<T> generate();

@Outgoing("prices-out") CompletionStage<Message<T>> generate();

Sending messages with Emitter

有时,您需要一种直接发送消息的方式。

例如,如果您需要在 REST 端点中接收 POST 请求时,向流发送消息。在这种情况下,您无法使用 @Outgoing,因为您的方法具有参数。

为此,您可以使用 Emitter

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.core.MediaType;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("price-create")
    Emitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        CompletionStage<Void> ack = priceEmitter.send(price);
    }
}

发送有效负载会返回 CompletionStage,该值在确认消息时完成。如果消息传输失败,则 `CompletionStage`将根据未确认的原因进行异常完成。

`Emitter`配置与 `@Incoming`和 `@Outgoing`使用的其他流配置相同。

使用 Emitter`时,您会将消息从直接代码发送到反应式消息传递。这些消息存储在队列中,直至它们被发送。如果 Kafka 生产者客户端无法跟上尝试发送到 Kafka 的消息,此队列就会占用内存,您甚至可能耗尽内存。您可以使用 `@OnOverflow`来配置反压策略。它允许您配置队列的大小(默认为 256),以及在达到缓冲区大小时要应用的策略。可用的策略为 `DROPLATESTFAILBUFFERUNBOUNDED_BUFFER`和 `NONE

利用 Emitter API,您还可以在 Message<T> 内封装传出有效负载。与之前的示例一样,Message 可让您以不同的方式处理 ack/nack 案例。

import java.util.concurrent.CompletableFuture;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.core.MediaType;

@Path("/prices")
public class PriceResource {

    @Inject @Channel("price-create") Emitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        priceEmitter.send(Message.of(price)
            .withAck(() -> {
                // Called when the message is acked
                return CompletableFuture.completedFuture(null);
            })
            .withNack(throwable -> {
                // Called when the message is nacked
                return CompletableFuture.completedFuture(null);
            }));
    }
}

如果您偏好使用 Reactive Stream API,那么您可以使用 MutinyEmitter,它将会从 send 方法中返回 Uni<Void>。因此,您可以使用 Mutiny API 处理下游消息和错误。

import org.eclipse.microprofile.reactive.messaging.Channel;

import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.core.MediaType;

import io.smallrye.reactive.messaging.MutinyEmitter;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("price-create")
    MutinyEmitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public Uni<String> addPrice(Double price) {
        return quoteRequestEmitter.send(price)
                .map(x -> "ok")
                .onFailure().recoverWithItem("ko");
    }
}

使用 sendAndAwait 方法时,还可以阻塞以将事件发送至发射器。它只会当接收器确认或否定事件时才从该方法返回。

Deprecation

io.smallrye.reactive.messaging.annotations.Emitterio.smallrye.reactive.messaging.annotations.Channelio.smallrye.reactive.messaging.annotations.OnOverflow 类现在已弃用并且被替换为:

  • org.eclipse.microprofile.reactive.messaging.Emitter

  • org.eclipse.microprofile.reactive.messaging.Channel

  • org.eclipse.microprofile.reactive.messaging.OnOverflow

新的 Emitter.send 方法返回在已生成的消息被确认时完成的 CompletionStage

Deprecation

现弃用 MutinyEmitter#send(Message msg) 方法,建议采用接收 Message 来发放的以下方法:

  • <M extends Message<? extends T>> Uni<Void> sendMessage(M msg)

  • <M extends Message<? extends T>> void sendMessageAndAwait(M msg)

  • <M extends Message<? extends T>> Cancellable sendMessageAndForget(M msg)

关于如何使用 Emitter 的更多信息可在 SmallRye Reactive Messaging – Emitters and Channels 中找到。

Write Acknowledgement

当 Kafka 代理接收记录时,取决于配置,其确认可能会花费时间。同样地,它会将无法写入的记录存储在内存中。

默认情况下,连接器会等待 Kafka 确认记录从而继续处理(确认已接收的消息)。您可以通过将 waitForWriteCompletion 属性设置为 false 来禁用此功能。

请注意,acks 属性对记录确认有巨大影响。

如果无法写入记录,则将否定消息。

Backpressure

Kafka 输出连接器处理反压,监视等待写入 Kafka 代理的飞行中消息数量。飞行中消息的数量使用 max-inflight-messages 属性进行配置,并且默认为 1024。

连接器仅同时发送该数量的消息。其他任何消息都不会被发送,直至代理确认至少一个飞行中消息为止。随后,当代理中的一个飞行中消息被确认时,连接器会向 Kafka 编写新消息。务必相应地配置 Kafka 的 batch.sizelinger.ms

您还可以通过将 max-inflight-messages 设置为 0 来移除飞行中消息的限制。但是,请注意,如果请求的数量达到 max.in.flight.requests.per.connection,Kafka 生产者可能会阻塞。

Retrying message dispatch

当 Kafka 生产者从服务器接收错误时,如果错误是可恢复的瞬态错误,客户端会重试发送消息批处理。此行为受 retriesretry.backoff.ms 参数控制。此外,SmallRye Reactive Messaging 会根据 retriesdelivery.timeout.ms 参数,在可恢复错误发生时重试各个消息。

请注意,虽然在可靠系统中进行重试是最佳实践,但 max.in.flight.requests.per.connection 参数默认为 5,这意味着无法保证消息的顺序。如果您的用例必须保证消息顺序,将 max.in.flight.requests.per.connection 设置为 1 将确保一次发送一批消息,但会以限制生产者吞吐量为代价。

有关如何在处理错误时应用重试机制,请参阅 [retrying-processing] 部分。

Handling Serialization Failures

对于 Kafka 生产者客户端序列化失败不可恢复,因此不会重试消息发送。在这些情况下,您可能需要对序列化程序应用故障策略。如需实现这一点,您需要创建一个实现 SerializationFailureHandler<T> 接口的 Bean:

@ApplicationScoped
@Identifier("failure-fallback") // Set the name of the failure handler
public class MySerializationFailureHandler
    implements SerializationFailureHandler<JsonObject> { // Specify the expected type

    @Override
    public byte[] decorateSerialization(Uni<byte[]> serialization, String topic, boolean isKey,
        String serializer, Object data, Headers headers) {
        return serialization
                    .onFailure().retry().atMost(3)
                    .await().indefinitely();
    }
}

要使用此故障处理程序,必须使用 @Identifier 限定符公开 Bean,且连接器配置必须指定 mp.messaging.outgoing.$channel.[key|value]-serialization-failure-handler 属性(对于键或值序列化程序)。

处理程序会通过序列化详细信息调用,包括表示为 Uni<byte[]> 的操作。请注意,方法必须等待结果,并返回序列化的字节数组。

In-memory channels

在某些用例中,方便使用消息模式在同一应用程序内传输消息。当您未将通道连接到如 Kafka 这类消息传递后端时,所有内容都在内存中进行,流通过链接方法创建。每个链仍然是响应式流,并且强制执行背压协议。

框架会验证生产者/消费者链是否完整,这意味着如果应用程序将消息写入内存中通道(使用仅带有 @Outgoing 的方法)或 Emitter,它还必须从应用程序中使用仅带有 @Incoming 的方法或使用非托管流)获取消息。

Broadcasting messages on multiple consumers

默认情况下,通道可以使用 @Incoming 方法或 @Channel 响应式流链接到单个消费者。在应用程序启动时,将验证通道以形成具有单个消费者和生产者的消费者和生产者链。您可以通过在通道中设置 mp.messaging.$channel.broadcast=true 来覆盖此行为。

对于内存中的通道,可以在 @Outgoing 方法中使用 @Broadcast 注释。例如,

import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.annotations.Broadcast;

@ApplicationScoped
public class MultipleConsumer {

    private final Random random = new Random();

    @Outgoing("in-memory-channel")
    @Broadcast
    double generate() {
        return random.nextDouble();
    }

    @Incoming("in-memory-channel")
    void consumeAndLog(double price) {
        System.out.println(price);
    }

    @Incoming("in-memory-channel")
    @Outgoing("prices2")
    double consumeAndSend(double price) {
        return price;
    }
}

反之,可以通过设置 mp.messaging.incoming.$channel.merge=true 来合并同一通道上的多个生产者。在 @Incoming 方法中,您可以使用 @Merge 注释控制如何合并多个通道。

对输出或处理方法重复使用 @Outgoing 注释允许以另一种方式将消息发送到多个输出通道:

import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class MultipleProducers {

    private final Random random = new Random();

    @Outgoing("generated")
    @Outgoing("generated-2")
    double priceBroadcast() {
        return random.nextDouble();
    }

}

在上一个示例中,生成的 price 将广播到两个输出通道。以下示例使用 Targeted 容器对象有选择性地将消息发送到多个输出通道,容器对象包含键(作为通道名称)和值(作为消息负载)。

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.Targeted;

@ApplicationScoped
public class TargetedProducers {

    @Incoming("in")
    @Outgoing("out1")
    @Outgoing("out2")
    @Outgoing("out3")
    public Targeted process(double price) {
        Targeted targeted = Targeted.of("out1", "Price: " + price,
                "out2", "Quote: " + price);
        if (price > 90.0) {
            return targeted.with("out3", price);
        }
        return targeted;
    }

}

请注意,the auto-detection for Kafka serializers 不适用于使用 Targeted 的签名。

有关使用多输出的更多详细信息,请参阅 SmallRye Reactive Messaging documentation

Kafka Transactions

Kafka 事务支持对多个 Kafka 主题和分区进行原子写。Kafka 连接器提供了 KafkaTransactions 自定义发射器,用于在事务内写入 Kafka 记录。它可以作为常规发射器 @Channel 注入:

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;

@ApplicationScoped
public class KafkaTransactionalProducer {

    @Channel("tx-out-example")
    KafkaTransactions<String> txProducer;

    public Uni<Void> emitInTransaction() {
        return txProducer.withTransaction(emitter -> {
            emitter.send(KafkaRecord.of(1, "a"));
            emitter.send(KafkaRecord.of(2, "b"));
            emitter.send(KafkaRecord.of(3, "c"));
            return Uni.createFrom().voidItem();
        });
    }

}

提供给 withTransaction 方法的函数接收一个 TransactionalEmitter 用于生成记录,并返回一个 Uni 提供事务结果。

  • 如果处理成功完成,生产者将刷新,事务提交。

  • 如果处理引发异常,返回失败的 Uni,或将 TransactionalEmitter 标记为中止,事务将中止。

Kafka 事务性生产者需要配置 acks=all 客户端属性和 transactional.id 的唯一 id,这暗示 enable.idempotence=true。当 Quarkus 检测到对传出通道使用 KafkaTransactions 时,它将为通道配置这些属性,并提供 transactional.id 属性的默认值 "${quarkus.application.name}-${channelName}"

请注意,为了在生产中使用, transactional.id 在所有应用程序实例中必须是唯一的。

虽然普通的消息发射器会支持并发调用 send 方法,并且将传出消息排队写到 Kafka,但 KafkaTransactions 发射器一次只支持一个事务。从调用 withTransaction 到返回的 Uni 导致成功或失败,都会认为一个事务正在进行。当一个事务正在进行时,对 withTransaction 的后续调用,包括在给定函数中嵌套的调用,都将抛出 IllegalStateException。 请注意,在 Reactive Messaging 中,处理方法的执行已经序列化,除非使用 @Blocking(ordered = false)。如果 withTransaction 可以并发调用,例如,从 REST 终结点,则建议限制执行的并发性。这可以使用 Microprofile Fault Tolerance@Bulkhead 注释来完成。 可以在 Chaining Kafka Transactions with Hibernate Reactive transactions 中找到一个示例用法。

Transaction-aware consumers

如果您想仅使用 Kafka 事务写入和提交记录,则需要将 isolation.level 属性配置为传入通道:

mp.messaging.incoming.prices-in.isolation.level=read_committed

Kafka Request-Reply

Kafka 请求-响应模式允许将请求记录发布到 Kafka 主题,然后等待响应记录来响应初始请求。Kafka 连接器提供 KafkaRequestReply 自定义发射器,它为 Kafka 传出通道的请求-响应模式实现请求方(或客户端):

它可以注入为常规发射器 @Channel

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply;

@ApplicationScoped
@Path("/kafka")
public class KafkaRequestReplyEmitter {

    @Channel("request-reply")
    KafkaRequestReply<Integer, String> requestReply;

    @POST
    @Path("/req-rep")
    @Produces(MediaType.TEXT_PLAIN)
    public Uni<String> post(Integer request) {
        return requestReply.request(request);
    }

}

请求方法将记录发布到传出通道的已配置目标主题,并轮询回复主题(默认情况下,目标主题带有 -replies 后缀)以获取回复记录。当收到响应时,用记录值完成返回的 Uni。请求发送操作会生成 correlation id,并设置一个标头(默认值为 REPLY_CORRELATION_ID),它期望在响应记录中将其发回。

可以使用 Reactive Messaging 处理器来实现应答器(请参阅 Processing Messages)。

有关 Kafka 请求响应功能和高级配置选项的更多信息,请参阅 Smallrye Reactive Messaging Documentation

Processing Messages

流化数据的应用程序通常需要从主题中获取一些事件,对其进行处理,并将结果发布到另一个主题。可以使用 @Incoming@Outgoing 注释轻松实现一个处理程序方法:

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PriceProcessor {

    private static final double CONVERSION_RATE = 0.88;

    @Incoming("price-in")
    @Outgoing("price-out")
    public double process(double price) {
        return price * CONVERSION_RATE;
    }

}

process 方法的参数是传入消息负载,而返回值将用作传出消息负载。同样支持前面提到的参数和返回类型签名,例如 Message<T>Record<K, V> 等。

您可以通过使用并返回反应流 Multi<T> 类型来应用异步流处理:

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
public class PriceProcessor {

    private static final double CONVERSION_RATE = 0.88;

    @Incoming("price-in")
    @Outgoing("price-out")
    public Multi<Double> process(Multi<Integer> prices) {
        return prices.filter(p -> p > 100).map(p -> p * CONVERSION_RATE);
    }

}

Propagating Record Key

处理消息时,可以将传入记录键传播到传出记录。

启用 mp.messaging.outgoing.$channel.propagate-record-key=true 配置后,记录键传播会生成与传入记录具有相同 key 的传出记录。

如果传出记录已包含 key,则它 won’t be overridden 由传入记录键。如果传入记录具有 null 键,则将使用 mp.messaging.outgoing.$channel.key 属性。

Exactly-Once Processing

Kafka 事务允许在事务中管理消费者偏移量以及生成的消息。这使得以 consume-transform-produce 模式将消费者与事务性生产者耦合起来成为可能,这也称为 exactly-once processing

KafkaTransactions 自定义发送器提供一种方法,可在事务中对传入的 Kafka 消息应用完全一次处理。

以下示例在事务中包括一批 Kafka 记录。

import jakarta.enterprise.context.ApplicationScoped;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;

@ApplicationScoped
public class KafkaExactlyOnceProcessor {

    @Channel("prices-out")
    @OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 500) (3)
    KafkaTransactions<Integer> txProducer;

    @Incoming("prices-in")
    public Uni<Void> emitInTransaction(Message<ConsumerRecords<String, Integer>> batch) { (1)
        return txProducer.withTransactionAndAck(batch, emitter -> { (2)
            for (ConsumerRecord<String, Integer> record : batch.getPayload()) {
                emitter.send(KafkaRecord.of(record.key(), record.value() + 1)); (3)
            }
            return Uni.createFrom().voidItem();
        });
    }

}
1 建议将完全一次处理与批处理消费模式一起使用。虽然可以将其与单个 Kafka 消息一起使用,但它会对性能产生重大影响。
2 将使用消息传递给 KafkaTransactions#withTransactionAndAck,以处理偏移量提交和消息确认。
3 send 方法在事务中将记录写入 Kafka,而无需等待来自代理的发送回执。待写入 Kafka 的消息将被缓冲,并在提交事务之前刷新。因此,建议配置 @OnOverflow bufferSize,以适合足够的消息,例如 max.poll.records,一批中返回的最大记录数。
  • 如果处理成功完成,before committing the transaction,将给定批处理消息的主题分区偏移量提交给事务。

  • 如果需要中止处理,after aborting the transaction,则消费者位置将重置为最后提交的偏移量,有效地从该偏移量重新开始使用。如果尚未将消费者偏移量提交给主题分区,则消费者的位置将重置到主题分区的开头,even if the offset reset policy is `latest`

在使用完全一次处理时,已使用消息偏移量提交由事务处理,因此应用程序不应通过其他方式提交偏移量。消费者应具备 enable.auto.commit=false(默认值)并显式设置 commit-strategy=ignore

mp.messaging.incoming.prices-in.commit-strategy=ignore
mp.messaging.incoming.prices-in.failure-strategy=ignore

Error handling for the exactly-once processing

如果事务失败并中止,UniKafkaTransactions#withTransaction 返回的内容会报告一个失败。应用程序可以选择处理错误情况,但是如果从 @Incoming 方法返回一个失败的 Uni,则传入频道实际上会失败并停止反应式流。

KafkaTransactions#withTransactionAndAck 方法确认和不认可消息,但将 not 返回一个失败的 Uni。不认可的消息将由传入频道的失败策略处理(参见 Error Handling Strategies)。配置 failure-strategy=ignore 只需将 Kafka 消费者重置为最后提交的偏移量,并从此处重新开始使用。

Accessing Kafka clients directly

在极少数情况下,你可能需要访问基础 Kafka 客户端。KafkaClientService 提供对 ProducerConsumer 的线程安全访问。

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;

import org.apache.kafka.clients.producer.ProducerRecord;

import io.quarkus.runtime.StartupEvent;
import io.smallrye.reactive.messaging.kafka.KafkaClientService;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.KafkaProducer;

@ApplicationScoped
public class PriceSender {

    @Inject
    KafkaClientService clientService;

    void onStartup(@Observes StartupEvent startupEvent) {
        KafkaProducer<String, Double> producer = clientService.getProducer("generated-price");
        producer.runOnSendingThread(client -> client.send(new ProducerRecord<>("prices", 2.4)))
            .await().indefinitely();
    }
}

KafkaClientService 是一个实验性 API,将来可能会发生变更。

你还可以将 Kafka 配置注入到你的应用程序并直接创建 Kafka 生产者、消费者和管理客户端:

import io.smallrye.common.annotation.Identifier;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.KafkaAdminClient;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import java.util.HashMap;
import java.util.Map;

@ApplicationScoped
public class KafkaClients {

    @Inject
    @Identifier("default-kafka-broker")
    Map<String, Object> config;

    @Produces
    AdminClient getAdmin() {
        Map<String, Object> copy = new HashMap<>();
        for (Map.Entry<String, Object> entry : config.entrySet()) {
            if (AdminClientConfig.configNames().contains(entry.getKey())) {
                copy.put(entry.getKey(), entry.getValue());
            }
        }
        return KafkaAdminClient.create(copy);
    }

}

default-kafka-broker 配置映射包含所有应用程序属性,其前缀为 kafka.KAFKA_。有关更多配置选项,请查看 Kafka Configuration Resolution

JSON serialization

Quarkus 具备处理 JSON Kafka 消息的内置功能。

设想我们有一个 Fruit 数据类如下:

public class Fruit {

    public String name;
    public int price;

    public Fruit() {
    }

    public Fruit(String name, int price) {
        this.name = name;
        this.price = price;
    }
}

并且我们想用它来接收来自 Kafka 的消息、执行一些价格转换,并将消息发回 Kafka。

import io.smallrye.reactive.messaging.annotations.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;

/**
* A bean consuming data from the "fruit-in" channel and applying some price conversion.
* The result is pushed to the "fruit-out" channel.
*/
@ApplicationScoped
public class FruitProcessor {

    private static final double CONVERSION_RATE = 0.88;

    @Incoming("fruit-in")
    @Outgoing("fruit-out")
    @Broadcast
    public Fruit process(Fruit fruit) {
        fruit.price = fruit.price * CONVERSION_RATE;
        return fruit;
    }

}

为此,我们需要使用 Jackson 或 JSON-B 设置 JSON 序列化。

在正确配置 JSON 序列化后,你还可以使用 Publisher<Fruit>Emitter<Fruit>

Serializing via Jackson

Quarkus 对基于 Jackson 的 JSON 序列化和反序列化内置了支持。它还将为您“@3”序列化程序和反序列化程序,因此您不必配置任何内容。如果禁用了生成,您可以按如下所述使用提供的“@1”和“@2”。

存在可用于通过 Jackson 序列化所有数据对象的现有“@4”。如果您想使用“@5”,则可以创建一个空子类。

默认情况下,“@6”序列化空为“@7”字符串,可以通过设置 Kafka 配置属性“@8”来自定义该属性,这会将空序列化为“@9”。在使用压缩主题时这个很方便,因为“@10”用作墓碑,了解在压缩阶段中删除哪些消息。

相应的反序列化类需要被子类化。所以,让我们创建一个扩展“@12”的“@11”。

package com.acme.fruit.jackson;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

最后,配置您的通道以使用 Jackson 序列化程序和反序列化程序。

# Configure the Kafka source (we read from it)
mp.messaging.incoming.fruit-in.topic=fruit-in
mp.messaging.incoming.fruit-in.value.deserializer=com.acme.fruit.jackson.FruitDeserializer

# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.fruit-out.topic=fruit-out
mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

现在,您的 Kafka 消息将包含一个 Jackson 序列化的“@13”数据对象的表示形式。在这种情况下,“@14”配置不是必要的,因为“@15”在默认情况下已启用。

如果您想反序列化水果列表,则需要创建一个使用 Jackson“@16”表示所用泛型集合的反序列化程序。

package com.acme.fruit.jackson;

import java.util.List;
import com.fasterxml.jackson.core.type.TypeReference;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class ListOfFruitDeserializer extends ObjectMapperDeserializer<List<Fruit>> {
    public ListOfFruitDeserializer() {
        super(new TypeReference<List<Fruit>>() {});
    }
}

Serializing via JSON-B

首先,您需要包括“@17”扩展。

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-jsonb</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-jsonb")

存在可用于通过 JSON-B 序列化所有数据对象的现有“@18”。如果您想使用“@19”,则可以创建一个空子类。

默认情况下,“@20”序列化空为“@21”字符串,可以通过设置 Kafka 配置属性“@22”来自定义该属性,这会将空序列化为“@23”。在使用压缩主题时这个很方便,因为“@24”用作墓碑,了解在压缩阶段中删除哪些消息。

相应的反序列化类需要被子类化。所以,让我们创建一个扩展泛型“@26”的“@25”。

package com.acme.fruit.jsonb;

import io.quarkus.kafka.client.serialization.JsonbDeserializer;

public class FruitDeserializer extends JsonbDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

最后,配置您的通道以使用 JSON-B 序列化程序和反序列化程序。

# Configure the Kafka source (we read from it)
mp.messaging.incoming.fruit-in.connector=smallrye-kafka
mp.messaging.incoming.fruit-in.topic=fruit-in
mp.messaging.incoming.fruit-in.value.deserializer=com.acme.fruit.jsonb.FruitDeserializer

# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.fruit-out.connector=smallrye-kafka
mp.messaging.outgoing.fruit-out.topic=fruit-out
mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer

现在,您的 Kafka 消息将包含一个 JSON-B 序列化的“@27”数据对象的表示形式。

如果您想反序列化水果列表,则需要创建一个带有“@28”的反序列化程序,用来表示所用的泛型集合。

package com.acme.fruit.jsonb;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import io.quarkus.kafka.client.serialization.JsonbDeserializer;

public class ListOfFruitDeserializer extends JsonbDeserializer<List<Fruit>> {
    public ListOfFruitDeserializer() {
        super(new ArrayList<MyEntity>() {}.getClass().getGenericSuperclass());
    }
}

如果您不想为每个数据对象创建一个反序列化程序,则可以使用泛型“@29”,它将反序列化为“@30”。还可以使用相应的序列化程序:“@31”。

Avro Serialization

这在专门指南中进行了描述:@32。

JSON Schema Serialization

这在专门指南中进行了描述:@33。

Serializer/deserializer autodetection

在使用带有 Kafka 的 Quarkus 消息传递(“@34”)时,Quarkus 通常可以自动检测正确的序列化程序和反序列化程序类。此自动检测基于“@35”和“@36”方法的声明以及注入的“@Channel”。

例如,如果您声明

@Outgoing("generated-price")
public Multi<Integer> generate() {
    ...
}

如果您的配置表明 generated-price 通道使用 smallrye-kafka 连接器,Quarkus 会自动将 value.serializer 设置为 Kafka 的内置 IntegerSerializer

类似地,如果您声明

@Incoming("my-kafka-records")
public void consume(Record<Long, byte[]> record) {
    ...
}

而且您的配置表明 my-kafka-records 通道使用 smallrye-kafka 连接器,那么 Quarkus 会自动将 key.deserializer 设置为 Kafka 的内置 LongDeserializer,并将 value.deserializer 设置为 ByteArrayDeserializer

最后,如果您声明

@Inject
@Channel("price-create")
Emitter<Double> priceEmitter;

而且您的配置表明 price-create 通道使用 smallrye-kafka 连接器,那么 Quarkus 会自动将 value.serializer 设置为 Kafka 的内置 DoubleSerializer

序列化程序/反序列化程序自动检测支持的完整类型集为:

  • short and java.lang.Short

  • int and java.lang.Integer

  • long and java.lang.Long

  • float and java.lang.Float

  • double and java.lang.Double

  • byte[]

  • java.lang.String

  • java.util.UUID

  • java.nio.ByteBuffer

  • org.apache.kafka.common.utils.Bytes

  • io.vertx.core.buffer.Buffer

  • io.vertx.core.json.JsonObject

  • io.vertx.core.json.JsonArray

  • 具有 org.apache.kafka.common.serialization.Serializer&lt;T&gt;/org.apache.kafka.common.serialization.Deserializer&lt;T&gt; 的直接实现的类。

    • 实现需要将类型参数 T 指定为(解)序列化的类型。

  • 生成的源自 Avro 架构的类,以及 Avro GenericRecord,如果存在 Confluent 或 Apicurio Registry serde

    • 如果存在多个 Avro serdes,则对于源自 Avro 的类必须手动配置序列化程序/反序列化程序,因为不可能自动检测

    • 有关使用 Confluent 或 Apicurio Registry 库的更多信息,请参阅 Using Apache Kafka with Schema Registry and Avro

  • 具有 ObjectMapperSerializer/ObjectMapperDeserializer 子类的类,如 Serializing via Jackson 所述

    • 技术上无需创建 ObjectMapperSerializer 的子类,但这种情况下不可能进行自动检测

  • 具有 JsonbSerializer/JsonbDeserializer 子类的类,如 Serializing via JSON-B 所述

    • 技术上无需创建 JsonbSerializer 的子类,但这种情况下不可能进行自动检测

如果序列化程序/反序列化程序通过配置设置,则不会被自动检测替换。

如果您在序列化程序自动检测中遇到任何问题,可以通过设置 quarkus.messaging.kafka.serializer-autodetection.enabled=false 完全关闭该功能。如果您需要执行此操作,请在 Quarkus issue tracker 中提交缺陷,以便我们解决您遇到的问题。

JSON Serializer/deserializer generation

Quarkus 会自动为如下位置生成序列化程序和反序列化程序:

  1. 未配置序列化程序/反序列化程序

  2. 自动检测没有找到匹配的序列化器/反序列化器

底层使用 Jackson。

可以使用以下方式禁用此生成:

quarkus.messaging.kafka.serializer-generation.enabled=false

生成不支持 List<Fruit> 等集合。参考 Serializing via Jackson 为此情况编写自己的序列化器/反序列化器。

Using Schema Registry

在 Avro 的专用指南中对此进行了描述: Using Apache Kafka with Schema Registry and Avro。JSON Schema 中的描述有所不同: Using Apache Kafka with Schema Registry and JSON Schema

Health Checks

Quarkus 为 Kafka 提供了多个运行状况检查。这些检查与 quarkus-smallrye-health 扩展一起使用。

Kafka Broker Readiness Check

使用 quarkus-kafka-client 扩展时,可以通过在 application.properties 中将 quarkus.kafka.health.enabled 属性设置为 true 来启用 readiness 运行状况检查。此检查报告与 default Kafka 代理交互的状态(使用 kafka.bootstrap.servers 进行配置)。它要求与 Kafka 代理存在 admin connection,并且默认禁用该代理。如果启用,当您访问应用程序的 /q/health/ready 终结点时,您将具有有关连接验证状态的信息。

Kafka Reactive Messaging Health Checks

使用响应式消息传递和 Kafka 连接器时,每个已配置通道(入站或出站)都提供 startuplivenessreadiness 检查。

  • startup 检查验证是否已建立与 Kafka 群集的通信。

  • liveness 检查捕获在与 Kafka 通信期间发生的任何不可恢复的故障。

  • readiness 检查验证 Kafka 连接器是否已准备好使用已配置 Kafka 主题来消费/生成消息。

对于每个通道,您可以使用以下方式禁用检查:

# Disable both liveness and readiness checks with `health-enabled=false`:

# Incoming channel (receiving records form Kafka)
mp.messaging.incoming.your-channel.health-enabled=false
# Outgoing channel (writing records to Kafka)
mp.messaging.outgoing.your-channel.health-enabled=false

# Disable only the readiness check with `health-readiness-enabled=false`:

mp.messaging.incoming.your-channel.health-readiness-enabled=false
mp.messaging.outgoing.your-channel.health-readiness-enabled=false

您可以使用 mp.messaging.incoming|outgoing.$channel.bootstrap.servers 属性为每个通道配置 bootstrap.servers。默认值为 kafka.bootstrap.servers

响应式消息传递 startupreadiness 检查提供了两种策略。默认策略验证是否已与代理建立活动连接。此方法不会侵入,因为它基于内置 Kafka 客户端指标。

使用 health-topic-verification-enabled=true 属性, startup 探针使用 admin client 检查主题列表。而入站通道的 readiness 探针会检查是否至少为某个分区分配了消耗,出站通道的探针会检查生产者使用的主题是否存在于代理中。

请注意,要实现这一点,需要 admin connection。您可以使用 health-topic-verification-timeout 配置调整对代理执行的主题验证调用的超时时间。

Observability

如果 OpenTelemetry extension 存在,则 Kafka 连接器通道与 OpenTelemetry Tracing 协同工作。写入 Kafka 主题的消息会传播当前跟踪范围。对于入站通道,如果消耗的 Kafka 记录包含跟踪信息,则消息处理会继承消息范围作为父代。

可以为每个通道显式禁用跟踪:

mp.messaging.incoming.data.tracing-enabled=false

如果 Micrometer extension 存在,则 Kafka 生产者和消费者客户端指标会作为 Micrometer 指标公开。

Channel metrics

每个通道指标也可以被收集并公开为 Micrometer 仪表。每个通道可以通过带有 channel 标记标识的以下度量收集:

  • quarkus.messaging.message.count: 已生产或接收的消息数

  • quarkus.messaging.message.acks: 成功处理的消息数

  • quarkus.messaging.message.failures: 带有故障消息处理的数量

  • quarkus.messaging.message.duration: 消息处理的持续时间

出于向后兼容性考虑,默认情况下未启用通道指标,可以通过以下方式启用:

message observation 取决于截获消息,因此不支持消费带有自定义消息类型的通道,如 IncomingKafkaRecord, KafkaRecord, IncomingKafkaRecordBatchKafkaRecordBatch. 消息拦截和观察仍然可以在消费通用 Message 类型或通过 converters 启用的自定义有效负载的通道中使用。

smallrye.messaging.observation.enabled=true

Kafka Streams

这将在专门指南中有所描述: Using Apache Kafka Streams.

Using Snappy for message compression

outgoing 通道上,你可以通过将 compression.type 属性设置为 snappy 来启用 Snappy 压缩:

mp.messaging.outgoing.fruit-out.compression.type=snappy

在 JVM 模式下,它将立即生效。然而,要在本地可执行文件中编译你的应用程序,你需要在 application.properties 中添加 quarkus.kafka.snappy.enabled=true.

在本地模式下,Snappy 默认处于禁用状态,因为使用 Snappy 需要嵌入一个本地库并在应用程序启动时解压它。

Authentication with OAuth

如果你的 Kafka 中介使用 OAuth 作为认证机制,你需要配置 Kafka 消费者来启用这个认证过程。首先,将以下依赖项添加到你的应用程序中:

pom.xml
<dependency>
    <groupId>io.strimzi</groupId>
    <artifactId>kafka-oauth-client</artifactId>
</dependency>
<!-- if compiling to native you'd need also the following dependency -->
<dependency>
    <groupId>io.strimzi</groupId>
    <artifactId>kafka-oauth-common</artifactId>
</dependency>
build.gradle
implementation("io.strimzi:kafka-oauth-client")
// if compiling to native you'd need also the following dependency
implementation("io.strimzi:kafka-oauth-common")

这个依赖项提供了处理 OAuth 工作流所需的回调处理程序。然后,在 application.properties 中添加:

mp.messaging.connector.smallrye-kafka.security.protocol=SASL_PLAINTEXT
mp.messaging.connector.smallrye-kafka.sasl.mechanism=OAUTHBEARER
mp.messaging.connector.smallrye-kafka.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.client.id="team-a-client" \
  oauth.client.secret="team-a-client-secret" \
  oauth.token.endpoint.uri="http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/token" ;
mp.messaging.connector.smallrye-kafka.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

quarkus.ssl.native=true

更新 oauth.client.id, oauth.client.secretoauth.token.endpoint.uri 值。

OAuth 认证适用于 JVM 和本地模式。由于在本地模式下默认没有启用 SSL,因此必须添加 quarkus.ssl.native=true 来支持 JaasClientOauthLoginCallbackHandler,它使用 SSL。(请参阅 Using SSL with Native Executables 指南了解更多详细信息。)

Testing a Kafka application

Testing without a broker

在无需启动 Kafka 代理的情况下测试应用程序非常有用。为此,你可以将 Kafka 连接器管理的通道 switchin-memory.

此方法仅适用于 JVM 测试。它不能用于本地测试(因为它们不支持注入)。

假设我们要测试以下处理器应用程序:

@ApplicationScoped
public class BeverageProcessor {

    @Incoming("orders")
    @Outgoing("beverages")
    Beverage process(Order order) {
        System.out.println("Order received " + order.getProduct());
        Beverage beverage = new Beverage();
        beverage.setBeverage(order.getProduct());
        beverage.setCustomer(order.getCustomer());
        beverage.setOrderId(order.getOrderId());
        beverage.setPreparationState("RECEIVED");
        return beverage;
    }

}

首先,将以下测试依赖项添加到应用程序中:

pom.xml
<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>smallrye-reactive-messaging-in-memory</artifactId>
    <scope>test</scope>
</dependency>
build.gradle
testImplementation("io.smallrye.reactive:smallrye-reactive-messaging-in-memory")

接着,按照以下步骤创建 Quarkus 测试资源:

public class KafkaTestResourceLifecycleManager implements QuarkusTestResourceLifecycleManager {

    @Override
    public Map<String, String> start() {
        Map<String, String> env = new HashMap<>();
        Map<String, String> props1 = InMemoryConnector.switchIncomingChannelsToInMemory("orders");     (1)
        Map<String, String> props2 = InMemoryConnector.switchOutgoingChannelsToInMemory("beverages");  (2)
        env.putAll(props1);
        env.putAll(props2);
        return env;  (3)
    }

    @Override
    public void stop() {
        InMemoryConnector.clear();  (4)
    }
}
1 将传入通道 orders(期待来自 Kafka 的消息)切换到内存内。
2 将传出通道 beverages(向 Kafka 写入消息)切换到内存内。
3 构建并返回一个 Map,其中包含使用内存内通道配置应用程序所需的所有属性。
4 当测试停止时,清除 InMemoryConnector(丢弃所有已接收和已发送的消息)

使用上面创建的测试资源创建一个 Quarkus 测试:

import static org.awaitility.Awaitility.await;

@QuarkusTest
@WithTestResource(KafkaTestResourceLifecycleManager.class)
class BaristaTest {

    @Inject
    @Connector("smallrye-in-memory")
    InMemoryConnector connector; (1)

    @Test
    void testProcessOrder() {
        InMemorySource<Order> ordersIn = connector.source("orders");     (2)
        InMemorySink<Beverage> beveragesOut = connector.sink("beverages");  (3)

        Order order = new Order();
        order.setProduct("coffee");
        order.setName("Coffee lover");
        order.setOrderId("1234");

        ordersIn.send(order);  (4)

        await().<List<? extends Message<Beverage>>>until(beveragesOut::received, t -> t.size() == 1); (5)

        Beverage queuedBeverage = beveragesOut.received().get(0).getPayload();
        Assertions.assertEquals(Beverage.State.READY, queuedBeverage.getPreparationState());
        Assertions.assertEquals("coffee", queuedBeverage.getBeverage());
        Assertions.assertEquals("Coffee lover", queuedBeverage.getCustomer());
        Assertions.assertEquals("1234", queuedBeverage.getOrderId());
    }

}
1 在测试类中注入内存内连接器。
2 检索传入通道 (orders) - 该通道必须已在测试资源中切换到内存内。
3 检索传出通道 (beverages) - 该通道必须已在测试资源中切换到内存内。
4 使用 `send`方法将消息发送到 `orders`通道。应用程序将处理此消息并发送一条消息到 `beverages`通道。
5 在 `beverages`通道上使用 `received`方法检查应用程序生成的消息。

如果 Kafka 消费者基于批处理,你将需要将一批消息发送到该通道(通过手动创建它们)。

例如:

@ApplicationScoped
public class BeverageProcessor {

    @Incoming("orders")
    CompletionStage<Void> process(KafkaRecordBatch<String, Order> orders) {
        System.out.println("Order received " + orders.getPayload().size());
        return orders.ack();
    }
}
import static org.awaitility.Awaitility.await;

@QuarkusTest
@WithTestResource(KafkaTestResourceLifecycleManager.class)
class BaristaTest {

    @Inject
    @Connector("smallrye-in-memory")

    InMemoryConnector connector;

    @Test
    void testProcessOrder() {
        InMemorySource<IncomingKafkaRecordBatch<String, Order>> ordersIn = connector.source("orders");
        var committed = new AtomicBoolean(false);  (1)
        var commitHandler = new KafkaCommitHandler() {
            @Override
            public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record) {
                committed.set(true);  (2)
                return null;
            }
        };
        var failureHandler = new KafkaFailureHandler() {
            @Override
            public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record, Throwable reason, Metadata metadata) {
                return null;
            }
        };

        Order order = new Order();
        order.setProduct("coffee");
        order.setName("Coffee lover");
        order.setOrderId("1234");
        var record = new ConsumerRecord<>("topic", 0, 0, "key", order);
        var records = new ConsumerRecords<>(Map.of(new TopicPartition("topic", 1), List.of(record)));
        var batch = new IncomingKafkaRecordBatch<>(
            records, "kafka", 0, commitHandler, failureHandler, false, false);  (3)

        ordersIn.send(batch);

        await().until(committed::get);  (4)
    }
}
1 创建 `AtomicBoolean`以跟踪批处理是否已提交。
2 当批处理提交时更新 committed
3 使用单个记录创建 IncomingKafkaRecordBatch
4 等到批处理提交。

有了内存通道,我们不必启动 Kafka 代理便可以测试处理消息的应用程序代码。注意,不同的内存通道是独立的,并且将通道连接器切换为内存不会模拟配置为相同 Kafka 主题的通道之间的消息传递。

Context propagation with InMemoryConnector

默认情况下,内存通道在调用线程上调度消息,这将是单元测试中的主线程。

quarkus-test-vertx 依赖项提供了 @io.quarkus.test.vertx.RunOnVertxContext 注释,在测试方法中使用该注释时,将在 Vert.x 上下文中执行测试。

但是,大多数其他连接器都处理上下文传播,并在不同的重复 Vert.x 上下文中调度消息。

如果您的测试依赖于上下文传播,则可以使用 run-on-vertx-context 属性配置内存连接器通道以在 Vert.x 上下文中调度事件,包括消息和确认。或者,您可以使用 InMemorySource#runOnVertxContext 方法切换此行为。

Testing using a Kafka broker

如果您正在使用 Dev Services for Kafka,那么将启动一个 Kafka 代理并在测试期间一直可用,除非在 %test 配置文件中将其禁用。虽然可以使用 Kafka 客户端 API 连接到此代理,但 Kafka Companion Library 提供了一种与 Kafka 代理交互以及在测试中创建使用者、生成器和管理操作的简单方法。

要在测试中使用 KafkaCompanion API,请首先添加以下依赖项:

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-test-kafka-companion</artifactId>
    <scope>test</scope>
</dependency>

其中提供了 io.quarkus.test.kafka.KafkaCompanionResource,它是 io.quarkus.test.common.QuarkusTestResourceLifecycleManager 的实现。

然后使用 @WithTestResource 在测试中配置 Kafka Companion,例如:

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.UUID;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;

import io.quarkus.test.common.WithTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.kafka.InjectKafkaCompanion;
import io.quarkus.test.kafka.KafkaCompanionResource;
import io.smallrye.reactive.messaging.kafka.companion.ConsumerTask;
import io.smallrye.reactive.messaging.kafka.companion.KafkaCompanion;

@QuarkusTest
@WithTestResource(KafkaCompanionResource.class)
public class OrderProcessorTest {

    @InjectKafkaCompanion (1)
    KafkaCompanion companion;

    @Test
    void testProcessor() {
        companion.produceStrings().usingGenerator(i -> new ProducerRecord<>("orders", UUID.randomUUID().toString())); (2)

        // Expect that the tested application processes orders from 'orders' topic and write to 'orders-processed' topic

        ConsumerTask<String, String> orders = companion.consumeStrings().fromTopics("orders-processed", 10); (3)
        orders.awaitCompletion(); (4)
        assertEquals(10, orders.count());
    }
}
1 @InjectKafkaCompanion 注入的 KafkaCompanion 实例被配置为访问为测试创建的 Kafka 代理。
2 使用 KafkaCompanion 创建生产者任务,将 10 个记录写入“订单”主题。
3 创建使用者任务,订阅“已处理的订单”主题,并使用 10 个记录。
4 等待使用者任务完成。

如果在测试期间 Kafka Dev 服务可用,KafkaCompanionResource 将使用已创建的 Kafka 代理,否则会使用 Strimzi Test Container 创建一个 Kafka 代理。 可以使用 @ResourceArg 自定义已创建 Kafka 代理的配置,例如:

@WithTestResource(value = KafkaCompanionResource.class, initArgs = {
        @ResourceArg(name = "strimzi.kafka.image", value = "quay.io/strimzi-test-container/test-container:0.106.0-kafka-3.7.0"), // Image name
        @ResourceArg(name = "kafka.port", value = "9092"), // Fixed port for kafka, by default it will be exposed on a random port
        @ResourceArg(name = "kraft", value = "true"), // Enable Kraft mode
        @ResourceArg(name = "num.partitions", value = "3"), // Other custom broker configurations
})
public class OrderProcessorTest {
    // ...
}

Custom test resource

或者,您可以在测试资源中启动一个 Kafka 代理。以下代码段展示了一个使用 Testcontainers 启动 Kafka 代理的测试资源:

public class KafkaResource implements QuarkusTestResourceLifecycleManager {

    private final KafkaContainer kafka = new KafkaContainer();

    @Override
    public Map<String, String> start() {
        kafka.start();
        return Collections.singletonMap("kafka.bootstrap.servers", kafka.getBootstrapServers());  (1)
    }

    @Override
    public void stop() {
        kafka.close();
    }
}
1 配置 Kafka 启动位置,以便应用程序连接到此代理。

Dev Services for Kafka

如果存在任何与 Kafka 相关的扩展(例如 quarkus-messaging-kafka),Dev Services for Kafka 会在 dev 模式和运行测试时自动启动一个 Kafka 代理。因此,您无需手动启动代理。应用程序会自动配置。

由于启动 Kafka 代理可能需要较长时间,因此 Dev Services for Kafka 使用 Redpanda,一个在约 1 秒内启动的与 Kafka 兼容的代理。

Enabling / Disabling Dev Services for Kafka

除非满足以下条件,否则 Dev Services for Kafka 会被自动启用:

  • 设置 quarkus.kafka.devservices.enabledfalse

  • the kafka.bootstrap.servers is configured

  • 所有反应式消息传递 Kafka 通道都设置了 bootstrap.servers 属性

Dev Services for Kafka 依赖于 Docker 来启动代理。如果您的环境不支持 Docker,您需要手动启动代理,或连接到已运行的代理。您可以使用 kafka.bootstrap.servers 来配置代理地址。

Shared broker

大多数情况下,您需要在应用程序之间共享代理。Dev Services for Kafka 为您的多个,在 dev 模式下运行的 Quarkus 应用程序实现了共享单个代理的 service discovery 机制。

Dev Services for Kafka 用 quarkus-dev-service-kafka 标签启动容器,用于标识容器。

如果您需要多个(共享)代理,您可以配置 quarkus.kafka.devservices.service-name 属性并指示代理名称。它将查找具有相同值的容器,或在找不到容器的情况下启动一个新容器。默认服务名称为 kafka

在 dev 模式下默认启用共享,但在测试模式下禁用共享。您可以使用 quarkus.kafka.devservices.shared=false 禁用共享。

Setting the port

默认情况下,Dev Services for Kafka 选择一个随机端口并配置应用程序。您可以通过配置 quarkus.kafka.devservices.port 属性来设置端口。

请注意,Kafka 通告地址会使用所选端口自动配置。

Configuring the image

Dev Services for Kafka 支持 Redpanda, kafka-nativeStrimzi(处于 Kraft模式)映像。

*Redpanda*是一个与 Kafka 兼容的事件流式传输平台。由于它提供快速启动时间,因此 dev services 默认使用 `vectorized/redpanda`的 Redpanda 映像。您可以从 [role="bare"][role="bare"]https://hub.docker.com/r/vectorized/redpanda中选择任何版本。

*kafka-native*提供使用 Quarkus 和 GraalVM 编译为原生二进制文件的 Apache Kafka 发行版的映像。同时仍然是 experimental,它提供了非常快速启动时间,并且空间占用小。

可以使用配置映像类型

quarkus.kafka.devservices.provider=kafka-native

*Strimzi*为在 Kubernetes 上运行 Apache Kafka 提供容器映像和操作符。虽然 Strimzi 针对 Kubernetes 进行了优化,但这些映像在经典容器环境中也能完美运行。Strimzi 容器映像在 JVM 上运行“真正的”Kafka 代理,启动速度较慢。

quarkus.kafka.devservices.provider=strimzi

对于 Strimzi,您可以从 [role="bare"][role="bare"]https://quay.io/repository/strimzi-test-container/test-container?tab=tags中选择具有 Kraft 支持(2.8.1 及更高版本)的任何 Kafka 版本映像。

quarkus.kafka.devservices.image-name=quay.io/strimzi-test-container/test-container:0.106.0-kafka-3.7.0

Configuring Kafka topics

您可以将 Dev Services for Kafka 配置为在代理启动后创建主题。主题使用给定数量的分区和 1 个副本创建。

以下示例创建一个名为 `test`且有 3 个分区的主题,以及一个名为 `messages`且有 2 个分区的第二个主题。

quarkus.kafka.devservices.topic-partitions.test=3
quarkus.kafka.devservices.topic-partitions.messages=2

如果已经存在具有指定名称的主题,则将跳过创建过程,而不会尝试将现有主题重新分区为其他数量的分区。

您可以使用 `quarkus.kafka.devservices.topic-partitions-timeout`来配置主题创建中使用的 Kafka 管理客户端调用的超时时间,其默认值为 2 秒。

Transactional and Idempotent producers support

默认情况下,Redpanda 代理配置为启用事务和幂等性功能。您可以使用以下方法禁用它们:

quarkus.kafka.devservices.redpanda.transaction-enabled=false

Redpanda 事务不支持完全一次处理。

Kafka Dev UI

如果存在任何与 Kafka 相关的扩展(例如 quarkus-messaging-kafka),Quarkus Dev UI 将扩展为包含有 Kafka 代理管理 UI。该管理 UI 会自动连接到为应用程序配置的 Kafka 代理。

kafka dev ui link

使用 Kafka Dev UI,您可以直接管理您的 Kafka 集群并执行任务,例如:

  • Listing and creating topics

  • Visualizing records

  • Publishing new records

  • 检查使用者组列表及其使用量延迟

kafka dev ui records

Kafka Dev UI 属于 Quarkus Dev UI 的一部分,它仅在开发模式下可用。

Kubernetes Service Bindings

Quarkus Kafka 扩展支持 Service Binding Specification for Kubernetes。您可以通过将 `quarkus-kubernetes-service-binding`扩展添加到您的应用程序中来启用此功能。

在适当地配置的 Kubernetes 集群中运行时,Kafka 扩展将从集群内可用的服务绑定中提取其 Kafka 代理连接配置,而无需用户配置。

Execution model

反应式消息传递在 I/O 线程上调用用户的函数。因此,默认情况下,这些函数不得阻塞。正如 Blocking processing中所述,如果此函数将阻塞调用线程,您需要在函数上添加 `@Blocking`注解。

请参阅 Quarkus Reactive Architecture documentation以了解有关此主题的更多详细信息。

Channel Decorators

SmallRye 反应式消息传递支持装饰传入和传出频道,以便实现横切关注点,例如监视、跟踪或消息拦截。有关实现装饰器和消息拦截器的更多信息,请参阅 SmallRye Reactive Messaging documentation

Configuration Reference

有关 SmallRye 反应式消息传递配置的更多详细信息,请参见 SmallRye Reactive Messaging - Kafka Connector Documentation

每个频道可以通过使用以下方式通过配置来关闭:

mp.messaging.[incoming|outgoing].[channel].enabled=false

最重要的属性列在下面的表格中:

Incoming channel configuration (polling from Kafka)

使用以下内容配置下列属性:

mp.messaging.incoming.your-channel-name.attribute=value

一些属性具有可以在全局范围内配置的别名:

kafka.bootstrap.servers=...

您还可以传递基础 Kafka consumer支持的任何属性。

例如,要配置 `max.poll.records`属性,请使用:

mp.messaging.incoming.[channel].max.poll.records=1000

一些消费者客户端属性被配置为合理的默认值:

如果未设置,则 reconnect.backoff.max.ms`设置为 `10000,以避免在断开连接时产生高负载。

如果未设置,则 key.deserializer`设置为 `org.apache.kafka.common.serialization.StringDeserializer

消费者 `client.id`根据使用 `mp.messaging.incoming.[channel].partitions`属性创建的客户端数进行配置。

  • 如果提供了 client.id,则将其按原样使用,或者如果设置了 `partitions`属性,则用客户端索引作为后缀。

  • 如果没有提供 client.id,则将其生成为 [client-id-prefix][channel-name][-index]

Table 1. Incoming Attributes of the 'smallrye-kafka' connector
Attribute (alias) Description Mandatory Default

[role="no-hyphens"]bootstrap.servers

[role="no-hyphens"](kafka.bootstrap.servers)

用于建立与 Kafka 群集的初始连接的主机:端口的逗号分隔列表。类型: string

false

localhost:9092

[role="no-hyphens"]topic

使用/填充 Kafka 主题。如果未设置此属性或 topics 属性,则使用通道名称类型: string

false

[role="no-hyphens"]health-enabled

启用(默认)或禁用健康报告类型: boolean

false

true

[role="no-hyphens"]health-readiness-enabled

启用(默认)或禁用就绪性健康报告类型: boolean

false

true

[role="no-hyphens"]health-readiness-topic-verification

deprecated - 就绪性检查是否应验证主题在代理上存在。默认为 false。启用它需要管理员连接。已弃用:改为使用 `health-topic-verification-enabled}。类型: boolean

false

[role="no-hyphens"]health-readiness-timeout

deprecated - 在就绪性健康检查期间,连接器将连接到代理并获取主题列表。此属性指定检索的最大持续时间(以毫秒为单位)。如果超出,则通道被视为未就绪。已弃用:改为使用 health-topic-verification-timeout。类型: long

false

[role="no-hyphens"]health-topic-verification-enabled

启动和就绪性检查是否应验证主题在代理上存在。默认为 false。启用它需要管理员客户端连接。类型: boolean

false

false

[role="no-hyphens"]health-topic-verification-timeout

在启动和就绪性健康检查期间,连接器将连接到代理并获取主题列表。此属性指定检索的最大持续时间(以毫秒为单位)。如果超出,则通道被视为未就绪。类型: long

false

2000

[role="no-hyphens"]tracing-enabled

启用(默认)或禁用跟踪类型: boolean

false

true

[role="no-hyphens"]client-id-prefix

Kafka 客户端 client.id 属性的前缀。如果已定义配置或生成,则 client.id 将使用给定值作为前缀,否则 kafka-consumer- 为前缀。类型: string

false

[role="no-hyphens"]checkpoint.state-store

使用 checkpoint 提交策略时,在 @Identifier 中设置的 bean 名称将实现 io.smallrye.reactive.messaging.kafka.StateStore.Factory 以指定状态存储实现。类型: string

false

[role="no-hyphens"]checkpoint.state-type

使用 checkpoint 提交策略时,要保留在状态存储中的状态对象的完全限定类型名称。提供后,状态存储实现可以使用它来帮助保留处理状态对象。类型: string

false

[role="no-hyphens"]checkpoint.unsynced-state-max-age.ms

使用 checkpoint 提交策略时,指定在将连接器标记为不健康之前,处理状态必须保留的最大时间(以毫秒为单位)。将此属性设置为`0`将禁用此监视。类型: int

false

10000

[role="no-hyphens"]cloud-events

启用(默认)或禁用 Cloud Event 支持。如果在 incoming 通道上启用,则连接器将分析传入记录并尝试创建 Cloud Event 元数据。如果在 outgoing 上启用,则连接器将消息包括 Cloud Event 元数据的情况下发送为 Cloud Event。类型: boolean

false

true

[role="no-hyphens"]kafka-configuration

提供对此通道的默认 Kafka 消费者/生产者配置的 CDI bean 的标识符。通道配置仍然可以覆盖任何属性。该 bean 的类型必须是 Map<String, Object>,并且必须使用 @io.smallrye.common.annotation.Identifier 限定符设置标识符。类型: string

false

[role="no-hyphens"]topics

要使用的逗号分隔主题列表。不能与 topicpattern 属性一起使用类型: string

false

[role="no-hyphens"]pattern

指示 topic 属性为正则表达式。必须与 topic 属性一起使用。不能与 topics 属性一起使用类型: boolean

false

false

[role="no-hyphens"]key.deserializer

用于反序列化记录密钥的反序列化类名类型: string

false

org.apache.kafka.common.serialization.StringDeserializer

[role="no-hyphens"]lazy-client

Kafka 客户端是创建延迟还是创建立即。类型: boolean

false

false

[role="no-hyphens"]value.deserializer

用于反序列化记录值的反序列化类名类型: string

true

[role="no-hyphens"]fetch.min.bytes

服务器应为获取请求返回的最小数据量。默认设置为 1 个字节,表示在有 1 个字节的数据可用或获取请求等待数据到达超时后立即响应获取请求。类型:int

false

1

[role="no-hyphens"]group.id

一个标识应用程序所属的使用者组的唯一字符串。如果未设置,则默认为应用程序名称,如 quarkus.application.name`配置属性所设置。如果也未设置该属性,则使用唯一生成的 ID。建议始终定义 `group.id,自动生成仅是开发的一个方便特性。您可以通过将此属性设置为 `${quarkus.uuid}`来显式请求自动生成唯一 ID。类型:string

false

[role="no-hyphens"]enable.auto.commit

如果启用,则使用者的偏移量将由底层 Kafka 客户端在后台定期提交,忽略记录的实际处理结果。建议不要启用此设置,并让反应式消息处理提交。类型:boolean

false

false

[role="no-hyphens"]retry

在发生故障时是否重新尝试连接到代理类型:boolean

false

true

[role="no-hyphens"]retry-attempts

失败前最大重连次数。-1 表示无限重试类型:int

false

-1

[role="no-hyphens"]retry-max-wait

两次重新连接之间的最大延迟(以秒为单位)类型:int

false

30

[role="no-hyphens"]broadcast

是否应将 Kafka 记录分发给多个使用者类型:boolean

false

false

[role="no-hyphens"]auto.offset.reset

Kafka 中没有初始偏移量时该怎么办。可接受的值为 earliest、latest 和 none 类型:string

false

latest

[role="no-hyphens"]failure-strategy

指定当从记录生成的 message 被否定确认(nack)时应用的故障策略。值可以是 fail(默认值)、`ignore`或 `dead-letter-queue`类型:string

false

fail

[role="no-hyphens"]commit-strategy

指定当从记录生成的 message 被确认时应用的提交策略。值可以是 latestignore`或 `throttled。如果 enable.auto.commit`为 true,则默认为 `ignore,否则为 `throttled`类型:string

false

[role="no-hyphens"]throttled.unprocessed-record-max-age.ms

在使用 `throttled`提交策略时,指定未处理的 message 在连接器标记为不正常之前可达到的最大年龄(以毫秒为单位)。将此属性设置为 0 会禁用此监控。类型:int

false

60000

[role="no-hyphens"]dead-letter-queue.topic

当 `failure-strategy`设置为 `dead-letter-queue`时,指示发送记录的主题。默认为 `dead-letter-topic-$channel`类型:string

false

[role="no-hyphens"]dead-letter-queue.key.serializer

当 `failure-strategy`设置为 `dead-letter-queue`时,指示要使用的键序列化器。如果未设置,则使用与键反序列化器关联的序列化器类型:string

false

[role="no-hyphens"]dead-letter-queue.value.serializer

当 `failure-strategy`设置为 `dead-letter-queue`时,指示要使用的值序列化器。如果未设置,则使用与值反序列化器关联的序列化器类型:string

false

[role="no-hyphens"]partitions

要同时使用的分区数。连接器创建指定数量的 Kafka 使用者。它应匹配目标主题的分区数类型:int

false

1

[role="no-hyphens"]requests

当 `partitions`大于 1 时,此属性允许配置每次每个使用者请求的记录数。类型:int

false

128

[role="no-hyphens"]consumer-rebalance-listener.name

@Identifier`中设置的 bean 名称,该 bean 实现 `io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener。如果已设置,则此重新平衡侦听器将应用于使用者。类型:string

false

[role="no-hyphens"]key-deserialization-failure-handler

@Identifier`中设置的 bean 名称,该 bean 实现 `io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler。如果已设置,则反序列化键时发生的反序列化失败将委托给此处理程序,该处理程序可能重试或提供后备值。类型:string

false

[role="no-hyphens"]value-deserialization-failure-handler

@Identifier`中设置的 bean 名称,该 bean 实现 `io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler。如果已设置,则反序列化值时发生的反序列化失败将委托给此处理程序,该处理程序可能重试或提供后备值。类型:string

false

[role="no-hyphens"]fail-on-deserialization-failure

当没有设置反序列化失败处理程序且发生了反序列化失败时,报告失败并将应用程序标记为不正常。如果设置为 false 并且发生了反序列化失败,则将转发一个 null 值。类型:boolean

false

true

[role="no-hyphens"]graceful-shutdown

当应用程序终止时是否应该尝试优雅关闭。类型:boolean

false

true

[role="no-hyphens"]poll-timeout

轮询超时(以毫秒为单位)。轮询记录时,轮询最多等待此持续时间,然后返回记录。默认值为 1000 毫秒类型:int

false

1000

[role="no-hyphens"]pause-if-no-requests

当应用程序不请求项目且在请求时恢复时,轮询是否必须暂停。这允许根据应用程序容量实现基于反压。注意,轮询并未停止,但当暂停时不会检索任何记录。类型:boolean

false

true

[role="no-hyphens"]batch

是否批量消费 Kafka 记录。通道注入点必须消费兼容的类型,例如 List&lt;Payload&gt;KafkaRecordBatch&lt;Payload&gt; 。类型:boolean

false

false

[role="no-hyphens"]max-queue-size-factor

乘法因子,用于使用 max.poll.records * max-queue-size-factor 确定排队等待处理的最大记录数。默认为 2。在 batch 模式中,max.poll.records 被视为 1 。类型:int

false

2

Outgoing channel configuration (writing to Kafka)

使用以下内容配置下列属性:

mp.messaging.outgoing.your-channel-name.attribute=value

一些属性具有可以在全局范围内配置的别名:

kafka.bootstrap.servers=...

您还可以传递底层 Kafka producer支持的任何属性。

例如,若要配置 `max.block.ms`属性,请使用:

mp.messaging.incoming.[channel].max.block.ms=10000

某些生产者客户端属性配置为合理的默认值:

如果未设置,则 reconnect.backoff.max.ms`设置为 `10000,以避免在断开连接时产生高负载。

如果未设置,则 key.serializer`将设置为 `org.apache.kafka.common.serialization.StringSerializer

如果未设置,则生产者 client.id`将生成为 `[client-id-prefix][channel-name]

Table 2. Outgoing Attributes of the 'smallrye-kafka' connector
Attribute (alias) Description Mandatory Default

[role="no-hyphens"]acks

生产者要求领导者在将请求视为完成之前收到的确认数。这控制发送的记录的耐用性。可接受的值: 0、1、allType: string

false

1

[role="no-hyphens"]bootstrap.servers

[role="no-hyphens"](kafka.bootstrap.servers)

用于建立与 Kafka 群集的初始连接的主机:端口的逗号分隔列表。类型: string

false

localhost:9092

[role="no-hyphens"]client-id-prefix

Kafka 客户端 client.id 属性的前缀。如果定义的配置或生成的 client.id 具有给定值的前缀,否则 kafka-producer- 为前缀。类型: string

false

[role="no-hyphens"]buffer.memory

生产者可用来缓冲等待发送到服务器的记录的内存总字节数。类型: long

false

33554432

[role="no-hyphens"]close-timeout

等待 Kafka 生产者正常关机的毫秒数。类型: int

false

10000

[role="no-hyphens"]cloud-events

启用(默认)或禁用 Cloud Event 支持。如果在 incoming 通道上启用,则连接器将分析传入记录并尝试创建 Cloud Event 元数据。如果在 outgoing 上启用,则连接器将消息包括 Cloud Event 元数据的情况下发送为 Cloud Event。类型: boolean

false

true

[role="no-hyphens"]cloud-events-data-content-type

[role="no-hyphens"](cloud-events-default-data-content-type)

配置传出 Cloud 事件的默认 datacontenttype 属性。需要将 cloud-events 设置为 true。如果消息本身未配置 datacontenttype 属性,则使用此值。类型: string

false

[role="no-hyphens"]cloud-events-data-schema

[role="no-hyphens"](cloud-events-default-data-schema)

配置传出 Cloud 事件的默认 dataschema 属性。需要将 cloud-events 设置为 true。如果消息本身未配置 dataschema 属性,则使用此值。类型: string

false

[role="no-hyphens"]cloud-events-insert-timestamp

[role="no-hyphens"](cloud-events-default-timestamp)

是否让连接器自动将 time 属性插入到传出 Cloud 事件中。需要将 cloud-events 设置为 true。如果消息本身未配置 time 属性,则使用此值。类型: boolean

false

true

[role="no-hyphens"]cloud-events-mode

Cloud 事件模式( structuredbinary (默认值))。表示如何将 Cloud 事件写入到传出记录中。类型: string

false

binary

[role="no-hyphens"]cloud-events-source

[role="no-hyphens"](cloud-events-default-source)

配置传出 Cloud 事件的默认 source 属性。需要将 cloud-events 设置为 true。如果消息本身未配置 source 属性,则使用此值。类型: string

false

[role="no-hyphens"]cloud-events-subject

[role="no-hyphens"](cloud-events-default-subject)

配置传出 Cloud 事件的默认 subject 属性。需要将 cloud-events 设置为 true。如果消息本身未配置 subject 属性,则使用此值。类型: string

false

[role="no-hyphens"]cloud-events-type

[role="no-hyphens"](cloud-events-default-type)

配置传出 Cloud 事件的默认 type 属性。需要将 cloud-events 设置为 true。如果消息本身未配置 type 属性,则使用此值。类型: string

false

[role="no-hyphens"]health-enabled

启用(默认)或禁用健康报告类型: boolean

false

true

[role="no-hyphens"]health-readiness-enabled

启用(默认)或禁用就绪性健康报告类型: boolean

false

true

[role="no-hyphens"]health-readiness-timeout

deprecated - 在就绪性健康检查期间,连接器将连接到代理并获取主题列表。此属性指定检索的最大持续时间(以毫秒为单位)。如果超出,则通道被视为未就绪。已弃用:改为使用 health-topic-verification-timeout。类型: long

false

[role="no-hyphens"]health-readiness-topic-verification

deprecated - 就绪性检查是否应验证主题在代理上存在。默认为 false。启用它需要管理员连接。已弃用:改为使用 `health-topic-verification-enabled}。类型: boolean

false

[role="no-hyphens"]health-topic-verification-enabled

启动和就绪性检查是否应验证主题在代理上存在。默认为 false。启用它需要管理员客户端连接。类型: boolean

false

false

[role="no-hyphens"]health-topic-verification-timeout

在启动和就绪性健康检查期间,连接器将连接到代理并获取主题列表。此属性指定检索的最大持续时间(以毫秒为单位)。如果超出,则通道被视为未就绪。类型: long

false

2000

[role="no-hyphens"]kafka-configuration

提供对此通道的默认 Kafka 消费者/生产者配置的 CDI bean 的标识符。通道配置仍然可以覆盖任何属性。该 bean 的类型必须是 Map<String, Object>,并且必须使用 @io.smallrye.common.annotation.Identifier 限定符设置标识符。类型: string

false

[role="no-hyphens"]key

写入记录时使用的键。类型: string

false

[role="no-hyphens"]key-serialization-failure-handler

@Identifier 中设置的实现 io.smallrye.reactive.messaging.kafka.SerializationFailureHandler 的 Bean 的名称。如果设置了序列化失败,则在序列化键时发生的序列化失败将委托给该处理程序,该处理程序可能会提供一个后备值。类型: string

false

[role="no-hyphens"]key.serializer

用于序列化记录键的序列化程序类名。类型: string

false

org.apache.kafka.common.serialization.StringSerializer

[role="no-hyphens"]lazy-client

Kafka 客户端是创建延迟还是创建立即。类型: boolean

false

false

[role="no-hyphens"]max-inflight-messages

与Kafka同时写入的最大消息数。它限制等待写入经代理确认的消息数量。可以把该属性设置为“1”以取消限制类型:“2”

false

1024

[role="no-hyphens"]merge

连接器是否应该允许多个上游类型:“3”

false

false

[role="no-hyphens"]partition

目标分区ID。-1由客户端确定分区类型:“4”

false

-1

[role="no-hyphens"]propagate-headers

传送到传出记录传进记录头逗号分隔列表类型:“5”

false

[role="no-hyphens"]propagate-record-key

把传进记录密钥传播到传出记录类型:“6”

false

false

[role="no-hyphens"]retries

如果设置为正数,连接器会尝试重新发送任何未成功发送(可能是瞬态错误)的记录,直到达到重试次数。如果设置为0,禁用重试。如果未设置,连接器会尝试重新发送任何未能发送(可能是瞬态错误)记录一段时间,由“7”配置类型:“8”

false

2147483647

[role="no-hyphens"]topic

使用/填充 Kafka 主题。如果未设置此属性或 topics 属性,则使用通道名称类型: string

false

[role="no-hyphens"]tracing-enabled

启用(默认)或禁用跟踪类型: boolean

false

true

[role="no-hyphens"]value-serialization-failure-handler

“9”中设置的bean的名称实现“10”。如果设置,在序列化值时发生的序列化失败委派到该处理程序,该处理程序可能提供备用值类型:“11”

false

[role="no-hyphens"]value.serializer

用于序列化载荷的序列化器类名类型:“12”

true

[role="no-hyphens"]waitForWriteCompletion

客户端在确认消息之前是否等待Kafka确认已写入的记录类型:“13”

false

true

Kafka Configuration Resolution

Quarkus 暴露出所有与 Kafka 相关的应用程序属性,这些属性以 `kafka.`或 `KAFKA_`为前缀,位于具有 `default-kafka-broker`名称的配置映射中。此配置用于与 Kafka 代理商建立连接。

除了此默认配置之外,您还可以使用 `kafka-configuration`属性配置 `Map`生产者的名称:

mp.messaging.incoming.my-channel.connector=smallrye-kafka
mp.messaging.incoming.my-channel.kafka-configuration=my-configuration

在这种情况下,连接器将查找与`my-configuration`名称关联的`Map`。如果未设置 kafka-configuration,则会查找与通道名称关联可选的 Map(即前一示例中的 my-channel)。

@Produces
@ApplicationScoped
@Identifier("my-configuration")
Map<String, Object> outgoing() {
    return Map.ofEntries(
            Map.entry("value.serializer", ObjectMapperSerializer.class.getName())
    );
}

如果设置了 kafka-configuration`且找不到 `Map,则部署失败。

属性值按以下方式解析:

  1. 该属性直接在通道配置 (mp.messaging.incoming.my-channel.attribute=value) 上设置,

  2. 如果没有设置,则连接器将查找具有通道名称或已配置 kafka-configuration(如果已设置)的 Map,并且从该 `Map`检索值

  3. 如果解析后的 Map`不包含该值,则使用默认 `Map(该值以 `default-kafka-broker`名称公开)

Conditionally configure channels

您可以使用特定配置文件配置通道。因此,仅当启用了指定配置文件时才配置通道(并将它们添加到应用程序中)。

要实现此目的,您需要做以下操作:

  1. 使用 %my-profile`为 `mp.messaging.[incoming|outgoing].$channel`条目添加前缀,例如: `%my-profile.mp.messaging.[incoming|outgoing].$channel.key=value

  2. 可以使用 @Incoming(channel)@Outgoing(channel) 注解来启用 CDI bean 中的 @IfBuildProfile("my-profile"),但仅当启用配置文件时才需要启用它。

请注意,响应性消息传递会验证图形是否完整。因此,在使用这样的条件配置时,请确保应用程序可与已启用和未启用的配置文件配合使用。

请注意,此方法还可以根据配置文件来更改通道配置。

Integrating with Kafka - Common patterns

Writing to Kafka from an HTTP endpoint

若要从 HTTP 终端向 Kafka 发送消息,请在端点中注入 Emitter(或 MutinyEmitter):

package org.acme;

import java.util.concurrent.CompletionStage;

import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@Path("/")
public class ResourceSendingToKafka {

    @Channel("kafka") Emitter<String> emitter;          (1)

    @POST
    @Produces(MediaType.TEXT_PLAIN)
    public CompletionStage<Void> send(String payload) { (2)
        return emitter.send(payload);                   (3)
    }
}
1 Inject an Emitter<String>
2 HTTP 方法接收有效负载并返回在将消息写入 Kafka 时完成的 CompletionStage
3 将消息发送到 Kafka,send 方法返回 CompletionStage

该端点将通过 POST HTTP 请求传递的有效负载发送到发送器。发送器的通道在 application.properties 文件中映射到一个 Kafka 主题:

mp.messaging.outgoing.kafka.connector=smallrye-kafka
mp.messaging.outgoing.kafka.topic=my-topic

该端点返回一个 CompletionStage 以指示方法的异步属性。emitter.send 方法返回一个 CompletionStage<Void>。当消息写入 Kafka 后,返回的 future 完成。如果写入失败,则返回的 CompletionStage 异常地完成。

如果端点未返回 CompletionStage ,则可能会在将消息发送到 Kafka 之前写入 HTTP 响应,因此故障不会报告给用户。

如果您需要发送 Kafka 记录,请使用:

package org.acme;

import java.util.concurrent.CompletionStage;

import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import io.smallrye.reactive.messaging.kafka.Record;

@Path("/")
public class ResourceSendingToKafka {

    @Channel("kafka") Emitter<Record<String,String>> emitter;  (1)


    @POST
    @Produces(MediaType.TEXT_PLAIN)
    public CompletionStage<Void> send(String payload) {
        return emitter.send(Record.of("my-key", payload));    (2)
    }
}
1 请注意 Emitter&lt;Record&lt;K, V&gt;&gt; 的用法
2 使用 Record.of(k, v) 创建记录

Persisting Kafka messages with Hibernate with Panache

要将从 Kafka 接收的对象持久化到数据库中,可以使用带潘纳切的 Hibernate。

如果您使用 Hibernate Reactive,请查看 Persisting Kafka messages with Hibernate Reactive

让我们想象一下您接收 Fruit 对象。出于简单起见,我们的 Fruit 类非常简单:

package org.acme;

import jakarta.persistence.Entity;

import io.quarkus.hibernate.orm.panache.PanacheEntity;

@Entity
public class Fruit extends PanacheEntity {

    public String name;

}

若要使用 Kafka 主题中存储的 Fruit 实例,并将其持久化到数据库,您可以使用以下方法:

package org.acme;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.smallrye.common.annotation.Blocking;

@ApplicationScoped
public class FruitConsumer {

    @Incoming("fruits")                                     (1)
    @Transactional                                          (2)
    public void persistFruits(Fruit fruit) {                (3)
        fruit.persist();                                    (4)
    }
}
1 配置传入通道。此通道从 Kafka 中读取。
2 由于我们要写入数据库,因此我们必须在事务中。此注解启动一个新事务并在方法返回时提交它。Quarkus 自动将该方法视为 blocking。实际上,使用经典 Hibernate 写入数据库会阻塞。因此,Quarkus 在工作线程上调用该方法,您可以阻止它(而不是 I/O 线程)。
3 该方法接收每个 Fruit。请注意,您需要一个反序列化器才能从 Kafka 记录中重建 Fruit 实例。
4 持久化接收到的 fruit 对象。

如 <4> 中所述,你需要一个反序列化程序,它能够从记录创建一个 Fruit。这可以使用 Jackson 反序列化程序来实现:

package org.acme;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

关联的配置如下:

mp.messaging.incoming.fruits.connector=smallrye-kafka
mp.messaging.incoming.fruits.value.deserializer=org.acme.FruitDeserializer

请查看 Serializing via Jackson 以了解如何将 Jackson 与 Kafka 配合使用的更多详细信息。你还可以使用 Avro。

Persisting Kafka messages with Hibernate Reactive

要将从 Kafka 接收的对象持久化到数据库中,可以使用 Hibernate Reactive with Panache。

让我们想象一下您接收 Fruit 对象。出于简单起见,我们的 Fruit 类非常简单:

package org.acme;

import jakarta.persistence.Entity;

import io.quarkus.hibernate.reactive.panache.PanacheEntity;  (1)

@Entity
public class Fruit extends PanacheEntity {

    public String name;

}
1 务必使用 reactive 变体

若要使用 Kafka 主题中存储的 Fruit 实例,并将其持久化到数据库,您可以使用以下方法:

package org.acme;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.control.ActivateRequestContext;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class FruitStore {

    @Inject
    Mutiny.Session session;                    (1)

    @Incoming("in")
    @ActivateRequestContext (2)
    public Uni<Void> consume(Fruit entity) {
        return session.withTransaction(t -> {  (3)
            return entity.persistAndFlush()    (4)
                    .replaceWithVoid();        (5)
        }).onTermination().call(() -> session.close()); (6)
    }

}
1 注入 Hibernate Reactive Session
2 Hibernate Reactive SessionPanache API 要求一个活动 的 CDI 请求上下文。@ActivateRequestContext 注释创建一个新的请求上下文,当方法返回的 Uni 完成时销毁它。如果未使用 Panache,则可以注入和使用 Mutiny.SessionFactory,而无需激活请求上下文或手动关闭会话。
3 请求一个新的事务。事务在传递的操作完成时完成。
4 持久化实体。它返回一个 Uni&lt;Fruit&gt;
5 切换回一个 Uni&lt;Void&gt;
6 关闭会话 - 这是关闭与数据库的连接。连接可以随后被重新利用。

classic Hibernate 不同,你无法使用 @Transactional。相反,我们要使用 session.withTransaction 并持久化我们的实体。map 用于返回一个 Uni<Void>,而不是 Uni<Fruit>

你需要一个反序列化程序,它能够从记录创建一个 Fruit。这可以使用 Jackson 反序列化程序来实现:

package org.acme;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

关联的配置如下:

mp.messaging.incoming.fruits.connector=smallrye-kafka
mp.messaging.incoming.fruits.value.deserializer=org.acme.FruitDeserializer

请查看 Serializing via Jackson 以了解如何将 Jackson 与 Kafka 配合使用的更多详细信息。你还可以使用 Avro。

Writing entities managed by Hibernate to Kafka

我们想象以下过程:

  1. 你接收了一个带有效负载的 HTTP 请求,

  2. 你从此有效负载中创建一个 Hibernate 实体实例,

  3. 你将该实体持久化到数据库中,

  4. 你将实体发送到一个 Kafka 主题

如果您使用 Hibernate Reactive,请看 Writing entities managed by Hibernate Reactive to Kafka

因为我们要写入数据库,所以必须在这个交易中运行此方法。然而,以异步方式给 Kafka 发送实体。操作返回一个 CompletionStage(或一个 Uni,如果你使用 MutinyEmitter)报告操作完成的情况。我们必须确信该交易仍然在运行,直到对象被写入。否则,您可能会在交易之外访问该对象,这是不允许的。

要实施该进程,您需要以下方法:

package org.acme;

import java.util.concurrent.CompletionStage;

import jakarta.transaction.Transactional;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@Path("/")
public class ResourceSendingToKafka {

    @Channel("kafka") Emitter<Fruit> emitter;

    @POST
    @Path("/fruits")
    @Transactional                                                      (1)
    public CompletionStage<Void> storeAndSendToKafka(Fruit fruit) {     (2)
        fruit.persist();
        return emitter.send(new FruitDto(fruit));                       (3)
    }
}
1 当我们写入数据库时,请确保我们运行在交易当中
2 该方法接收要持续存在的水果实例。它返回一个 CompletionStage,用于交易分界。返回 CompletionStage 完成时,此交易被提交。在我们的例子中,即信息被写入 Kafka 时。
3 将管理实体包装在数据传输对象内,然后将其发送给 Kafka。此举可确保管理实体不受 Kafka 序列化的影响。

Writing entities managed by Hibernate Reactive to Kafka

若要发送由 Hibernate Reactive 管理的实体到 Kafka,我们建议使用:

  • 使用 Quarkus REST 为 HTTP 请求提供服务

  • 一个 MutinyEmitter 向通道发送消息,以便它能够轻松的与由 Hibernate Reactive 或带有 Panache 的 Hibernate Reactive 公开的 Mutiny API 集成。

以下示例演示了如何接收有效载荷,使用带有 Panache 的 Hibernate Reactive 存储在数据库内,并将持续存在的实体发送给 Kafka:

package org.acme;

import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.MutinyEmitter;

@Path("/")
public class ReactiveGreetingResource {

    @Channel("kafka") MutinyEmitter<Fruit> emitter;     (1)

    @POST
    @Path("/fruits")
    public Uni<Void> sendToKafka(Fruit fruit) {         (2)
        return Panache.withTransaction(() ->            (3)
            fruit.<Fruit>persist()
        )
            .chain(f -> emitter.send(f));               (4)
    }
}
1 注入一个 MutinyEmitter,它公开了一个 Mutiny API。它简化了与由带有 Panache 的 Hibernate Reactive 公开的 Mutiny API 集成。
2 接收有效载荷的 HTTP 方法返回一个 Uni&lt;Void&gt;。HTTP 响应在操作完成时(实体被持续化并且被写入 Kafka)被写入。
3 我们需要在交易中将实体写入数据库中。
4 一旦持续化操作完成,我们给 Kafka 发送实体。该 send 方法返回一个 Uni&lt;Void&gt;

Streaming Kafka topics as server-sent events

将 Kafka 主题作为服务器发送的事件 (SSE) 流是简单的:

  1. 在您的 HTTP 终点中注入代表 Kafka 主题的通道

  2. 从 HTTP 方法中将该通道返回为 Publisher`或 `Multi

以下代码提供了一个示例:

@Channel("fruits")
Multi<Fruit> fruits;

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<Fruit> stream() {
    return fruits;
}

当没有足够的活动时,一些环境会切断 SSE 连接。解决方法包括定期发送 _ping_消息(或空对象)。

@Channel("fruits")
Multi<Fruit> fruits;

@Inject
ObjectMapper mapper;

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> stream() {
    return Multi.createBy().merging()
            .streams(
                    fruits.map(this::toJson),
                    emitAPeriodicPing()
            );
}

Multi<String> emitAPeriodicPing() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(10))
            .onItem().transform(x -> "{}");
}

private String toJson(Fruit f) {
    try {
        return mapper.writeValueAsString(f);
    } catch (JsonProcessingException e) {
        throw new RuntimeException(e);
    }
}

除了发送来自 Kafka 的数据之外,还需要定期发送 ping,因此解决方法稍微复杂一些。要实现此目的,我们将来自 Kafka 的流与每 10 秒发出 {} 的定期流合并。

Chaining Kafka Transactions with Hibernate Reactive transactions

通过将 Kafka 事务与 Hibernate Reactive 事务链接,可以将记录发送到 Kafka 事务,执行数据库更新并仅在数据库事务成功时提交 Kafka 事务。

以下示例演示:

  • 使用 Quarkus REST 处理 HTTP 请求来接收有效负载,

  • 使用 Smallrye 容错限制该 HTTP 端点的并发性,

  • 启动 Kafka 事务并将有效负载发送到 Kafka 记录,

  • 使用带有 Panache 的 Hibernate Reactive 将有效负载存储到数据库中,

  • 仅在成功持久化实体时提交 Kafka 事务。

package org.acme;

import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.faulttolerance.Bulkhead;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.hibernate.reactive.mutiny.Mutiny;

import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;

@Path("/")
public class FruitProducer {

    @Channel("kafka") KafkaTransactions<Fruit> kafkaTx; (1)

    @POST
    @Path("/fruits")
    @Consumes(MediaType.APPLICATION_JSON)
    @Bulkhead(1) (2)
    public Uni<Void> post(Fruit fruit) { (3)
        return kafkaTx.withTransaction(emitter -> { (4)
            emitter.send(fruit); (5)
            return Panache.withTransaction(() -> { (6)
                return fruit.<Fruit>persist(); (7)
            });
        }).replaceWithVoid();
    }
}
1 注入一个公开 Mutiny API 的 KafkaTransactions。它允许与 Hibernate Reactive with Panache 公开的 Mutiny API 集成。
2 将 HTTP 端点的并发性限制为“1”,防止在给定时间启动多个事务。
3 接收有效负载的 HTTP 方法返回一个 Uni&lt;Void&gt;。在操作完成后(已持久化实体并提交了 Kafka 事务)将写入 HTTP 响应。
4 Begin a Kafka transaction.
5 在 Kafka 事务中将有效负载发送到 Kafka。
6 在 Hibernate Reactive 事务中将实体持久化到数据库中。
7 一旦完成持久性操作并且没有错误,Kafka 事务将提交。结果将省略并作为 HTTP 响应返回。

在前面的示例中,数据库事务(内部)将提交,然后是 Kafka 事务(外部)。如果您希望首先提交 Kafka 事务,然后是数据库事务,则需要按相反的顺序嵌套它们。

下一个示例演示如何使用 Hibernate Reactive API(不带 Panache):

import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.faulttolerance.Bulkhead;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.hibernate.reactive.mutiny.Mutiny;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;

@Path("/")
public class FruitProducer {

    @Channel("kafka") KafkaTransactions<Fruit> kafkaTx;

    @Inject Mutiny.SessionFactory sf; (1)

    @POST
    @Path("/fruits")
    @Consumes(MediaType.APPLICATION_JSON)
    @Bulkhead(1)
    public Uni<Void> post(Fruit fruit) {
        Context context = Vertx.currentContext(); (2)
        return sf.withTransaction(session -> (3)
                kafkaTx.withTransaction(emitter -> (4)
                        session.persist(fruit).invoke(() -> emitter.send(fruit)) (5)
                ).emitOn(context::runOnContext) (6)
        );
    }
}
1 注入 Hibernate Reactive SessionFactory
2 捕获调用者 Vert.x 上下文。
3 开始一个 Hibernate Reactive 事务。
4 Begin a Kafka transaction.
5 将有效载荷持久化,并将实体发送到 Kafka。
6 Kafka 事务在 Kafka 生产者发送器线程上终止。我们需要切换到之前捕获的 Vert.x 上下文,以便在启动它的同一上下文中终止 Hibernate Reactive 事务。

Logging

为了减少 Kafka 客户端写入的日志数量,Quarkus 将以下日志类别的级别设置为 “1”:

  • org.apache.kafka.clients

  • org.apache.kafka.common.utils

  • org.apache.kafka.common.metrics

可以通过将以下行添加到 “2” 中来覆盖配置:

quarkus.log.category."org.apache.kafka.clients".level=INFO
quarkus.log.category."org.apache.kafka.common.utils".level=INFO
quarkus.log.category."org.apache.kafka.common.metrics".level=INFO

Connecting to Managed Kafka clusters

本部分说明如何连接到著名的 Kafka 云服务。

Azure Event Hub

“3” 提供与 Apache Kafka 兼容的端点。

在 “4” 等级中,无法使用用于 Kafka 的 Azure 事件中心。您至少需要 “5” 等级才能使用 Kafka。请参阅 “6” 查看其他选项。

若要使用 TLS 连接到 Azure 事件中心,需要以下配置,并使用 Kafka 协议:

kafka.bootstrap.servers=my-event-hub.servicebus.windows.net:9093 (1)
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=PLAIN
kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ (2)
    username="$ConnectionString" \ (3)
    password="<YOUR.EVENTHUBS.CONNECTION.STRING>"; (4)
1 The port is 9093.
2 您需要使用 JAAS “7”。
3 用户名是 “8” 字符串。
4 由 Azure 提供的事件中心连接字符串。

使用事件中心命名空间的连接字符串替换 “9”。有关获取连接字符串的说明,请参阅 “10”。结果应类似于:

kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="Endpoint=sb://my-event-hub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

此配置可以是全局的(如上所示),也可以设置在通道配置中:

mp.messaging.incoming.$channel.bootstrap.servers=my-event-hub.servicebus.windows.net:9093
mp.messaging.incoming.$channel.security.protocol=SASL_SSL
mp.messaging.incoming.$channel.sasl.mechanism=PLAIN
mp.messaging.incoming.$channel.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="Endpoint=sb://my-event-hub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=...";

Red Hat OpenShift Streams for Apache Kafka

“13” 提供托管的 Kafka 代理。首先,按照 “14” 中的说明创建您的 Kafka 代理实例。确保复制了与您创建的 “12” 关联的客户端 ID 和客户端密钥。

然后,可以将 Quarkus 应用程序配置为如下连接到代理:

kafka.bootstrap.servers=<connection url> (1)
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=PLAIN
kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="${KAFKA_USERNAME}" \ (2)
  password="${KAFKA_PASSWORD}"; (3)
1 在管理控制台上给出的连接字符串,例如 “15”
2 kafka 用户名(服务帐户中的客户端 ID)
3 kafka 密码(服务帐户中的客户端密钥)

通常,这些属性的前缀使用 “16” 来启用它们,仅在运行于生产模式时启用。

正如 Getting started with the rhoas CLI for Red Hat OpenShift Streams for Apache Kafka中解释的那样,要使用 Red Hat OpenShift Streams for Apache Kafka,您必须事先创建主题,创建一个 Service Account,并从该服务帐户提供读取和写入主题的权限。认证数据(客户端 ID 和密钥)与服务帐户相关,这意味着您可以实施细粒度权限并限制对主题的访问。

在使用 Kubernetes 时,建议在 Kubernetes 密钥中设置客户端 ID 和密钥:

apiVersion: v1
kind: Secret
metadata:
  name: kafka-credentials
stringData:
  KAFKA_USERNAME: "..."
  KAFKA_PASSWORD: "..."

要允许您的 Quarkus 应用程序使用该密钥,请将以下行添加到 `application.properties`文件中:

%prod.quarkus.openshift.env.secrets=kafka-credentials

Red Hat OpenShift Service Registry

Red Hat OpenShift Service Registry提供了完全管理的服务注册表,用于处理 Kafka 模式。

您可以按照 Getting started with Red Hat OpenShift Service Registry中的说明进行操作,或使用 `rhoas`CLI 创建新的服务注册表实例:

rhoas service-registry create --name my-schema-registry

请务必注意已创建实例的 Registry URL。对于认证,您可以使用之前创建的相同 ServiceAccount。您需要确保它具有访问服务注册表的必要权限。

例如,可以使用 `rhoas`CLI 将 `MANAGER`角色授予服务帐户:

rhoas service-registry role add --role manager --service-account [SERVICE_ACCOUNT_CLIENT_ID]

然后,您可以将 Quarkus 应用程序配置为连接到模式注册表,如下所示:

mp.messaging.connector.smallrye-kafka.apicurio.registry.url=${RHOAS_SERVICE_REGISTRY_URL} 1
mp.messaging.connector.smallrye-kafka.apicurio.auth.service.token.endpoint=${RHOAS_OAUTH_TOKEN_ENDPOINT} 2
mp.messaging.connector.smallrye-kafka.apicurio.auth.client.id=${RHOAS_CLIENT_ID} 3
mp.messaging.connector.smallrye-kafka.apicurio.auth.client.secret=${RHOAS_CLIENT_ID} 4
1 服务注册表 URL,如 `https://bu98.serviceregistry.rhcloud.com/t/0e95af2c-6e11-475e-82ee-f13bd782df24/apis/registry/v2`所示,显示在管理控制台中
2 OAuth 令牌端点 URL,如 `https://identity.api.openshift.com/auth/realms/rhoas/protocol/openid-connect/token`所示
3 客户端 ID(来自服务帐户)
4 客户端密钥(来自服务帐户)

Binding Red Hat OpenShift managed services to Quarkus application using the Service Binding Operator

如果您的 Quarkus 应用程序部署在安装了 Service Binding OperatorOpenShift Application Services操作符的 Kubernetes 或 OpenShift 集群上,则可以使用 Kubernetes Service Binding向应用程序注入访问 Red Hat OpenShift Streams for Apache Kafka 及服务注册表所需的配置。

为了设置服务绑定,您首先需要将 OpenShift 托管服务连接到您的集群。对于 OpenShift 集群,您可以按照 Connecting a Kafka and Service Registry instance to your OpenShift cluster中的说明进行操作。

在将您的集群与 RHOAS Kafka 及服务注册表实例连接后,请确保已向新创建的服务帐户授予必要的权限。

然后,您可以使用 Kubernetes Service Binding扩展将 Quarkus 应用程序配置为生成这些服务的 `ServiceBinding`资源:

quarkus.kubernetes-service-binding.detect-binding-resources=true

quarkus.kubernetes-service-binding.services.kafka.api-version=rhoas.redhat.com/v1alpha1
quarkus.kubernetes-service-binding.services.kafka.kind=KafkaConnection
quarkus.kubernetes-service-binding.services.kafka.name=my-kafka

quarkus.kubernetes-service-binding.services.serviceregistry.api-version=rhoas.redhat.com/v1alpha1
quarkus.kubernetes-service-binding.services.serviceregistry.kind=ServiceRegistryConnection
quarkus.kubernetes-service-binding.services.serviceregistry.name=my-schema-registry

对于此示例,Quarkus 构建将生成以下 `ServiceBinding`资源:

apiVersion: binding.operators.coreos.com/v1alpha1
kind: ServiceBinding
metadata:
  name: my-app-kafka
spec:
  application:
    group: apps.openshift.io
    name: my-app
    version: v1
    kind: DeploymentConfig
  services:
    - group: rhoas.redhat.com
      version: v1alpha1
      kind: KafkaConnection
      name: my-kafka
  detectBindingResources: true
  bindAsFiles: true
---
apiVersion: binding.operators.coreos.com/v1alpha1
kind: ServiceBinding
metadata:
  name: my-app-serviceregistry
spec:
  application:
    group: apps.openshift.io
    name: my-app
    version: v1
    kind: DeploymentConfig
  services:
    - group: rhoas.redhat.com
      version: v1alpha1
      kind: ServiceRegistryConnection
      name: my-schema-registry
  detectBindingResources: true
  bindAsFiles: true

您可以按照 Deploying to OpenShift部署您的应用程序,包括生成的 `ServiceBinding`资源。用于访问 Kafka 及模式注册表实例的配置属性将在部署时自动注入到应用程序中。

Going further

本指南展示了如何使用 Quarkus 与 Kafka 进行交互。它利用 Quarkus Messaging 构建数据流应用程序。

如果您想进一步了解,请查看 SmallRye Reactive Messaging的文档,这是Quarkus中使用的实现。