Getting Started to Quarkus Messaging with RabbitMQ

本指南演示了您的 Quarkus 应用程序如何利用 Quarkus 消息传递与 RabbitMQ 交互。 :iokays-category: quarkus :iokays-path: modules/ROOT/pages/_includes/extension-status.adoc :keywords: Quarkus, 中文文档, 编程技术

该技术被认为是 {extension-status}。 有关可能状态的完整列表,请查看我们的 FAQ entry.

Prerequisites

如要完成本指南,您需要:

  • Roughly 15 minutes

  • An IDE

  • 安装了 JDK 17+,已正确配置 JAVA_HOME

  • Apache Maven ${proposed-maven-version}

  • 如果你想使用 Quarkus CLI, 则可以选择使用

  • 如果你想构建一个本机可执行文件(或如果你使用本机容器构建,则使用 Docker),则可以选择安装 Mandrel 或 GraalVM 以及 configured appropriately

Architecture

在本指南中,我们将开发两个与 RabbitMQ 代理通信的应用程序。第一个应用程序将 quote request 发送到 RabbitMQ quote requests 交换并从 quote 队列中使用消息。第二个应用程序接收 quote request 并发送回 quote

amqp qs architecture

第一个应用程序 producer 将允许用户通过 HTTP 端点请求一些报价。对于每个报价请求,都会生成一个随机标识符并返还给用户,以将报价请求置于 pending 中。与此同时,生成的请求 id 会被发送到 quote-requests 交换。

amqp qs app screenshot

第二个应用程序 processor 接下来将从 quote-requests 队列读取内容,为报价添加随机价格,然后将其发送到名为 quotes 的交换中。

最后,producer 将读取报价并使用服务器端发送的事件将它们发送到浏览器。因此,用户将看到报价价格从 pending 实时更新为接收到的价格。

Solution

我们建议您按照下一节中的说明一步步创建应用程序。但是,您可以直接转到已完成的示例。

克隆 Git 存储库: git clone $${quickstarts-base-url}.git,或下载 $${quickstarts-base-url}/archive/main.zip[存档]。

该解决方案位于 rabbitmq-quickstart directory 中。

Creating the Maven Project

首先,我们需要创建两个项目:producer_和 _processor

要在终端中创建 _producer_项目,请运行:

CLI
quarkus create app {create-app-group-id}:{create-app-artifact-id} \
    --no-code
cd {create-app-artifact-id}

要创建一个 Gradle 项目,添加 --gradle--gradle-kotlin-dsl 选项。 有关如何安装和使用 Quarkus CLI 的详细信息,请参见 Quarkus CLI 指南。

Maven
mvn {quarkus-platform-groupid}:quarkus-maven-plugin:{quarkus-version}:create \
    -DprojectGroupId={create-app-group-id} \
    -DprojectArtifactId={create-app-artifact-id} \
    -DnoCode
cd {create-app-artifact-id}

要创建一个 Gradle 项目,添加 -DbuildTool=gradle-DbuildTool=gradle-kotlin-dsl 选项。

适用于 Windows 用户:

  • 如果使用 cmd,(不要使用反斜杠 \ ,并将所有内容放在同一行上)

  • 如果使用 Powershell,将 -D 参数用双引号引起来,例如 "-DprojectArtifactId={create-app-artifact-id}"

此命令创建项目结构并选择我们将使用的两个 Quarkus 扩展:

  1. 反应式消息传递 RabbitMQ 连接器

  2. Quarkus REST(以前的 RESTEasy Reactive)及其 Jackson 支持来处理 JSON 有效负载

如果您已配置 Quarkus 项目,则可以通过在项目基本目录中运行以下命令将 messaging-rabbitmq 扩展添加到您的项目: :iokays-category: quarkus :iokays-path: modules/ROOT/pages/_includes/devtools/extension-add.adoc :keywords: Quarkus, 中文文档, 编程技术

CLI
quarkus extension add {add-extension-extensions}
Maven
./mvnw quarkus:add-extension -Dextensions='{add-extension-extensions}'
Gradle
./gradlew addExtension --extensions='{add-extension-extensions}'

