Getting Started to Quarkus Messaging with Apache Pulsar

Prerequisites

如要完成本指南,您需要:

  • Roughly 15 minutes

  • An IDE

  • 安装了 JDK 17+,已正确配置 JAVA_HOME

  • Apache Maven ${proposed-maven-version}

  • 如果你想使用 Quarkus CLI, 则可以选择使用

  • 如果你想构建一个本机可执行文件(或如果你使用本机容器构建,则使用 Docker),则可以选择安装 Mandrel 或 GraalVM 以及 configured appropriately

Architecture

在本指南中,我们要开发两个与 Pulsar 通信的应用程序。第一个应用程序将 quote request_发送到 Pulsar,并使用 _quote_主题来使用 Pulsar 消息。第二个应用程序接收 _quote request_并发送回 _quote

pulsar qs architecture

第一个应用程序 producer_将允许用户通过 HTTP 端点请求一些报价。对于每个报价请求,将生成一个随机标识符并返回给用户,以将报价请求标记为 _pending。同时,生成的请求 ID 通过 Pulsar 主题 `quote-requests`发送。

pulsar qs app screenshot

第二个应用程序 _processor_将从 `quote-requests`主题中读取,向报价提供随机价格,并将其发送到名为 `quotes`的 Pulsar 主题。

最后,_producer_将读取报价并使用服务器发送事件将它们发送到浏览器。因此,用户将看到报价价格从 _pending_实时更新为收到的价格。

Solution

我们建议您按照下一节中的说明一步步创建应用程序。但是,您可以直接转到已完成的示例。

克隆 Git 存储库: git clone $${quickstarts-base-url}.git,或下载 $${quickstarts-base-url}/archive/main.zip[存档]。

解决方案位于 pulsar-quickstart directory 中。

Creating the Maven Project

首先,我们需要创建两个项目:producer_和 _processor

要在终端中创建 _producer_项目,请运行:

CLI
quarkus create app {create-app-group-id}:{create-app-artifact-id} \
    --no-code
cd {create-app-artifact-id}

要创建一个 Gradle 项目,添加 --gradle--gradle-kotlin-dsl 选项。 有关如何安装和使用 Quarkus CLI 的详细信息,请参见 Quarkus CLI 指南。

Maven
mvn {quarkus-platform-groupid}:quarkus-maven-plugin:{quarkus-version}:create \
    -DprojectGroupId={create-app-group-id} \
    -DprojectArtifactId={create-app-artifact-id} \
    -DnoCode
cd {create-app-artifact-id}

要创建一个 Gradle 项目,添加 -DbuildTool=gradle-DbuildTool=gradle-kotlin-dsl 选项。

适用于 Windows 用户:

  • 如果使用 cmd,(不要使用反斜杠 \ ,并将所有内容放在同一行上)

  • 如果使用 Powershell,将 -D 参数用双引号引起来,例如 "-DprojectArtifactId={create-app-artifact-id}"

此命令可创建项目结构,并选择我们将要使用的两个 Quarkus 扩展:

  1. Quarkus REST(以前称为 RESTEasy Reactive)及其 Jackson 支持(用来处理 JSON)用于提供 HTTP 终端。

  2. Reactive Messaging 的 Pulsar 连接器

要从相同目录创建 _processor_项目,请运行:

CLI
quarkus create app {create-app-group-id}:{create-app-artifact-id} \
    --no-code
cd {create-app-artifact-id}

要创建一个 Gradle 项目,添加 --gradle--gradle-kotlin-dsl 选项。 有关如何安装和使用 Quarkus CLI 的详细信息,请参见 Quarkus CLI 指南。

Maven
mvn {quarkus-platform-groupid}:quarkus-maven-plugin:{quarkus-version}:create \
    -DprojectGroupId={create-app-group-id} \
    -DprojectArtifactId={create-app-artifact-id} \
    -DnoCode
cd {create-app-artifact-id}

要创建一个 Gradle 项目,添加 -DbuildTool=gradle-DbuildTool=gradle-kotlin-dsl 选项。

