Using Apache Kafka with Schema Registry and JSON Schema
本指南展示了你的 Quarkus 应用程序如何使用 Apache Kakfa、 JSON Schema 序列化记录,以及如何连至模式注册表(例如 Confluent Schema Registry 或 Apicurio Registry)。 如果您不熟悉 Kafka 和 Quarkus 中的 Kafka,请考虑首先阅读“Using Apache Kafka with Reactive Messaging”指南。
Prerequisites
如要完成本指南,您需要:
-
Roughly 15 minutes
-
An IDE
-
安装了 JDK 17+,已正确配置
JAVA_HOME
-
Apache Maven ${proposed-maven-version}
-
如果你想使用 Quarkus CLI, 则可以选择使用
-
如果你想构建一个本机可执行文件(或如果你使用本机容器构建,则使用 Docker),则可以选择安装 Mandrel 或 GraalVM 以及 configured appropriately
Architecture
在本指南中,我们要实现一个 REST 资源,即“MovieResource
”,它将消耗电影 DTO 并将它们放入 Kafka 主题。
然后,我们将实现一个消费者,它将消耗并收集来自同一主题的消息。收集到的消息将随后通过“ Server-Sent Events”公开另一个资源“ConsumedMovieResource
”。
JSON Schema 将对 Movies 执行序列化和反序列化。描述 Movie 的模式存储在 Apicurio Registry 中。如果你使用 Confluent JSON Schema serde 和 Confluent Schema Registry,则也适用相同概念。
Solution
我们建议您遵循接下来的部分中的说明,按部就班地创建应用程序。然而,您可以直接跳到完成的示例。
克隆 Git 存储库: git clone $${quickstarts-base-url}.git
,或下载 $${quickstarts-base-url}/archive/main.zip[存档]。
解决方案位于 kafka-json-schema-quickstart
directory 中。
Creating the Maven Project
首先,我们需要一个新项目。使用以下命令创建一个新项目:
quarkus create app {create-app-group-id}:{create-app-artifact-id} \
--no-code
cd {create-app-artifact-id}
要创建一个 Gradle 项目,添加 --gradle
或 --gradle-kotlin-dsl
选项。
有关如何安装和使用 Quarkus CLI 的详细信息,请参见 Quarkus CLI 指南。
mvn {quarkus-platform-groupid}:quarkus-maven-plugin:{quarkus-version}:create \
-DprojectGroupId={create-app-group-id} \
-DprojectArtifactId={create-app-artifact-id} \
-DnoCode
cd {create-app-artifact-id}
要创建一个 Gradle 项目,添加 -DbuildTool=gradle
或 -DbuildTool=gradle-kotlin-dsl
选项。
适用于 Windows 用户:
-
如果使用 cmd,(不要使用反斜杠
\
,并将所有内容放在同一行上) -
如果使用 Powershell,将
-D
参数用双引号引起来,例如"-DprojectArtifactId={create-app-artifact-id}"
如果你使用 Confluent Schema Registry,则不需要 |
Json Schema
Json Schema 是一个数据序列化系统。数据结构通过模式来描述。我们首先需要做的就是创建一个模式来描述 Movie
结构。为我们的记录创建名为 src/main/resources/json-schema.json
的模式(Kafka 消息):
{
"$id": "https://example.com/person.schema.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Movie",
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "The movie's title."
},
"year": {
"type": "integer",
"description": "The movie's year."
}
}
}
请注意,无法从 JSON Schema 定义自动生成 Java 类。因此,您必须按如下定义 Java 类,以便串行化过程能使用它:
package org.acme.kafka;
public class Movie {
private String title;
private Integer year;
public Movie() {
}
public Movie(String title, Integer year) {
this.title = title;
this.year = year;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public Integer getYear() {
return year;
}
public void setYear(Integer year) {
this.year = year;
}
}
The Movie
producer
定义了架构后,我们现在可以跳至实现 MovieResource
。
让我们打开 MovieResource
,注入一个 Movie
DTO 的 Emitter
,并实现一个 @POST
方法,该方法消耗 Movie
并通过 Emitter
发送它:
package org.acme.kafka;
import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.jboss.logging.Logger;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;
@Path("/movies")
public class MovieResource {
private static final Logger LOGGER = Logger.getLogger(MovieResource.class);
@Channel("movies")
Emitter<Movie> emitter;
@POST
public Response enqueueMovie(Movie movie) {
LOGGER.infof("Sending movie %s to Kafka", movie.getTitle());
emitter.send(movie);
return Response.accepted().build();
}
}
现在,我们需要将 map 的 movies
通道(Emitter
发出的信号给此通道)map 到一个 Kafka 主题,并 map 在此通道上使用的模式。为了实现这一点,请编辑 application.properties
文件,并添加如下内容:
# set the connector for the outgoing channel to `smallrye-kafka`
mp.messaging.outgoing.movies.connector=smallrye-kafka
# disable automatic detection of the serializers
quarkus.messaging.kafka.serializer-autodetection.enabled=false
# Set the value serializer for the channel `movies`
mp.messaging.outgoing.movies.value.serializer=io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer
# set the topic name for the channel to `movies`
mp.messaging.outgoing.movies.topic=movies
# set the schema to be used for the channel `movies`. Note that this property accepts just a name or a path and the serializer will look for the resource on the classpath.
mp.messaging.outgoing.movies.apicurio.registry.artifact.schema.location=json-schema.json
# automatically register the schema with the registry, if not present
mp.messaging.outgoing.movies.apicurio.registry.auto-register=true
请注意,与 avro 序列化不同,autodetect 不能用于 JSON 模式,所以我们必须定义 |
The Movie
consumer
因此,我们可以向 Kafka 中写入包含 Movie
数据的记录。该数据使用 JSON Schema 进行序列化。现在,是时候为它们实现一个消费者了。
让我们创建一个将消费 movies-from-kafka
通道中的 Movie
消息并通过服务器端事件公开它的 ConsumedMovieResource
:
package org.acme.kafka;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.jboss.resteasy.reactive.RestStreamElementType;
import io.smallrye.mutiny.Multi;
@ApplicationScoped
@Path("/consumed-movies")
public class ConsumedMovieResource {
@Channel("movies-from-kafka")
Multi<Movie> movies;
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<String> stream() {
return movies.map(movie -> String.format("'%s' from %s", movie.getTitle(), movie.getYear()));
}
}
应用程序代码的最后一部分是在 application.properties
中配置 movies-from-kafka
通道:
# set the connector for the incoming channel to `smallrye-kafka`
mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka
# set the topic name for the channel to `movies`
mp.messaging.incoming.movies-from-kafka.topic=movies
# set the deserializer for the incoming channel
mp.messaging.incoming.movies-from-kafka.value.deserializer=io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer
# disable auto-commit, Reactive Messaging handles it itself
mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false
mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest
同样,与 Avro 不同,我们必须定义 |
Running the application
在开发模式下启动应用程序:
quarkus dev
./mvnw quarkus:dev
./gradlew --console=plain quarkusDev
多亏了开发服务,Kafka 代理和 Apicurio Registry 实例会自动启动。有关更多详细信息,请参见 Dev Services for Kafka 和 Dev Services for Apicurio Registry。
您可能已经注意到我们没有在任何地方配置架构注册表 URL。这是因为 Apicurio Registry 的开发服务将 Quarkus Messaging 中的 Kafka 通道配置为使用自动启动的注册表实例。 Apicurio Registry 除了其原生 API 以外,还公开了与 Confluent Schema Registry 在 API 兼容的端点。因此,这种自动配置对 Apicurio Registry serde 和 Confluent Schema Registry serde 都有效。 但是,请注意,没有任何 Dev Services 支持可以运行 Confluent Schema Registry 本身。如果你想使用 Confluent Schema Registry 的运行中实例,请与 Kafka 代理一起配置其 URL:
|
在第二个终端中,使用 curl
查询 ConsumedMovieResource
资源:
curl -N http://localhost:8080/consumed-movies
在第三个中,发布一些电影:
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Shawshank Redemption","year":1994}' \
http://localhost:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Godfather","year":1972}' \
http://localhost:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Dark Knight","year":2008}' \
http://localhost:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"12 Angry Men","year":1957}' \
http://localhost:8080/movies
观察在第二个终端中打印的内容。您应该看到类似以下的内容:
data:'The Shawshank Redemption' from 1994
data:'The Godfather' from 1972
data:'The Dark Knight' from 2008
data:'12 Angry Men' from 1957
Running in JVM or Native mode
在非开发或测试模式下运行时,您需要启动自己的 Kafka 代理和 Apicurio Registry。使它们运行的最简单方法是使用 docker-compose
启动适当的容器。
如果您使用 Confluent Schema Registry,则您已经运行了 Kafka 代理和 Confluent Schema Registry 实例并将其配置好了。您可以忽略此处的 |
在项目的根目录创建一个 docker-compose.yaml
文件,内容如下:
version: '2'
services:
zookeeper:
image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
command: [
"sh", "-c",
"bin/zookeeper-server-start.sh config/zookeeper.properties"
]
ports:
- "2181:2181"
environment:
LOG_DIR: /tmp/logs
kafka:
image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
command: [
"sh", "-c",
"bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
]
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
LOG_DIR: "/tmp/logs"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
schema-registry:
image: apicurio/apicurio-registry-mem:2.4.2.Final
ports:
- 8081:8080
depends_on:
- kafka
environment:
QUARKUS_PROFILE: prod
在启动应用程序之前,我们首先启动 Kafka 代理和 Apicurio Registry:
docker-compose up
要停止容器,请使用 |
您可以使用以下命令构建应用程序:
quarkus build
./mvnw install
./gradlew build
并使用以下命令在 JVM 模式中运行它:
java -Dmp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2 -jar target/quarkus-app/quarkus-run.jar
默认情况下,应用程序尝试连接到在 |
在命令行上指定注册表 URL 不太方便,因此您只能为 prod
配置文件添加一个配置属性:
%prod.mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2
您可以使用以下命令构建一个本机可执行文件:
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
并使用以下命令运行它:
./target/kafka-json-schema-schema-quickstart-1.0.0-SNAPSHOT-runner -Dkafka.bootstrap.servers=localhost:9092
Testing the application
如上所述,Kafka 和 Apicurio Registry 的开发服务在开发模式和测试中自动启动并配置一个 Kafka 代理和 Apicurio Registry 实例。因此,我们不必自己设置 Kafka 和 Apicurio Registry。我们只需专注于编写测试。
首先,让我们将对 REST 客户端和 Awaitility 的测试依赖关系添加到构建文件中:
<!-- we'll use Jakarta REST Client for talking to the SSE endpoint -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
testImplementation("io.quarkus:quarkus-rest-client")
testImplementation("org.awaitility:awaitility")
在测试中,我们将循环发送电影并检查 ConsumedMovieResource
是否返回了我们发送的内容。
package org.acme.kafka;
import io.quarkus.test.common.WithTestResource;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.http.ContentType;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.sse.SseEventSource;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static io.restassured.RestAssured.given;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
@QuarkusTest
public class MovieResourceTest {
@TestHTTPResource("/consumed-movies")
URI consumedMovies;
@Test
public void testHelloEndpoint() throws InterruptedException {
// create a client for `ConsumedMovieResource` and collect the consumed resources in a list
Client client = ClientBuilder.newClient();
WebTarget target = client.target(consumedMovies);
List<String> received = new CopyOnWriteArrayList<>();
SseEventSource source = SseEventSource.target(target).build();
source.register(inboundSseEvent -> received.add(inboundSseEvent.readData()));
// in a separate thread, feed the `MovieResource`
ExecutorService movieSender = startSendingMovies();
source.open();
// check if, after at most 5 seconds, we have at least 2 items collected, and they are what we expect
await().atMost(5, SECONDS).until(() -> received.size() >= 2);
assertThat(received, Matchers.hasItems("'The Shawshank Redemption' from 1994",
"'12 Angry Men' from 1957"));
source.close();
// shutdown the executor that is feeding the `MovieResource`
movieSender.shutdownNow();
movieSender.awaitTermination(5, SECONDS);
}
private ExecutorService startSendingMovies() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
while (true) {
given()
.contentType(ContentType.JSON)
.body("{\"title\":\"The Shawshank Redemption\",\"year\":1994}")
.when()
.post("/movies")
.then()
.statusCode(202);
given()
.contentType(ContentType.JSON)
.body("{\"title\":\"12 Angry Men\",\"year\":1957}")
.when()
.post("/movies")
.then()
.statusCode(202);
try {
Thread.sleep(200L);
} catch (InterruptedException e) {
break;
}
}
});
return executorService;
}
}
我们修改了与该项目一起生成的 |
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
Manual setup
如果我们无法使用开发服务并希望手动启动 Kafka 代理和 Apicurio Registry 实例,我们将定义一个 QuarkusTestResourceLifecycleManager。
<dependency>
<groupId>io.strimzi</groupId>
<artifactId>strimzi-test-container</artifactId>
<version>0.105.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
</exclusions>
</dependency>
testImplementation("io.strimzi:strimzi-test-container:0.105.0") {
exclude group: "org.apache.logging.log4j", module: "log4j-core"
}
package org.acme.kafka;
import java.util.HashMap;
import java.util.Map;
import org.testcontainers.containers.GenericContainer;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import io.strimzi.StrimziKafkaContainer;
public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager {
private final StrimziKafkaContainer kafka = new StrimziKafkaContainer();
private GenericContainer<?> registry;
@Override
public Map<String, String> start() {
kafka.start();
registry = new GenericContainer<>("apicurio/apicurio-registry-mem:2.4.2.Final")
.withExposedPorts(8080)
.withEnv("QUARKUS_PROFILE", "prod");
registry.start();
Map<String, String> properties = new HashMap<>();
properties.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url",
"http://" + registry.getHost() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2");
properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers());
return properties;
}
@Override
public void stop() {
registry.stop();
kafka.stop();
}
}
@QuarkusTest
@WithTestResource(KafkaAndSchemaRegistryTestResource.class)
public class MovieResourceTest {
...
}
Using compatible versions of the Apicurio Registry
quarkus-apicurio-registry-json-schema
扩展取决于 Apticurio Registry 客户端的最新版本,并且大多数版本的 Apicurio Registry 服务端和客户端都具有向后兼容性。对于某些版本,您需要确保 Serdes 中使用的客户端与服务端兼容。
例如,如果您将 Apicurio dev 服务的映像名称设置为使用版本 2.1.5.Final
:
quarkus.apicurio-registry.devservices.image-name=quay.io/apicurio/apicurio-registry-mem:2.1.5.Final
您需要确保 apicurio-registry-serdes-json-schema-serde
依赖项和 REST 客户端 apicurio-common-rest-client-vertx
依赖项的版本设置兼容:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-json-schema</artifactId>
<exclusions>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-vertx</artifactId>
</exclusion>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-json-schema-serde</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-client</artifactId>
<version>2.1.5.Final</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-common</artifactId>
<version>2.1.5.Final</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-json-schema-serde</artifactId>
<version>2.1.5.Final</version>
<exclusions>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-jdk</artifactId>
</exclusion>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-client</artifactId>
</exclusion>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-vertx</artifactId>
<version>0.1.5.Final</version>
</dependency>
dependencies {
implementation(platform("{quarkus-platform-groupid}:quarkus-bom:2.12.3.Final"))
...
implementation("io.quarkus:quarkus-apicurio-registry-json-schema")
implementation("io.apicurio:apicurio-registry-serdes-json-schema-serde") {
exclude group: "io.apicurio", module: "apicurio-common-rest-client-jdk"
exclude group: "io.apicurio", module: "apicurio-registry-client"
exclude group: "io.apicurio", module: "apicurio-registry-common"
version {
strictly "2.1.5.Final"
}
}
implementation("io.apicurio:apicurio-registry-client") {
version {
strictly "2.1.5.Final"
}
}
implementation("io.apicurio:apicurio-registry-common") {
version {
strictly "2.1.5.Final"
}
}
implementation("io.apicurio:apicurio-common-rest-client-vertx") {
version {
strictly "0.1.5.Final"
}
}
}
apicurio-registry-client
和 apicurio-common-rest-client-vertx
的已知先前兼容版本如下所列:
-
apicurio-registry-client
2.1.5.Final 与apicurio-common-rest-client-vertx
0.1.5.Final -
apicurio-registry-client
2.3.1.Final 与apicurio-common-rest-client-vertx
0.1.13.Final
Using the Confluent Schema Registry
如果您想使用 Confluent Schema Registry,那么您需要使用 quarkus-confluent-registry-json-schema
扩展,而不是 quarkus-apicurio-registry-json-schema
扩展。此外,您需要向 pom.xml
/ build.gradle
文件中添加一些依赖项和自定义 Maven 仓库:
<dependencies>
...
<!-- the extension -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-json-schema</artifactId>
</dependency>
<!-- Confluent registry libraries use Jakarta REST client -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-json-schema-serializer</artifactId>
<version>7.2.0</version>
<exclusions>
<exclusion>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<repositories>
<!-- io.confluent:kafka-json-schema-serializer is only available from this repository: -->
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
repositories {
...
maven {
url "https://packages.confluent.io/maven/"
}
}
dependencies {
...
implementation("io.quarkus:quarkus-confluent-registry-json-schema")
// Confluent registry libraries use Jakarta REST client
implementation("io.quarkus:quarkus-rest-client")
implementation("io.confluent:kafka-json-schema-serializer:7.2.0") {
exclude group: "jakarta.ws.rs", module: "jakarta.ws.rs-api"
}
}
在 JVM 模式下,可以使用的 io.confluent:kafka-json-schema-serializer
版本不限。在原生模式下,Quarkus 支持以下版本:6.2.x
、7.0.x
、7.1.x
、7.2.x
、7.3.x
。
对于版本 7.4.x
和 7.5.x
,由于 Confluent Schema Serializer 出现问题,因此您需要添加另一个依赖项:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
</dependency>
dependencies {
implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-csv")
}
对于任何其他版本,可能需要调整原生配置。