Using Apache Kafka with Schema Registry and Avro
本指南展示了 Quarkus 应用程序如何使用 Apache Kafka、“ Avro”序列化的记录,以及连接到模式注册表(如“ Confluent Schema Registry”或“ Apicurio Registry”)。 如果您不熟悉 Kafka 和 Quarkus 中的 Kafka,请考虑首先阅读“Using Apache Kafka with Reactive Messaging”指南。
- Prerequisites
- Architecture
- Solution
- Creating the Maven Project
- Avro schema
- The
Movie
producer - The
Movie
consumer - Running the application
- Running in JVM or Native mode
- Testing the application
- Using compatible versions of the Apicurio Registry
- Using the Confluent Schema Registry
- Avro code generation details
- Further reading
Prerequisites
如要完成本指南,您需要:
-
Roughly 15 minutes
-
An IDE
-
安装了 JDK 17+,已正确配置
JAVA_HOME
-
Apache Maven ${proposed-maven-version}
-
如果你想使用 Quarkus CLI, 则可以选择使用
-
如果你想构建一个本机可执行文件(或如果你使用本机容器构建,则使用 Docker),则可以选择安装 Mandrel 或 GraalVM 以及 configured appropriately
Architecture
在本指南中,我们要实现一个 REST 资源,即“MovieResource
”,它将消耗电影 DTO 并将它们放入 Kafka 主题。
然后,我们将实现一个消费者,它将消耗并收集来自同一主题的消息。收集到的消息将随后通过“ Server-Sent Events”公开另一个资源“ConsumedMovieResource
”。
“Movies”将使用 Avro 序列化和反序列化。“Movie”的模式存储在 Apicurio Registry 中。如果您使用的是 Confluent Avro “serde”和 Confluent Schema Registry,则适用于相同概念。
Solution
我们建议您遵循接下来的部分中的说明,按部就班地创建应用程序。然而,您可以直接跳到完成的示例。
克隆 Git 存储库: git clone $${quickstarts-base-url}.git
,或下载 $${quickstarts-base-url}/archive/main.zip[存档]。
解决方案位于“kafka-avro-schema-quickstart
” directory”。
Creating the Maven Project
首先,我们需要一个新项目。使用以下命令创建一个新项目:
quarkus create app {create-app-group-id}:{create-app-artifact-id} \
--no-code
cd {create-app-artifact-id}
要创建一个 Gradle 项目,添加 --gradle
或 --gradle-kotlin-dsl
选项。
有关如何安装和使用 Quarkus CLI 的详细信息,请参见 Quarkus CLI 指南。
mvn {quarkus-platform-groupid}:quarkus-maven-plugin:{quarkus-version}:create \
-DprojectGroupId={create-app-group-id} \
-DprojectArtifactId={create-app-artifact-id} \
-DnoCode
cd {create-app-artifact-id}
要创建一个 Gradle 项目,添加 -DbuildTool=gradle
或 -DbuildTool=gradle-kotlin-dsl
选项。
适用于 Windows 用户:
-
如果使用 cmd,(不要使用反斜杠
\
,并将所有内容放在同一行上) -
如果使用 Powershell,将
-D
参数用双引号引起来,例如"-DprojectArtifactId={create-app-artifact-id}"
如果您使用的是 Confluent Schema Registry,则不需要“ |
Avro schema
Apache Avro 是一个数据序列化系统。数据结构使用模式进行描述。我们需要做的第一件事是创建一个描述“Movie
”结构的模式。使用我们记录(Kafka 消息)的模式创建一个名为“src/main/avro/movie.avsc
”的文件:
{
"namespace": "org.acme.kafka.quarkus",
"type": "record",
"name": "Movie",
"fields": [
{
"name": "title",
"type": "string"
},
{
"name": "year",
"type": "int"
}
]
}
如果您使用以下方式构建项目:
quarkus build
./mvnw install
./gradlew build
“movies.avsc
”将被编译为“Movie.java
”文件,并放置在“target/generated-sources/avsc
”目录中。
查看“ Avro specification”以了解更多关于 Avro 语法和受支持类型的知识。
使用 Quarkus 无需使用特定 Maven 插件来处理 Avro 模式,所有这些操作都将通过“ |
如果您使用以下方式运行该项目:
quarkus dev
./mvnw quarkus:dev
./gradlew --console=plain quarkusDev
您对模式文件所做的更改将自动应用到生成的 Java 文件中。
The Movie
producer
定义了架构后,我们现在可以跳至实现 MovieResource
。
让我们打开 MovieResource
,注入一个 Movie
DTO 的 Emitter
,并实现一个 @POST
方法,该方法消耗 Movie
并通过 Emitter
发送它:
package org.acme.kafka;
import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.jboss.logging.Logger;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;
@Path("/movies")
public class MovieResource {
private static final Logger LOGGER = Logger.getLogger(MovieResource.class);
@Channel("movies")
Emitter<Movie> emitter;
@POST
public Response enqueueMovie(Movie movie) {
LOGGER.infof("Sending movie %s to Kafka", movie.getTitle());
emitter.send(movie);
return Response.accepted().build();
}
}
现在,我们需要将 movies
通道(Emitter
发送到此通道)map 到 Kafka 主题。为了实现这一点,编辑 application.properties
文件,并添加以下内容:
# set the connector for the outgoing channel to `smallrye-kafka`
mp.messaging.outgoing.movies.connector=smallrye-kafka
# set the topic name for the channel to `movies`
mp.messaging.outgoing.movies.topic=movies
# automatically register the schema with the registry, if not present
mp.messaging.outgoing.movies.apicurio.registry.auto-register=true
你可能已经注意到,我们没有定义 |
The Movie
consumer
因此,我们可以将包含 Movie
数据的记录写入 Kafka 中。该数据使用 Avro 序列化。现在,是时候为它们实现一个消费者了。
让我们创建一个将消费 movies-from-kafka
通道中的 Movie
消息并通过服务器端事件公开它的 ConsumedMovieResource
:
package org.acme.kafka;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.jboss.resteasy.reactive.RestStreamElementType;
import io.smallrye.mutiny.Multi;
@ApplicationScoped
@Path("/consumed-movies")
public class ConsumedMovieResource {
@Channel("movies-from-kafka")
Multi<Movie> movies;
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<String> stream() {
return movies.map(movie -> String.format("'%s' from %s", movie.getTitle(), movie.getYear()));
}
}
应用程序代码的最后一部分是在 application.properties
中配置 movies-from-kafka
通道:
# set the connector for the incoming channel to `smallrye-kafka`
mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka
# set the topic name for the channel to `movies`
mp.messaging.incoming.movies-from-kafka.topic=movies
# disable auto-commit, Reactive Messaging handles it itself
mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false
mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest
您可能已经注意到我们没有定义 |
Running the application
在开发模式下启动应用程序:
quarkus dev
./mvnw quarkus:dev
./gradlew --console=plain quarkusDev
多亏了开发服务,Kafka 代理和 Apicurio Registry 实例会自动启动。有关更多详细信息,请参见 Dev Services for Kafka 和 Dev Services for Apicurio Registry。
您可能已经注意到我们没有在任何地方配置架构注册表 URL。这是因为 Apicurio Registry 的开发服务将 Quarkus Messaging 中的 Kafka 通道配置为使用自动启动的注册表实例。 Apicurio Registry 除了其原生 API 以外,还公开了与 Confluent Schema Registry 在 API 兼容的端点。因此,这种自动配置对 Apicurio Registry serde 和 Confluent Schema Registry serde 都有效。 但是,请注意,没有任何 Dev Services 支持可以运行 Confluent Schema Registry 本身。如果你想使用 Confluent Schema Registry 的运行中实例,请与 Kafka 代理一起配置其 URL:
|
在第二个终端中,使用 curl
查询 ConsumedMovieResource
资源:
curl -N http://localhost:8080/consumed-movies
在第三个中,发布一些电影:
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Shawshank Redemption","year":1994}' \
http://localhost:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Godfather","year":1972}' \
http://localhost:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Dark Knight","year":2008}' \
http://localhost:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"12 Angry Men","year":1957}' \
http://localhost:8080/movies
观察在第二个终端中打印的内容。您应该看到类似以下的内容:
data:'The Shawshank Redemption' from 1994
data:'The Godfather' from 1972
data:'The Dark Knight' from 2008
data:'12 Angry Men' from 1957
Running in JVM or Native mode
在非开发或测试模式下运行时,您需要启动自己的 Kafka 代理和 Apicurio Registry。使它们运行的最简单方法是使用 docker-compose
启动适当的容器。
如果您使用 Confluent Schema Registry,则您已经运行了 Kafka 代理和 Confluent Schema Registry 实例并将其配置好了。您可以忽略此处的 |
在项目的根目录创建一个 docker-compose.yaml
文件,内容如下:
version: '2'
services:
zookeeper:
image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
command: [
"sh", "-c",
"bin/zookeeper-server-start.sh config/zookeeper.properties"
]
ports:
- "2181:2181"
environment:
LOG_DIR: /tmp/logs
kafka:
image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
command: [
"sh", "-c",
"bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
]
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
LOG_DIR: "/tmp/logs"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
schema-registry:
image: apicurio/apicurio-registry-mem:2.4.2.Final
ports:
- 8081:8080
depends_on:
- kafka
environment:
QUARKUS_PROFILE: prod
在启动应用程序之前,我们首先启动 Kafka 代理和 Apicurio Registry:
docker-compose up
要停止容器,请使用 |
您可以使用以下命令构建应用程序:
quarkus build
./mvnw install
./gradlew build
并使用以下命令在 JVM 模式中运行它:
java -Dmp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2 -jar target/quarkus-app/quarkus-run.jar
默认情况下,应用程序尝试连接到在 |
在命令行上指定注册表 URL 不太方便,因此您只能为 prod
配置文件添加一个配置属性:
%prod.mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2
您可以使用以下命令构建一个本机可执行文件:
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
并使用以下命令运行它:
./target/kafka-avro-schema-quickstart-1.0.0-SNAPSHOT-runner -Dkafka.bootstrap.servers=localhost:9092
Testing the application
如上所述,Kafka 和 Apicurio Registry 的开发服务在开发模式和测试中自动启动并配置一个 Kafka 代理和 Apicurio Registry 实例。因此,我们不必自己设置 Kafka 和 Apicurio Registry。我们只需专注于编写测试。
首先,让我们将对 REST 客户端和 Awaitility 的测试依赖关系添加到构建文件中:
<!-- we'll use Jakarta REST Client for talking to the SSE endpoint -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
testImplementation("io.quarkus:quarkus-rest-client")
testImplementation("org.awaitility:awaitility")
在测试中,我们将循环发送电影并检查 ConsumedMovieResource
是否返回了我们发送的内容。
package org.acme.kafka;
import io.quarkus.test.common.WithTestResource;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.http.ContentType;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.sse.SseEventSource;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static io.restassured.RestAssured.given;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
@QuarkusTest
public class MovieResourceTest {
@TestHTTPResource("/consumed-movies")
URI consumedMovies;
@Test
public void testHelloEndpoint() throws InterruptedException {
// create a client for `ConsumedMovieResource` and collect the consumed resources in a list
Client client = ClientBuilder.newClient();
WebTarget target = client.target(consumedMovies);
List<String> received = new CopyOnWriteArrayList<>();
SseEventSource source = SseEventSource.target(target).build();
source.register(inboundSseEvent -> received.add(inboundSseEvent.readData()));
// in a separate thread, feed the `MovieResource`
ExecutorService movieSender = startSendingMovies();
source.open();
// check if, after at most 5 seconds, we have at least 2 items collected, and they are what we expect
await().atMost(5, SECONDS).until(() -> received.size() >= 2);
assertThat(received, Matchers.hasItems("'The Shawshank Redemption' from 1994",
"'12 Angry Men' from 1957"));
source.close();
// shutdown the executor that is feeding the `MovieResource`
movieSender.shutdownNow();
movieSender.awaitTermination(5, SECONDS);
}
private ExecutorService startSendingMovies() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
while (true) {
given()
.contentType(ContentType.JSON)
.body("{\"title\":\"The Shawshank Redemption\",\"year\":1994}")
.when()
.post("/movies")
.then()
.statusCode(202);
given()
.contentType(ContentType.JSON)
.body("{\"title\":\"12 Angry Men\",\"year\":1957}")
.when()
.post("/movies")
.then()
.statusCode(202);
try {
Thread.sleep(200L);
} catch (InterruptedException e) {
break;
}
}
});
return executorService;
}
}
我们修改了与该项目一起生成的 |
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
Manual setup
如果我们无法使用开发服务并希望手动启动 Kafka 代理和 Apicurio Registry 实例,我们将定义一个 QuarkusTestResourceLifecycleManager。
<dependency>
<groupId>io.strimzi</groupId>
<artifactId>strimzi-test-container</artifactId>
<version>0.105.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
</exclusions>
</dependency>
testImplementation("io.strimzi:strimzi-test-container:0.105.0") {
exclude group: "org.apache.logging.log4j", module: "log4j-core"
}
package org.acme.kafka;
import java.util.HashMap;
import java.util.Map;
import org.testcontainers.containers.GenericContainer;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import io.strimzi.StrimziKafkaContainer;
public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager {
private final StrimziKafkaContainer kafka = new StrimziKafkaContainer();
private GenericContainer<?> registry;
@Override
public Map<String, String> start() {
kafka.start();
registry = new GenericContainer<>("apicurio/apicurio-registry-mem:2.4.2.Final")
.withExposedPorts(8080)
.withEnv("QUARKUS_PROFILE", "prod");
registry.start();
Map<String, String> properties = new HashMap<>();
properties.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url",
"http://" + registry.getHost() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2");
properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers());
return properties;
}
@Override
public void stop() {
registry.stop();
kafka.stop();
}
}
@QuarkusTest
@WithTestResource(KafkaAndSchemaRegistryTestResource.class)
public class MovieResourceTest {
...
}
Using compatible versions of the Apicurio Registry
quarkus-apicurio-registry-avro
扩展依赖于 Apicurio Registry 客户端的最新版本,并且大多数版本的 Apicurio Registry 服务器和客户端都向后兼容。对于某些版本,您需要确保 Serdes 使用的客户端与服务器兼容。
例如,如果您将 Apicurio dev 服务的映像名称设置为使用版本 2.1.5.Final
:
quarkus.apicurio-registry.devservices.image-name=quay.io/apicurio/apicurio-registry-mem:2.1.5.Final
您需要确保 apicurio-registry-serdes-avro-serde
依赖项和 REST 客户端 apicurio-common-rest-client-vertx
依赖项都设置为兼容版本:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-avro</artifactId>
<exclusions>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-vertx</artifactId>
</exclusion>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-avro-serde</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-client</artifactId>
<version>2.1.5.Final</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-common</artifactId>
<version>2.1.5.Final</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-avro-serde</artifactId>
<version>2.1.5.Final</version>
<exclusions>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-jdk</artifactId>
</exclusion>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-client</artifactId>
</exclusion>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-vertx</artifactId>
<version>0.1.5.Final</version>
</dependency>
dependencies {
implementation(platform("{quarkus-platform-groupid}:quarkus-bom:2.12.3.Final"))
...
implementation("io.quarkus:quarkus-apicurio-registry-avro")
implementation("io.apicurio:apicurio-registry-serdes-avro-serde") {
exclude group: "io.apicurio", module: "apicurio-common-rest-client-jdk"
exclude group: "io.apicurio", module: "apicurio-registry-client"
exclude group: "io.apicurio", module: "apicurio-registry-common"
version {
strictly "2.1.5.Final"
}
}
implementation("io.apicurio:apicurio-registry-client") {
version {
strictly "2.1.5.Final"
}
}
implementation("io.apicurio:apicurio-registry-common") {
version {
strictly "2.1.5.Final"
}
}
implementation("io.apicurio:apicurio-common-rest-client-vertx") {
version {
strictly "0.1.5.Final"
}
}
}
apicurio-registry-client
和 apicurio-common-rest-client-vertx
的已知先前兼容版本如下所列:
-
apicurio-registry-client
2.1.5.Final 与apicurio-common-rest-client-vertx
0.1.5.Final -
apicurio-registry-client
2.3.1.Final 与apicurio-common-rest-client-vertx
0.1.13.Final
Using the Confluent Schema Registry
如果您想使用 Confluent Schema Registry,则需要 quarkus-confluent-registry-avro
扩展,而不是 quarkus-apicurio-registry-avro
扩展。此外,您需要向 pom.xml
/ build.gradle
文件中添加一些依赖项和一个自定义 Maven 存储库:
<dependencies>
...
<!-- the extension -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-avro</artifactId>
</dependency>
<!-- Confluent registry libraries use Jakarta REST client -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.2.0</version>
<exclusions>
<exclusion>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<repositories>
<!-- io.confluent:kafka-avro-serializer is only available from this repository: -->
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
repositories {
...
maven {
url "https://packages.confluent.io/maven/"
}
}
dependencies {
...
implementation("io.quarkus:quarkus-confluent-registry-avro")
// Confluent registry libraries use Jakarta REST client
implementation("io.quarkus:quarkus-rest-client")
implementation("io.confluent:kafka-avro-serializer:7.2.0") {
exclude group: "jakarta.ws.rs", module: "jakarta.ws.rs-api"
}
}
在 JVM 模式下,可以使用任何版本的 io.confluent:kafka-avro-serializer
。在原生模式中,Quarkus 支持以下版本: 6.2.x
、 7.0.x
、 7.1.x
、 7.2.x
、 7.3.x
。
对于版本 7.4.x
和 7.5.x
,由于 Confluent Schema Serializer 出现问题,因此您需要添加另一个依赖项:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
</dependency>
dependencies {
implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-csv")
}
对于任何其他版本,可能需要调整原生配置。
Avro code generation details
在本指南中,我们使用 Quarkus 代码生成机制从 Avro 模式生成 Java 文件。
实际上,该机制使用 org.apache.avro:avro-compiler
。
您可以使用以下配置属性来更改其工作方式:
-
avro.codegen.[avsc|avdl|avpr].imports
- 应首先编译的文件或目录列表,从而使随后编译的模式可以导入它们。请注意,导入的文件不应相互引用。所有路径都应相对于构建系统配置的任何源目录中的src/[main|test]/avro
目录或avro
子目录。以逗号分隔的列表形式传递。 -
avro.codegen.stringType
- 用于 Avro 字符串的 Java 类型。可以是CharSequence
、String
或Utf8
之一。默认为String
-
avro.codegen.createOptionalGetters
- 启用生成返回所请求类型可选值的getOptional…​
方法。默认为false
-
avro.codegen.enableDecimalLogicalType
- 确定是否为十进制类型使用 Java 类,默认为false
-
avro.codegen.createSetters
- 确定是否为记录的字段创建设置器。默认为false
-
avro.codegen.gettersReturnOptional
- 启用生成返回所请求类型可选值的get…​
方法。默认为false
-
avro.codegen.optionalGettersForNullableFieldsOnly
,与gettersReturnOptional
选项配合使用。如果设置了它,则只会为可为空的字段生成Optional
获取器。如果字段是必需的,则将生成常规获取器。默认为false
Further reading
-
How to Use Kafka, Schema Registry and Avro with Quarkus - 本指南所基于的一篇博文。它对 Apache Avro 和模式注册表概念进行了很好的介绍