Using Apache Kafka Streams

Prerequisites

如要完成本指南,您需要:

  • Roughly 15 minutes

  • An IDE

  • 安装了 JDK 17+,已正确配置 JAVA_HOME

  • Apache Maven ${proposed-maven-version}

  • 如果你想使用 Quarkus CLI, 则可以选择使用

  • 如果你想构建一个本机可执行文件(或如果你使用本机容器构建,则使用 Docker),则可以选择安装 Mandrel 或 GraalVM 以及 configured appropriately

建议您事先阅读 Kafka quickstart

用于 Kafka Streams 的 Quarkus 扩展允许在开发期间通过支持 Quarkus Dev 模式(例如,通过 ./mvnw compile quarkus:dev)实现非常快速的周转时间。在更改 Kafka Streams 拓扑代码后,当下一个输入消息到达时,应用程序将自动重新加载。 建议的开发设置是,备有一些生成器,以在固定间隔(例如每秒)内在经过处理的主题上创建测试消息,并使用 kafkacat 等工具观察流应用程序的输出主题。使用 dev 模式,在保存的时候,您会立即在输出主题上看到您流应用程序的最新版本生成的邮件。 为了获得最佳的开发体验,我们建议将以下配置设置应用到您的 Kafka 代理:

group.min.session.timeout.ms=250

还在您的 Quarkus application.properties 中指定以下设置:

kafka-streams.consumer.session.timeout.ms=250
kafka-streams.consumer.heartbeat.interval.ms=200

这些设置组合在一起将确保应用程序在 dev 模式下重新启动后可以非常快速地重新连接到代理。

Architecture

在本指南中,我们将在一个组件(名为 generator)中生成(随机)温度值。这些值与给定的气象站相关联,并写入 Kafka 主题(temperature-values)。另一个主题(weather-stations)仅包含天气台本身的主要数据(id 和名称)。

第二个组件(aggregator)从两个 Kafka 主题中读取信息,并在流管道中处理它们:

  • 这两个主题根据气象台 id 进行关联

  • 根据气象台 id 确定最低值、最高值和平均温度

  • 这些聚合数据被写入第三个主题(temperatures-aggregated)中

可以通过检查输出主题来检查数据。通过公开一个 Kafka Streams interactive query,也可以通过一个简单的 REST 查询来获得每个气象站的最新结果。

整体架构如下所示:

kafka streams guide architecture

Solution

我们建议您遵循接下来的部分中的说明,按部就班地创建应用程序。然而,您可以直接跳到完成的示例。

克隆 Git 存储库: git clone $${quickstarts-base-url}.git,或下载 $${quickstarts-base-url}/archive/main.zip[存档]。

解决方案位于 kafka-streams-quickstart directory

Creating the Producer Maven Project

首先,我们需要一个带有温度值生产者的新项目。使用以下命令创建一个新项目:

CLI
quarkus create app {create-app-group-id}:{create-app-artifact-id} \
    --no-code
cd {create-app-artifact-id}

要创建一个 Gradle 项目,添加 --gradle--gradle-kotlin-dsl 选项。 有关如何安装和使用 Quarkus CLI 的详细信息,请参见 Quarkus CLI 指南。

Maven
mvn {quarkus-platform-groupid}:quarkus-maven-plugin:{quarkus-version}:create \
    -DprojectGroupId={create-app-group-id} \
    -DprojectArtifactId={create-app-artifact-id} \
    -DnoCode
cd {create-app-artifact-id}

要创建一个 Gradle 项目,添加 -DbuildTool=gradle-DbuildTool=gradle-kotlin-dsl 选项。

适用于 Windows 用户:

  • 如果使用 cmd,(不要使用反斜杠 \ ,并将所有内容放在同一行上)

  • 如果使用 Powershell,将 -D 参数用双引号引起来,例如 "-DprojectArtifactId={create-app-artifact-id}"

此命令生成一个 Maven 项目,导入 Reactive Messaging 和 Kafka 连接器扩展。

如果你已经配置了你的 Quarkus 项目,你可以在项目基本目录中运行以下命令,向你的项目添加 messaging-kafka 扩展:

CLI
quarkus extension add {add-extension-extensions}
Maven
./mvnw quarkus:add-extension -Dextensions='{add-extension-extensions}'
Gradle
./gradlew addExtension --extensions='{add-extension-extensions}'

这会将以下内容添加到构建文件中:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-kafka</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-kafka")

The Temperature Value Producer

创建一个 producer/src/main/java/org/acme/kafka/streams/producer/generator/ValuesGenerator.java 文件,内容如下:

package org.acme.kafka.streams.producer.generator;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.kafka.Record;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.logging.Logger;

/**
 * A bean producing random temperature data every second.
 * The values are written to a Kafka topic (temperature-values).
 * Another topic contains the name of weather stations (weather-stations).
 * The Kafka configuration is specified in the application configuration.
 */
