Getting Started to Quarkus Messaging with AMQP 1.0

本指南演示了 Quarkus 应用程序如何利用 Quarkus Messaging 与 AMQP 1.0 进行交互。

如果你想使用 RabbitMQ,你应使用 Quarkus Messaging RabbitMQ extension。或者,如果你想将 RabbitMQ 与 AMQP 1.0 一起使用,你需要在 RabbitMQ 代理中启用 AMQP 1.0 插件;查看 connecting to RabbitMQ文档。

Prerequisites

如要完成本指南,您需要:

  • Roughly 15 minutes

  • An IDE

  • 安装了 JDK 17+,已正确配置 JAVA_HOME

  • Apache Maven ${proposed-maven-version}

  • 如果你想使用 Quarkus CLI, 则可以选择使用

  • 如果你想构建一个本机可执行文件(或如果你使用本机容器构建,则使用 Docker),则可以选择安装 Mandrel 或 GraalVM 以及 configured appropriately

Architecture

在本指南中,我们将开发两个与 AMQP 代理通信的应用程序。我们将使用 Artemis,但你可以使用任何 AMQP 1.0 代理。第一个应用程序向 AMQP 队列发送 quote request,并使用 quote_队列中的消息。第二个应用程序接收 _quote request,然后发送回 quote

amqp qs architecture

第一个应用程序 `producer`将让用户通过 HTTP 端点请求一些报价。对于每个报价请求,都会生成一个随机标识符并返回给用户,以将报价请求放在 _pending_上。同时,生成的请求 ID 会通过 `quote-requests`队列发送。

amqp qs app screenshot

第二个应用程序 `processor`反过来将从 `quote-requests`队列读取,向报价添加随机价格,并将其发送到名为 `quotes`的队列。

最后,producer 将读取报价并使用服务器端发送的事件将它们发送到浏览器。因此,用户将看到报价价格从 pending 实时更新为接收到的价格。

Solution

我们建议您按照下一节中的说明一步步创建应用程序。但是,您可以直接转到已完成的示例。

克隆 Git 存储库: git clone $${quickstarts-base-url}.git,或下载 $${quickstarts-base-url}/archive/main.zip[存档]。

该解决方案位于 amqp-quickstart directory中。

Creating the Maven Project

首先,我们需要创建两个项目:producer_和 _processor

要在终端中创建 _producer_项目,请运行:

CLI
quarkus create app {create-app-group-id}:{create-app-artifact-id} \
    --no-code
cd {create-app-artifact-id}

要创建一个 Gradle 项目,添加 --gradle--gradle-kotlin-dsl 选项。 有关如何安装和使用 Quarkus CLI 的详细信息,请参见 Quarkus CLI 指南。

Maven
mvn {quarkus-platform-groupid}:quarkus-maven-plugin:{quarkus-version}:create \
    -DprojectGroupId={create-app-group-id} \
    -DprojectArtifactId={create-app-artifact-id} \
    -DnoCode
cd {create-app-artifact-id}

要创建一个 Gradle 项目,添加 -DbuildTool=gradle-DbuildTool=gradle-kotlin-dsl 选项。

适用于 Windows 用户:

  • 如果使用 cmd,(不要使用反斜杠 \ ,并将所有内容放在同一行上)

  • 如果使用 Powershell,将 -D 参数用双引号引起来,例如 "-DprojectArtifactId={create-app-artifact-id}"

此命令创建项目结构并选择我们将使用的两个 Quarkus 扩展:

  1. Quarkus REST(以前的 RESTEasy Reactive)及其 Jackson 支持来处理 JSON 有效负载

  2. Reactive Messaging AMQP 连接器

要从相同目录创建 _processor_项目,请运行:

CLI
quarkus create app {create-app-group-id}:{create-app-artifact-id} \
    --no-code
cd {create-app-artifact-id}

要创建一个 Gradle 项目,添加 --gradle--gradle-kotlin-dsl 选项。 有关如何安装和使用 Quarkus CLI 的详细信息,请参见 Quarkus CLI 指南。

Maven
mvn {quarkus-platform-groupid}:quarkus-maven-plugin:{quarkus-version}:create \
    -DprojectGroupId={create-app-group-id} \
    -DprojectArtifactId={create-app-artifact-id} \
    -DnoCode
cd {create-app-artifact-id}

要创建一个 Gradle 项目,添加 -DbuildTool=gradle-DbuildTool=gradle-kotlin-dsl 选项。

适用于 Windows 用户:

  • 如果使用 cmd,(不要使用反斜杠 \ ,并将所有内容放在同一行上)

  • 如果使用 Powershell,将 -D 参数用双引号引起来,例如 "-DprojectArtifactId={create-app-artifact-id}"

此时,您应该拥有以下结构:

.
├── amqp-quickstart-processor
│  ├── README.md
│  ├── mvnw
│  ├── mvnw.cmd
│  ├── pom.xml
│  └── src
│     └── main
│        ├── docker
│        ├── java
│        └── resources
│           └── application.properties
└── amqp-quickstart-producer
   ├── README.md
   ├── mvnw
   ├── mvnw.cmd
   ├── pom.xml
   └── src
      └── main
         ├── docker
         ├── java
         └── resources
            └── application.properties

在你的首选 IDE 中打开这两个项目。

The Quote object

该 `Quote`类将同时用于 `producer`和 `processor`项目。为简单起见,我们将复制该类。在这两个项目中,创建 `src/main/java/org/acme/amqp/model/Quote.java`文件,内容如下:

package org.acme.amqp.model;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class Quote {

    public String id;
    public int price;

    /**
    * Default constructor required for Jackson serializer
    */
    public Quote() { }

    public Quote(String id, int price) {
        this.id = id;
        this.price = price;
    }

    @Override
    public String toString() {
        return "Quote{" +
                "id='" + id + '\'' +
                ", price=" + price +
                '}';
    }
}

发送到 AMQP 队列的消息和发送到浏览器客户端的服务器端发送的事件都将使用 `Quote`对象的 JSON 表示形式。

Quarkus 具有处理 JSON AMQP 消息的内置功能。

@RegisterForReflection

@RegisterForReflection 注释指示 Quarkus 在创建本机可执行文件时保留类、其字段和方法。当我们稍后在容器中以本机可执行文件的形式运行应用程序时,这点很重要。如果没有此注释,本机编译过程将在死代码消除阶段丢弃字段和方法,这会导致运行时错误。有关 @RegisterForReflection 注释的更多详细信息,请参阅 native application tips 页面。

Sending quote request

在 `producer`项目内部,找到生成的 `src/main/java/org/acme/amqp/producer/QuotesResource.java`文件,并将内容更新为:

package org.acme.amqp.producer;

import java.util.UUID;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.acme.amqp.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import io.smallrye.mutiny.Multi;

@Path("/quotes")
public class QuotesResource {

    @Channel("quote-requests") Emitter<String> quoteRequestEmitter; (1)

    /**
     * Endpoint to generate a new quote request id and send it to "quote-requests" AMQP queue using the emitter.
     */
    @POST
    @Path("/request")
    @Produces(MediaType.TEXT_PLAIN)
    public String createRequest() {
        UUID uuid = UUID.randomUUID();
        quoteRequestEmitter.send(uuid.toString()); (2)
        return uuid.toString();
    }
}
1 注入 Reactive Messaging Emitter,以便向 `quote-requests`通道发送消息。
2 在 post 请求上,生成一个随机 UUID 并使用发射器将其发送到 AMQP 队列。

`quote-requests`通道将被管理为 AMQP 队列,因为那是类路径上的唯一连接器。如果没有另行指明,就像此示例中一样,Quarkus 会使用通道名称作为 AMQP 队列名称。因此,在本例中,该应用程序会向 `quote-requests`队列发送消息。

当你有多个连接器时,你需要在应用程序配置中指明你要在其中使用哪个连接器。

Processing quote requests

现在,让我们使用该报价请求并给出价格。在 `processor`项目内部,找到 `src/main/java/org/acme/amqp/processor/QuoteProcessor.java`文件并添加以下内容:

package org.acme.amqp.processor;

import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.acme.amqp.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.annotations.Blocking;

/**
 * A bean consuming data from the "request" AMQP queue and giving out a random quote.
 * The result is pushed to the "quotes" AMQP queue.
 */
@ApplicationScoped
public class QuoteProcessor {

    private Random random = new Random();

    @Incoming("requests")       (1)
    @Outgoing("quotes")         (2)
    @Blocking                   (3)
    public Quote process(String quoteRequest) throws InterruptedException {
        // simulate some hard-working task
        Thread.sleep(200);
        return new Quote(quoteRequest, random.nextInt(100));
    }
}
1 表明方法使用 requests 通道中的项目
2 表明方法返回的对象被发送到 quotes 通道
3 指示处理为 blocking,并且不能在调用方线程上运行。

`process`方法是为来自 `quote-requests`队列的每条 AMQP 消息调用的,它会将 `Quote`对象发送到 `quotes`队列。

因为我们希望使用 `requests`通道将消息从 `quotes-requests`队列中使用,我们需要配置此关联。打开 `src/main/resources/application.properties`文件并添加:

mp.messaging.incoming.requests.address=quote-requests

配置属性的构建方式如下:

mp.messaging.[outgoing|incoming].{channel-name}.property=value

在我们的示例中,我们要配置 `address`属性以指明队列名称。

Receiving quotes

回到我们的 producer 项目。让我们修改 QuotesResource 以使用报价、将其绑定到一个 HTTP 端点以向客户端发送事件:

import io.smallrye.mutiny.Multi;
//...

