Using Apache Kafka with Schema Registry and JSON Schema

本指南展示了你的 Quarkus 应用程序如何使用 Apache Kakfa、 JSON Schema 序列化记录,以及如何连至模式注册表(例如 Confluent Schema RegistryApicurio Registry)。

This guide shows how your Quarkus application can use Apache Kafka, JSON Schema serialized records, and connect to a schema registry (such as the Confluent Schema Registry or Apicurio Registry).

如果您不熟悉 Kafka 和 Quarkus 中的 Kafka,请考虑首先阅读“Using Apache Kafka with Reactive Messaging”指南。

If you are not familiar with Kafka and Kafka in Quarkus in particular, consider first going through the Using Apache Kafka with Reactive Messaging guide.

Prerequisites

Unresolved directive in kafka-schema-registry-json-schema.adoc - include::{includes}/prerequisites.adoc[]

Architecture

在本指南中,我们要实现一个 REST 资源,即“MovieResource”,它将消耗电影 DTO 并将它们放入 Kafka 主题。

In this guide we are going to implement a REST resource, namely MovieResource, that will consume movie DTOs and put them in a Kafka topic.

然后,我们将实现一个消费者,它将消耗并收集来自同一主题的消息。收集到的消息将随后通过“ Server-Sent Events”公开另一个资源“ConsumedMovieResource”。

Then, we will implement a consumer that will consume and collect messages from the same topic. The collected messages will be then exposed by another resource, ConsumedMovieResource, via Server-Sent Events.

JSON Schema 将对 Movies 执行序列化和反序列化。描述 Movie 的模式存储在 Apicurio Registry 中。如果你使用 Confluent JSON Schema serde 和 Confluent Schema Registry,则也适用相同概念。

The Movies will be serialized and deserialized using JSON Schema. The schema, describing the Movie, is stored in Apicurio Registry. The same concept applies if you are using the Confluent JSON Schema serde and Confluent Schema Registry.

Solution

我们建议您遵循接下来的部分中的说明,按部就班地创建应用程序。然而,您可以直接跳到完成的示例。

We recommend that you follow the instructions in the next sections and create the application step by step. However, you can go right to the completed example.

克隆 Git 存储库: git clone {quickstarts-clone-url},或下载 {quickstarts-archive-url}[存档]。

Clone the Git repository: git clone {quickstarts-clone-url}, or download an {quickstarts-archive-url}[archive].

解决方案位于 kafka-json-schema-quickstart directory 中。

The solution is located in the kafka-json-schema-quickstart directory.

Creating the Maven Project

首先,我们需要一个新项目。使用以下命令创建一个新项目:

First, we need a new project. Create a new project with the following command:

Unresolved directive in kafka-schema-registry-json-schema.adoc - include::{includes}/devtools/create-app.adoc[]

如果你使用 Confluent Schema Registry,则不需要 quarkus-apicurio-registry-json-schema 扩展。相反,你需要 quarkus-confluent-registry-json-schema 扩展和更多依赖项。参见 Using the Confluent Schema Registry 了解详情。

If you use Confluent Schema Registry, you don’t need the quarkus-apicurio-registry-json-schema extension. Instead, you need the quarkus-confluent-registry-json-schema extension and a few more dependencies. See Using the Confluent Schema Registry for details.

Json Schema

Json Schema 是一个数据序列化系统。数据结构通过模式来描述。我们首先需要做的就是创建一个模式来描述 Movie 结构。为我们的记录创建名为 src/main/resources/json-schema.json 的模式(Kafka 消息):

Json Schema is a data serialization system. Data structures are described using schemas. The first thing we need to do is to create a schema describing the Movie structure. Create a file called src/main/resources/json-schema.json with the schema for our record (Kafka message):

{
  "$id": "https://example.com/person.schema.json",
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "Movie",
  "type": "object",
  "properties": {
    "title": {
      "type": "string",
      "description": "The movie's title."
    },
    "year": {
      "type": "integer",
      "description": "The movie's year."
    }
  }
}