@ApplicationScoped
public class ValuesGenerator {

    private static final Logger LOG = Logger.getLogger(ValuesGenerator.class);

    private Random random = new Random();

    private List<WeatherStation> stations = List.of(
                        new WeatherStation(1, "Hamburg", 13),
                        new WeatherStation(2, "Snowdonia", 5),
                        new WeatherStation(3, "Boston", 11),
                        new WeatherStation(4, "Tokio", 16),
                        new WeatherStation(5, "Cusco", 12),
                        new WeatherStation(6, "Svalbard", -7),
                        new WeatherStation(7, "Porthsmouth", 11),
                        new WeatherStation(8, "Oslo", 7),
                        new WeatherStation(9, "Marrakesh", 20));

    @Outgoing("temperature-values")                                        (1)
    public Multi<Record<Integer, String>> generate() {
        return Multi.createFrom().ticks().every(Duration.ofMillis(500))    (2)
                .onOverflow().drop()
                .map(tick -> {
                    WeatherStation station = stations.get(random.nextInt(stations.size()));
                    double temperature = BigDecimal.valueOf(random.nextGaussian() * 15 + station.averageTemperature)
                            .setScale(1, RoundingMode.HALF_UP)
                            .doubleValue();

                    LOG.infov("station: {0}, temperature: {1}", station.name, temperature);
                    return Record.of(station.id, Instant.now() + ";" + temperature);
                });
    }

    @Outgoing("weather-stations")                                          (3)
    public Multi<Record<Integer, String>> weatherStations() {
        return Multi.createFrom().items(stations.stream()
            .map(s -> Record.of(
                    s.id,
                    "{ \"id\" : " + s.id +
                    ", \"name\" : \"" + s.name + "\" }"))
        );
    }

    private static class WeatherStation {

        int id;
        String name;
        int averageTemperature;

        public WeatherStation(int id, String name, int averageTemperature) {
            this.id = id;
            this.name = name;
            this.averageTemperature = averageTemperature;
        }
    }
}
1 指示 Reactive Messaging 将返回的 Multi 中的项目调度到 temperature-values
2 该方法返回了一个 Mutiny stream (Multi),每 0.5 秒发出一个随机温度值。
3 指示 Reactive Messaging 将返回的 Multi(气象站的静态列表)中的项目调度到 weather-stations

这两种方法各自返回一个 reactive stream,其项目会分别发送到名为 temperature-valuesweather-stations 的流中。

Topic Configuration

这两个通道使用 Quarkus 配置文件 application.properties 映射到 Kafka 主题。为此,将以下内容添加到文件 producer/src/main/resources/application.properties 中:

# Configure the Kafka broker location
kafka.bootstrap.servers=localhost:9092

mp.messaging.outgoing.temperature-values.connector=smallrye-kafka
mp.messaging.outgoing.temperature-values.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.temperature-values.value.serializer=org.apache.kafka.common.serialization.StringSerializer

mp.messaging.outgoing.weather-stations.connector=smallrye-kafka
mp.messaging.outgoing.weather-stations.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.weather-stations.value.serializer=org.apache.kafka.common.serialization.StringSerializer

这配置了 Kafka 引导服务器、两个主题以及相应的 (de-) 序列化器。有关不同配置选项的更多详细信息,请访问 Kafka 文档中的 Producer configurationConsumer configuration 部分。

Creating the Aggregator Maven Project

在生产者应用程序到位后,就可以实现实际的聚合器应用程序了,它将运行 Kafka Streams 管道。像这样创建另一个项目:

CLI
quarkus create app {create-app-group-id}:{create-app-artifact-id} \
    --no-code
cd {create-app-artifact-id}

要创建一个 Gradle 项目,添加 --gradle--gradle-kotlin-dsl 选项。 有关如何安装和使用 Quarkus CLI 的详细信息,请参见 Quarkus CLI 指南。

Maven
mvn {quarkus-platform-groupid}:quarkus-maven-plugin:{quarkus-version}:create \
    -DprojectGroupId={create-app-group-id} \
    -DprojectArtifactId={create-app-artifact-id} \
    -DnoCode
cd {create-app-artifact-id}

要创建一个 Gradle 项目,添加 -DbuildTool=gradle-DbuildTool=gradle-kotlin-dsl 选项。

适用于 Windows 用户:

  • 如果使用 cmd,(不要使用反斜杠 \ ,并将所有内容放在同一行上)

  • 如果使用 Powershell,将 -D 参数用双引号引起来,例如 "-DprojectArtifactId={create-app-artifact-id}"

这使用用于 Kafka 流的 Quarkus 扩展和用于 Quarkus REST(以前是 RESTEasy Reactive)的 Jackson 支持创建了 aggregator 项目。

如果你已经配置了你的 Quarkus 项目,你可以在项目基本目录中运行以下命令,向你的项目添加 kafka-streams 扩展:

CLI
quarkus extension add {add-extension-extensions}
Maven
./mvnw quarkus:add-extension -Dextensions='{add-extension-extensions}'
Gradle
./gradlew addExtension --extensions='{add-extension-extensions}'

这会将以下内容添加到您的 pom.xml

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-kafka-streams</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-kafka-streams")

The Pipeline Implementation

让我们通过创建一些值对象来表示温度测量值、气象站以及跟踪聚合值来开始实现流处理应用程序。

首先,使用以下内容创建一个 aggregator/src/main/java/org/acme/kafka/streams/aggregator/model/WeatherStation.java 文件,表示一个气象站:

package org.acme.kafka.streams.aggregator.model;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection (1)
public class WeatherStation {

    public int id;
    public String name;
}
1 @RegisterForReflection 注解指示 Quarkus 在本机编译期间保留类及其成员。有关 @RegisterForReflection 注解的更多详细信息,请参见 native application tips 页面。

然后使用以下内容创建一个 aggregator/src/main/java/org/acme/kafka/streams/aggregator/model/TemperatureMeasurement.java 文件,表示给定站点的温度测量值:

package org.acme.kafka.streams.aggregator.model;

import java.time.Instant;

public class TemperatureMeasurement {

    public int stationId;
    public String stationName;
    public Instant timestamp;
    public double value;

    public TemperatureMeasurement(int stationId, String stationName, Instant timestamp,
            double value) {
        this.stationId = stationId;
        this.stationName = stationName;
        this.timestamp = timestamp;
        this.value = value;
    }
}

最后是 aggregator/src/main/java/org/acme/kafka/streams/aggregator/model/Aggregation.java,它用于在流式管道中处理事件时跟踪聚合值:

package org.acme.kafka.streams.aggregator.model;

import java.math.BigDecimal;
import java.math.RoundingMode;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class Aggregation {

    public int stationId;
    public String stationName;
    public double min = Double.MAX_VALUE;
    public double max = Double.MIN_VALUE;
    public int count;
    public double sum;
    public double avg;

    public Aggregation updateFrom(TemperatureMeasurement measurement) {
        stationId = measurement.stationId;
        stationName = measurement.stationName;

        count++;
        sum += measurement.value;
        avg = BigDecimal.valueOf(sum / count)
                .setScale(1, RoundingMode.HALF_UP).doubleValue();

        min = Math.min(min, measurement.value);
        max = Math.max(max, measurement.value);

        return this;
    }
}

接下来,让我们在 aggregator/src/main/java/org/acme/kafka/streams/aggregator/streams/TopologyProducer.java 文件中创建实际的流式查询实现。我们需要做的就是声明一个返还 Kafka 流 Topology;Quarkus 扩展将负责配置、启动和停止实际的 Kafka 流引擎的 CDI 生产程序方法。

package org.acme.kafka.streams.aggregator.streams;

import java.time.Instant;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

import org.acme.kafka.streams.aggregator.model.Aggregation;
import org.acme.kafka.streams.aggregator.model.TemperatureMeasurement;
import org.acme.kafka.streams.aggregator.model.WeatherStation;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;

import io.quarkus.kafka.client.serialization.ObjectMapperSerde;

@ApplicationScoped
public class TopologyProducer {

    static final String WEATHER_STATIONS_STORE = "weather-stations-store";

    private static final String WEATHER_STATIONS_TOPIC = "weather-stations";
    private static final String TEMPERATURE_VALUES_TOPIC = "temperature-values";
    private static final String TEMPERATURES_AGGREGATED_TOPIC = "temperatures-aggregated";

    @Produces
    public Topology buildTopology() {
        StreamsBuilder builder = new StreamsBuilder();

        ObjectMapperSerde<WeatherStation> weatherStationSerde = new ObjectMapperSerde<>(
                WeatherStation.class);
        ObjectMapperSerde<Aggregation> aggregationSerde = new ObjectMapperSerde<>(Aggregation.class);

        KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(
                WEATHER_STATIONS_STORE);

        GlobalKTable<Integer, WeatherStation> stations = builder.globalTable( (1)
                WEATHER_STATIONS_TOPIC,
                Consumed.with(Serdes.Integer(), weatherStationSerde));

        builder.stream(                                                       (2)
                        TEMPERATURE_VALUES_TOPIC,
                        Consumed.with(Serdes.Integer(), Serdes.String())
                )
                .join(                                                        (3)
                        stations,
                        (stationId, timestampAndValue) -> stationId,
                        (timestampAndValue, station) -> {
                            String[] parts = timestampAndValue.split(";");
                            return new TemperatureMeasurement(station.id, station.name,
                                    Instant.parse(parts[0]), Double.valueOf(parts[1]));
                        }
                )
                .groupByKey()                                                 (4)
                .aggregate(                                                   (5)
                        Aggregation::new,
                        (stationId, value, aggregation) -> aggregation.updateFrom(value),
                        Materialized.<Integer, Aggregation> as(storeSupplier)
                            .withKeySerde(Serdes.Integer())
                            .withValueSerde(aggregationSerde)
                )
                .toStream()
                .to(                                                          (6)
                        TEMPERATURES_AGGREGATED_TOPIC,
                        Produced.with(Serdes.Integer(), aggregationSerde)
                );

        return builder.build();
    }
}
1 weather-stations 表读入到 GlobalKTable 中,表示每个天气站的当前状态
2 temperature-values 主题读入到 KStream 中;每当有新消息到达此主题时,都会为此测量处理管道
3 来自 temperature-values 主题的消息使用主题密钥(天气站 ID)与对应的天气站连接;连接结果包含测量和相关天气站消息的数据
4 值按消息密钥(天气站 ID)分组
5 在每个组内,该站的所有测量值都会聚合,通过跟踪最小值和最大值,并计算该站所有测量值的平均值(Aggregation 类型)
6 将管道结果写入 temperatures-aggregated 主题