这会将以下内容添加到您的 pom.xml

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-rabbitmq</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-rabbitmq")

要从相同目录创建 _processor_项目,请运行:

CLI
quarkus create app {create-app-group-id}:{create-app-artifact-id} \
    --no-code
cd {create-app-artifact-id}

要创建一个 Gradle 项目,添加 --gradle--gradle-kotlin-dsl 选项。 有关如何安装和使用 Quarkus CLI 的详细信息,请参见 Quarkus CLI 指南。

Maven
mvn {quarkus-platform-groupid}:quarkus-maven-plugin:{quarkus-version}:create \
    -DprojectGroupId={create-app-group-id} \
    -DprojectArtifactId={create-app-artifact-id} \
    -DnoCode
cd {create-app-artifact-id}

要创建一个 Gradle 项目,添加 -DbuildTool=gradle-DbuildTool=gradle-kotlin-dsl 选项。

适用于 Windows 用户:

  • 如果使用 cmd,(不要使用反斜杠 \ ,并将所有内容放在同一行上)

  • 如果使用 Powershell,将 -D 参数用双引号引起来,例如 "-DprojectArtifactId={create-app-artifact-id}"

此时,您应该拥有以下结构:

.
├── rabbitmq-quickstart-processor
│  ├── README.md
│  ├── mvnw
│  ├── mvnw.cmd
│  ├── pom.xml
│  └── src
│     └── main
│        ├── docker
│        ├── java
│        └── resources
│           └── application.properties
└── rabbitmq-quickstart-producer
   ├── README.md
   ├── mvnw
   ├── mvnw.cmd
   ├── pom.xml
   └── src
      └── main
         ├── docker
         ├── java
         └── resources
            └── application.properties

在你的首选 IDE 中打开这两个项目。

The Quote object

Quote 类将用于 producerprocessor 项目。为了简单起见,我们将复制该类。在这两个项目中,创建 src/main/java/org/acme/rabbitmq/model/Quote.java 文件,其内容如下:

package org.acme.rabbitmq.model;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class Quote {

    public String id;
    public int price;

    /**
    * Default constructor required for Jackson serializer
    */
    public Quote() { }

    public Quote(String id, int price) {
        this.id = id;
        this.price = price;
    }

    @Override
    public String toString() {
        return "Quote{" +
                "id='" + id + '\'' +
                ", price=" + price +
                '}';
    }
}

发送到 RabbitMQ 队列的消息和发送到浏览器客户端的服务器发送事件中将使用 Quote 对象的 JSON 表示形式。

Quarkus 具有内置功能来处理 JSON RabbitMQ 消息。

@RegisterForReflection

@RegisterForReflection 注释指示 Quarkus 在创建本机可执行文件时保留类、其字段和方法。当我们稍后在容器中以本机可执行文件的形式运行应用程序时,这点很重要。如果没有此注释,本机编译过程将在死代码消除阶段丢弃字段和方法,这会导致运行时错误。有关 @RegisterForReflection 注释的更多详细信息,请参阅 native application tips 页面。

Sending quote request

producer 项目中找到生成的 src/main/java/org/acme/rabbitmq/producer/QuotesResource.java 文件,并将内容更新为:

package org.acme.rabbitmq.producer;

import java.util.UUID;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.acme.rabbitmq.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import io.smallrye.mutiny.Multi;

@Path("/quotes")
public class QuotesResource {

    @Channel("quote-requests") Emitter<String> quoteRequestEmitter; (1)

    /**
     * Endpoint to generate a new quote request id and send it to "quote-requests" channel (which
     * maps to the "quote-requests" RabbitMQ exchange) using the emitter.
     */
    @POST
    @Path("/request")
    @Produces(MediaType.TEXT_PLAIN)
    public String createRequest() {
        UUID uuid = UUID.randomUUID();
        quoteRequestEmitter.send(uuid.toString()); (2)
        return uuid.toString();
    }
}
1 注入 Reactive Messaging Emitter,以便向 `quote-requests`通道发送消息。
2 在 post 请求中,生成一个随机 UUID 并使用发送器将其发送到 RabbitMQ 队列。

