Using Apache Kafka with Schema Registry and Avro

本指南展示了 Quarkus 应用程序如何使用 Apache Kafka、“ Avro”序列化的记录,以及连接到模式注册表(如“ Confluent Schema Registry”或“ Apicurio Registry”)。 如果您不熟悉 Kafka 和 Quarkus 中的 Kafka,请考虑首先阅读“Using Apache Kafka with Reactive Messaging”指南。

Prerequisites

如要完成本指南,您需要:

  • Roughly 15 minutes

  • An IDE

  • 安装了 JDK 17+,已正确配置 JAVA_HOME

  • Apache Maven ${proposed-maven-version}

  • 如果你想使用 Quarkus CLI, 则可以选择使用

  • 如果你想构建一个本机可执行文件(或如果你使用本机容器构建,则使用 Docker),则可以选择安装 Mandrel 或 GraalVM 以及 configured appropriately

Architecture

在本指南中,我们要实现一个 REST 资源,即“MovieResource”,它将消耗电影 DTO 并将它们放入 Kafka 主题。

然后,我们将实现一个消费者,它将消耗并收集来自同一主题的消息。收集到的消息将随后通过“ Server-Sent Events”公开另一个资源“ConsumedMovieResource”。

Movies”将使用 Avro 序列化和反序列化。“Movie”的模式存储在 Apicurio Registry 中。如果您使用的是 Confluent Avro “serde”和 Confluent Schema Registry,则适用于相同概念。

Solution

我们建议您遵循接下来的部分中的说明,按部就班地创建应用程序。然而,您可以直接跳到完成的示例。

克隆 Git 存储库: git clone $${quickstarts-base-url}.git,或下载 $${quickstarts-base-url}/archive/main.zip[存档]。

解决方案位于“kafka-avro-schema-quickstartdirectory”。

Creating the Maven Project

首先,我们需要一个新项目。使用以下命令创建一个新项目:

CLI
quarkus create app {create-app-group-id}:{create-app-artifact-id} \
    --no-code
cd {create-app-artifact-id}

要创建一个 Gradle 项目,添加 --gradle--gradle-kotlin-dsl 选项。 有关如何安装和使用 Quarkus CLI 的详细信息,请参见 Quarkus CLI 指南。

Maven
mvn {quarkus-platform-groupid}:quarkus-maven-plugin:{quarkus-version}:create \
    -DprojectGroupId={create-app-group-id} \
    -DprojectArtifactId={create-app-artifact-id} \
    -DnoCode
cd {create-app-artifact-id}

要创建一个 Gradle 项目,添加 -DbuildTool=gradle-DbuildTool=gradle-kotlin-dsl 选项。

适用于 Windows 用户:

  • 如果使用 cmd,(不要使用反斜杠 \ ,并将所有内容放在同一行上)

  • 如果使用 Powershell,将 -D 参数用双引号引起来,例如 "-DprojectArtifactId={create-app-artifact-id}"

如果您使用的是 Confluent Schema Registry,则不需要“quarkus-apicurio-registry-avro”扩展。相反,您需要“quarkus-confluent-registry-avro”扩展和更多一些依赖项。有关详细信息,请参阅“Using the Confluent Schema Registry”。

Avro schema

Apache Avro 是一个数据序列化系统。数据结构使用模式进行描述。我们需要做的第一件事是创建一个描述“Movie”结构的模式。使用我们记录(Kafka 消息)的模式创建一个名为“src/main/avro/movie.avsc”的文件:

{
  "namespace": "org.acme.kafka.quarkus",
  "type": "record",
  "name": "Movie",
  "fields": [
    {
      "name": "title",
      "type": "string"
    },
    {
      "name": "year",
      "type": "int"
    }
  ]
}

如果您使用以下方式构建项目:

CLI
quarkus build
Maven
./mvnw install
Gradle
./gradlew build