通过 Quarkus 配置文件 application.properties 配置 Kafka 流扩展。使用以下内容创建文件 aggregator/src/main/resources/application.properties

quarkus.kafka-streams.bootstrap-servers=localhost:9092
quarkus.kafka-streams.application-server=${hostname}:8080
quarkus.kafka-streams.topics=weather-stations,temperature-values

# pass-through options
kafka-streams.cache.max.bytes.buffering=10240
kafka-streams.commit.interval.ms=1000
kafka-streams.metadata.max.age.ms=500
kafka-streams.auto.offset.reset=earliest
kafka-streams.metrics.recording.level=DEBUG

具有 quarkus.kafka-streams 前缀的选项可以在应用程序启动时动态更改,例如,通过环境变量或系统属性。bootstrap-serversapplication-server 分别映射到 Kafka 流属性 bootstrap.serversapplication.servertopics 特定于 Quarkus:应用程序会在启动 Kafka 流引擎之前等待所有给定主题存在。这样做是为了在应用程序启动时优雅地等待尚未存在的主题的创建。

或者,您可以使用 kafka.bootstrap.servers 而不是 quarkus.kafka-streams.bootstrap-servers,如上面 generator 中的项目所做的那样。

一旦您准备好将您的应用程序推广到生产环境中,请考虑更改上述配置值。虽然 cache.max.bytes.buffering=10240 会以更快的速度通过拓扑移动您的记录,但 10485760 的默认值对吞吐量更加友好。同时,将 metadata.max.age.ms500 增加,其会快速更新集群元数据,但会产生大量冗余请求,将值较接近 300000 的默认值。commit.interval.ms1000 正好适合一次处理,但对于采用 30000 默认值的默认至少一次处理,则可能会产生过多的负载。

kafka-streams 命名空间中的所有属性会按原样传递到 Kafka 流引擎。更改其值需要重新构建应用程序。

Building and Running the Applications

我们现在可以构建 produceraggregator 应用程序:

./mvnw clean package -f producer/pom.xml
./mvnw clean package -f aggregator/pom.xml

我们不会使用 Quarkus 开发模式直接在主机机器上运行它们,而是将它们打包到容器映像中,并通过 Docker Compose 启动它们。这样做是为了演示如何将 aggregator 聚合扩展到多个节点。

Quarkus 创建的 Dockerfile 默认需要对 aggregator 应用程序进行一项调整,以便运行 Kafka 流管道。为此,请编辑文件 aggregator/src/main/docker/Dockerfile.jvm 并将 FROM fabric8/java-alpine-openjdk8-jre 行替换为 FROM fabric8/java-centos-openjdk8-jdk

接下来,创建一个 Docker Compose 文件(docker-compose.yaml)来启动这两个应用程序以及 Apache Kafka 和 ZooKeeper,如下所示:

version: '3.5'

services:
  zookeeper:
    image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
    command: [
      "sh", "-c",
      "bin/zookeeper-server-start.sh config/zookeeper.properties"
    ]
    ports:
      - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs
    networks:
      - kafkastreams-network
  kafka:
    image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
    command: [
      "sh", "-c",
      "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT} --override num.partitions=$${KAFKA_NUM_PARTITIONS}"
    ]
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_NUM_PARTITIONS: 3
    networks:
      - kafkastreams-network

  producer:
    image: quarkus-quickstarts/kafka-streams-producer:1.0
    build:
      context: producer
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    environment:
      KAFKA_BOOTSTRAP_SERVERS: kafka:9092
    networks:
      - kafkastreams-network

  aggregator:
    image: quarkus-quickstarts/kafka-streams-aggregator:1.0
    build:
      context: aggregator
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    environment:
      QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS: kafka:9092
    networks:
      - kafkastreams-network