请注意,无法从 JSON Schema 定义自动生成 Java 类。因此,您必须按如下定义 Java 类,以便串行化过程能使用它:

Note that auto-generating the Java class from the JSON Schema definition is not possible. Therefore, you must define the Java class as follows so it can be used by the serialization process:

package org.acme.kafka;

public class Movie {

    private String title;
    private Integer year;

    public Movie() {
    }

    public Movie(String title, Integer year) {
        this.title = title;
        this.year = year;
    }

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public Integer getYear() {
        return year;
    }

    public void setYear(Integer year) {
        this.year = year;
    }
}

The Movie producer

定义了架构后,我们现在可以跳至实现 MovieResource

Having defined the schema, we can now jump to implementing the MovieResource.

让我们打开 MovieResource,注入一个 Movie DTO 的 Emitter,并实现一个 @POST 方法,该方法消耗 Movie 并通过 Emitter 发送它:

Let’s open the MovieResource, inject an Emitter of Movie DTO and implement a @POST method that consumes Movie and sends it through the Emitter:

package org.acme.kafka;

import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.jboss.logging.Logger;

import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;

@Path("/movies")
public class MovieResource {
    private static final Logger LOGGER = Logger.getLogger(MovieResource.class);

    @Channel("movies")
    Emitter<Movie> emitter;

    @POST
    public Response enqueueMovie(Movie movie) {
        LOGGER.infof("Sending movie %s to Kafka", movie.getTitle());
        emitter.send(movie);
        return Response.accepted().build();
    }

}

现在,我们需要将 mapmovies 通道(Emitter 发出的信号给此通道)map 到一个 Kafka 主题,并 map 在此通道上使用的模式。为了实现这一点,请编辑 application.properties 文件,并添加如下内容:

Now, we need to map the movies channel (the Emitter emits to this channel) to a Kafka topic and also map the schema to be used on this channel. To achieve this, edit the application.properties file, and add the following content:

# set the connector for the outgoing channel to `smallrye-kafka`
mp.messaging.outgoing.movies.connector=smallrye-kafka

# disable automatic detection of the serializers
quarkus.messaging.kafka.serializer-autodetection.enabled=false

# Set the value serializer for the channel `movies`
mp.messaging.outgoing.movies.value.serializer=io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer

# set the topic name for the channel to `movies`
mp.messaging.outgoing.movies.topic=movies

# set the schema to be used for the channel `movies`. Note that this property accepts just a name or a path and the serializer will look for the resource on the classpath.
mp.messaging.outgoing.movies.apicurio.registry.artifact.schema.location=json-schema.json

# automatically register the schema with the registry, if not present
mp.messaging.outgoing.movies.apicurio.registry.auto-register=true

请注意,与 avro 序列化不同,autodetect 不能用于 JSON 模式,所以我们必须定义 value.serializer。与 avro 一样,我们仍然必须定义 apicurio.registry.auto-register 属性。

Note that unlike in the avro serialization, autodetect can’t be used with JSON Schema, so we must define the value.serializer. Just like with avro, we still have to define the apicurio.registry.auto-register property.

如果您使用的是 Confluent Schema Registry,那么在这种情况下,您还必须将 value.serializer 定义为值 io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer。它也会被自动检测到。Confluent Schema Registry 中与 apicurio.registry.auto-register 对应的类似物叫做 auto.register.schemas。它默认为 true,因此在此示例中无需配置它。如果您想禁用自动模式注册,则可以将其明确设置为 false

If you use Confluent Schema Registry, in this case you must define the value.serializer as well with the value io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer. It is also detected automatically. The Confluent Schema Registry analogue of apicurio.registry.auto-register is called auto.register.schemas. It defaults to true, so it doesn’t have to be configured in this example. It can be explicitly set to false if you want to disable automatic schema registration.

The Movie consumer