此通道使用我们将添加到 application.properties 文件的配置映射到 RabbitMQ 交换。打开 src/main/resource/application.properties 文件并添加:

# Configure the outgoing RabbitMQ exchange `quote-requests`
mp.messaging.outgoing.quote-requests.connector=smallrye-rabbitmq
mp.messaging.outgoing.quote-requests.exchange.name=quote-requests

我们只需要指定 smallrye-rabbitmq 连接器。默认情况下,响应式消息传递将通道名 quote-requests 映射到相同的 RabbitMQ 交换名称。

Processing quote requests

现在,让我们使用报价请求并给出价格。在 processor 项目中,找到 src/main/java/org/acme/rabbitmq/processor/QuoteProcessor.java 文件并添加以下内容:

package org.acme.rabbitmq.processor;

import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.acme.rabbitmq.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.annotations.Blocking;

/**
 * A bean consuming data from the "quote-requests" RabbitMQ queue and giving out a random quote.
 * The result is pushed to the "quotes" RabbitMQ exchange.
 */
@ApplicationScoped
public class QuoteProcessor {

    private Random random = new Random();

    @Incoming("requests")       (1)
    @Outgoing("quotes")         (2)
    @Blocking                   (3)
    public Quote process(String quoteRequest) throws InterruptedException {
        // simulate some hard-working task
        Thread.sleep(1000);
        return new Quote(quoteRequest, random.nextInt(100));
    }
}
1 表明方法使用 requests 通道中的项目
2 表明方法返回的对象被发送到 quotes 通道
3 指示处理为 blocking,并且不能在调用方线程上运行。

process 方法将针对来自 quote-requests 队列的每个 RabbitMQ 消息而调用,并将 Quote 对象发送到 quotes 交换。

与前面的示例一样,我们需要在 application.properties 文件中配置连接器。打开 src/main/resources/application.properties 文件并添加:

# Configure the incoming RabbitMQ queue `quote-requests`
mp.messaging.incoming.requests.connector=smallrye-rabbitmq
mp.messaging.incoming.requests.queue.name=quote-requests
mp.messaging.incoming.requests.exchange.name=quote-requests

# Configure the outgoing RabbitMQ exchange `quotes`
mp.messaging.outgoing.quotes.connector=smallrye-rabbitmq
mp.messaging.outgoing.quotes.exchange.name=quotes

请注意,在这种情况下,我们有一个入站和一个出站连接器配置,每个都明确命名。配置属性的结构如下:

mp.messaging.[outgoing|incoming].{channel-name}.property=value

channel-name 片段必须与 @Incoming@Outgoing 注释中设置的值相匹配:

  • quote-requests → RabbitMQ 队列,我们从其中读取报价请求

  • quotes → RabbitMQ 交换,我们向其中写入报价

Receiving quotes

回到我们的 producer 项目。让我们修改 QuotesResource 以使用报价、将其绑定到一个 HTTP 端点以向客户端发送事件:

import io.smallrye.mutiny.Multi;
//...

@Channel("quotes") Multi<Quote> quotes;     (1)

/**
 * Endpoint retrieving the "quotes" queue and sending the items to a server sent event.
 */