movies.avsc”将被编译为“Movie.java”文件,并放置在“target/generated-sources/avsc”目录中。

查看“ Avro specification”以了解更多关于 Avro 语法和受支持类型的知识。

使用 Quarkus 无需使用特定 Maven 插件来处理 Avro 模式,所有这些操作都将通过“quarkus-avro”扩展为您完成!

如果您使用以下方式运行该项目:

CLI
quarkus dev
Maven
./mvnw quarkus:dev
Gradle
./gradlew --console=plain quarkusDev

您对模式文件所做的更改将自动应用到生成的 Java 文件中。

The Movie producer

定义了架构后,我们现在可以跳至实现 MovieResource

让我们打开 MovieResource,注入一个 Movie DTO 的 Emitter,并实现一个 @POST 方法,该方法消耗 Movie 并通过 Emitter 发送它:

package org.acme.kafka;

import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.jboss.logging.Logger;

import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;

@Path("/movies")
public class MovieResource {
    private static final Logger LOGGER = Logger.getLogger(MovieResource.class);

    @Channel("movies")
    Emitter<Movie> emitter;

    @POST
    public Response enqueueMovie(Movie movie) {
        LOGGER.infof("Sending movie %s to Kafka", movie.getTitle());
        emitter.send(movie);
        return Response.accepted().build();
    }

}

现在,我们需要将 movies 通道(Emitter 发送到此通道)map 到 Kafka 主题。为了实现这一点,编辑 application.properties 文件,并添加以下内容:

# set the connector for the outgoing channel to `smallrye-kafka`
mp.messaging.outgoing.movies.connector=smallrye-kafka

# set the topic name for the channel to `movies`
mp.messaging.outgoing.movies.topic=movies

# automatically register the schema with the registry, if not present
mp.messaging.outgoing.movies.apicurio.registry.auto-register=true

你可能已经注意到,我们没有定义 value.serializer。那是因为 Quarkus 可以 autodetect 适当的 io.apicurio.registry.serde.avro.AvroKafkaSerializer 在这里,基于 @Channel 声明、Movie 类型结构和 Apicurio 注册中心库的存在。我们仍要定义 apicurio.registry.auto-register 属性。 如果您使用 Confluent Schema Registry,那么您也无需配置 value.serializer。它也会被自动检测到。Confluent Schema Registry 的 apicurio.registry.auto-register 类似物被称为 auto.register.schemas。其默认值为 true,因此在本示例中无需对其进行配置。如果您想禁用自动架构注册,可以将其显式设置为 false

The Movie consumer

因此,我们可以将包含 Movie 数据的记录写入 Kafka 中。该数据使用 Avro 序列化。现在,是时候为它们实现一个消费者了。

让我们创建一个将消费 movies-from-kafka 通道中的 Movie 消息并通过服务器端事件公开它的 ConsumedMovieResource

package org.acme.kafka;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.jboss.resteasy.reactive.RestStreamElementType;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
@Path("/consumed-movies")
public class ConsumedMovieResource {

    @Channel("movies-from-kafka")
    Multi<Movie> movies;

    @GET
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Multi<String> stream() {
        return movies.map(movie -> String.format("'%s' from %s", movie.getTitle(), movie.getYear()));
    }
}

应用程序代码的最后一部分是在 application.properties 中配置 movies-from-kafka 通道:

# set the connector for the incoming channel to `smallrye-kafka`
mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka

# set the topic name for the channel to `movies`
mp.messaging.incoming.movies-from-kafka.topic=movies

# disable auto-commit, Reactive Messaging handles it itself
mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false

mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest

您可能已经注意到我们没有定义 value.deserializer。这是因为 Quarkus 可以 autodetect,即 io.apicurio.registry.serde.avro.AvroKafkaDeserializer 在此处合适,具体取决于 @Channel 声明、Movie 类型的结构以及 Apicurio Registry 库的存在。我们也不必定义 apicurio.registry.use-specific-avro-reader 属性,它也会自动配置。 如果您使用 Confluent Schema Registry,那么您也不必配置 value.deserializerspecific.avro.reader。它们都会自动被检测到。