因此,我们可以向 Kafka 中写入包含 Movie 数据的记录。该数据使用 JSON Schema 进行序列化。现在,是时候为它们实现一个消费者了。

So, we can write records into Kafka containing our Movie data. That data is serialized using JSON Schema. Now, it’s time to implement a consumer for them.

让我们创建一个将消费 movies-from-kafka 通道中的 Movie 消息并通过服务器端事件公开它的 ConsumedMovieResource

Let’s create ConsumedMovieResource that will consume Movie messages from the movies-from-kafka channel and will expose it via Server-Sent Events:

package org.acme.kafka;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.jboss.resteasy.reactive.RestStreamElementType;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
@Path("/consumed-movies")
public class ConsumedMovieResource {

    @Channel("movies-from-kafka")
    Multi<Movie> movies;

    @GET
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Multi<String> stream() {
        return movies.map(movie -> String.format("'%s' from %s", movie.getTitle(), movie.getYear()));
    }
}

应用程序代码的最后一部分是在 application.properties 中配置 movies-from-kafka 通道:

The last bit of the application’s code is the configuration of the movies-from-kafka channel in application.properties:

# set the connector for the incoming channel to `smallrye-kafka`
mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka

# set the topic name for the channel to `movies`
mp.messaging.incoming.movies-from-kafka.topic=movies

# set the deserializer for the incoming channel
mp.messaging.incoming.movies-from-kafka.value.deserializer=io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer

# disable auto-commit, Reactive Messaging handles it itself
mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false

mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest

同样,与 Avro 不同,我们必须定义 value.deserializer

Again, unlike with Avro, we have to define the value.deserializer.

如果您使用的是 Confluent Schema Registry,那么您还必须将 value.deserializer 配置为值 ´io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer´。它们都会被自动检测到。

If you use Confluent Schema Registry, you must configure value.deserializer as well with the value ´io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer´. They are both detected automatically.

Running the application

在开发模式下启动应用程序:

Start the application in dev mode:

Unresolved directive in kafka-schema-registry-json-schema.adoc - include::{includes}/devtools/dev.adoc[]

多亏了开发服务,Kafka 代理和 Apicurio Registry 实例会自动启动。有关更多详细信息,请参见 Dev Services for KafkaDev Services for Apicurio Registry

Kafka broker and Apicurio Registry instance are started automatically thanks to Dev Services. See Dev Services for Kafka and Dev Services for Apicurio Registry for more details.

您可能已经注意到我们没有在任何地方配置架构注册表 URL。这是因为 Apicurio Registry 的开发服务将 Quarkus Messaging 中的 Kafka 通道配置为使用自动启动的注册表实例。

You might have noticed that we didn’t configure the schema registry URL anywhere. This is because Dev Services for Apicurio Registry configures all Kafka channels in Quarkus Messaging to use the automatically started registry instance.

Apicurio Registry 除了其原生 API 以外,还公开了与 Confluent Schema Registry 在 API 兼容的端点。因此,这种自动配置对 Apicurio Registry serde 和 Confluent Schema Registry serde 都有效。

Apicurio Registry, in addition to its native API, also exposes an endpoint that is API-compatible with Confluent Schema Registry. Therefore, this automatic configuration works both for Apicurio Registry serde and Confluent Schema Registry serde.

但是,请注意,没有任何 Dev Services 支持可以运行 Confluent Schema Registry 本身。如果你想使用 Confluent Schema Registry 的运行中实例,请与 Kafka 代理一起配置其 URL:

However, note that there’s no Dev Services support for running Confluent Schema Registry itself. If you want to use a running instance of Confluent Schema Registry, configure its URL, together with the URL of a Kafka broker:

kafka.bootstrap.servers=PLAINTEXT://localhost:9092
mp.messaging.connector.smallrye-kafka.schema.registry.url=http://localhost:8081

在第二个终端中,使用 curl 查询 ConsumedMovieResource 资源:

In the second terminal, query the ConsumedMovieResource resource with curl:

curl -N http://localhost:8080/consumed-movies

在第三个中,发布一些电影:

In the third one, post a few movies:

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"The Shawshank Redemption","year":1994}' \
  http://localhost:8080/movies

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"The Godfather","year":1972}' \
  http://localhost:8080/movies

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"The Dark Knight","year":2008}' \
  http://localhost:8080/movies

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"12 Angry Men","year":1957}' \
  http://localhost:8080/movies

观察在第二个终端中打印的内容。您应该看到类似以下的内容:

Observe what is printed in the second terminal. You should see something along the lines of:

data:'The Shawshank Redemption' from 1994

data:'The Godfather' from 1972

data:'The Dark Knight' from 2008

data:'12 Angry Men' from 1957

Running in JVM or Native mode

在非开发或测试模式下运行时,您需要启动自己的 Kafka 代理和 Apicurio Registry。使它们运行的最简单方法是使用 docker-compose 启动适当的容器。

When not running in dev or test mode, you will need to start your own Kafka broker and Apicurio Registry. The easiest way to get them running is to use docker-compose to start the appropriate containers.

如果您使用 Confluent Schema Registry,则您已经运行了 Kafka 代理和 Confluent Schema Registry 实例并将其配置好了。您可以忽略此处的 docker-compose 说明,以及 Apicurio Registry 配置。

If you use Confluent Schema Registry, you already have a Kafka broker and Confluent Schema Registry instance running and configured. You can ignore the docker-compose instructions here, as well as the Apicurio Registry configuration.

在项目的根目录创建一个 docker-compose.yaml 文件,内容如下:

Create a docker-compose.yaml file at the root of the project with the following content:

version: '2'

services:

  zookeeper:
    image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
    command: [
        "sh", "-c",
        "bin/zookeeper-server-start.sh config/zookeeper.properties"
    ]
    ports:
      - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs

  kafka:
    image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
    command: [
        "sh", "-c",
        "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
    ]
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

  schema-registry:
    image: apicurio/apicurio-registry-mem:2.4.2.Final
    ports:
      - 8081:8080
    depends_on:
      - kafka
    environment:
      QUARKUS_PROFILE: prod

在启动应用程序之前,我们首先启动 Kafka 代理和 Apicurio Registry:

Before starting the application, let’s first start the Kafka broker and Apicurio Registry:

docker-compose up

要停止容器,请使用 docker-compose down。您还可以使用 docker-compose rm 清理容器。

To stop the containers, use docker-compose down. You can also clean up the containers with docker-compose rm

您可以使用以下命令构建应用程序:

You can build the application with:

Unresolved directive in kafka-schema-registry-json-schema.adoc - include::{includes}/devtools/build.adoc[]

并使用以下命令在 JVM 模式中运行它:

And run it in JVM mode with:

java -Dmp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2 -jar target/quarkus-app/quarkus-run.jar

默认情况下,应用程序尝试连接到在 localhost:9092 侦听的 Kafka 代理。您可以使用以下命令配置引导服务器: java -Dkafka.bootstrap.servers=…​ -jar target/quarkus-app/quarkus-run.jar

By default, the application tries to connect to a Kafka broker listening at localhost:9092. You can configure the bootstrap server using: java -Dkafka.bootstrap.servers=…​ -jar target/quarkus-app/quarkus-run.jar

在命令行上指定注册表 URL 不太方便,因此您只能为 prod 配置文件添加一个配置属性:

Specifying the registry URL on the command line is not very convenient, so you can add a configuration property only for the prod profile:

%prod.mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2

您可以使用以下命令构建一个本机可执行文件:

You can build a native executable with:

Unresolved directive in kafka-schema-registry-json-schema.adoc - include::{includes}/devtools/build-native.adoc[]

并使用以下命令运行它:

and run it with:

./target/kafka-json-schema-schema-quickstart-1.0.0-SNAPSHOT-runner -Dkafka.bootstrap.servers=localhost:9092