适用于 Windows 用户:

  • 如果使用 cmd,(不要使用反斜杠 \ ,并将所有内容放在同一行上)

  • 如果使用 Powershell,将 -D 参数用双引号引起来,例如 "-DprojectArtifactId={create-app-artifact-id}"

那时,你应该有以下结构:

.
├── pulsar-quickstart-processor
│  ├── README.md
│  ├── mvnw
│  ├── mvnw.cmd
│  ├── pom.xml
│  └── src
│     └── main
│        ├── docker
│        ├── java
│        └── resources
│           └── application.properties
└── pulsar-quickstart-producer
   ├── README.md
   ├── mvnw
   ├── mvnw.cmd
   ├── pom.xml
   └── src
      └── main
         ├── docker
         ├── java
         └── resources
            └── application.properties

在你的首选 IDE 中打开这两个项目。

Dev Services

使用开发模式或测试时,无需启动 Pulsar 代理。Quarkus 会自动为你启动代理。有关详细信息,请参见 Dev Services for Pulsar

The Quote object

`Quote`类将用在 _producer_和 _processor_项目中。为了简单起见,我们将复制该类。在这两个项目中,创建 `src/main/java/org/acme/pulsar/model/Quote.java`文件,内容如下:

package org.acme.pulsar.model;

public class Quote {

    public String id;
    public int price;

    /**
    * Default constructor required for Jackson serializer
    */
    public Quote() { }

    public Quote(String id, int price) {
        this.id = id;
        this.price = price;
    }

    @Override
    public String toString() {
        return "Quote{" +
                "id='" + id + '\'' +
                ", price=" + price +
                '}';
    }
}

`Quote`对象的 JSON 表示将用在发送到 Pulsar 主题的消息中,以及发送到 Web 浏览器中的服务器发送事件中。

Quarkus 具有处理 JSON Pulsar 消息的内置功能。在以下部分中,我们将为 Jackson 创建序列化程序/反序列化程序类。

Sending quote request

在 _producer_项目内,创建 `src/main/java/org/acme/pulsar/producer/QuotesResource.java`文件并添加以下内容:

package org.acme.pulsar.producer;

import java.util.UUID;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.acme.pulsar.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@Path("/quotes")
public class QuotesResource {

    @Channel("quote-requests")
    Emitter<String> quoteRequestEmitter; (1)

    /**
     * Endpoint to generate a new quote request id and send it to "quote-requests" Pulsar topic using the emitter.
     */
    @POST
    @Path("/request")
    @Produces(MediaType.TEXT_PLAIN)
    public String createRequest() {
        UUID uuid = UUID.randomUUID();
        quoteRequestEmitter.send(uuid.toString()); (2)
        return uuid.toString(); (3)
    }
}
1 注入 Reactive Messaging Emitter,以便向 `quote-requests`通道发送消息。
2 在 post 请求上,生成随机 UUID 并使用发布器将其发送到 Pulsar 主题。
3 将相同 UUID 返回给客户端。

`quote-requests`通道将作为一个 Pulsar 主题进行管理,因为这是类路径上的唯一连接器。如果没有特别说明,比如在本示例中,Quarkus 会将通道名称用作主题名称。因此,在本示例中,应用程序将写入 `quote-requests`主题。Quarkus 还会自动配置序列化程序,因为它发现 `Emitter`生成 `String`值。

当你有多个连接器时,你需要在应用程序配置中指明你要在其中使用哪个连接器。

Processing quote requests

现在,让我们处理报价请求并给出价格。在 processor 项目中,创建 src/main/java/org/acme/pulsar/processor/QuotesProcessor.java 文件并添加以下内容:

package org.acme.pulsar.processor;

import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.acme.pulsar.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.annotations.Blocking;

/**
 * A bean consuming data from the "quote-requests" Pulsar topic (mapped to "requests" channel) and giving out a random quote.
 * The result is pushed to the "quotes" Pulsar topic.
 */
@ApplicationScoped
public class QuotesProcessor {

    private Random random = new Random();

