Getting Started to Quarkus Messaging with RabbitMQ
本指南演示了您的 Quarkus 应用程序如何利用 Quarkus 消息传递与 RabbitMQ 交互。
This guide demonstrates how your Quarkus application can utilize Quarkus Messaging to interact with RabbitMQ. Unresolved directive in rabbitmq.adoc - include::{includes}/extension-status.adoc[]
Architecture
在本指南中,我们将开发两个与 RabbitMQ 代理通信的应用程序。第一个应用程序将 quote request 发送到 RabbitMQ quote requests 交换并从 quote 队列中使用消息。第二个应用程序接收 quote request 并发送回 quote 。
In this guide, we are going to develop two applications communicating with a RabbitMQ broker. The first application sends a quote request to the RabbitMQ quote requests exchange and consumes messages from the quote queue. The second application receives the quote request and sends a quote back.
第一个应用程序 producer
将允许用户通过 HTTP 端点请求一些报价。对于每个报价请求,都会生成一个随机标识符并返还给用户,以将报价请求置于 pending 中。与此同时,生成的请求 id 会被发送到 quote-requests
交换。
The first application, the producer
, will let the user request some quotes over an HTTP endpoint.
For each quote request, a random identifier is generated and returned to the user, to put the quote request on pending.
At the same time the generated request id is sent to the quote-requests
exchange.
第二个应用程序 processor
接下来将从 quote-requests
队列读取内容,为报价添加随机价格,然后将其发送到名为 quotes
的交换中。
The second application, the processor
, in turn, will read from the quote-requests
queue put a random price to the quote, and send it to an exchange named quotes
.
最后,producer
将读取报价并使用服务器端发送的事件将它们发送到浏览器。因此,用户将看到报价价格从 pending 实时更新为接收到的价格。
Lastly, the producer
will read the quotes and send them to the browser using server-sent events.
The user will therefore see the quote price updated from pending to the received price in real-time.
Solution
我们建议您按照下一节中的说明一步步创建应用程序。但是,您可以直接转到已完成的示例。
We recommend that you follow the instructions in the next sections and create applications step by step. However, you can go right to the completed example.
克隆 Git 存储库: git clone {quickstarts-clone-url}
,或下载 {quickstarts-archive-url}[存档]。
Clone the Git repository: git clone {quickstarts-clone-url}
, or download an {quickstarts-archive-url}[archive].
该解决方案位于 rabbitmq-quickstart
directory 中。
The solution is located in the rabbitmq-quickstart
directory.
Creating the Maven Project
首先,我们需要创建两个项目:producer_和 _processor。
First, we need to create two projects: the producer and the processor.
要在终端中创建 _producer_项目,请运行:
To create the producer project, in a terminal run:
Unresolved directive in rabbitmq.adoc - include::{includes}/devtools/create-app.adoc[]
此命令创建项目结构并选择我们将使用的两个 Quarkus 扩展:
This command creates the project structure and select the two Quarkus extensions we will be using:
-
The Reactive Messaging RabbitMQ connector
-
Quarkus REST (formerly RESTEasy Reactive) and its Jackson support to handle JSON payloads
如果您已配置 Quarkus 项目,则可以通过在项目基本目录中运行以下命令将 If you already have your Quarkus project configured, you can add the 这会将以下内容添加到您的 This will add the following to your pom.xml
build.gradle
|
要从相同目录创建 _processor_项目,请运行:
To create the processor project, from the same directory, run:
Unresolved directive in rabbitmq.adoc - include::{includes}/devtools/create-app.adoc[]
此时,您应该拥有以下结构:
At that point you should have the following structure:
.
├── rabbitmq-quickstart-processor
│ ├── README.md
│ ├── mvnw
│ ├── mvnw.cmd
│ ├── pom.xml
│ └── src
│ └── main
│ ├── docker
│ ├── java
│ └── resources
│ └── application.properties
└── rabbitmq-quickstart-producer
├── README.md
├── mvnw
├── mvnw.cmd
├── pom.xml
└── src
└── main
├── docker
├── java
└── resources
└── application.properties
在你的首选 IDE 中打开这两个项目。
Open the two projects in your favorite IDE.
The Quote object
Quote
类将用于 producer
和 processor
项目。为了简单起见,我们将复制该类。在这两个项目中,创建 src/main/java/org/acme/rabbitmq/model/Quote.java
文件,其内容如下:
The Quote
class will be used in both producer
and processor
projects.
For the sake of simplicity we will duplicate the class.
In both projects, create the src/main/java/org/acme/rabbitmq/model/Quote.java
file, with the following content:
package org.acme.rabbitmq.model;
import io.quarkus.runtime.annotations.RegisterForReflection;
@RegisterForReflection
public class Quote {
public String id;
public int price;
/**
* Default constructor required for Jackson serializer
*/
public Quote() { }
public Quote(String id, int price) {
this.id = id;
this.price = price;
}
@Override
public String toString() {
return "Quote{" +
"id='" + id + '\'' +
", price=" + price +
'}';
}
}
发送到 RabbitMQ 队列的消息和发送到浏览器客户端的服务器发送事件中将使用 Quote
对象的 JSON 表示形式。
JSON representation of Quote
objects will be used in messages sent to the RabbitMQ queues
and also in the server-sent events sent to browser clients.
Quarkus 具有内置功能来处理 JSON RabbitMQ 消息。
Quarkus has built-in capabilities to deal with JSON RabbitMQ messages.
@RegisterForReflection
The |
Sending quote request
在 producer
项目中找到生成的 src/main/java/org/acme/rabbitmq/producer/QuotesResource.java
文件,并将内容更新为:
Inside the producer
project locate the generated src/main/java/org/acme/rabbitmq/producer/QuotesResource.java
file, and update the content to be:
package org.acme.rabbitmq.producer;
import java.util.UUID;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.acme.rabbitmq.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import io.smallrye.mutiny.Multi;
@Path("/quotes")
public class QuotesResource {
@Channel("quote-requests") Emitter<String> quoteRequestEmitter; (1)
/**
* Endpoint to generate a new quote request id and send it to "quote-requests" channel (which
* maps to the "quote-requests" RabbitMQ exchange) using the emitter.
*/
@POST
@Path("/request")
@Produces(MediaType.TEXT_PLAIN)
public String createRequest() {
UUID uuid = UUID.randomUUID();
quoteRequestEmitter.send(uuid.toString()); (2)
return uuid.toString();
}
}
1 | Inject a Reactive Messaging Emitter to send messages to the quote-requests channel. |
2 | On a post request, generate a random UUID and send it to the RabbitMQ queue using the emitter. |
此通道使用我们将添加到 application.properties
文件的配置映射到 RabbitMQ 交换。打开 src/main/resource/application.properties
文件并添加:
This channel is mapped to a RabbitMQ exchange using the configuration we will add to the application.properties
file.
Open the src/main/resource/application.properties
file and add:
# Configure the outgoing RabbitMQ exchange `quote-requests`
mp.messaging.outgoing.quote-requests.connector=smallrye-rabbitmq
mp.messaging.outgoing.quote-requests.exchange.name=quote-requests
我们只需要指定 smallrye-rabbitmq
连接器。默认情况下,响应式消息传递将通道名 quote-requests
映射到相同的 RabbitMQ 交换名称。
All we need to specify is the smallrye-rabbitmq
connector.
By default, reactive messaging maps the channel name quote-requests
to the same RabbitMQ exchange name.
Processing quote requests
现在,让我们使用报价请求并给出价格。在 processor
项目中,找到 src/main/java/org/acme/rabbitmq/processor/QuoteProcessor.java
文件并添加以下内容:
Now let’s consume the quote request and give out a price.
Inside the processor
project, locate the src/main/java/org/acme/rabbitmq/processor/QuoteProcessor.java
file and add the following:
package org.acme.rabbitmq.processor;
import java.util.Random;
import jakarta.enterprise.context.ApplicationScoped;
import org.acme.rabbitmq.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.reactive.messaging.annotations.Blocking;
/**
* A bean consuming data from the "quote-requests" RabbitMQ queue and giving out a random quote.
* The result is pushed to the "quotes" RabbitMQ exchange.
*/
@ApplicationScoped
public class QuoteProcessor {
private Random random = new Random();
@Incoming("requests") (1)
@Outgoing("quotes") (2)
@Blocking (3)
public Quote process(String quoteRequest) throws InterruptedException {
// simulate some hard-working task
Thread.sleep(1000);
return new Quote(quoteRequest, random.nextInt(100));
}
}
1 | Indicates that the method consumes the items from the requests channel |
2 | Indicates that the objects returned by the method are sent to the quotes channel |
3 | Indicates that the processing is blocking and cannot be run on the caller thread. |
process
方法将针对来自 quote-requests
队列的每个 RabbitMQ 消息而调用,并将 Quote
对象发送到 quotes
交换。
The process
method is called for every RabbitMQ message from the quote-requests
queue, and will send a Quote
object to the quotes
exchange.
与前面的示例一样,我们需要在 application.properties
文件中配置连接器。打开 src/main/resources/application.properties
文件并添加:
As with the previous example we need to configure the connectors in the application.properties
file.
Open the src/main/resources/application.properties
file and add:
# Configure the incoming RabbitMQ queue `quote-requests`
mp.messaging.incoming.requests.connector=smallrye-rabbitmq
mp.messaging.incoming.requests.queue.name=quote-requests
mp.messaging.incoming.requests.exchange.name=quote-requests
# Configure the outgoing RabbitMQ exchange `quotes`
mp.messaging.outgoing.quotes.connector=smallrye-rabbitmq
mp.messaging.outgoing.quotes.exchange.name=quotes
请注意,在这种情况下,我们有一个入站和一个出站连接器配置,每个都明确命名。配置属性的结构如下:
Note that in this case we have one incoming and one outgoing connector configuration, each one distinctly named. The configuration properties are structured as follows:
mp.messaging.[outgoing|incoming].{channel-name}.property=value
channel-name
片段必须与 @Incoming
和 @Outgoing
注释中设置的值相匹配:
The channel-name
segment must match the value set in the @Incoming
and @Outgoing
annotation:
-
quote-requests
→ RabbitMQ queue from which we read the quote requests -
quotes
→ RabbitMQ exchange in which we write the quotes
Receiving quotes
回到我们的 producer
项目。让我们修改 QuotesResource
以使用报价、将其绑定到一个 HTTP 端点以向客户端发送事件:
Back to our producer
project.
Let’s modify the QuotesResource
to consume quotes, bind it to an HTTP endpoint to send events to clients:
import io.smallrye.mutiny.Multi;
//...
@Channel("quotes") Multi<Quote> quotes; (1)
/**
* Endpoint retrieving the "quotes" queue and sending the items to a server sent event.
*/
@GET
@Produces(MediaType.SERVER_SENT_EVENTS) (2)
public Multi<Quote> stream() {
return quotes; (3)
}
1 | Injects the quotes channel using the @Channel qualifier |
2 | Indicates that the content is sent using Server Sent Events |
3 | Returns the stream (Reactive Stream) |
我们再次需要配置 producer
项目内的传入 quotes
通道。在 application.properties
文件内添加以下内容:
Again we need to configure the incoming quotes
channel inside producer
project.
Add the following inside application.properties
file:
# Configure the outgoing `quote-requests` queue
mp.messaging.outgoing.quote-requests.connector=smallrye-rabbitmq
# Configure the incoming `quotes` queue
mp.messaging.incoming.quotes.connector=smallrye-rabbitmq
The HTML page
最后一步,使用 SSE 读取已转换价格的 HTML 页面。
Final touch, the HTML page reading the converted prices using SSE.
在 producer
项目内创建 src/main/resources/META-INF/resources/quotes.html
文件,并包含以下内容:
Create inside the producer
project src/main/resources/META-INF/resources/quotes.html
file, with the following content:
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Quotes</title>
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">
<div class="card">
<div class="card-body">
<h2 class="card-title">Quotes</h2>
<button class="btn btn-info" id="request-quote">Request Quote</button>
<div class="quotes"></div>
</div>
</div>
</div>
</body>
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
<script>
$("#request-quote").click((event) => {
fetch("/quotes/request", {method: "POST"})
.then(res => res.text())
.then(qid => {
var row = $(`<h4 class='col-md-12' id='${qid}'>Quote # <i>${qid}</i> | <strong>Pending</strong></h4>`);
$(".quotes").append(row);
});
});
var source = new EventSource("/quotes");
source.onmessage = (event) => {
var json = JSON.parse(event.data);
$(`#${json.id}`).html(function(index, html) {
return html.replace("Pending", `\$\xA0${json.price}`);
});
};
</script>
</html>
这里没有特别之处。它会更新每个收到的页面的报价。
Nothing spectacular here. On each received quote, it updates the page.
Get it running
你只需要使用以下命令运行这两个应用程序:
You just need to run both applications using:
> mvn -f rabbitmq-quickstart-producer quarkus:dev
在另一个终端中:
And, in a separate terminal:
> mvn -f rabbitmq-quickstart-processor quarkus:dev
Quarkus 会自动启动 RabbitMQ 代理,配置应用程序,并在不同应用程序之间共享代理实例。有关更多详细信息,请参见 Dev Services for RabbitMQ。
Quarkus starts a RabbitMQ broker automatically, configures the application and shares the broker instance between different applications. See Dev Services for RabbitMQ for more details.
在浏览器中打开 http://localhost:8080/quotes.html
,然后通过单击按钮请求一些报价。
Open http://localhost:8080/quotes.html
in your browser and request some quotes by clicking the button.
Running in JVM or Native mode
在非开发或测试模式下运行时,你需要启动 RabbitMQ 代理。你可以按照 RabbitMQ Docker website 中的说明进行操作,或创建一个包含以下内容的 docker-compose.yaml
文件:
When not running in dev or test mode, you will need to start your RabbitMQ broker.
You can follow the instructions from the RabbitMQ Docker website or create a docker-compose.yaml
file with the following content:
version: '2'
services:
rabbit:
image: rabbitmq:3.12-management
ports:
- "5672:5672"
networks:
- rabbitmq-quickstart-network
producer:
image: quarkus-quickstarts/rabbitmq-quickstart-producer:1.0-${QUARKUS_MODE:-jvm}
build:
context: rabbitmq-quickstart-producer
dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
environment:
RABBITMQ_HOST: rabbit
RABBITMQ_PORT: 5672
ports:
- "8080:8080"
networks:
- rabbitmq-quickstart-network
processor:
image: quarkus-quickstarts/rabbitmq-quickstart-processor:1.0-${QUARKUS_MODE:-jvm}
build:
context: rabbitmq-quickstart-processor
dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
environment:
RABBITMQ_HOST: rabbit
RABBITMQ_PORT: 5672
networks:
- rabbitmq-quickstart-network
networks:
rabbitmq-quickstart-network:
name: rabbitmq-quickstart
注意 RabbitMQ 代理位置的配置方式。rabbitmq-host
和 rabbitmq-port
属性(AMQP_HOST
和 AMQP_PORT
环境变量)配置位置。
Note how the RabbitMQ broker location is configured.
The rabbitmq-host
and rabbitmq-port
(AMQP_HOST
and AMQP_PORT
environment variables) properties configure location.
首先,确保已停止应用程序,并使用以下命令以 JVM 模式构建这两个应用程序:
First, make sure you stopped the applications, and build both applications in JVM mode with:
> mvn -f rabbitmq-quickstart-producer clean package
> mvn -f rabbitmq-quickstart-processor clean package
打包完成后,运行 docker compose up --build
。用户界面在 [role="bare"][role="bare"]http://localhost:8080/quotes.html 上公开
Once packaged, run docker compose up --build
.
The UI is exposed on [role="bare"]http://localhost:8080/quotes.html
要以本机方式运行应用程序,我们首先需要构建本机可执行文件:
To run your applications as native, first we need to build the native executables:
> mvn -f rabbitmq-quickstart-producer package -Dnative -Dquarkus.native.container-build=true
> mvn -f rabbitmq-quickstart-processor package -Dnative -Dquarkus.native.container-build=true
-Dquarkus.native.container-build=true
指示 Quarkus 构建可在容器内运行的 Linux 64 位本机可执行文件。然后,使用以下命令运行系统:
The -Dquarkus.native.container-build=true
instructs Quarkus to build Linux 64bits native executables, who can run inside containers.
Then, run the system using:
> export QUARKUS_MODE=native
> docker compose up --build
和之前一样,用户界面在 [role="bare"][role="bare"]http://localhost:8080/quotes.html 上公开
As before, the UI is exposed on [role="bare"]http://localhost:8080/quotes.html
Going further
本指南展示了如何使用 Quarkus 与 RabbitMQ 交互。它利用 SmallRye Reactive Messaging 构建数据流应用程序。
This guide has shown how you can interact with RabbitMQ using Quarkus. It utilizes SmallRye Reactive Messaging to build data streaming applications.
如果你已使用过 Kafka,你就会意识到代码是相同的。唯一的区别是连接器配置和 JSON 映射。
If you did the Kafka, you have realized that it’s the same code. The only difference is the connector configuration and the JSON mapping.