Testing the application

如上所述,Kafka 和 Apicurio Registry 的开发服务在开发模式和测试中自动启动并配置一个 Kafka 代理和 Apicurio Registry 实例。因此,我们不必自己设置 Kafka 和 Apicurio Registry。我们只需专注于编写测试。

As mentioned above, Dev Services for Kafka and Apicurio Registry automatically start and configure a Kafka broker and Apicurio Registry instance in dev mode and for tests. Hence, we don’t have to set up Kafka and Apicurio Registry ourselves. We can just focus on writing the test.

首先,让我们将对 REST 客户端和 Awaitility 的测试依赖关系添加到构建文件中:

First, let’s add test dependencies on REST Client and Awaitility to the build file:

pom.xml
<!-- we'll use Jakarta REST Client for talking to the SSE endpoint -->
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-rest-client</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <scope>test</scope>
</dependency>
build.gradle
testImplementation("io.quarkus:quarkus-rest-client")
testImplementation("org.awaitility:awaitility")

在测试中,我们将循环发送电影并检查 ConsumedMovieResource 是否返回了我们发送的内容。

In the test, we will send movies in a loop and check if the ConsumedMovieResource returns what we send.

package org.acme.kafka;

import io.quarkus.test.common.WithTestResource;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.http.ContentType;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.sse.SseEventSource;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static io.restassured.RestAssured.given;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;

@QuarkusTest
public class MovieResourceTest {

    @TestHTTPResource("/consumed-movies")
    URI consumedMovies;

    @Test
    public void testHelloEndpoint() throws InterruptedException {
        // create a client for `ConsumedMovieResource` and collect the consumed resources in a list
        Client client = ClientBuilder.newClient();
        WebTarget target = client.target(consumedMovies);

        List<String> received = new CopyOnWriteArrayList<>();

        SseEventSource source = SseEventSource.target(target).build();
        source.register(inboundSseEvent -> received.add(inboundSseEvent.readData()));

        // in a separate thread, feed the `MovieResource`
        ExecutorService movieSender = startSendingMovies();

        source.open();

        // check if, after at most 5 seconds, we have at least 2 items collected, and they are what we expect
        await().atMost(5, SECONDS).until(() -> received.size() >= 2);
        assertThat(received, Matchers.hasItems("'The Shawshank Redemption' from 1994",
                "'12 Angry Men' from 1957"));
        source.close();

        // shutdown the executor that is feeding the `MovieResource`
        movieSender.shutdownNow();
        movieSender.awaitTermination(5, SECONDS);
    }

    private ExecutorService startSendingMovies() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(() -> {
            while (true) {
                given()
                        .contentType(ContentType.JSON)
                        .body("{\"title\":\"The Shawshank Redemption\",\"year\":1994}")
                .when()
                        .post("/movies")
                .then()
                        .statusCode(202);

                given()
                        .contentType(ContentType.JSON)
                        .body("{\"title\":\"12 Angry Men\",\"year\":1957}")
                .when()
                        .post("/movies")
                .then()
                        .statusCode(202);

                try {
                    Thread.sleep(200L);
                } catch (InterruptedException e) {
                    break;
                }
            }
        });
        return executorService;
    }

}

我们修改了与该项目一起生成的 MovieResourceTest。此测试类具有一个子类 NativeMovieResourceIT,它针对本机可执行文件运行相同的测试。要运行它,请执行:

We modified the MovieResourceTest that was generated together with the project. This test class has a subclass, NativeMovieResourceIT, that runs the same test against the native executable. To run it, execute:

Unresolved directive in kafka-schema-registry-json-schema.adoc - include::{includes}/devtools/build-native.adoc[]

Manual setup

如果我们无法使用开发服务并希望手动启动 Kafka 代理和 Apicurio Registry 实例,我们将定义一个 QuarkusTestResourceLifecycleManager

If we couldn’t use Dev Services and wanted to start a Kafka broker and Apicurio Registry instance manually, we would define a QuarkusTestResourceLifecycleManager.