    @Incoming("requests") (1)
    @Outgoing("quotes")   (2)
    @Blocking             (3)
    public Quote process(String quoteRequest) throws InterruptedException {
        // simulate some hard working task
        Thread.sleep(200);
        return new Quote(quoteRequest, random.nextInt(100));
    }
}
1 指示此方法将使用 requests 通道中的项。
2 指示此方法返回的对象将发送到 quotes 通道。
3 指示处理为 blocking,并且不能在调用方线程上运行。

对于来自 quote-requests 主题的每个 Pulsar message,Reactive Messaging 将调用 process 方法,并将返回的 Quote 对象发送到 quotes 通道。在这种情况下,我们需要在 application.properties 文件中配置通道,以配置 requestsquotes 通道:

%dev.quarkus.http.port=8081

# Configure the incoming `quote-requests` Pulsar topic
mp.messaging.incoming.requests.topic=quote-requests
mp.messaging.incoming.requests.subscriptionInitialPosition=Earliest

请注意,在这种情况下,我们有一个入站和一个出站连接器配置,每个都明确命名。配置属性的结构如下:

mp.messaging.[outgoing|incoming].{channel-name}.property=value

channel-name 片段必须与 @Incoming@Outgoing 注释中设置的值相匹配:

  • quote-requests → 读取报价请求的 Pulsar 主题

  • quotes → 编写报价的 Pulsar 主题

有关此配置的更多详细信息,请参阅 Pulsar 文档中的 [role="bare"][role="bare"]https://pulsar.apache.org/docs/3.0.x/concepts-messaging/ 部分。这些属性使用前缀 pulsar 进行配置。可以在 Pulsar Reference Guide - Configuration 中找到配置属性的详尽列表。

当之前没有已确认的消息时,mp.messaging.incoming.requests.subscriptionInitialPosition=Earliest 指示应用程序从主题上的第一条消息开始读取该主题。换言之,它还将处理在启动处理器应用程序之前发送的消息。

无需设置模式。Quarkus 会检测它们,如果没有找到,则会使用适当的模式类型生成它们。像 Quote bean 这样的结构化类型使用 JSON 模式。

Receiving quotes

回到我们的 producer 项目。让我们修改 QuotesResource,以便从 Pulsar 消费报价,并通过服务器发送事件将它们发送回客户端:

import io.smallrye.mutiny.Multi;

...

@Channel("quotes")
Multi<Quote> quotes; (1)

/**
 * Endpoint retrieving the "quotes" Pulsar topic and sending the items to a server sent event.
 */
