Getting Started to Quarkus Messaging with RabbitMQ
本指南演示了您的 Quarkus 应用程序如何利用 Quarkus 消息传递与 RabbitMQ 交互。 :iokays-category: quarkus :iokays-path: modules/ROOT/pages/_includes/extension-status.adoc :keywords: Quarkus, 中文文档, 编程技术
该技术被认为是 {extension-status}。 有关可能状态的完整列表,请查看我们的 FAQ entry. |
Prerequisites
如要完成本指南,您需要:
-
Roughly 15 minutes
-
An IDE
-
安装了 JDK 17+,已正确配置
JAVA_HOME
-
Apache Maven ${proposed-maven-version}
-
如果你想使用 Quarkus CLI, 则可以选择使用
-
如果你想构建一个本机可执行文件(或如果你使用本机容器构建,则使用 Docker),则可以选择安装 Mandrel 或 GraalVM 以及 configured appropriately
Architecture
在本指南中,我们将开发两个与 RabbitMQ 代理通信的应用程序。第一个应用程序将 quote request 发送到 RabbitMQ quote requests 交换并从 quote 队列中使用消息。第二个应用程序接收 quote request 并发送回 quote 。
第一个应用程序 producer
将允许用户通过 HTTP 端点请求一些报价。对于每个报价请求,都会生成一个随机标识符并返还给用户,以将报价请求置于 pending 中。与此同时,生成的请求 id 会被发送到 quote-requests
交换。
第二个应用程序 processor
接下来将从 quote-requests
队列读取内容,为报价添加随机价格,然后将其发送到名为 quotes
的交换中。
最后,producer
将读取报价并使用服务器端发送的事件将它们发送到浏览器。因此,用户将看到报价价格从 pending 实时更新为接收到的价格。
Solution
我们建议您按照下一节中的说明一步步创建应用程序。但是,您可以直接转到已完成的示例。
克隆 Git 存储库: git clone $${quickstarts-base-url}.git
,或下载 $${quickstarts-base-url}/archive/main.zip[存档]。
该解决方案位于 rabbitmq-quickstart
directory 中。
Creating the Maven Project
首先,我们需要创建两个项目:producer_和 _processor。
要在终端中创建 _producer_项目,请运行:
quarkus create app {create-app-group-id}:{create-app-artifact-id} \
--no-code
cd {create-app-artifact-id}
要创建一个 Gradle 项目,添加 --gradle
或 --gradle-kotlin-dsl
选项。
有关如何安装和使用 Quarkus CLI 的详细信息,请参见 Quarkus CLI 指南。
mvn {quarkus-platform-groupid}:quarkus-maven-plugin:{quarkus-version}:create \
-DprojectGroupId={create-app-group-id} \
-DprojectArtifactId={create-app-artifact-id} \
-DnoCode
cd {create-app-artifact-id}
要创建一个 Gradle 项目,添加 -DbuildTool=gradle
或 -DbuildTool=gradle-kotlin-dsl
选项。
适用于 Windows 用户:
-
如果使用 cmd,(不要使用反斜杠
\
,并将所有内容放在同一行上) -
如果使用 Powershell,将
-D
参数用双引号引起来,例如"-DprojectArtifactId={create-app-artifact-id}"
此命令创建项目结构并选择我们将使用的两个 Quarkus 扩展:
-
反应式消息传递 RabbitMQ 连接器
-
Quarkus REST(以前的 RESTEasy Reactive)及其 Jackson 支持来处理 JSON 有效负载
如果您已配置 Quarkus 项目,则可以通过在项目基本目录中运行以下命令将 CLI
Maven
Gradle
这会将以下内容添加到您的 pom.xml
build.gradle
|
要从相同目录创建 _processor_项目,请运行:
quarkus create app {create-app-group-id}:{create-app-artifact-id} \
--no-code
cd {create-app-artifact-id}
要创建一个 Gradle 项目,添加 --gradle
或 --gradle-kotlin-dsl
选项。
有关如何安装和使用 Quarkus CLI 的详细信息,请参见 Quarkus CLI 指南。
mvn {quarkus-platform-groupid}:quarkus-maven-plugin:{quarkus-version}:create \
-DprojectGroupId={create-app-group-id} \
-DprojectArtifactId={create-app-artifact-id} \
-DnoCode
cd {create-app-artifact-id}
要创建一个 Gradle 项目,添加 -DbuildTool=gradle
或 -DbuildTool=gradle-kotlin-dsl
选项。
适用于 Windows 用户:
-
如果使用 cmd,(不要使用反斜杠
\
,并将所有内容放在同一行上) -
如果使用 Powershell,将
-D
参数用双引号引起来,例如"-DprojectArtifactId={create-app-artifact-id}"
此时,您应该拥有以下结构:
.
├── rabbitmq-quickstart-processor
│ ├── README.md
│ ├── mvnw
│ ├── mvnw.cmd
│ ├── pom.xml
│ └── src
│ └── main
│ ├── docker
│ ├── java
│ └── resources
│ └── application.properties
└── rabbitmq-quickstart-producer
├── README.md
├── mvnw
├── mvnw.cmd
├── pom.xml
└── src
└── main
├── docker
├── java
└── resources
└── application.properties
在你的首选 IDE 中打开这两个项目。
The Quote object
Quote
类将用于 producer
和 processor
项目。为了简单起见,我们将复制该类。在这两个项目中,创建 src/main/java/org/acme/rabbitmq/model/Quote.java
文件,其内容如下:
package org.acme.rabbitmq.model;
import io.quarkus.runtime.annotations.RegisterForReflection;
@RegisterForReflection
public class Quote {
public String id;
public int price;
/**
* Default constructor required for Jackson serializer
*/
public Quote() { }
public Quote(String id, int price) {
this.id = id;
this.price = price;
}
@Override
public String toString() {
return "Quote{" +
"id='" + id + '\'' +
", price=" + price +
'}';
}
}
发送到 RabbitMQ 队列的消息和发送到浏览器客户端的服务器发送事件中将使用 Quote
对象的 JSON 表示形式。
Quarkus 具有内置功能来处理 JSON RabbitMQ 消息。
@RegisterForReflection
|
Sending quote request
在 producer
项目中找到生成的 src/main/java/org/acme/rabbitmq/producer/QuotesResource.java
文件,并将内容更新为:
package org.acme.rabbitmq.producer;
import java.util.UUID;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.acme.rabbitmq.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import io.smallrye.mutiny.Multi;
@Path("/quotes")
public class QuotesResource {
@Channel("quote-requests") Emitter<String> quoteRequestEmitter; (1)
/**
* Endpoint to generate a new quote request id and send it to "quote-requests" channel (which
* maps to the "quote-requests" RabbitMQ exchange) using the emitter.
*/
@POST
@Path("/request")
@Produces(MediaType.TEXT_PLAIN)
public String createRequest() {
UUID uuid = UUID.randomUUID();
quoteRequestEmitter.send(uuid.toString()); (2)
return uuid.toString();
}
}
1 | 注入 Reactive Messaging Emitter ,以便向 `quote-requests`通道发送消息。 |
2 | 在 post 请求中,生成一个随机 UUID 并使用发送器将其发送到 RabbitMQ 队列。 |
此通道使用我们将添加到 application.properties
文件的配置映射到 RabbitMQ 交换。打开 src/main/resource/application.properties
文件并添加:
# Configure the outgoing RabbitMQ exchange `quote-requests`
mp.messaging.outgoing.quote-requests.connector=smallrye-rabbitmq
mp.messaging.outgoing.quote-requests.exchange.name=quote-requests
我们只需要指定 smallrye-rabbitmq
连接器。默认情况下,响应式消息传递将通道名 quote-requests
映射到相同的 RabbitMQ 交换名称。
Processing quote requests
现在,让我们使用报价请求并给出价格。在 processor
项目中,找到 src/main/java/org/acme/rabbitmq/processor/QuoteProcessor.java
文件并添加以下内容:
package org.acme.rabbitmq.processor;
import java.util.Random;
import jakarta.enterprise.context.ApplicationScoped;
import org.acme.rabbitmq.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.reactive.messaging.annotations.Blocking;
/**
* A bean consuming data from the "quote-requests" RabbitMQ queue and giving out a random quote.
* The result is pushed to the "quotes" RabbitMQ exchange.
*/
@ApplicationScoped
public class QuoteProcessor {
private Random random = new Random();
@Incoming("requests") (1)
@Outgoing("quotes") (2)
@Blocking (3)
public Quote process(String quoteRequest) throws InterruptedException {
// simulate some hard-working task
Thread.sleep(1000);
return new Quote(quoteRequest, random.nextInt(100));
}
}
1 | 表明方法使用 requests 通道中的项目 |
2 | 表明方法返回的对象被发送到 quotes 通道 |
3 | 指示处理为 blocking,并且不能在调用方线程上运行。 |
process
方法将针对来自 quote-requests
队列的每个 RabbitMQ 消息而调用,并将 Quote
对象发送到 quotes
交换。
与前面的示例一样,我们需要在 application.properties
文件中配置连接器。打开 src/main/resources/application.properties
文件并添加:
# Configure the incoming RabbitMQ queue `quote-requests`
mp.messaging.incoming.requests.connector=smallrye-rabbitmq
mp.messaging.incoming.requests.queue.name=quote-requests
mp.messaging.incoming.requests.exchange.name=quote-requests
# Configure the outgoing RabbitMQ exchange `quotes`
mp.messaging.outgoing.quotes.connector=smallrye-rabbitmq
mp.messaging.outgoing.quotes.exchange.name=quotes
请注意,在这种情况下,我们有一个入站和一个出站连接器配置,每个都明确命名。配置属性的结构如下:
mp.messaging.[outgoing|incoming].{channel-name}.property=value
channel-name
片段必须与 @Incoming
和 @Outgoing
注释中设置的值相匹配:
-
quote-requests
→ RabbitMQ 队列,我们从其中读取报价请求 -
quotes
→ RabbitMQ 交换,我们向其中写入报价
Receiving quotes
回到我们的 producer
项目。让我们修改 QuotesResource
以使用报价、将其绑定到一个 HTTP 端点以向客户端发送事件:
import io.smallrye.mutiny.Multi;
//...
@Channel("quotes") Multi<Quote> quotes; (1)
/**
* Endpoint retrieving the "quotes" queue and sending the items to a server sent event.
*/
@GET
@Produces(MediaType.SERVER_SENT_EVENTS) (2)
public Multi<Quote> stream() {
return quotes; (3)
}
1 | 使用 @Channel 限定符注入 quotes 通道 |
2 | 表明内容是使用 Server Sent Events 发送的 |
3 | 返回流(Reactive Stream) |
我们再次需要配置 producer
项目内的传入 quotes
通道。在 application.properties
文件内添加以下内容:
# Configure the outgoing `quote-requests` queue
mp.messaging.outgoing.quote-requests.connector=smallrye-rabbitmq
# Configure the incoming `quotes` queue
mp.messaging.incoming.quotes.connector=smallrye-rabbitmq
The HTML page
最后一步,使用 SSE 读取已转换价格的 HTML 页面。
在 producer
项目内创建 src/main/resources/META-INF/resources/quotes.html
文件,并包含以下内容:
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Quotes</title>
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">
<div class="card">
<div class="card-body">
<h2 class="card-title">Quotes</h2>
<button class="btn btn-info" id="request-quote">Request Quote</button>
<div class="quotes"></div>
</div>
</div>
</div>
</body>
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
<script>
$("#request-quote").click((event) => {
fetch("/quotes/request", {method: "POST"})
.then(res => res.text())
.then(qid => {
var row = $(`<h4 class='col-md-12' id='${qid}'>Quote # <i>${qid}</i> | <strong>Pending</strong></h4>`);
$(".quotes").append(row);
});
});
var source = new EventSource("/quotes");
source.onmessage = (event) => {
var json = JSON.parse(event.data);
$(`#${json.id}`).html(function(index, html) {
return html.replace("Pending", `\$\xA0${json.price}`);
});
};
</script>
</html>
这里没有特别之处。它会更新每个收到的页面的报价。
Get it running
你只需要使用以下命令运行这两个应用程序:
> mvn -f rabbitmq-quickstart-producer quarkus:dev
在另一个终端中:
> mvn -f rabbitmq-quickstart-processor quarkus:dev
Quarkus 会自动启动 RabbitMQ 代理,配置应用程序,并在不同应用程序之间共享代理实例。有关更多详细信息,请参见 Dev Services for RabbitMQ。
在浏览器中打开 http://localhost:8080/quotes.html
,然后通过单击按钮请求一些报价。
Running in JVM or Native mode
在非开发或测试模式下运行时,你需要启动 RabbitMQ 代理。你可以按照 RabbitMQ Docker website 中的说明进行操作,或创建一个包含以下内容的 docker-compose.yaml
文件:
version: '2'
services:
rabbit:
image: rabbitmq:3.12-management
ports:
- "5672:5672"
networks:
- rabbitmq-quickstart-network
producer:
image: quarkus-quickstarts/rabbitmq-quickstart-producer:1.0-${QUARKUS_MODE:-jvm}
build:
context: rabbitmq-quickstart-producer
dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
environment:
RABBITMQ_HOST: rabbit
RABBITMQ_PORT: 5672
ports:
- "8080:8080"
networks:
- rabbitmq-quickstart-network
processor:
image: quarkus-quickstarts/rabbitmq-quickstart-processor:1.0-${QUARKUS_MODE:-jvm}
build:
context: rabbitmq-quickstart-processor
dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
environment:
RABBITMQ_HOST: rabbit
RABBITMQ_PORT: 5672
networks:
- rabbitmq-quickstart-network
networks:
rabbitmq-quickstart-network:
name: rabbitmq-quickstart
注意 RabbitMQ 代理位置的配置方式。rabbitmq-host
和 rabbitmq-port
属性(AMQP_HOST
和 AMQP_PORT
环境变量)配置位置。
首先,确保已停止应用程序,并使用以下命令以 JVM 模式构建这两个应用程序:
> mvn -f rabbitmq-quickstart-producer clean package
> mvn -f rabbitmq-quickstart-processor clean package
打包完成后,运行 docker compose up --build
。用户界面在 [role="bare"][role="bare"]http://localhost:8080/quotes.html 上公开
要以本机方式运行应用程序,我们首先需要构建本机可执行文件:
> mvn -f rabbitmq-quickstart-producer package -Dnative -Dquarkus.native.container-build=true
> mvn -f rabbitmq-quickstart-processor package -Dnative -Dquarkus.native.container-build=true
-Dquarkus.native.container-build=true
指示 Quarkus 构建可在容器内运行的 Linux 64 位本机可执行文件。然后,使用以下命令运行系统:
> export QUARKUS_MODE=native
> docker compose up --build
和之前一样,用户界面在 [role="bare"][role="bare"]http://localhost:8080/quotes.html 上公开
Going further
本指南展示了如何使用 Quarkus 与 RabbitMQ 交互。它利用 SmallRye Reactive Messaging 构建数据流应用程序。
如果你已使用过 Kafka,你就会意识到代码是相同的。唯一的区别是连接器配置和 JSON 映射。