pom.xml
<dependency>
    <groupId>io.strimzi</groupId>
    <artifactId>strimzi-test-container</artifactId>
    <version>0.105.0</version>
    <scope>test</scope>
    <exclusions>
        <exclusion>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>
build.gradle
testImplementation("io.strimzi:strimzi-test-container:0.105.0") {
    exclude group: "org.apache.logging.log4j", module: "log4j-core"
}
package org.acme.kafka;

import java.util.HashMap;
import java.util.Map;

import org.testcontainers.containers.GenericContainer;

import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import io.strimzi.StrimziKafkaContainer;

public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager {

    private final StrimziKafkaContainer kafka = new StrimziKafkaContainer();

    private GenericContainer<?> registry;

    @Override
    public Map<String, String> start() {
        kafka.start();
        registry = new GenericContainer<>("apicurio/apicurio-registry-mem:2.4.2.Final")
                .withExposedPorts(8080)
                .withEnv("QUARKUS_PROFILE", "prod");
        registry.start();
        Map<String, String> properties = new HashMap<>();
        properties.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url",
                "http://" + registry.getHost() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2");
        properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers());
        return properties;
    }

    @Override
    public void stop() {
        registry.stop();
        kafka.stop();
    }
}
@QuarkusTest
@WithTestResource(KafkaAndSchemaRegistryTestResource.class)
public class MovieResourceTest {
    ...
}

Using compatible versions of the Apicurio Registry

quarkus-apicurio-registry-json-schema 扩展取决于 Apticurio Registry 客户端的最新版本,并且大多数版本的 Apicurio Registry 服务端和客户端都具有向后兼容性。对于某些版本,您需要确保 Serdes 中使用的客户端与服务端兼容。

The quarkus-apicurio-registry-json-schema extension depends on recent versions of Apicurio Registry client, and most versions of Apicurio Registry server and client are backwards compatible. For some you need to make sure that the client used by Serdes is compatible with the server.

例如,如果您将 Apicurio dev 服务的映像名称设置为使用版本 2.1.5.Final

For example, with Apicurio dev service if you set the image name to use version 2.1.5.Final:

quarkus.apicurio-registry.devservices.image-name=quay.io/apicurio/apicurio-registry-mem:2.1.5.Final

您需要确保 apicurio-registry-serdes-json-schema-serde 依赖项和 REST 客户端 apicurio-common-rest-client-vertx 依赖项的版本设置兼容:

You need to make sure that apicurio-registry-serdes-json-schema-serde dependency and the REST client apicurio-common-rest-client-vertx dependency are set to compatible versions:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-apicurio-registry-json-schema</artifactId>
    <exclusions>
        <exclusion>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-common-rest-client-vertx</artifactId>
        </exclusion>
        <exclusion>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-registry-serdes-json-schema-serde</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>io.apicurio</groupId>
    <artifactId>apicurio-registry-client</artifactId>
    <version>2.1.5.Final</version>
</dependency>
<dependency>
    <groupId>io.apicurio</groupId>
    <artifactId>apicurio-registry-common</artifactId>
    <version>2.1.5.Final</version>
</dependency>
<dependency>
    <groupId>io.apicurio</groupId>
    <artifactId>apicurio-registry-serdes-json-schema-serde</artifactId>
    <version>2.1.5.Final</version>
    <exclusions>
        <exclusion>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-common-rest-client-jdk</artifactId>
        </exclusion>
        <exclusion>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-registry-client</artifactId>
        </exclusion>
        <exclusion>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-registry-common</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>io.apicurio</groupId>
    <artifactId>apicurio-common-rest-client-vertx</artifactId>
    <version>0.1.5.Final</version>