Running the application

在开发模式下启动应用程序:

CLI
quarkus dev
Maven
./mvnw quarkus:dev
Gradle
./gradlew --console=plain quarkusDev

多亏了开发服务,Kafka 代理和 Apicurio Registry 实例会自动启动。有关更多详细信息,请参见 Dev Services for KafkaDev Services for Apicurio Registry

您可能已经注意到我们没有在任何地方配置架构注册表 URL。这是因为 Apicurio Registry 的开发服务将 Quarkus Messaging 中的 Kafka 通道配置为使用自动启动的注册表实例。 Apicurio Registry 除了其原生 API 以外,还公开了与 Confluent Schema Registry 在 API 兼容的端点。因此,这种自动配置对 Apicurio Registry serde 和 Confluent Schema Registry serde 都有效。 但是,请注意,没有任何 Dev Services 支持可以运行 Confluent Schema Registry 本身。如果你想使用 Confluent Schema Registry 的运行中实例,请与 Kafka 代理一起配置其 URL:

kafka.bootstrap.servers=PLAINTEXT://localhost:9092
mp.messaging.connector.smallrye-kafka.schema.registry.url=http://localhost:8081

在第二个终端中,使用 curl 查询 ConsumedMovieResource 资源:

curl -N http://localhost:8080/consumed-movies

在第三个中,发布一些电影:

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"The Shawshank Redemption","year":1994}' \
  http://localhost:8080/movies

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"The Godfather","year":1972}' \
  http://localhost:8080/movies

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"The Dark Knight","year":2008}' \
  http://localhost:8080/movies

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"12 Angry Men","year":1957}' \
  http://localhost:8080/movies

观察在第二个终端中打印的内容。您应该看到类似以下的内容:

data:'The Shawshank Redemption' from 1994

data:'The Godfather' from 1972

data:'The Dark Knight' from 2008

data:'12 Angry Men' from 1957

Running in JVM or Native mode

在非开发或测试模式下运行时,您需要启动自己的 Kafka 代理和 Apicurio Registry。使它们运行的最简单方法是使用 docker-compose 启动适当的容器。

如果您使用 Confluent Schema Registry,则您已经运行了 Kafka 代理和 Confluent Schema Registry 实例并将其配置好了。您可以忽略此处的 docker-compose 说明,以及 Apicurio Registry 配置。

在项目的根目录创建一个 docker-compose.yaml 文件,内容如下:

version: '2'

services:

  zookeeper:
    image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
    command: [
        "sh", "-c",
        "bin/zookeeper-server-start.sh config/zookeeper.properties"
    ]
    ports:
      - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs

  kafka:
    image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
    command: [
        "sh", "-c",
        "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
    ]
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

  schema-registry:
    image: apicurio/apicurio-registry-mem:2.4.2.Final
    ports:
      - 8081:8080
    depends_on:
      - kafka
    environment:
      QUARKUS_PROFILE: prod

在启动应用程序之前,我们首先启动 Kafka 代理和 Apicurio Registry:

docker-compose up

要停止容器,请使用 docker-compose down。您还可以使用 docker-compose rm 清理容器。

您可以使用以下命令构建应用程序:

CLI
quarkus build
Maven
./mvnw install
Gradle
./gradlew build

并使用以下命令在 JVM 模式中运行它:

java -Dmp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2 -jar target/quarkus-app/quarkus-run.jar

默认情况下,应用程序尝试连接到在 localhost:9092 侦听的 Kafka 代理。您可以使用以下命令配置引导服务器: java -Dkafka.bootstrap.servers=…​ -jar target/quarkus-app/quarkus-run.jar

在命令行上指定注册表 URL 不太方便,因此您只能为 prod 配置文件添加一个配置属性:

%prod.mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2

您可以使用以下命令构建一个本机可执行文件:

CLI
quarkus build --native
Maven
./mvnw install -Dnative
Gradle
./gradlew build -Dquarkus.native.enabled=true

并使用以下命令运行它:

./target/kafka-avro-schema-quickstart-1.0.0-SNAPSHOT-runner -Dkafka.bootstrap.servers=localhost:9092

Testing the application

如上所述,Kafka 和 Apicurio Registry 的开发服务在开发模式和测试中自动启动并配置一个 Kafka 代理和 Apicurio Registry 实例。因此,我们不必自己设置 Kafka 和 Apicurio Registry。我们只需专注于编写测试。

首先,让我们将对 REST 客户端和 Awaitility 的测试依赖关系添加到构建文件中:

pom.xml
<!-- we'll use Jakarta REST Client for talking to the SSE endpoint -->
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-rest-client</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <scope>test</scope>
</dependency>
build.gradle
testImplementation("io.quarkus:quarkus-rest-client")
testImplementation("org.awaitility:awaitility")

在测试中,我们将循环发送电影并检查 ConsumedMovieResource 是否返回了我们发送的内容。

package org.acme.kafka;

import io.quarkus.test.common.WithTestResource;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.http.ContentType;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.sse.SseEventSource;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static io.restassured.RestAssured.given;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;

@QuarkusTest
public class MovieResourceTest {

    @TestHTTPResource("/consumed-movies")
    URI consumedMovies;

    @Test
    public void testHelloEndpoint() throws InterruptedException {
        // create a client for `ConsumedMovieResource` and collect the consumed resources in a list
        Client client = ClientBuilder.newClient();
        WebTarget target = client.target(consumedMovies);

        List<String> received = new CopyOnWriteArrayList<>();

        SseEventSource source = SseEventSource.target(target).build();
        source.register(inboundSseEvent -> received.add(inboundSseEvent.readData()));

        // in a separate thread, feed the `MovieResource`
        ExecutorService movieSender = startSendingMovies();

        source.open();

        // check if, after at most 5 seconds, we have at least 2 items collected, and they are what we expect
        await().atMost(5, SECONDS).until(() -> received.size() >= 2);
        assertThat(received, Matchers.hasItems("'The Shawshank Redemption' from 1994",
                "'12 Angry Men' from 1957"));
        source.close();

        // shutdown the executor that is feeding the `MovieResource`
        movieSender.shutdownNow();
        movieSender.awaitTermination(5, SECONDS);
    }

    private ExecutorService startSendingMovies() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(() -> {
            while (true) {
                given()
                        .contentType(ContentType.JSON)
                        .body("{\"title\":\"The Shawshank Redemption\",\"year\":1994}")
                .when()
                        .post("/movies")
                .then()
                        .statusCode(202);

                given()
                        .contentType(ContentType.JSON)
                        .body("{\"title\":\"12 Angry Men\",\"year\":1957}")
                .when()
                        .post("/movies")
                .then()
                        .statusCode(202);

                try {
                    Thread.sleep(200L);
                } catch (InterruptedException e) {
                    break;
                }
            }
        });
        return executorService;
    }

}

我们修改了与该项目一起生成的 MovieResourceTest。此测试类具有一个子类 NativeMovieResourceIT,它针对本机可执行文件运行相同的测试。要运行它,请执行:

CLI
quarkus build --native
Maven
./mvnw install -Dnative
Gradle
./gradlew build -Dquarkus.native.enabled=true

Manual setup

如果我们无法使用开发服务并希望手动启动 Kafka 代理和 Apicurio Registry 实例,我们将定义一个 QuarkusTestResourceLifecycleManager

pom.xml
<dependency>
    <groupId>io.strimzi</groupId>
    <artifactId>strimzi-test-container</artifactId>
    <version>0.105.0</version>
    <scope>test</scope>
    <exclusions>
        <exclusion>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>