networks:
  kafkastreams-network:
    name: ks

要启动所有容器、构建 produceraggregator 容器映像,请运行 docker-compose up --build

您可以使用 KAFKA_BOOTSTRAP_SERVERS 代替 QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS

您应该能看到 producer 应用的日志记录发送到"temperature-values"主题的消息。

现在运行 debezium/tooling 镜像的实例,附着到所有其他容器运行的相同网络。此镜像提供多个有用工具,如 kafkacathttpie

docker run --tty --rm -i --network ks debezium/tooling:1.1

在工具容器内,运行 kafkacat 检查流式处理管道运行结果:

kafkacat -b kafka:9092 -C -o beginning -q -t temperatures-aggregated

{"avg":34.7,"count":4,"max":49.4,"min":16.8,"stationId":9,"stationName":"Marrakesh","sum":138.8}
{"avg":15.7,"count":1,"max":15.7,"min":15.7,"stationId":2,"stationName":"Snowdonia","sum":15.7}
{"avg":12.8,"count":7,"max":25.5,"min":-13.8,"stationId":7,"stationName":"Porthsmouth","sum":89.7}
...

您应该能看到新值不断涌现,因为生产者持续发送温度测量值,每个传出主题值显示所代表气象站的最小、最大和平均温度值。

Interactive Queries

订阅 temperatures-aggregated 主题是响应任何新温度值的好方法。如果您只对给定气象站的最新聚合值感兴趣,那会有点浪费。这正是 Kafka Streams 交互式查询发挥作用的地方:它们让您直接查询管道的底层状态存储以获取与给定键关联的值。通过公开查询状态存储的简单 REST 端点,无需订阅任何 Kafka 主题即可检索最新聚合结果。

让我们从在文件 aggregator/src/main/java/org/acme/kafka/streams/aggregator/streams/InteractiveQueries.java 中创建一个新类 InteractiveQueries 开始:

KafkaStreamsPipeline 类添加一个新方法,该方法获取给定键的当前状态:

package org.acme.kafka.streams.aggregator.streams;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.acme.kafka.streams.aggregator.model.Aggregation;
import org.acme.kafka.streams.aggregator.model.WeatherStationData;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

@ApplicationScoped
public class InteractiveQueries {

    @Inject
    KafkaStreams streams;

    public GetWeatherStationDataResult getWeatherStationData(int id) {
        Aggregation result = getWeatherStationStore().get(id);

        if (result != null) {
            return GetWeatherStationDataResult.found(WeatherStationData.from(result)); (1)
        }
        else {
            return GetWeatherStationDataResult.notFound();                             (2)
        }
    }

    private ReadOnlyKeyValueStore<Integer, Aggregation> getWeatherStationStore() {
        while (true) {
            try {
                return streams.store(TopologyProducer.WEATHER_STATIONS_STORE, QueryableStoreTypes.keyValueStore());
            } catch (InvalidStateStoreException e) {
                // ignore, store not ready yet
            }
        }
    }
}
1 找到了给定站点 ID 的值,因此将返回该值
2 未找到任何值,可能是因为查询了不存在的站点,或给定站点尚无任何测量值

还可以在文件 aggregator/src/main/java/org/acme/kafka/streams/aggregator/streams/GetWeatherStationDataResult.java 中创建该方法的返回类型:

package org.acme.kafka.streams.aggregator.streams;

import java.util.Optional;
import java.util.OptionalInt;

import org.acme.kafka.streams.aggregator.model.WeatherStationData;

public class GetWeatherStationDataResult {

    private static GetWeatherStationDataResult NOT_FOUND =
            new GetWeatherStationDataResult(null);

    private final WeatherStationData result;

    private GetWeatherStationDataResult(WeatherStationData result) {
        this.result = result;
    }

    public static GetWeatherStationDataResult found(WeatherStationData data) {
        return new GetWeatherStationDataResult(data);
    }

    public static GetWeatherStationDataResult notFound() {
        return NOT_FOUND;
    }

    public Optional<WeatherStationData> getResult() {
        return Optional.ofNullable(result);
    }
}

还创建 aggregator/src/main/java/org/acme/kafka/streams/aggregator/model/WeatherStationData.java,表示某气象站的实际聚合结果:

package org.acme.kafka.streams.aggregator.model;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class WeatherStationData {

    public int stationId;
    public String stationName;
    public double min = Double.MAX_VALUE;
    public double max = Double.MIN_VALUE;
    public int count;
    public double avg;

    private WeatherStationData(int stationId, String stationName, double min, double max,
            int count, double avg) {
        this.stationId = stationId;
        this.stationName = stationName;
        this.min = min;
        this.max = max;
        this.count = count;
        this.avg = avg;
    }

    public static WeatherStationData from(Aggregation aggregation) {
        return new WeatherStationData(
                aggregation.stationId,
                aggregation.stationName,
                aggregation.min,
                aggregation.max,
                aggregation.count,
                aggregation.avg);
    }
}