@GET
@Produces(MediaType.SERVER_SENT_EVENTS) (2)
public Multi<Quote> stream() {
    return quotes; (3)
}
1 使用 @Channel 限定符注入 quotes 通道
2 表明内容是使用 Server Sent Events 发送的
3 返回流(Reactive Stream

我们再次需要配置 producer 项目内的传入 quotes 通道。在 application.properties 文件内添加以下内容:

# Configure the outgoing `quote-requests` queue
mp.messaging.outgoing.quote-requests.connector=smallrye-rabbitmq

# Configure the incoming `quotes` queue
mp.messaging.incoming.quotes.connector=smallrye-rabbitmq

The HTML page

最后一步,使用 SSE 读取已转换价格的 HTML 页面。

producer 项目内创建 src/main/resources/META-INF/resources/quotes.html 文件,并包含以下内容:

<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Quotes</title>

    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">
    <div class="card">
        <div class="card-body">
            <h2 class="card-title">Quotes</h2>
            <button class="btn btn-info" id="request-quote">Request Quote</button>
            <div class="quotes"></div>
        </div>
    </div>
</div>
</body>
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
<script>
    $("#request-quote").click((event) => {
        fetch("/quotes/request", {method: "POST"})
        .then(res => res.text())
        .then(qid => {
            var row = $(`<h4 class='col-md-12' id='${qid}'>Quote # <i>${qid}</i> | <strong>Pending</strong></h4>`);
            $(".quotes").append(row);
        });
    });
    var source = new EventSource("/quotes");
    source.onmessage = (event) => {
      var json = JSON.parse(event.data);
      $(`#${json.id}`).html(function(index, html) {
        return html.replace("Pending", `\$\xA0${json.price}`);
      });
    };
</script>
</html>

这里没有特别之处。它会更新每个收到的页面的报价。

Get it running

你只需要使用以下命令运行这两个应用程序:

> mvn -f rabbitmq-quickstart-producer quarkus:dev

在另一个终端中:

> mvn -f rabbitmq-quickstart-processor quarkus:dev

Quarkus 会自动启动 RabbitMQ 代理,配置应用程序,并在不同应用程序之间共享代理实例。有关更多详细信息,请参见 Dev Services for RabbitMQ

在浏览器中打开 http://localhost:8080/quotes.html,然后通过单击按钮请求一些报价。

Running in JVM or Native mode

在非开发或测试模式下运行时,你需要启动 RabbitMQ 代理。你可以按照 RabbitMQ Docker website 中的说明进行操作,或创建一个包含以下内容的 docker-compose.yaml 文件:

version: '2'

services:

  rabbit:
    image: rabbitmq:3.12-management
    ports:
      - "5672:5672"
    networks:
      - rabbitmq-quickstart-network

  producer:
    image: quarkus-quickstarts/rabbitmq-quickstart-producer:1.0-${QUARKUS_MODE:-jvm}
    build:
      context: rabbitmq-quickstart-producer
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    environment:
      RABBITMQ_HOST: rabbit
      RABBITMQ_PORT: 5672
    ports:
      - "8080:8080"
    networks:
      - rabbitmq-quickstart-network

  processor:
    image: quarkus-quickstarts/rabbitmq-quickstart-processor:1.0-${QUARKUS_MODE:-jvm}
    build:
      context: rabbitmq-quickstart-processor
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    environment:
      RABBITMQ_HOST: rabbit
      RABBITMQ_PORT: 5672
    networks:
      - rabbitmq-quickstart-network

networks:
  rabbitmq-quickstart-network:
    name: rabbitmq-quickstart

注意 RabbitMQ 代理位置的配置方式。rabbitmq-hostrabbitmq-port 属性(AMQP_HOSTAMQP_PORT 环境变量)配置位置。

首先,确保已停止应用程序,并使用以下命令以 JVM 模式构建这两个应用程序:

> mvn -f rabbitmq-quickstart-producer clean package
> mvn -f rabbitmq-quickstart-processor clean package

打包完成后,运行 docker compose up --build。用户界面在 [role="bare"][role="bare"]http://localhost:8080/quotes.html 上公开

要以本机方式运行应用程序,我们首先需要构建本机可执行文件:

> mvn -f rabbitmq-quickstart-producer package -Dnative  -Dquarkus.native.container-build=true
> mvn -f rabbitmq-quickstart-processor package -Dnative -Dquarkus.native.container-build=true

-Dquarkus.native.container-build=true 指示 Quarkus 构建可在容器内运行的 Linux 64 位本机可执行文件。然后,使用以下命令运行系统:

> export QUARKUS_MODE=native
> docker compose up --build

和之前一样,用户界面在 [role="bare"][role="bare"]http://localhost:8080/quotes.html 上公开

Going further

本指南展示了如何使用 Quarkus 与 RabbitMQ 交互。它利用 SmallRye Reactive Messaging 构建数据流应用程序。

如果你已使用过 Kafka,你就会意识到代码是相同的。唯一的区别是连接器配置和 JSON 映射。