Getting Started to Quarkus Messaging with Apache Kafka
本指南演示了 Quarkus 应用如何使用 Quarkus Messaging 与 Apache Kafka 进行交互。
Prerequisites
如要完成本指南,您需要:
-
Roughly 15 minutes
-
An IDE
-
安装了 JDK 17+,已正确配置
JAVA_HOME
-
Apache Maven ${proposed-maven-version}
-
如果你想使用 Quarkus CLI, 则可以选择使用
-
如果你想构建一个本机可执行文件(或如果你使用本机容器构建,则使用 Docker),则可以选择安装 Mandrel 或 GraalVM 以及 configured appropriately
Architecture
在本指南中,我们将开发两个与 Kafka 通信的应用。第一个应用向 Kafka 发送 quote request,并从 quote 主题使用 Kafka 消息。第二个应用接收 quote request,并回复 quote。
第一个应用 producer,将让用户通过 HTTP 终端请求一些报价。对于每个报价请求,都会生成一个随机识别符并返回给用户,以标记报价请求为 pending。同时,生成的请求 ID 会通过 Kafka 主题 quote-requests
发送。
第二个应用 processor,将读取 quote-requests
主题,向报价添加一个随机价格,并发送到名为 quotes
的 Kafka 主题。
最后,_producer_将读取报价并使用服务器发送事件将它们发送到浏览器。因此,用户将看到报价价格从 _pending_实时更新为收到的价格。
Solution
我们建议您按照下一节中的说明一步步创建应用程序。但是,您可以直接转到已完成的示例。
克隆 Git 存储库: git clone $${quickstarts-base-url}.git
,或下载 $${quickstarts-base-url}/archive/main.zip[存档]。
解决方案位于 kafka-quickstart
directory。
Creating the Maven Project
首先,我们需要创建两个项目:producer_和 _processor。
要在终端中创建 _producer_项目,请运行:
quarkus create app {create-app-group-id}:{create-app-artifact-id} \
--no-code
cd {create-app-artifact-id}
要创建一个 Gradle 项目,添加 --gradle
或 --gradle-kotlin-dsl
选项。
有关如何安装和使用 Quarkus CLI 的详细信息,请参见 Quarkus CLI 指南。
mvn {quarkus-platform-groupid}:quarkus-maven-plugin:{quarkus-version}:create \
-DprojectGroupId={create-app-group-id} \
-DprojectArtifactId={create-app-artifact-id} \
-DnoCode
cd {create-app-artifact-id}
要创建一个 Gradle 项目,添加 -DbuildTool=gradle
或 -DbuildTool=gradle-kotlin-dsl
选项。
适用于 Windows 用户:
-
如果使用 cmd,(不要使用反斜杠
\
,并将所有内容放在同一行上) -
如果使用 Powershell,将
-D
参数用双引号引起来,例如"-DprojectArtifactId={create-app-artifact-id}"
此命令可创建项目结构,并选择我们将要使用的两个 Quarkus 扩展:
-
Quarkus REST(以前称为 RESTEasy Reactive)及其 Jackson 支持(用来处理 JSON)用于提供 HTTP 终端。
-
反应消息的 Kafka 连接器
要从相同目录创建 _processor_项目,请运行:
quarkus create app {create-app-group-id}:{create-app-artifact-id} \
--no-code
cd {create-app-artifact-id}
要创建一个 Gradle 项目,添加 --gradle
或 --gradle-kotlin-dsl
选项。
有关如何安装和使用 Quarkus CLI 的详细信息,请参见 Quarkus CLI 指南。
mvn {quarkus-platform-groupid}:quarkus-maven-plugin:{quarkus-version}:create \
-DprojectGroupId={create-app-group-id} \
-DprojectArtifactId={create-app-artifact-id} \
-DnoCode
cd {create-app-artifact-id}
要创建一个 Gradle 项目,添加 -DbuildTool=gradle
或 -DbuildTool=gradle-kotlin-dsl
选项。
适用于 Windows 用户:
-
如果使用 cmd,(不要使用反斜杠
\
,并将所有内容放在同一行上) -
如果使用 Powershell,将
-D
参数用双引号引起来,例如"-DprojectArtifactId={create-app-artifact-id}"
那时,你应该有以下结构:
.
├── kafka-quickstart-processor
│ ├── README.md
│ ├── mvnw
│ ├── mvnw.cmd
│ ├── pom.xml
│ └── src
│ └── main
│ ├── docker
│ ├── java
│ └── resources
│ └── application.properties
└── kafka-quickstart-producer
├── README.md
├── mvnw
├── mvnw.cmd
├── pom.xml
└── src
└── main
├── docker
├── java
└── resources
└── application.properties
在你的首选 IDE 中打开这两个项目。
Dev Services
使用 dev 模式或进行测试时无需启动 Kafka 代理。Quarkus 会自动为您启动一个代理。详情请参见 Dev Services for Kafka。 |
The Quote object
Quote
类将同时用于 producer 和 processor 项目。为了简单起见,我们将复制代码。在两个项目中,都创建 src/main/java/org/acme/kafka/model/Quote.java
文件,内容如下:
package org.acme.kafka.model;
public class Quote {
public String id;
public int price;
/**
* Default constructor required for Jackson serializer
*/
public Quote() { }
public Quote(String id, int price) {
this.id = id;
this.price = price;
}
@Override
public String toString() {
return "Quote{" +
"id='" + id + '\'' +
", price=" + price +
'}';
}
}
发送到 Kafka 主题的消息和发送到 Web 浏览器的服务器发送事件中将使用 Quote
对象的 JSON 表示形式。
Quarkus 具有处理 JSON Kafka 消息的内置功能。在下一节中,我们将为 Jackson 创建序列化程序/解序列化程序类。
Sending quote request
在 producer 项目中,创建 src/main/java/org/acme/kafka/producer/QuotesResource.java
文件,并添加以下内容:
package org.acme.kafka.producer;
import java.util.UUID;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.acme.kafka.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
@Path("/quotes")
public class QuotesResource {
@Channel("quote-requests")
Emitter<String> quoteRequestEmitter; (1)
/**
* Endpoint to generate a new quote request id and send it to "quote-requests" Kafka topic using the emitter.
*/
@POST
@Path("/request")
@Produces(MediaType.TEXT_PLAIN)
public String createRequest() {
UUID uuid = UUID.randomUUID();
quoteRequestEmitter.send(uuid.toString()); (2)
return uuid.toString(); (3)
}
}
1 | 注入 Reactive Messaging Emitter ,以便向 `quote-requests`通道发送消息。 |
2 | 在 post 请求中,生成一个随机 UUID 并使用发射器发送到 Kafka 主题。 |
3 | 将相同 UUID 返回给客户端。 |
quote-requests
通道将被管理为 Kafka 主题,因为它是类路径中的唯一连接器。如果未另行指示(比如在本示例中),Quarkus 将通道名称用作主题名称。因此在本示例中,应用会写入 quote-requests
主题。Quarkus 还会自动配置序列化程序,因为它发现 Emitter
会生成 String
值。
当你有多个连接器时,你需要在应用程序配置中指明你要在其中使用哪个连接器。 |
Processing quote requests
现在,让我们使用报价请求并提供价格。在 processor 项目中,创建 src/main/java/org/acme/kafka/processor/QuotesProcessor.java
文件,并添加以下内容:
package org.acme.kafka.processor;
import java.util.Random;
import jakarta.enterprise.context.ApplicationScoped;
import org.acme.kafka.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.reactive.messaging.annotations.Blocking;
/**
* A bean consuming data from the "quote-requests" Kafka topic (mapped to "requests" channel) and giving out a random quote.
* The result is pushed to the "quotes" Kafka topic.
*/
@ApplicationScoped
public class QuotesProcessor {
private Random random = new Random();
@Incoming("requests") (1)
@Outgoing("quotes") (2)
@Blocking (3)
public Quote process(String quoteRequest) throws InterruptedException {
// simulate some hard working task
Thread.sleep(200);
return new Quote(quoteRequest, random.nextInt(100));
}
}
1 | 指示此方法将使用 requests 通道中的项。 |
2 | 指示此方法返回的对象将发送到 quotes 通道。 |
3 | 指示处理为 blocking,并且不能在调用方线程上运行。 |
对于来自 quote-requests
主题的每个 Kafka record,反应消息都会调用 process
方法,并将返回的 Quote
对象发送到 quotes
通道。在这种情况下,我们需要在 application.properties
文件中配置通道,以配置 requests
和 quotes
通道:
%dev.quarkus.http.port=8081
# Configure the incoming `quote-requests` Kafka topic
mp.messaging.incoming.requests.topic=quote-requests
mp.messaging.incoming.requests.auto.offset.reset=earliest
请注意,在这种情况下,我们有一个入站和一个出站连接器配置,每个都明确命名。配置属性的结构如下:
mp.messaging.[outgoing|incoming].{channel-name}.property=value
channel-name
片段必须与 @Incoming
和 @Outgoing
注释中设置的值相匹配:
-
quote-requests
→ 我们从中读取报价请求的 Kafka 主题 -
quotes
→ 我们用于编写引号的 Kafka 主题
Kafka 文档的 Producer configuration 和 Consumer configuration 部分提供了此配置的更多详细信息。这些属性使用前缀 |
当消费者组没有提交的偏移量时,mp.messaging.incoming.requests.auto.offset.reset=earliest
指示应用程序从第一个偏移量开始读取主题。换句话说,它还将处理我们在启动处理器应用程序之前发送的消息。
无需设置序列化器或反序列化器。Quarkus 会检测它们,如果未找到任何,则使用 JSON 序列化生成它们。
Receiving quotes
回到我们的 producer 项目。让我们修改 QuotesResource
以使用 Kafka 的引号并通过服务器发送事件将它们发送回客户端:
import io.smallrye.mutiny.Multi;
...
@Channel("quotes")
Multi<Quote> quotes; (1)
/**
* Endpoint retrieving the "quotes" Kafka topic and sending the items to a server sent event.
*/
@GET
@Produces(MediaType.SERVER_SENT_EVENTS) (2)
public Multi<Quote> stream() {
return quotes; (3)
}
1 | 使用 @Channel 限定符注入 quotes 通道 |
2 | 表明内容是使用 Server Sent Events 发送的 |
3 | 返回流(Reactive Stream) |
无需配置任何内容,因为 Quarkus 会自动将 quotes
通道关联到 quotes
Kafka 主题。它还将为 Quote
类生成一个反序列化器。
Message serialization in Kafka
在此示例中,我们使用 Jackson 序列化/反序列化 Kafka 消息。有关消息序列化的更多选项,请参见 Kafka Reference Guide - Serialization。 我们强烈建议采用使用模式注册表的“合同优先”方法。若要了解有关如何将 Apache Kafka 与模式注册表和 Avro 一起使用的更多信息,请遵循适用于 Avro 的 Using Apache Kafka with Schema Registry and Avro 指南,或者可以遵循 Using Apache Kafka with Schema Registry and JSON Schema 指南。 |
The HTML page
最后一步,请求报价并显示通过 SSE 获取的价格的 HTML 页面。
在 producer 项目内,使用以下内容创建 src/main/resources/META-INF/resources/quotes.html
文件:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Prices</title>
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">
<div class="card">
<div class="card-body">
<h2 class="card-title">Quotes</h2>
<button class="btn btn-info" id="request-quote">Request Quote</button>
<div class="quotes"></div>
</div>
</div>
</div>
</body>
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
<script>
$("#request-quote").click((event) => {
fetch("/quotes/request", {method: "POST"})
.then(res => res.text())
.then(qid => {
var row = $(`<h4 class='col-md-12' id='${qid}'>Quote # <i>${qid}</i> | <strong>Pending</strong></h4>`);
$(".quotes").prepend(row);
});
});
var source = new EventSource("/quotes");
source.onmessage = (event) => {
var json = JSON.parse(event.data);
$(`#${json.id}`).html((index, html) => {
return html.replace("Pending", `\$\xA0${json.price}`);
});
};
</script>
</html>
这里没有什么特别之处。当用户单击按钮时,将发出 HTTP 请求以请求报价,并将待处理报价添加到列表中。对于通过 SSE 收到的每个报价,都会更新列表中的相应项目。
Get it running
您只需要运行这两个应用程序。在一个终端中,运行:
mvn -f producer quarkus:dev
在另一个终端中,运行:
mvn -f processor quarkus:dev
Quarkus 自动启动一个 Kafka 代理,配置应用程序,并在不同应用程序之间共享 Kafka 代理实例。有关更多详细信息,请参见 Dev Services for Kafka。
在浏览器中打开 http://localhost:8080/quotes.html
,然后通过单击按钮请求一些报价。
Running in JVM or Native mode
当不在 dev 或测试模式下运行时,需要启动 Kafka 代理。您可以遵循 Apache Kafka website 中的说明,或使用以下内容创建一个 docker-compose.yaml
文件:
version: '3.5'
services:
zookeeper:
image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
command: [
"sh", "-c",
"bin/zookeeper-server-start.sh config/zookeeper.properties"
]
ports:
- "2181:2181"
environment:
LOG_DIR: /tmp/logs
networks:
- kafka-quickstart-network
kafka:
image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
command: [
"sh", "-c",
"bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
]
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
LOG_DIR: "/tmp/logs"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
networks:
- kafka-quickstart-network
producer:
image: quarkus-quickstarts/kafka-quickstart-producer:1.0-${QUARKUS_MODE:-jvm}
build:
context: producer
dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
depends_on:
- kafka
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
ports:
- "8080:8080"
networks:
- kafka-quickstart-network
processor:
image: quarkus-quickstarts/kafka-quickstart-processor:1.0-${QUARKUS_MODE:-jvm}
build:
context: processor
dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
depends_on:
- kafka
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
networks:
- kafka-quickstart-network
networks:
kafka-quickstart-network:
name: kafkaquickstart
确保首先使用以下内容在 JVM 模式下构建这两个应用程序:
mvn -f producer package
mvn -f processor package
打包后,运行 docker-compose up
。
这是一个开发集群,请勿在生产中使用。 |
您还可以将我们的应用程序构建并运行为原生可执行文件。首先,将这两个应用程序编译为原生:
mvn -f producer package -Dnative -Dquarkus.native.container-build=true
mvn -f processor package -Dnative -Dquarkus.native.container-build=true
通过以下方式运行系统:
export QUARKUS_MODE=native
docker-compose up --build
Going further
本指南已说明了如何使用 Quarkus 与 Kafka 进行交互。它利用 SmallRye Reactive Messaging 构建数据流应用程序。
有关功能和配置选项的详尽列表,请检查 Reference guide for Apache Kafka Extension。
在本指南中,我们探讨了如何使用 Quarkus 消息传递扩展与 Apache Kafka 进行交互。用于 Kafka 的 Quarkus 扩展还允许 using Kafka clients directly。 |