build.gradle
testImplementation("io.strimzi:strimzi-test-container:0.105.0") {
    exclude group: "org.apache.logging.log4j", module: "log4j-core"
}
package org.acme.kafka;

import java.util.HashMap;
import java.util.Map;

import org.testcontainers.containers.GenericContainer;

import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import io.strimzi.StrimziKafkaContainer;

public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager {

    private final StrimziKafkaContainer kafka = new StrimziKafkaContainer();

    private GenericContainer<?> registry;

    @Override
    public Map<String, String> start() {
        kafka.start();
        registry = new GenericContainer<>("apicurio/apicurio-registry-mem:2.4.2.Final")
                .withExposedPorts(8080)
                .withEnv("QUARKUS_PROFILE", "prod");
        registry.start();
        Map<String, String> properties = new HashMap<>();
        properties.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url",
                "http://" + registry.getHost() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2");
        properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers());
        return properties;
    }

    @Override
    public void stop() {
        registry.stop();
        kafka.stop();
    }
}
@QuarkusTest
@WithTestResource(KafkaAndSchemaRegistryTestResource.class)
public class MovieResourceTest {
    ...
}

Using compatible versions of the Apicurio Registry

quarkus-apicurio-registry-avro 扩展依赖于 Apicurio Registry 客户端的最新版本,并且大多数版本的 Apicurio Registry 服务器和客户端都向后兼容。对于某些版本,您需要确保 Serdes 使用的客户端与服务器兼容。

例如,如果您将 Apicurio dev 服务的映像名称设置为使用版本 2.1.5.Final

quarkus.apicurio-registry.devservices.image-name=quay.io/apicurio/apicurio-registry-mem:2.1.5.Final

您需要确保 apicurio-registry-serdes-avro-serde 依赖项和 REST 客户端 apicurio-common-rest-client-vertx 依赖项都设置为兼容版本:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-apicurio-registry-avro</artifactId>
    <exclusions>
        <exclusion>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-common-rest-client-vertx</artifactId>
        </exclusion>
        <exclusion>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-registry-serdes-avro-serde</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>io.apicurio</groupId>
    <artifactId>apicurio-registry-client</artifactId>
    <version>2.1.5.Final</version>
</dependency>
<dependency>
    <groupId>io.apicurio</groupId>
    <artifactId>apicurio-registry-common</artifactId>
    <version>2.1.5.Final</version>
</dependency>
<dependency>
    <groupId>io.apicurio</groupId>
    <artifactId>apicurio-registry-serdes-avro-serde</artifactId>
    <version>2.1.5.Final</version>
    <exclusions>
        <exclusion>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-common-rest-client-jdk</artifactId>
        </exclusion>
        <exclusion>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-registry-client</artifactId>
        </exclusion>
        <exclusion>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-registry-common</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>io.apicurio</groupId>
    <artifactId>apicurio-common-rest-client-vertx</artifactId>
    <version>0.1.5.Final</version>
</dependency>
build.gradle
dependencies {
    implementation(platform("{quarkus-platform-groupid}:quarkus-bom:2.12.3.Final"))

    ...

    implementation("io.quarkus:quarkus-apicurio-registry-avro")
    implementation("io.apicurio:apicurio-registry-serdes-avro-serde") {
        exclude group: "io.apicurio", module: "apicurio-common-rest-client-jdk"
        exclude group: "io.apicurio", module: "apicurio-registry-client"
        exclude group: "io.apicurio", module: "apicurio-registry-common"
        version {
            strictly "2.1.5.Final"
        }
    }
    implementation("io.apicurio:apicurio-registry-client") {
        version {
            strictly "2.1.5.Final"
        }
    }
    implementation("io.apicurio:apicurio-registry-common") {
        version {
            strictly "2.1.5.Final"
        }
    }
    implementation("io.apicurio:apicurio-common-rest-client-vertx") {
        version {
            strictly "0.1.5.Final"
        }
    }
}