@Channel("quotes") Multi<Quote> quotes;     (1)

/**
 * Endpoint retrieving the "quotes" queue and sending the items to a server sent event.
 */
@GET
@Produces(MediaType.SERVER_SENT_EVENTS) (2)
public Multi<Quote> stream() {
    return quotes; (3)
}
1 使用 @Channel 限定符注入 quotes 通道
2 表明内容是使用 Server Sent Events 发送的
3 返回流(Reactive Stream

The HTML page

最后一步,使用 SSE 读取已转换价格的 HTML 页面。

producer 项目内创建 src/main/resources/META-INF/resources/quotes.html 文件,并包含以下内容:

<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Quotes</title>

    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">
    <div class="card">
        <div class="card-body">
            <h2 class="card-title">Quotes</h2>
            <button class="btn btn-info" id="request-quote">Request Quote</button>
            <div class="quotes"></div>
        </div>
    </div>
</div>
</body>
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
<script>
    $("#request-quote").click((event) => {
        fetch("/quotes/request", {method: "POST"})
        .then(res => res.text())
        .then(qid => {
            var row = $(`<h4 class='col-md-12' id='${qid}'>Quote # <i>${qid}</i> | <strong>Pending</strong></h4>`);
            $(".quotes").append(row);
        });
    });
    var source = new EventSource("/quotes");
    source.onmessage = (event) => {
      var json = JSON.parse(event.data);
      $(`#${json.id}`).html(function(index, html) {
        return html.replace("Pending", `\$\xA0${json.price}`);
      });
    };
</script>
</html>

这里没有特别之处。它会更新每个收到的页面的报价。

Get it running

你只需要使用以下命令运行这两个应用程序:

> mvn -f amqp-quickstart-producer quarkus:dev

在另一个终端中:

> mvn -f amqp-quickstart-processor quarkus:dev

Quarkus 自动启动 AMQP 代理,为应用程序进行配置,并在不同应用程序之间共享代理实例,有关更多详细信息,请见 Dev Services for AMQP

在浏览器中打开 http://localhost:8080/quotes.html,然后通过单击按钮请求一些报价。

Running in JVM or Native mode

在非开发或测试模式下运行时,你需要启动 AMQP 代理。你可以遵循 Apache ActiveMQ Artemis website中的说明或创建一个 `docker-compose.yaml`文件,其内容如下:

version: '2'

services:

  artemis:
    image: quay.io/artemiscloud/activemq-artemis-broker:1.0.25
    ports:
      - "8161:8161"
      - "61616:61616"
      - "5672:5672"
    environment:
      AMQ_USER: quarkus
      AMQ_PASSWORD: quarkus
    networks:
      - amqp-quickstart-network

  producer:
    image: quarkus-quickstarts/amqp-quickstart-producer:1.0-${QUARKUS_MODE:-jvm}
    build:
      context: amqp-quickstart-producer
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    environment:
      AMQP_HOST: artemis
      AMQP_PORT: 5672
    ports:
      - "8080:8080"
    networks:
      - amqp-quickstart-network

  processor:
    image: quarkus-quickstarts/amqp-quickstart-processor:1.0-${QUARKUS_MODE:-jvm}
    build:
      context: amqp-quickstart-processor
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    environment:
      AMQP_HOST: artemis
      AMQP_PORT: 5672
    networks:
      - amqp-quickstart-network

networks:
  amqp-quickstart-network:
    name: amqp-quickstart

注意 AMQP 代理位置的配置方式。amqp.host`和 `amqp.port(`AMQP_HOST`和 `AMQP_PORT`环境变量) 属性配置位置。

首先,确保已停止应用程序,并使用以下命令以 JVM 模式构建这两个应用程序:

> mvn -f amqp-quickstart-producer clean package
> mvn -f amqp-quickstart-processor clean package

打包完成后,运行 docker compose up --build。用户界面在 [role="bare"][role="bare"]http://localhost:8080/quotes.html 上公开

要以本机方式运行应用程序,我们首先需要构建本机可执行文件:

> mvn -f amqp-quickstart-producer package -Dnative  -Dquarkus.native.container-build=true
> mvn -f amqp-quickstart-processor package -Dnative -Dquarkus.native.container-build=true

-Dquarkus.native.container-build=true 指示 Quarkus 构建可在容器内运行的 Linux 64 位本机可执行文件。然后,使用以下命令运行系统:

> export QUARKUS_MODE=native
> docker compose up --build

和之前一样,用户界面在 [role="bare"][role="bare"]http://localhost:8080/quotes.html 上公开

Going further

本指南展示了 Quarkus 如何与 AMQP 1.0 交互。它利用 SmallRye Reactive Messaging构建数据流应用程序。

如果你已经完成 Kafka 快速入门,你就会发现它是同样的代码。唯一的差异是连接器配置和 JSON 映射。