我们现在可以添加一个简单的 REST 端点 (aggregator/src/main/java/org/acme/kafka/streams/aggregator/rest/WeatherStationEndpoint.java),该端点调用 getWeatherStationData() 并向客户端返回数据:

package org.acme.kafka.streams.aggregator.rest;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.Status;

import org.acme.kafka.streams.aggregator.streams.GetWeatherStationDataResult;
import org.acme.kafka.streams.aggregator.streams.KafkaStreamsPipeline;

@ApplicationScoped
@Path("/weather-stations")
public class WeatherStationEndpoint {

    @Inject
    InteractiveQueries interactiveQueries;

    @GET
    @Path("/data/{id}")
    public Response getWeatherStationData(int id) {
        GetWeatherStationDataResult result = interactiveQueries.getWeatherStationData(id);

        if (result.getResult().isPresent()) {  (1)
            return Response.ok(result.getResult().get()).build();
        }
        else {
            return Response.status(Status.NOT_FOUND.getStatusCode(),
                    "No data found for weather station " + id).build();
        }
    }
}
1 根据是否获取到值,返回该值或 404 响应

有了此代码,是时候在 Docker Compose 中重新构建应用和 aggregator 服务了:

./mvnw clean package -f aggregator/pom.xml
docker-compose stop aggregator
docker-compose up --build -d

这将重新构建 aggregator 容器并重启其服务。完成后,您可以调用该服务的 REST API 以获取现有某个站点温度数据。为此,您可以在之前启动的工具容器中使用 httpie

http aggregator:8080/weather-stations/data/1

HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 85
Content-Type: application/json
Date: Tue, 18 Jun 2019 19:29:16 GMT

{
    "avg": 12.9,
    "count": 146,
    "max": 41.0,
    "min": -25.6,
    "stationId": 1,
    "stationName": "Hamburg"
}

Scaling Out

Kafka Streams 应用的一个非常有趣的特性是它们可以扩展,即负载和状态可以在运行相同管道的多个应用实例中分配。然后,每个节点都将包含聚合结果的一个子集,但 Kafka Streams 为您提供了 an API,以获取给定键位于哪个节点的信息。然后,该应用可以直接从另一个实例获取数据,或者只将客户端指向该另一个节点的位置。

启动 aggregator 应用的多个实例将使整体架构看起来像这样:

kafka streams guide architecture distributed

InteractiveQueries 类必须针对此分布式架构稍作调整:

public GetWeatherStationDataResult getWeatherStationData(int id) {
    StreamsMetadata metadata = streams.metadataForKey(                  (1)
            TopologyProducer.WEATHER_STATIONS_STORE,
            id,
            Serdes.Integer().serializer()
    );

    if (metadata == null || metadata == StreamsMetadata.NOT_AVAILABLE) {
        LOG.warn("Found no metadata for key {}", id);
        return GetWeatherStationDataResult.notFound();
    }
    else if (metadata.host().equals(host)) {                            (2)
        LOG.info("Found data for key {} locally", id);
        Aggregation result = getWeatherStationStore().get(id);

        if (result != null) {
            return GetWeatherStationDataResult.found(WeatherStationData.from(result));
        }
        else {
            return GetWeatherStationDataResult.notFound();
        }
    }
    else {                                                              (3)
        LOG.info(
            "Found data for key {} on remote host {}:{}",
            id,
            metadata.host(),
            metadata.port()
        );
        return GetWeatherStationDataResult.foundRemotely(metadata.host(), metadata.port());
    }
}

public List<PipelineMetadata> getMetaData() {                           (4)
    return streams.allMetadataForStore(TopologyProducer.WEATHER_STATIONS_STORE)
            .stream()
            .map(m -> new PipelineMetadata(
                    m.hostInfo().host() + ":" + m.hostInfo().port(),
                    m.topicPartitions()
                        .stream()
                        .map(TopicPartition::toString)
                        .collect(Collectors.toSet()))
            )
            .collect(Collectors.toList());
}
1 获取给定气象站 ID 的流元数据
2 本地应用程序节点维护给定的键(气象站 ID),也就是说,它可以自行回答查询。
3 给定的键由另一个应用程序节点维护;在这种情况下,将返回该节点的信息(主机和端口)。
4 添加 getMetaData() 方法以向调用方提供应用程序集群中所有节点的列表。

GetWeatherStationDataResult 类型必须相应地进行调整:

package org.acme.kafka.streams.aggregator.streams;

import java.util.Optional;
import java.util.OptionalInt;

import org.acme.kafka.streams.aggregator.model.WeatherStationData;

public class GetWeatherStationDataResult {

    private static GetWeatherStationDataResult NOT_FOUND =
            new GetWeatherStationDataResult(null, null, null);