</dependency>
build.gradle
dependencies {
    implementation(platform("{quarkus-platform-groupid}:quarkus-bom:2.12.3.Final"))

    ...

    implementation("io.quarkus:quarkus-apicurio-registry-json-schema")
    implementation("io.apicurio:apicurio-registry-serdes-json-schema-serde") {
        exclude group: "io.apicurio", module: "apicurio-common-rest-client-jdk"
        exclude group: "io.apicurio", module: "apicurio-registry-client"
        exclude group: "io.apicurio", module: "apicurio-registry-common"
        version {
            strictly "2.1.5.Final"
        }
    }
    implementation("io.apicurio:apicurio-registry-client") {
        version {
            strictly "2.1.5.Final"
        }
    }
    implementation("io.apicurio:apicurio-registry-common") {
        version {
            strictly "2.1.5.Final"
        }
    }
    implementation("io.apicurio:apicurio-common-rest-client-vertx") {
        version {
            strictly "0.1.5.Final"
        }
    }
}

apicurio-registry-clientapicurio-common-rest-client-vertx 的已知先前兼容版本如下所列:

Known previous compatible versions for apicurio-registry-client and apicurio-common-rest-client-vertx are the following

  • apicurio-registry-client 2.1.5.Final with apicurio-common-rest-client-vertx 0.1.5.Final

  • apicurio-registry-client 2.3.1.Final with apicurio-common-rest-client-vertx 0.1.13.Final

Using the Confluent Schema Registry

如果您想使用 Confluent Schema Registry,那么您需要使用 quarkus-confluent-registry-json-schema 扩展,而不是 quarkus-apicurio-registry-json-schema 扩展。此外,您需要向 pom.xml / build.gradle 文件中添加一些依赖项和自定义 Maven 仓库:

If you want to use the Confluent Schema Registry, you need the quarkus-confluent-registry-json-schema extension, instead of the quarkus-apicurio-registry-json-schema extension. Also, you need to add a few dependencies and a custom Maven repository to your pom.xml / build.gradle file:

pom.xml
<dependencies>
    ...
    <!-- the extension -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-confluent-registry-json-schema</artifactId>
    </dependency>
    <!-- Confluent registry libraries use Jakarta REST client -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-rest-client</artifactId>
    </dependency>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-json-schema-serializer</artifactId>
        <version>7.2.0</version>
        <exclusions>
            <exclusion>
                <groupId>jakarta.ws.rs</groupId>
                <artifactId>jakarta.ws.rs-api</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>

<repositories>
    <!-- io.confluent:kafka-json-schema-serializer is only available from this repository: -->
    <repository>
        <id>confluent</id>
        <url>https://packages.confluent.io/maven/</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
</repositories>
build.gradle
repositories {
    ...

    maven {
        url "https://packages.confluent.io/maven/"
    }
}

dependencies {
    ...

    implementation("io.quarkus:quarkus-confluent-registry-json-schema")

    // Confluent registry libraries use Jakarta REST client
    implementation("io.quarkus:quarkus-rest-client")

    implementation("io.confluent:kafka-json-schema-serializer:7.2.0") {
        exclude group: "jakarta.ws.rs", module: "jakarta.ws.rs-api"
    }
}

在 JVM 模式下,可以使用的 io.confluent:kafka-json-schema-serializer 版本不限。在原生模式下,Quarkus 支持以下版本:6.2.x7.0.x7.1.x7.2.x7.3.x

In JVM mode, any version of io.confluent:kafka-json-schema-serializer can be used. In native mode, Quarkus supports the following versions: 6.2.x, 7.0.x, 7.1.x, 7.2.x, 7.3.x.

对于版本 7.4.x7.5.x,由于 Confluent Schema Serializer 出现问题,因此您需要添加另一个依赖项:

For version 7.4.x and 7.5.x, due to an issue with the Confluent Schema Serializer, you need to add another dependency:

pom.xml
<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-csv</artifactId>
</dependency>
build.gradle
dependencies {
    implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-csv")
}

对于任何其他版本,可能需要调整原生配置。

For any other versions, the native configuration may need to be adjusted.

Further reading