Using Apache Kafka Streams
此指南演示了您的 Quarkus 应用程序如何利用 Apache Kafka Streams API 来基于 Apache Kafka 实现流处理应用程序。
Prerequisites
如要完成本指南,您需要:
-
Roughly 15 minutes
-
An IDE
-
安装了 JDK 17+,已正确配置
JAVA_HOME
-
Apache Maven ${proposed-maven-version}
-
如果你想使用 Quarkus CLI, 则可以选择使用
-
如果你想构建一个本机可执行文件(或如果你使用本机容器构建,则使用 Docker),则可以选择安装 Mandrel 或 GraalVM 以及 configured appropriately
建议您事先阅读 Kafka quickstart 。
用于 Kafka Streams 的 Quarkus 扩展允许在开发期间通过支持 Quarkus Dev 模式(例如,通过
还在您的 Quarkus
这些设置组合在一起将确保应用程序在 dev 模式下重新启动后可以非常快速地重新连接到代理。 |
Architecture
在本指南中,我们将在一个组件(名为 generator
)中生成(随机)温度值。这些值与给定的气象站相关联,并写入 Kafka 主题(temperature-values
)。另一个主题(weather-stations
)仅包含天气台本身的主要数据(id 和名称)。
第二个组件(aggregator
)从两个 Kafka 主题中读取信息,并在流管道中处理它们:
-
这两个主题根据气象台 id 进行关联
-
根据气象台 id 确定最低值、最高值和平均温度
-
这些聚合数据被写入第三个主题(
temperatures-aggregated
)中
可以通过检查输出主题来检查数据。通过公开一个 Kafka Streams interactive query,也可以通过一个简单的 REST 查询来获得每个气象站的最新结果。
整体架构如下所示:
Solution
我们建议您遵循接下来的部分中的说明,按部就班地创建应用程序。然而,您可以直接跳到完成的示例。
克隆 Git 存储库: git clone $${quickstarts-base-url}.git
,或下载 $${quickstarts-base-url}/archive/main.zip[存档]。
解决方案位于 kafka-streams-quickstart
directory。
Creating the Producer Maven Project
首先,我们需要一个带有温度值生产者的新项目。使用以下命令创建一个新项目:
quarkus create app {create-app-group-id}:{create-app-artifact-id} \
--no-code
cd {create-app-artifact-id}
要创建一个 Gradle 项目,添加 --gradle
或 --gradle-kotlin-dsl
选项。
有关如何安装和使用 Quarkus CLI 的详细信息,请参见 Quarkus CLI 指南。
mvn {quarkus-platform-groupid}:quarkus-maven-plugin:{quarkus-version}:create \
-DprojectGroupId={create-app-group-id} \
-DprojectArtifactId={create-app-artifact-id} \
-DnoCode
cd {create-app-artifact-id}
要创建一个 Gradle 项目,添加 -DbuildTool=gradle
或 -DbuildTool=gradle-kotlin-dsl
选项。
适用于 Windows 用户:
-
如果使用 cmd,(不要使用反斜杠
\
,并将所有内容放在同一行上) -
如果使用 Powershell,将
-D
参数用双引号引起来,例如"-DprojectArtifactId={create-app-artifact-id}"
此命令生成一个 Maven 项目,导入 Reactive Messaging 和 Kafka 连接器扩展。
如果你已经配置了你的 Quarkus 项目,你可以在项目基本目录中运行以下命令,向你的项目添加 messaging-kafka
扩展:
quarkus extension add {add-extension-extensions}
./mvnw quarkus:add-extension -Dextensions='{add-extension-extensions}'
./gradlew addExtension --extensions='{add-extension-extensions}'
这会将以下内容添加到构建文件中:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-messaging-kafka</artifactId>
</dependency>
implementation("io.quarkus:quarkus-messaging-kafka")
The Temperature Value Producer
创建一个 producer/src/main/java/org/acme/kafka/streams/producer/generator/ValuesGenerator.java
文件,内容如下:
package org.acme.kafka.streams.producer.generator;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import jakarta.enterprise.context.ApplicationScoped;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.kafka.Record;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.logging.Logger;
/**
* A bean producing random temperature data every second.
* The values are written to a Kafka topic (temperature-values).
* Another topic contains the name of weather stations (weather-stations).
* The Kafka configuration is specified in the application configuration.
*/
@ApplicationScoped
public class ValuesGenerator {
private static final Logger LOG = Logger.getLogger(ValuesGenerator.class);
private Random random = new Random();
private List<WeatherStation> stations = List.of(
new WeatherStation(1, "Hamburg", 13),
new WeatherStation(2, "Snowdonia", 5),
new WeatherStation(3, "Boston", 11),
new WeatherStation(4, "Tokio", 16),
new WeatherStation(5, "Cusco", 12),
new WeatherStation(6, "Svalbard", -7),
new WeatherStation(7, "Porthsmouth", 11),
new WeatherStation(8, "Oslo", 7),
new WeatherStation(9, "Marrakesh", 20));
@Outgoing("temperature-values") (1)
public Multi<Record<Integer, String>> generate() {
return Multi.createFrom().ticks().every(Duration.ofMillis(500)) (2)
.onOverflow().drop()
.map(tick -> {
WeatherStation station = stations.get(random.nextInt(stations.size()));
double temperature = BigDecimal.valueOf(random.nextGaussian() * 15 + station.averageTemperature)
.setScale(1, RoundingMode.HALF_UP)
.doubleValue();
LOG.infov("station: {0}, temperature: {1}", station.name, temperature);
return Record.of(station.id, Instant.now() + ";" + temperature);
});
}
@Outgoing("weather-stations") (3)
public Multi<Record<Integer, String>> weatherStations() {
return Multi.createFrom().items(stations.stream()
.map(s -> Record.of(
s.id,
"{ \"id\" : " + s.id +
", \"name\" : \"" + s.name + "\" }"))
);
}
private static class WeatherStation {
int id;
String name;
int averageTemperature;
public WeatherStation(int id, String name, int averageTemperature) {
this.id = id;
this.name = name;
this.averageTemperature = averageTemperature;
}
}
}
1 | 指示 Reactive Messaging 将返回的 Multi 中的项目调度到 temperature-values 。 |
2 | 该方法返回了一个 Mutiny stream (Multi ),每 0.5 秒发出一个随机温度值。 |
3 | 指示 Reactive Messaging 将返回的 Multi (气象站的静态列表)中的项目调度到 weather-stations 。 |
这两种方法各自返回一个 reactive stream,其项目会分别发送到名为 temperature-values
和 weather-stations
的流中。
Topic Configuration
这两个通道使用 Quarkus 配置文件 application.properties
映射到 Kafka 主题。为此,将以下内容添加到文件 producer/src/main/resources/application.properties
中:
# Configure the Kafka broker location
kafka.bootstrap.servers=localhost:9092
mp.messaging.outgoing.temperature-values.connector=smallrye-kafka
mp.messaging.outgoing.temperature-values.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.temperature-values.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.weather-stations.connector=smallrye-kafka
mp.messaging.outgoing.weather-stations.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.weather-stations.value.serializer=org.apache.kafka.common.serialization.StringSerializer
这配置了 Kafka 引导服务器、两个主题以及相应的 (de-) 序列化器。有关不同配置选项的更多详细信息,请访问 Kafka 文档中的 Producer configuration 和 Consumer configuration 部分。
Creating the Aggregator Maven Project
在生产者应用程序到位后,就可以实现实际的聚合器应用程序了,它将运行 Kafka Streams 管道。像这样创建另一个项目:
quarkus create app {create-app-group-id}:{create-app-artifact-id} \
--no-code
cd {create-app-artifact-id}
要创建一个 Gradle 项目,添加 --gradle
或 --gradle-kotlin-dsl
选项。
有关如何安装和使用 Quarkus CLI 的详细信息,请参见 Quarkus CLI 指南。
mvn {quarkus-platform-groupid}:quarkus-maven-plugin:{quarkus-version}:create \
-DprojectGroupId={create-app-group-id} \
-DprojectArtifactId={create-app-artifact-id} \
-DnoCode
cd {create-app-artifact-id}
要创建一个 Gradle 项目,添加 -DbuildTool=gradle
或 -DbuildTool=gradle-kotlin-dsl
选项。
适用于 Windows 用户:
-
如果使用 cmd,(不要使用反斜杠
\
,并将所有内容放在同一行上) -
如果使用 Powershell,将
-D
参数用双引号引起来,例如"-DprojectArtifactId={create-app-artifact-id}"
这使用用于 Kafka 流的 Quarkus 扩展和用于 Quarkus REST(以前是 RESTEasy Reactive)的 Jackson 支持创建了 aggregator
项目。
如果你已经配置了你的 Quarkus 项目,你可以在项目基本目录中运行以下命令,向你的项目添加 kafka-streams
扩展:
quarkus extension add {add-extension-extensions}
./mvnw quarkus:add-extension -Dextensions='{add-extension-extensions}'
./gradlew addExtension --extensions='{add-extension-extensions}'
这会将以下内容添加到您的 pom.xml
:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-streams</artifactId>
</dependency>
implementation("io.quarkus:quarkus-kafka-streams")
The Pipeline Implementation
让我们通过创建一些值对象来表示温度测量值、气象站以及跟踪聚合值来开始实现流处理应用程序。
首先,使用以下内容创建一个 aggregator/src/main/java/org/acme/kafka/streams/aggregator/model/WeatherStation.java
文件,表示一个气象站:
package org.acme.kafka.streams.aggregator.model;
import io.quarkus.runtime.annotations.RegisterForReflection;
@RegisterForReflection (1)
public class WeatherStation {
public int id;
public String name;
}
1 | @RegisterForReflection 注解指示 Quarkus 在本机编译期间保留类及其成员。有关 @RegisterForReflection 注解的更多详细信息,请参见 native application tips 页面。 |
然后使用以下内容创建一个 aggregator/src/main/java/org/acme/kafka/streams/aggregator/model/TemperatureMeasurement.java
文件,表示给定站点的温度测量值:
package org.acme.kafka.streams.aggregator.model;
import java.time.Instant;
public class TemperatureMeasurement {
public int stationId;
public String stationName;
public Instant timestamp;
public double value;
public TemperatureMeasurement(int stationId, String stationName, Instant timestamp,
double value) {
this.stationId = stationId;
this.stationName = stationName;
this.timestamp = timestamp;
this.value = value;
}
}
最后是 aggregator/src/main/java/org/acme/kafka/streams/aggregator/model/Aggregation.java
,它用于在流式管道中处理事件时跟踪聚合值:
package org.acme.kafka.streams.aggregator.model;
import java.math.BigDecimal;
import java.math.RoundingMode;
import io.quarkus.runtime.annotations.RegisterForReflection;
@RegisterForReflection
public class Aggregation {
public int stationId;
public String stationName;
public double min = Double.MAX_VALUE;
public double max = Double.MIN_VALUE;
public int count;
public double sum;
public double avg;
public Aggregation updateFrom(TemperatureMeasurement measurement) {
stationId = measurement.stationId;
stationName = measurement.stationName;
count++;
sum += measurement.value;
avg = BigDecimal.valueOf(sum / count)
.setScale(1, RoundingMode.HALF_UP).doubleValue();
min = Math.min(min, measurement.value);
max = Math.max(max, measurement.value);
return this;
}
}
接下来,让我们在 aggregator/src/main/java/org/acme/kafka/streams/aggregator/streams/TopologyProducer.java
文件中创建实际的流式查询实现。我们需要做的就是声明一个返还 Kafka 流 Topology
;Quarkus 扩展将负责配置、启动和停止实际的 Kafka 流引擎的 CDI 生产程序方法。
package org.acme.kafka.streams.aggregator.streams;
import java.time.Instant;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import org.acme.kafka.streams.aggregator.model.Aggregation;
import org.acme.kafka.streams.aggregator.model.TemperatureMeasurement;
import org.acme.kafka.streams.aggregator.model.WeatherStation;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import io.quarkus.kafka.client.serialization.ObjectMapperSerde;
@ApplicationScoped
public class TopologyProducer {
static final String WEATHER_STATIONS_STORE = "weather-stations-store";
private static final String WEATHER_STATIONS_TOPIC = "weather-stations";
private static final String TEMPERATURE_VALUES_TOPIC = "temperature-values";
private static final String TEMPERATURES_AGGREGATED_TOPIC = "temperatures-aggregated";
@Produces
public Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();
ObjectMapperSerde<WeatherStation> weatherStationSerde = new ObjectMapperSerde<>(
WeatherStation.class);
ObjectMapperSerde<Aggregation> aggregationSerde = new ObjectMapperSerde<>(Aggregation.class);
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(
WEATHER_STATIONS_STORE);
GlobalKTable<Integer, WeatherStation> stations = builder.globalTable( (1)
WEATHER_STATIONS_TOPIC,
Consumed.with(Serdes.Integer(), weatherStationSerde));
builder.stream( (2)
TEMPERATURE_VALUES_TOPIC,
Consumed.with(Serdes.Integer(), Serdes.String())
)
.join( (3)
stations,
(stationId, timestampAndValue) -> stationId,
(timestampAndValue, station) -> {
String[] parts = timestampAndValue.split(";");
return new TemperatureMeasurement(station.id, station.name,
Instant.parse(parts[0]), Double.valueOf(parts[1]));
}
)
.groupByKey() (4)
.aggregate( (5)
Aggregation::new,
(stationId, value, aggregation) -> aggregation.updateFrom(value),
Materialized.<Integer, Aggregation> as(storeSupplier)
.withKeySerde(Serdes.Integer())
.withValueSerde(aggregationSerde)
)
.toStream()
.to( (6)
TEMPERATURES_AGGREGATED_TOPIC,
Produced.with(Serdes.Integer(), aggregationSerde)
);
return builder.build();
}
}
1 | weather-stations 表读入到 GlobalKTable 中,表示每个天气站的当前状态 |
2 | temperature-values 主题读入到 KStream 中;每当有新消息到达此主题时,都会为此测量处理管道 |
3 | 来自 temperature-values 主题的消息使用主题密钥(天气站 ID)与对应的天气站连接;连接结果包含测量和相关天气站消息的数据 |
4 | 值按消息密钥(天气站 ID)分组 |
5 | 在每个组内,该站的所有测量值都会聚合,通过跟踪最小值和最大值,并计算该站所有测量值的平均值(Aggregation 类型) |
6 | 将管道结果写入 temperatures-aggregated 主题 |
通过 Quarkus 配置文件 application.properties
配置 Kafka 流扩展。使用以下内容创建文件 aggregator/src/main/resources/application.properties
:
quarkus.kafka-streams.bootstrap-servers=localhost:9092
quarkus.kafka-streams.application-server=${hostname}:8080
quarkus.kafka-streams.topics=weather-stations,temperature-values
# pass-through options
kafka-streams.cache.max.bytes.buffering=10240
kafka-streams.commit.interval.ms=1000
kafka-streams.metadata.max.age.ms=500
kafka-streams.auto.offset.reset=earliest
kafka-streams.metrics.recording.level=DEBUG
具有 quarkus.kafka-streams
前缀的选项可以在应用程序启动时动态更改,例如,通过环境变量或系统属性。bootstrap-servers
和 application-server
分别映射到 Kafka 流属性 bootstrap.servers
和 application.server
。topics
特定于 Quarkus:应用程序会在启动 Kafka 流引擎之前等待所有给定主题存在。这样做是为了在应用程序启动时优雅地等待尚未存在的主题的创建。
或者,您可以使用 |
一旦您准备好将您的应用程序推广到生产环境中,请考虑更改上述配置值。虽然 |
kafka-streams
命名空间中的所有属性会按原样传递到 Kafka 流引擎。更改其值需要重新构建应用程序。
Building and Running the Applications
我们现在可以构建 producer
和 aggregator
应用程序:
./mvnw clean package -f producer/pom.xml
./mvnw clean package -f aggregator/pom.xml
我们不会使用 Quarkus 开发模式直接在主机机器上运行它们,而是将它们打包到容器映像中,并通过 Docker Compose 启动它们。这样做是为了演示如何将 aggregator
聚合扩展到多个节点。
Quarkus 创建的 Dockerfile
默认需要对 aggregator
应用程序进行一项调整,以便运行 Kafka 流管道。为此,请编辑文件 aggregator/src/main/docker/Dockerfile.jvm
并将 FROM fabric8/java-alpine-openjdk8-jre
行替换为 FROM fabric8/java-centos-openjdk8-jdk
。
接下来,创建一个 Docker Compose 文件(docker-compose.yaml
)来启动这两个应用程序以及 Apache Kafka 和 ZooKeeper,如下所示:
version: '3.5'
services:
zookeeper:
image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
command: [
"sh", "-c",
"bin/zookeeper-server-start.sh config/zookeeper.properties"
]
ports:
- "2181:2181"
environment:
LOG_DIR: /tmp/logs
networks:
- kafkastreams-network
kafka:
image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
command: [
"sh", "-c",
"bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT} --override num.partitions=$${KAFKA_NUM_PARTITIONS}"
]
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
LOG_DIR: "/tmp/logs"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_NUM_PARTITIONS: 3
networks:
- kafkastreams-network
producer:
image: quarkus-quickstarts/kafka-streams-producer:1.0
build:
context: producer
dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
networks:
- kafkastreams-network
aggregator:
image: quarkus-quickstarts/kafka-streams-aggregator:1.0
build:
context: aggregator
dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
environment:
QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS: kafka:9092
networks:
- kafkastreams-network
networks:
kafkastreams-network:
name: ks
要启动所有容器、构建 producer
和 aggregator
容器映像,请运行 docker-compose up --build
。
您可以使用 |
您应该能看到 producer
应用的日志记录发送到"temperature-values"主题的消息。
现在运行 debezium/tooling 镜像的实例,附着到所有其他容器运行的相同网络。此镜像提供多个有用工具,如 kafkacat 和 httpie:
docker run --tty --rm -i --network ks debezium/tooling:1.1
在工具容器内,运行 kafkacat 检查流式处理管道运行结果:
kafkacat -b kafka:9092 -C -o beginning -q -t temperatures-aggregated
{"avg":34.7,"count":4,"max":49.4,"min":16.8,"stationId":9,"stationName":"Marrakesh","sum":138.8}
{"avg":15.7,"count":1,"max":15.7,"min":15.7,"stationId":2,"stationName":"Snowdonia","sum":15.7}
{"avg":12.8,"count":7,"max":25.5,"min":-13.8,"stationId":7,"stationName":"Porthsmouth","sum":89.7}
...
您应该能看到新值不断涌现,因为生产者持续发送温度测量值,每个传出主题值显示所代表气象站的最小、最大和平均温度值。
Interactive Queries
订阅 temperatures-aggregated
主题是响应任何新温度值的好方法。如果您只对给定气象站的最新聚合值感兴趣,那会有点浪费。这正是 Kafka Streams 交互式查询发挥作用的地方:它们让您直接查询管道的底层状态存储以获取与给定键关联的值。通过公开查询状态存储的简单 REST 端点,无需订阅任何 Kafka 主题即可检索最新聚合结果。
让我们从在文件 aggregator/src/main/java/org/acme/kafka/streams/aggregator/streams/InteractiveQueries.java
中创建一个新类 InteractiveQueries
开始:
向 KafkaStreamsPipeline
类添加一个新方法,该方法获取给定键的当前状态:
package org.acme.kafka.streams.aggregator.streams;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.acme.kafka.streams.aggregator.model.Aggregation;
import org.acme.kafka.streams.aggregator.model.WeatherStationData;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
@ApplicationScoped
public class InteractiveQueries {
@Inject
KafkaStreams streams;
public GetWeatherStationDataResult getWeatherStationData(int id) {
Aggregation result = getWeatherStationStore().get(id);
if (result != null) {
return GetWeatherStationDataResult.found(WeatherStationData.from(result)); (1)
}
else {
return GetWeatherStationDataResult.notFound(); (2)
}
}
private ReadOnlyKeyValueStore<Integer, Aggregation> getWeatherStationStore() {
while (true) {
try {
return streams.store(TopologyProducer.WEATHER_STATIONS_STORE, QueryableStoreTypes.keyValueStore());
} catch (InvalidStateStoreException e) {
// ignore, store not ready yet
}
}
}
}
1 | 找到了给定站点 ID 的值,因此将返回该值 |
2 | 未找到任何值,可能是因为查询了不存在的站点,或给定站点尚无任何测量值 |
还可以在文件 aggregator/src/main/java/org/acme/kafka/streams/aggregator/streams/GetWeatherStationDataResult.java
中创建该方法的返回类型:
package org.acme.kafka.streams.aggregator.streams;
import java.util.Optional;
import java.util.OptionalInt;
import org.acme.kafka.streams.aggregator.model.WeatherStationData;
public class GetWeatherStationDataResult {
private static GetWeatherStationDataResult NOT_FOUND =
new GetWeatherStationDataResult(null);
private final WeatherStationData result;
private GetWeatherStationDataResult(WeatherStationData result) {
this.result = result;
}
public static GetWeatherStationDataResult found(WeatherStationData data) {
return new GetWeatherStationDataResult(data);
}
public static GetWeatherStationDataResult notFound() {
return NOT_FOUND;
}
public Optional<WeatherStationData> getResult() {
return Optional.ofNullable(result);
}
}
还创建 aggregator/src/main/java/org/acme/kafka/streams/aggregator/model/WeatherStationData.java
,表示某气象站的实际聚合结果:
package org.acme.kafka.streams.aggregator.model;
import io.quarkus.runtime.annotations.RegisterForReflection;
@RegisterForReflection
public class WeatherStationData {
public int stationId;
public String stationName;
public double min = Double.MAX_VALUE;
public double max = Double.MIN_VALUE;
public int count;
public double avg;
private WeatherStationData(int stationId, String stationName, double min, double max,
int count, double avg) {
this.stationId = stationId;
this.stationName = stationName;
this.min = min;
this.max = max;
this.count = count;
this.avg = avg;
}
public static WeatherStationData from(Aggregation aggregation) {
return new WeatherStationData(
aggregation.stationId,
aggregation.stationName,
aggregation.min,
aggregation.max,
aggregation.count,
aggregation.avg);
}
}
我们现在可以添加一个简单的 REST 端点 (aggregator/src/main/java/org/acme/kafka/streams/aggregator/rest/WeatherStationEndpoint.java
),该端点调用 getWeatherStationData()
并向客户端返回数据:
package org.acme.kafka.streams.aggregator.rest;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.Status;
import org.acme.kafka.streams.aggregator.streams.GetWeatherStationDataResult;
import org.acme.kafka.streams.aggregator.streams.KafkaStreamsPipeline;
@ApplicationScoped
@Path("/weather-stations")
public class WeatherStationEndpoint {
@Inject
InteractiveQueries interactiveQueries;
@GET
@Path("/data/{id}")
public Response getWeatherStationData(int id) {
GetWeatherStationDataResult result = interactiveQueries.getWeatherStationData(id);
if (result.getResult().isPresent()) { (1)
return Response.ok(result.getResult().get()).build();
}
else {
return Response.status(Status.NOT_FOUND.getStatusCode(),
"No data found for weather station " + id).build();
}
}
}
1 | 根据是否获取到值,返回该值或 404 响应 |
有了此代码,是时候在 Docker Compose 中重新构建应用和 aggregator
服务了:
./mvnw clean package -f aggregator/pom.xml
docker-compose stop aggregator
docker-compose up --build -d
这将重新构建 aggregator
容器并重启其服务。完成后,您可以调用该服务的 REST API 以获取现有某个站点温度数据。为此,您可以在之前启动的工具容器中使用 httpie
:
http aggregator:8080/weather-stations/data/1
HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 85
Content-Type: application/json
Date: Tue, 18 Jun 2019 19:29:16 GMT
{
"avg": 12.9,
"count": 146,
"max": 41.0,
"min": -25.6,
"stationId": 1,
"stationName": "Hamburg"
}
Scaling Out
Kafka Streams 应用的一个非常有趣的特性是它们可以扩展,即负载和状态可以在运行相同管道的多个应用实例中分配。然后,每个节点都将包含聚合结果的一个子集,但 Kafka Streams 为您提供了 an API,以获取给定键位于哪个节点的信息。然后,该应用可以直接从另一个实例获取数据,或者只将客户端指向该另一个节点的位置。
启动 aggregator
应用的多个实例将使整体架构看起来像这样:
InteractiveQueries
类必须针对此分布式架构稍作调整:
public GetWeatherStationDataResult getWeatherStationData(int id) {
StreamsMetadata metadata = streams.metadataForKey( (1)
TopologyProducer.WEATHER_STATIONS_STORE,
id,
Serdes.Integer().serializer()
);
if (metadata == null || metadata == StreamsMetadata.NOT_AVAILABLE) {
LOG.warn("Found no metadata for key {}", id);
return GetWeatherStationDataResult.notFound();
}
else if (metadata.host().equals(host)) { (2)
LOG.info("Found data for key {} locally", id);
Aggregation result = getWeatherStationStore().get(id);
if (result != null) {
return GetWeatherStationDataResult.found(WeatherStationData.from(result));
}
else {
return GetWeatherStationDataResult.notFound();
}
}
else { (3)
LOG.info(
"Found data for key {} on remote host {}:{}",
id,
metadata.host(),
metadata.port()
);
return GetWeatherStationDataResult.foundRemotely(metadata.host(), metadata.port());
}
}
public List<PipelineMetadata> getMetaData() { (4)
return streams.allMetadataForStore(TopologyProducer.WEATHER_STATIONS_STORE)
.stream()
.map(m -> new PipelineMetadata(
m.hostInfo().host() + ":" + m.hostInfo().port(),
m.topicPartitions()
.stream()
.map(TopicPartition::toString)
.collect(Collectors.toSet()))
)
.collect(Collectors.toList());
}
1 | 获取给定气象站 ID 的流元数据 |
2 | 本地应用程序节点维护给定的键(气象站 ID),也就是说,它可以自行回答查询。 |
3 | 给定的键由另一个应用程序节点维护;在这种情况下,将返回该节点的信息(主机和端口)。 |
4 | 添加 getMetaData() 方法以向调用方提供应用程序集群中所有节点的列表。 |
GetWeatherStationDataResult
类型必须相应地进行调整:
package org.acme.kafka.streams.aggregator.streams;
import java.util.Optional;
import java.util.OptionalInt;
import org.acme.kafka.streams.aggregator.model.WeatherStationData;
public class GetWeatherStationDataResult {
private static GetWeatherStationDataResult NOT_FOUND =
new GetWeatherStationDataResult(null, null, null);
private final WeatherStationData result;
private final String host;
private final Integer port;
private GetWeatherStationDataResult(WeatherStationData result, String host,
Integer port) {
this.result = result;
this.host = host;
this.port = port;
}
public static GetWeatherStationDataResult found(WeatherStationData data) {
return new GetWeatherStationDataResult(data, null, null);
}
public static GetWeatherStationDataResult foundRemotely(String host, int port) {
return new GetWeatherStationDataResult(null, host, port);
}
public static GetWeatherStationDataResult notFound() {
return NOT_FOUND;
}
public Optional<WeatherStationData> getResult() {
return Optional.ofNullable(result);
}
public Optional<String> getHost() {
return Optional.ofNullable(host);
}
public OptionalInt getPort() {
return port != null ? OptionalInt.of(port) : OptionalInt.empty();
}
}
此外,必须定义 getMetaData()
的返回类型(aggregator/src/main/java/org/acme/kafka/streams/aggregator/streams/PipelineMetadata.java
):
package org.acme.kafka.streams.aggregator.streams;
import java.util.Set;
public class PipelineMetadata {
public String host;
public Set<String> partitions;
public PipelineMetadata(String host, Set<String> partitions) {
this.host = host;
this.partitions = partitions;
}
}
最后,必须更新 REST 端点类:
package org.acme.kafka.streams.aggregator.rest;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.Status;
import org.acme.kafka.streams.aggregator.streams.GetWeatherStationDataResult;
import org.acme.kafka.streams.aggregator.streams.KafkaStreamsPipeline;
import org.acme.kafka.streams.aggregator.streams.PipelineMetadata;
@ApplicationScoped
@Path("/weather-stations")
public class WeatherStationEndpoint {
@Inject
InteractiveQueries interactiveQueries;
@GET
@Path("/data/{id}")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response getWeatherStationData(int id) {
GetWeatherStationDataResult result = interactiveQueries.getWeatherStationData(id);
if (result.getResult().isPresent()) { (1)
return Response.ok(result.getResult().get()).build();
}
else if (result.getHost().isPresent()) { (2)
URI otherUri = getOtherUri(result.getHost().get(), result.getPort().getAsInt(),
id);
return Response.seeOther(otherUri).build();
}
else { (3)
return Response.status(Status.NOT_FOUND.getStatusCode(),
"No data found for weather station " + id).build();
}
}
@GET
@Path("/meta-data")
@Produces(MediaType.APPLICATION_JSON)
public List<PipelineMetadata> getMetaData() { (4)
return interactiveQueries.getMetaData();
}
private URI getOtherUri(String host, int port, int id) {
try {
return new URI("http://" + host + ":" + port + "/weather-stations/data/" + id);
}
catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
}
1 | 在本地找到数据,因此返回数据。 |
2 | 如果给定键的数据存储在其他节点之一上,则数据由另一个节点维护,因此请使用重定向(HTTP 状态代码 303)进行回复。 |
3 | 未找到给定气象站 ID 的数据。 |
4 | 公开有关构成应用程序集群的所有主机的的信息。 |
现在重新停止 aggregator
服务并重新编译它。然后让我们启动三个实例:
./mvnw clean package -f aggregator/pom.xml
docker-compose stop aggregator
docker-compose up --build -d --scale aggregator=3
在三个实例中的任何一个上调用 REST API 时,请求的气象站 id 聚合可能存储在接收查询的节点上,或者可能存储在另外两个节点中的一个上。
由于 Docker Compose 的负载平衡器将以循环方式将请求分发给 aggregator
服务,因此我们将直接调用实际节点。该应用程序通过 REST 公开有关所有主机名的信息:
http aggregator:8080/weather-stations/meta-data
HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 202
Content-Type: application/json
Date: Tue, 18 Jun 2019 20:00:23 GMT
[
{
"host": "2af13fe516a9:8080",
"partitions": [
"temperature-values-2"
]
},
{
"host": "32cc8309611b:8080",
"partitions": [
"temperature-values-1"
]
},
{
"host": "1eb39af8d587:8080",
"partitions": [
"temperature-values-0"
]
}
]
从响应中显示的三个主机之一中检索数据(您的实际主机名将有所不同):
http 2af13fe516a9:8080/weather-stations/data/1
如果该节点持有键“1”的数据,您将得到如下的响应:
HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 74
Content-Type: application/json
Date: Tue, 11 Jun 2019 19:16:31 GMT
{
"avg": 11.9,
"count": 259,
"max": 50.0,
"min": -30.1,
"stationId": 1,
"stationName": "Hamburg"
}
否则,该服务将发送重定向:
HTTP/1.1 303 See Other
Connection: keep-alive
Content-Length: 0
Date: Tue, 18 Jun 2019 20:01:03 GMT
Location: http://1eb39af8d587:8080/weather-stations/data/1
通过传递 --follow option
,您还可以让 httpie 自动遵循重定向:
http --follow 2af13fe516a9:8080/weather-stations/data/1
Running Natively
适用于 Kafka Streams 的 Quarkus 扩展通过 GraalVM 以本机方式启用流处理应用程序的执行,而无需进一步配置。
若要在本机模式下运行 producer
和 aggregator
应用程序,可以使用 -Dnative
执行 Maven 构建:
./mvnw clean package -f producer/pom.xml -Dnative -Dnative-image.container-runtime=docker
./mvnw clean package -f aggregator/pom.xml -Dnative -Dnative-image.container-runtime=docker
现在创建一个名为 QUARKUS_MODE
的环境变量,并将值设置为“native”:
export QUARKUS_MODE=native
Docker Compose 文件使用此变量在构建 producer
和 aggregator
映像时使用正确的 Dockerfile
。Kafka Streams 应用程序在 native 模式下可以使用少于 50 MB 的 RSS。为此,将 Xmx
选项添加到 aggregator/src/main/docker/Dockerfile.native
中的程序调用中:
CMD ["./application", "-Dquarkus.http.host=0.0.0.0", "-Xmx32m"]
现在按照上述说明启动 Docker Compose(别忘了重新构建容器映像)。
Kafka Streams Health Checks
如果你使用 quarkus-smallrye-health
扩展,quarkus-kafka-streams
将自动添加:
-
一个就绪性运行状况检查,以验证
quarkus.kafka-streams.topics
属性中声明的所有主题已创建, -
一个基于 Kafka Streams 状态的正常运行状况检查。
因此,当你访问应用程序的 /q/health
端点时,你将获得有关 Kafka Streams 状态以及可用和/或缺少主题的信息。
以下是在状态为 DOWN
时的一个示例:
curl -i http://aggregator:8080/q/health
HTTP/1.1 503 Service Unavailable
content-type: application/json; charset=UTF-8
content-length: 454
{
"status": "DOWN",
"checks": [
{
"name": "Kafka Streams state health check", 1
"status": "DOWN",
"data": {
"state": "CREATED"
}
},
{
"name": "Kafka Streams topics health check", 2
"status": "DOWN",
"data": {
"available_topics": "weather-stations,temperature-values",
"missing_topics": "hygrometry-values"
}
}
]
}
1 | 正常运行状况检查。也可以在 /q/health/live 端点获得。 |
2 | 就绪性运行状况检查。也可以在 /q/health/ready 端点获得。 |
因此,正如你所见,只要缺少了其中一个 quarkus.kafka-streams.topics
,或者 Kafka Streams state
不是 RUNNING
,状态就是 DOWN
。
如果没有可用主题,available_topics
键将不会出现在 Kafka Streams topics health check
的 data
字段中。同样,如果没有缺少主题,missing_topics
键将不会出现在 Kafka Streams topics health check
的 data
字段中。
你当然可以通过在 application.properties
中将 quarkus.kafka-streams.health.enabled
属性设置为 false
来禁用 quarkus-kafka-streams
扩展的运行状况检查。
显然,你可以根据各自的端点 /q/health/live
和 /q/health/ready
来创建你的正常运行状况和就绪性探测。
Liveness health check
以下是用例运行状况检查示例:
curl -i http://aggregator:8080/q/health/live
HTTP/1.1 503 Service Unavailable
content-type: application/json; charset=UTF-8
content-length: 225
{
"status": "DOWN",
"checks": [
{
"name": "Kafka Streams state health check",
"status": "DOWN",
"data": {
"state": "CREATED"
}
}
]
}
state
来自 KafkaStreams.State
枚举。
Readiness health check
以下是用例就绪性检查示例:
curl -i http://aggregator:8080/q/health/ready
HTTP/1.1 503 Service Unavailable
content-type: application/json; charset=UTF-8
content-length: 265
{
"status": "DOWN",
"checks": [
{
"name": "Kafka Streams topics health check",
"status": "DOWN",
"data": {
"missing_topics": "weather-stations,temperature-values"
}
}
]
}
Going Further
本指南展示了如何使用 Quarkus 和 Kafka Streams API 在 JVM 和 native 模式下构建流处理应用程序。为了在生产环境中运行你的 KStreams 应用程序,你还可以为数据管道添加运行状况检查和指标。请参考 Quarkus 指南中的 Micrometer、SmallRye Metrics 和 SmallRye Health,了解更多信息。