apicurio-registry-clientapicurio-common-rest-client-vertx 的已知先前兼容版本如下所列:

  • apicurio-registry-client 2.1.5.Final 与 apicurio-common-rest-client-vertx 0.1.5.Final

  • apicurio-registry-client 2.3.1.Final 与 apicurio-common-rest-client-vertx 0.1.13.Final

Using the Confluent Schema Registry

如果您想使用 Confluent Schema Registry,则需要 quarkus-confluent-registry-avro 扩展,而不是 quarkus-apicurio-registry-avro 扩展。此外,您需要向 pom.xml / build.gradle 文件中添加一些依赖项和一个自定义 Maven 存储库:

pom.xml
<dependencies>
    ...
    <!-- the extension -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-confluent-registry-avro</artifactId>
    </dependency>
    <!-- Confluent registry libraries use Jakarta REST client -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-rest-client</artifactId>
    </dependency>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-avro-serializer</artifactId>
        <version>7.2.0</version>
        <exclusions>
            <exclusion>
                <groupId>jakarta.ws.rs</groupId>
                <artifactId>jakarta.ws.rs-api</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>

<repositories>
    <!-- io.confluent:kafka-avro-serializer is only available from this repository: -->
    <repository>
        <id>confluent</id>
        <url>https://packages.confluent.io/maven/</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
</repositories>
build.gradle
repositories {
    ...

    maven {
        url "https://packages.confluent.io/maven/"
    }
}

dependencies {
    ...

    implementation("io.quarkus:quarkus-confluent-registry-avro")

    // Confluent registry libraries use Jakarta REST client
    implementation("io.quarkus:quarkus-rest-client")

    implementation("io.confluent:kafka-avro-serializer:7.2.0") {
        exclude group: "jakarta.ws.rs", module: "jakarta.ws.rs-api"
    }
}

在 JVM 模式下,可以使用任何版本的 io.confluent:kafka-avro-serializer。在原生模式中,Quarkus 支持以下版本: 6.2.x7.0.x7.1.x7.2.x7.3.x

对于版本 7.4.x7.5.x,由于 Confluent Schema Serializer 出现问题,因此您需要添加另一个依赖项:

pom.xml
<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-csv</artifactId>
</dependency>
build.gradle
dependencies {
    implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-csv")
}

对于任何其他版本,可能需要调整原生配置。

Avro code generation details

在本指南中,我们使用 Quarkus 代码生成机制从 Avro 模式生成 Java 文件。

实际上,该机制使用 org.apache.avro:avro-compiler

您可以使用以下配置属性来更改其工作方式:

  • avro.codegen.[avsc|avdl|avpr].imports - 应首先编译的文件或目录列表,从而使随后编译的模式可以导入它们。请注意,导入的文件不应相互引用。所有路径都应相对于构建系统配置的任何源目录中的 src/[main|test]/avro 目录或 avro 子目录。以逗号分隔的列表形式传递。

  • avro.codegen.stringType - 用于 Avro 字符串的 Java 类型。可以是 CharSequenceStringUtf8 之一。默认为 String

  • avro.codegen.createOptionalGetters - 启用生成返回所请求类型可选值的 getOptional&#8230;&#8203; 方法。默认为 false

  • avro.codegen.enableDecimalLogicalType - 确定是否为十进制类型使用 Java 类,默认为 false

  • avro.codegen.createSetters - 确定是否为记录的字段创建设置器。默认为 false

  • avro.codegen.gettersReturnOptional - 启用生成返回所请求类型可选值的 get&#8230;&#8203; 方法。默认为 false

  • avro.codegen.optionalGettersForNullableFieldsOnly,与 gettersReturnOptional 选项配合使用。如果设置了它,则只会为可为空的字段生成 Optional 获取器。如果字段是必需的,则将生成常规获取器。默认为 false

Further reading