    private final WeatherStationData result;
    private final String host;
    private final Integer port;

    private GetWeatherStationDataResult(WeatherStationData result, String host,
            Integer port) {
        this.result = result;
        this.host = host;
        this.port = port;
    }

    public static GetWeatherStationDataResult found(WeatherStationData data) {
        return new GetWeatherStationDataResult(data, null, null);
    }

    public static GetWeatherStationDataResult foundRemotely(String host, int port) {
        return new GetWeatherStationDataResult(null, host, port);
    }

    public static GetWeatherStationDataResult notFound() {
        return NOT_FOUND;
    }

    public Optional<WeatherStationData> getResult() {
        return Optional.ofNullable(result);
    }

    public Optional<String> getHost() {
        return Optional.ofNullable(host);
    }

    public OptionalInt getPort() {
        return port != null ? OptionalInt.of(port) : OptionalInt.empty();
    }
}

此外,必须定义 getMetaData() 的返回类型(aggregator/src/main/java/org/acme/kafka/streams/aggregator/streams/PipelineMetadata.java):

package org.acme.kafka.streams.aggregator.streams;

import java.util.Set;

public class PipelineMetadata {

    public String host;
    public Set<String> partitions;

    public PipelineMetadata(String host, Set<String> partitions) {
        this.host = host;
        this.partitions = partitions;
    }
}

最后,必须更新 REST 端点类:

package org.acme.kafka.streams.aggregator.rest;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.Status;

import org.acme.kafka.streams.aggregator.streams.GetWeatherStationDataResult;
import org.acme.kafka.streams.aggregator.streams.KafkaStreamsPipeline;
import org.acme.kafka.streams.aggregator.streams.PipelineMetadata;

@ApplicationScoped
@Path("/weather-stations")
public class WeatherStationEndpoint {

    @Inject
    InteractiveQueries interactiveQueries;

    @GET
    @Path("/data/{id}")
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    public Response getWeatherStationData(int id) {
        GetWeatherStationDataResult result = interactiveQueries.getWeatherStationData(id);

        if (result.getResult().isPresent()) {                     (1)
            return Response.ok(result.getResult().get()).build();
        }
        else if (result.getHost().isPresent()) {                  (2)
            URI otherUri = getOtherUri(result.getHost().get(), result.getPort().getAsInt(),
                    id);
            return Response.seeOther(otherUri).build();
        }
        else {                                                    (3)
            return Response.status(Status.NOT_FOUND.getStatusCode(),
                    "No data found for weather station " + id).build();
        }
    }

    @GET
    @Path("/meta-data")
    @Produces(MediaType.APPLICATION_JSON)
    public List<PipelineMetadata> getMetaData() {                 (4)
        return interactiveQueries.getMetaData();
    }

    private URI getOtherUri(String host, int port, int id) {
        try {
            return new URI("http://" + host + ":" + port + "/weather-stations/data/" + id);
        }
        catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }
    }
}
1 在本地找到数据,因此返回数据。
2 如果给定键的数据存储在其他节点之一上,则数据由另一个节点维护,因此请使用重定向(HTTP 状态代码 303)进行回复。
3 未找到给定气象站 ID 的数据。
4 公开有关构成应用程序集群的所有主机的的信息。

现在重新停止 aggregator 服务并重新编译它。然后让我们启动三个实例:

./mvnw clean package -f aggregator/pom.xml
docker-compose stop aggregator
docker-compose up --build -d --scale aggregator=3

在三个实例中的任何一个上调用 REST API 时,请求的气象站 id 聚合可能存储在接收查询的节点上,或者可能存储在另外两个节点中的一个上。

由于 Docker Compose 的负载平衡器将以循环方式将请求分发给 aggregator 服务,因此我们将直接调用实际节点。该应用程序通过 REST 公开有关所有主机名的信息:

http aggregator:8080/weather-stations/meta-data

HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 202
Content-Type: application/json
Date: Tue, 18 Jun 2019 20:00:23 GMT

[
    {
        "host": "2af13fe516a9:8080",
        "partitions": [
            "temperature-values-2"
        ]
    },
    {
        "host": "32cc8309611b:8080",
        "partitions": [
            "temperature-values-1"
        ]
    },
    {
        "host": "1eb39af8d587:8080",
        "partitions": [
            "temperature-values-0"
        ]
    }
]

从响应中显示的三个主机之一中检索数据(您的实际主机名将有所不同):

http 2af13fe516a9:8080/weather-stations/data/1

如果该节点持有键“1”的数据,您将得到如下的响应:

HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 74
Content-Type: application/json
Date: Tue, 11 Jun 2019 19:16:31 GMT

{
  "avg": 11.9,
  "count": 259,
  "max": 50.0,
  "min": -30.1,
  "stationId": 1,
  "stationName": "Hamburg"
}

否则,该服务将发送重定向:

HTTP/1.1 303 See Other
Connection: keep-alive
Content-Length: 0
Date: Tue, 18 Jun 2019 20:01:03 GMT
Location: http://1eb39af8d587:8080/weather-stations/data/1

通过传递 --follow option ,您还可以让 httpie 自动遵循重定向:

http --follow 2af13fe516a9:8080/weather-stations/data/1

Running Natively

适用于 Kafka Streams 的 Quarkus 扩展通过 GraalVM 以本机方式启用流处理应用程序的执行,而无需进一步配置。

若要在本机模式下运行 produceraggregator 应用程序,可以使用 -Dnative 执行 Maven 构建:

./mvnw clean package -f producer/pom.xml -Dnative -Dnative-image.container-runtime=docker
./mvnw clean package -f aggregator/pom.xml -Dnative -Dnative-image.container-runtime=docker

现在创建一个名为 QUARKUS_MODE 的环境变量,并将值设置为“native”:

export QUARKUS_MODE=native

Docker Compose 文件使用此变量在构建 produceraggregator 映像时使用正确的 Dockerfile。Kafka Streams 应用程序在 native 模式下可以使用少于 50 MB 的 RSS。为此,将 Xmx 选项添加到 aggregator/src/main/docker/Dockerfile.native 中的程序调用中:

CMD ["./application", "-Dquarkus.http.host=0.0.0.0", "-Xmx32m"]

现在按照上述说明启动 Docker Compose(别忘了重新构建容器映像)。

Kafka Streams Health Checks

如果你使用 quarkus-smallrye-health 扩展,quarkus-kafka-streams 将自动添加:

  • 一个就绪性运行状况检查,以验证 quarkus.kafka-streams.topics 属性中声明的所有主题已创建,

  • 一个基于 Kafka Streams 状态的正常运行状况检查。

因此,当你访问应用程序的 /q/health 端点时,你将获得有关 Kafka Streams 状态以及可用和/或缺少主题的信息。

以下是在状态为 DOWN 时的一个示例:

curl -i http://aggregator:8080/q/health

HTTP/1.1 503 Service Unavailable
content-type: application/json; charset=UTF-8
content-length: 454

{
    "status": "DOWN",
    "checks": [
        {
            "name": "Kafka Streams state health check",  1
            "status": "DOWN",
            "data": {
                "state": "CREATED"
            }
        },
        {
            "name": "Kafka Streams topics health check",  2
            "status": "DOWN",
            "data": {
                "available_topics": "weather-stations,temperature-values",
                "missing_topics": "hygrometry-values"
            }
        }
    ]
}
1 正常运行状况检查。也可以在 /q/health/live 端点获得。
2 就绪性运行状况检查。也可以在 /q/health/ready 端点获得。

因此,正如你所见,只要缺少了其中一个 quarkus.kafka-streams.topics,或者 Kafka Streams state 不是 RUNNING,状态就是 DOWN

如果没有可用主题,available_topics 键将不会出现在 Kafka Streams topics health checkdata 字段中。同样,如果没有缺少主题,missing_topics 键将不会出现在 Kafka Streams topics health checkdata 字段中。

你当然可以通过在 application.properties 中将 quarkus.kafka-streams.health.enabled 属性设置为 false 来禁用 quarkus-kafka-streams 扩展的运行状况检查。

显然,你可以根据各自的端点 /q/health/live/q/health/ready 来创建你的正常运行状况和就绪性探测。

Liveness health check

以下是用例运行状况检查示例:

curl -i http://aggregator:8080/q/health/live

HTTP/1.1 503 Service Unavailable
content-type: application/json; charset=UTF-8
content-length: 225

{
    "status": "DOWN",
    "checks": [
        {
            "name": "Kafka Streams state health check",
            "status": "DOWN",
            "data": {
                "state": "CREATED"
            }
        }
    ]
}

state 来自 KafkaStreams.State 枚举。

Readiness health check

以下是用例就绪性检查示例:

curl -i http://aggregator:8080/q/health/ready

HTTP/1.1 503 Service Unavailable
content-type: application/json; charset=UTF-8
content-length: 265

{
    "status": "DOWN",
    "checks": [
        {
            "name": "Kafka Streams topics health check",
            "status": "DOWN",
            "data": {
                "missing_topics": "weather-stations,temperature-values"
            }
        }
    ]
}

Going Further

本指南展示了如何使用 Quarkus 和 Kafka Streams API 在 JVM 和 native 模式下构建流处理应用程序。为了在生产环境中运行你的 KStreams 应用程序,你还可以为数据管道添加运行状况检查和指标。请参考 Quarkus 指南中的 MicrometerSmallRye MetricsSmallRye Health,了解更多信息。

Configuration Reference

Unresolved include directive in modules/ROOT/pages/kafka-streams.adoc - include::../../../target/quarkus-generated-doc/config/quarkus-kafka-streams.adoc[]