@GET
@Produces(MediaType.SERVER_SENT_EVENTS) (2)
public Multi<Quote> stream() {
    return quotes; (3)
}
1 使用 @Channel 限定符注入 quotes 通道
2 表明内容是使用 Server Sent Events 发送的
3 返回流(Reactive Stream

无需配置任何内容,因为 Quarkus 会自动将 quotes 通道关联到 quotes Pulsar 主题。它还会为 Quote 类生成一个反序列化程序。

Message Schemas in Pulsar

在此示例中,我们在 Pulsar 消息中使用了 JSON 模式。有关 Pulsar 架构的更多信息,请参阅 Pulsar Reference Guide - Schema

The HTML page

最后一步,请求报价并显示通过 SSE 获取的价格的 HTML 页面。

在 _pulsar-quickstart-producer_项目中,创建包含以下内容的 `src/main/resources/META-INF/resources/quotes.html`文件:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Prices</title>

    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">
    <div class="card">
        <div class="card-body">
            <h2 class="card-title">Quotes</h2>
            <button class="btn btn-info" id="request-quote">Request Quote</button>
            <div class="quotes"></div>
        </div>
    </div>
</div>
</body>
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
<script>
    $("#request-quote").click((event) => {
        fetch("/quotes/request", {method: "POST"})
        .then(res => res.text())
        .then(qid => {
            var row = $(`<h4 class='col-md-12' id='${qid}'>Quote # <i>${qid}</i> | <strong>Pending</strong></h4>`);
            $(".quotes").prepend(row);
        });
    });

    var source = new EventSource("/quotes");
    source.onmessage = (event) => {
      var json = JSON.parse(event.data);
      $(`#${json.id}`).html((index, html) => {
        return html.replace("Pending", `\$\xA0${json.price}`);
      });
    };
</script>
</html>

这里没有什么特别之处。当用户单击按钮时,将发出 HTTP 请求以请求报价,并将待处理报价添加到列表中。对于通过 SSE 收到的每个报价,都会更新列表中的相应项目。

Get it running

您只需要运行这两个应用程序。在一个终端中,运行:

mvn -f pulsar-quickstart-producer quarkus:dev

在另一个终端中,运行:

mvn -f pulsar-quickstart-processor quarkus:dev

Quarkus 会自动启动 Pulsar 代理,配置应用程序并在不同的应用程序之间共享 Pulsar 代理实例。有关更多详细信息,请参见 Dev Services for Pulsar

在浏览器中打开 http://localhost:8080/quotes.html,然后通过单击按钮请求一些报价。

Running in JVM or Native mode

如果不在开发或测试模式下运行,您需要启动 Pulsar 代理。您可以按照 Run a standalone Pulsar cluster in Docker中的说明进行操作,也可以使用以下内容创建 `docker-compose.yaml`文件:

version: '3.8'

services:

  pulsar:
    image: apachepulsar/pulsar:3.0.0
    command: [
      "sh", "-c",
      "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone -nfw -nss"
    ]

    ports:
      - "6650:6650"
      - "8080:8080"
    tmpfs:
      - /pulsar/data
    healthcheck:
      test: curl --fail http://localhost:8080/admin/v2/clusters || exit 1
      interval: 10s
      timeout: 10s
      retries: 5
      start_period: 5s
    environment:
      PULSAR_PREFIX_advertisedListeners: internal:pulsar://localhost:6650,external:pulsar://pulsar:6650
      PULSAR_PREFIX_transactionCoordinatorEnabled: true
      PULSAR_PREFIX_systemTopicEnabled: true
    networks:
      - pulsar-quickstart-network

  producer:
    image: quarkus-quickstarts/pulsar-quickstart-producer:1.0-${QUARKUS_MODE:-jvm}
    depends_on:
      pulsar:
        condition: service_healthy
    build:
      context: pulsar-quickstart-producer
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    deploy:
      restart_policy:
        condition: on-failure
    environment:
      PULSAR_CLIENT_SERVICE_URL: pulsar://pulsar:6650
    ports:
      - "8082:8080"
    networks:
      - pulsar-quickstart-network

  processor:
    image: quarkus-quickstarts/pulsar-quickstart-processor:1.0-${QUARKUS_MODE:-jvm}
    depends_on:
      pulsar:
        condition: service_healthy
    build:
      context: pulsar-quickstart-processor
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    deploy:
      restart_policy:
        condition: on-failure
    environment:
      QUARKUS_HTTP_PORT: 8082
      PULSAR_CLIENT_SERVICE_URL: pulsar://pulsar:6650
    ports:
      - "8083:8080"
    networks:
      - pulsar-quickstart-network

networks:
  pulsar-quickstart-network:
    name: pulsar-quickstart

确保首先使用以下内容在 JVM 模式下构建这两个应用程序:

mvn -f pulsar-quickstart-producer package
mvn -f pulsar-quickstart-processor package

打包后,运行 docker-compose up

这是一个开发集群,请勿在生产中使用。

您还可以将我们的应用程序构建并运行为原生可执行文件。首先,将这两个应用程序编译为原生:

mvn -f pulsar-quickstart-producer package -Dnative -Dquarkus.native.container-build=true
mvn -f pulsar-quickstart-processor package -Dnative -Dquarkus.native.container-build=true

通过以下方式运行系统:

export QUARKUS_MODE=native
docker-compose up --build

Going further

本指南展示了如何使用 Quarkus 与 Pulsar 进行交互。它利用 SmallRye Reactive Messaging构建数据流应用程序。

有关功能和配置选项的详尽列表,请查看 Reference guide for Apache Pulsar Extension

在本指南中,我们探讨如何通过使用 Quarkus Messaging 扩展来与 Apache Pulsar 进行交互。using Pulsar clients directly