Rabbitmq 简明教程
RabbitMQ - Overview
What is RabbitMQ?
RabbitMQ 是一个用 Java 编写的开源消息代理。它完全符合 JMS 1.1 标准。它由 Apache 软件基金会开发和维护,并根据 Apache 许可证获得许可。它为企业级消息传递应用程序提供高可用性、可扩展性、可靠性、性能和安全性。
JMS 是一项允许开发基于消息的系统的规范。RabbitMQ 作为消息代理,位于应用程序之间,并允许它们以异步且可靠的方式进行通信。
RabbitMQ - Environment Setup
本章节将指导您如何准备开发环境以便开始使用 RabbitMQ。它还将教您如何在设置 RabbitMQ 之前在计算机上设置 JDK、Maven 和 Eclipse −
Setup Java Development Kit (JDK)
你可以从 Oracle 的 Java 站点下载 SDK 的最新版本 − Java SE Downloads. 你在下载文件中可以找到安装 JDK 的说明,按照给定的说明进行安装和配置。最后设置 PATH 和 JAVA_HOME 环境变量以引用包含 java 和 javac 的目录,通常分别为 java_install_dir/bin 和 java_install_dir。
如果您运行的是 Windows 且在 C:\jdk-11.0.11 中安装了 JDK,那么您将不得不把以下代码行放入您的 C:\autoexec.bat 文件中。
set PATH=C:\jdk-11.0.11;%PATH%
set JAVA_HOME=C:\jdk-11.0.11
或者,在 Windows NT/2000/XP 中,你必须右键单击我的电脑,选择属性 → 高级 → 环境变量。然后,你将不得不更新 PATH 值并单击确定按钮。
在 Unix(Solaris、Linux 等)中,如果 SDK 安装在 /usr/local/jdk-11.0.11 且您使用 C shell,那么您将不得不把以下代码行放入您的 .cshrc 文件中。
setenv PATH /usr/local/jdk-11.0.11/bin:$PATH
setenv JAVA_HOME /usr/local/jdk-11.0.11
或者,如果你使用诸如 Borland JBuilder、Eclipse、IntelliJ IDEA 或 Sun ONE Studio 这样的集成开发环境 (IDE),则必须编译并运行一个简单程序来确认 IDE 知道你在何处安装了 Java。否则,你必须按照 IDE 文档中给出的内容执行正确的设置。
Setup Eclipse IDE
本教程中的所有示例都是使用 Eclipse IDE 编写的。因此,我们建议你应该在你机器上安装 Eclipse 的最新版本。
要安装 Eclipse IDE,请从 www.eclipse.org/downloads 下载最新的 Eclipse 二进制文件。下载安装文件后,将二进制分发包解压到方便的位置。例如,在 Windows 上的 C:\eclipse 中,或 Linux/Unix 上的 /usr/local/eclipse 中,最后相应地设置 PATH 变量。
可以通过在 Windows 机器上执行以下命令来启动 Eclipse,或者您也可以简单地双击 eclipse.exe。
%C:\eclipse\eclipse.exe
可以通过在 Unix(Solaris、Linux 等)机器上执行以下命令启动 Eclipse −
$/usr/local/eclipse/eclipse
成功启动后,如果一切正常,它应该显示以下结果 −
Set Maven
本教程中,我们使用 maven 运行和构建基于 Spring 的示例,以运行基于 RabbitMQ 的应用程序。按照 Maven - Environment Setup 安装 maven。
RabbitMQ - Features
RabbitMQ 是最受欢迎的开源消息代理之一。它旨在为企业级消息传递应用程序提供高可用性、可扩展性、可靠性、性能和安全性。以下是一些 RabbitMQ 的突出特点。
-
LightWeight − RabbitMQ 轻量且易于在本地和云端安装。
-
Connectivity Options − RabbitMQ 支持多种消息传递协议,并且可以部署在分布式/联合配置中以满足高可用性、可扩展性的要求。
-
Pluggable Architecture − RabbitMQ 允许选择持久性机制,还提供选项来自定义针对身份验证和授权的安全性,以满足应用程序需求。
-
Multi-Platform − RabbitMQ 为许多流行语言(如 Java、Python、JavaScript、Ruby 等)提供客户端 API。
-
Broker Cluster − RabbitMQ 可以部署为集群以实现高可用性和吞吐量。它可以在多个可用区域和区域之间联合使用。
-
Features Rich − RabbitMQ 为代理和客户端提供许多高级功能。
-
Simple Administration Interface − RabbitMQ 管理控制台易于使用,但仍然提供许多强大的管理功能。
-
Enterprise and Cloud Ready − RabbitMQ 支持可插拔的身份验证和授权。它支持 LDAP 和 TLS。它可以轻松部署在公有和私有云中。
-
Features Rich − RabbitMQ 为代理和客户端提供许多高级功能。它提供插件来支持持续集成、操作指标以及与其他企业系统的集成等。
-
Management − RabbitMQ 提供 HTTP API、命令行工具和 UI 来管理和监视 RabbitMQ。
RabbitMQ - Installation
RabbitMQ 构建于 Erlang 运行时,所以我们必须先下载 Erlang,然后再安装 RabbitMQ。确保你在使用管理员权限安装 Erlang 和 RabbitMQ。
Erlang
Erlang 是一门通用编程语言和运行时环境。你可以从其主页 Download Erlang/OTP. 下载 Erlang 的最新版本。我们正在 Windows 上安装 Erlang 并下载了适用于 Windows 的 Erlang/OTP 24.2.2 安装程序 - otp_win64_24.2.2.exe。
现在,双击安装程序并按照默认选择安装 Erlang,然后完成设置。
RabbitMQ Installation
从其 official downloads page 下载 RabbitMQ 最新二进制文件。我们下载了 windows 版的 3.9.13 作为 rabbitmq-server-3.9.13.exe。
现在,双击安装程序并按照默认选择安装 RabbitMQ,然后完成设置。
默认情况下,RabbitMQ 作为 Windows 服务运行。若要启用基于 Web 的管理 UI,你需要执行以下步骤。
转到 RabbitMQ 安装目录并按如下所示键入命令 −
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.9.13\sbin>rabbitmq-plugins.bat enable rabbitmq_management
Enabling plugins on node rabbit@DESKTOP-86KD9FC:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@DESKTOP-86KD9FC...
The following plugins have been enabled:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
started 3 plugins.
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.9.13\sbin>rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management
Enabling plugins on node rabbit@DESKTOP-86KD9FC:
rabbitmq_shovel
rabbitmq_shovel_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_shovel
rabbitmq_shovel_management
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@DESKTOP-86KD9FC...
The following plugins have been enabled:
rabbitmq_shovel
rabbitmq_shovel_management
started 2 plugins.
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.9.13\sbin>
使用管理员权限编辑 C:\Windows\System32\drivers\etc\hosts 文件,并向其中添加以下行 −
127.0.0.1 rabbitmq
RabbitMQ - Producer Application
现在,我们创建一个生产者应用程序,它将向 RabbitMQ 队列发送消息。
Create Project
使用 Eclipse 选择 File → New * → *Maven Project 。勾选 创建简单项目(跳过原型选择),然后单击下一步。
按照以下所示输入详细信息 −
-
groupId − com.tutorialspoint
-
artifactId − producer
-
version − 0.0.1-SNAPSHOT
-
name − RabbitMQ Producer
单击“完成”按钮,将创建新项目。
pom.xml
现在更新 pom.xml 的内容以包含 RabbitMQ 的依赖项。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tutorialspoint.activemq</groupId>
<artifactId>producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>RabbitMQ Producer</name>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.26</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.26</version>
</dependency>
</dependencies>
</project>
现在创建一个生产者类,它将向 RabbitMQ 队列发送消息。
package com.tutorialspoint.rabbitmq;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
private static String QUEUE = "MyFirstQueue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE, false, false, false, null);
Scanner input = new Scanner(System.in);
String message;
do {
System.out.println("Enter message: ");
message = input.nextLine();
channel.basicPublish("", QUEUE, null, message.getBytes());
} while (!message.equalsIgnoreCase("Quit"));
}
}
}
生产者类创建一个连接,创建一个通道,连接到一个队列。如果用户输入“退出”,则应用程序终止,否则它将使用 basicPublish 方法向队列发送消息。
我们在 RabbitMQ - Test Application 章节中运行此应用程序。
RabbitMQ - Consumer Application
现在,让我们创建一个将从 RabbitMQ 队列接收消息的消费程序应用程序。
Create Project
使用 Eclipse 选择 File → New * → *Maven Project 。勾选 创建简单项目(跳过原型选择),然后单击下一步。
输入详细信息,如下所示:
-
groupId − com.tutorialspoint
-
artifactId − consumer
-
version − 0.0.1-SNAPSHOT
-
name − RabbitMQ Consumer
单击“完成”按钮,将创建新项目。
pom.xml
现在,更新 pom.xml 的内容,以包含 ActiveMQ 的依赖关系。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tutorialspoint.activemq</groupId>
<artifactId>consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>RabbitMQ Consumer</name>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.26</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.26</version>
</dependency>
</dependencies>
</project>
现在创建一个将从 RabbitMQ 队列接收消息的消费者类。
package com.tutorialspoint.rabbitmq;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Consumer {
private static String QUEUE = "MyFirstQueue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE, false, false, false, null);
System.out.println("Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("Received '" + message + "'");
};
channel.basicConsume(QUEUE, true, deliverCallback, consumerTag -> { });
}
}
消费者类创建一个连接,创建一个通道,如果不存在则创建一个队列,然后从队列接收消息(如果有)并保持轮询队列以获取消息。一旦消息被传递,它将由 basicConsume() 方法使用 deliverCallback 处理。
我们在 RabbitMQ - Test Application 章节中运行此应用程序。
RabbitMQ - Test Application
Start the Producer Application
在 eclipse 中,右击 Producer.java 源文件,选择以 → Java 应用程序运行。生成程序应用程序将开始运行,您将看到如下输出 −
Enter message:
Start the Consumer Application
在 eclipse 中,右击 Consumer.java 源文件,选择以 → Java 应用程序运行。消费程序应用程序将开始运行,您将看到如下输出 −
Waiting for messages. To exit press CTRL+C
Receive Message
在使用者控制台窗口中确认消息已收到。
Waiting for messages. To exit press CTRL+C
Received = Hi
发送“退出”作为消息以终止生成窗口会话并终止客户端窗口会话。
Verification
现在,在您的浏览器中打开 http://rabbitmq:15672/ 。它将要求输入凭据。使用 guest/guest 作为用户名/密码,它将加载 RabbitMQ 管理控制台,您可以在其中检查队列以查看状态。它将显示已排队和已交付的消息。
RabbitMQ - Publisher Application
现在,让我们创建一个将消息发送到 RabbitMQ Exchange 的发布程序应用程序。此交换将把消息传递给与交换绑定的队列。
Create Project
使用 Eclipse 选择 File → New * → *Maven Project 。勾选 创建简单项目(跳过原型选择),然后单击下一步。
按照以下所示输入详细信息 −
-
groupId − com.tutorialspoint
-
artifactId − publisher
-
version − 0.0.1-SNAPSHOT
-
name − RabbitMQ Publisher
单击“完成”按钮,将创建新项目。
pom.xml
现在更新 pom.xml 的内容以包含 RabbitMQ 的依赖项。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tutorialspoint.activemq</groupId>
<artifactId>publisher</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>RabbitMQ Publisher</name>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.26</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.26</version>
</dependency>
</dependencies>
</project>
现在创建一个将消息发送到 RabbitMQ 主题的发布程序类以将其广播到所有订阅者。
package com.tutorialspoint.rabbitmq;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Publisher {
private static final String EXCHANGE = "MyExchange";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE, "fanout");
Scanner input = new Scanner(System.in);
String message;
do {
System.out.println("Enter message: ");
message = input.nextLine();
channel.basicPublish(EXCHANGE, "", null, message.getBytes());
} while (!message.equalsIgnoreCase("Quit"));
}
}
}
生成程序类创建一个连接,创建一个通道,声明一个交换,然后要求用户输入消息。消息被发送到交换,并且作为队列名称,我们没有传递队列名称,因此所有与该交换绑定的队列都将收到该消息。如果用户输入退出,则应用程序终止,否则它会将消息发送到主题。
我们将在 RabbitMQ - Test Application 一章中运行此应用程序。
RabbitMQ - Subscriber Application
现在,我们创建一个订阅者应用程序,它将从 RabbitMQ 主题接收消息。
Create Project
使用 Eclipse 选择 File → New * → *Maven Project 。勾选 创建简单项目(跳过原型选择),然后单击下一步。
按照以下所示输入详细信息 −
-
groupId − com.tutorialspoint
-
artifactId − subscriber
-
version − 0.0.1-SNAPSHOT
-
name − RabbitMQ Subscriber
单击“完成”按钮,将创建新项目。
pom.xml
现在更新 pom.xml 的内容以包含 RabbitMQ 的依赖项。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tutorialspoint.activemq</groupId>
<artifactId>subscriber</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>RabbitMQ Subscriber</name>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.26</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.26</version>
</dependency>
</dependencies>
</project>
现在创建一个订阅者类,它将从 RabbitMQ 队列接收消息。
package com.tutorialspoint.rabbitmq;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Subscriber {
private static String EXCHANGE = "MyExchange";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE, "");
System.out.println("Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
订阅者类创建一个连接,创建一个通道,声明该交换,创建一个随机队列并将其绑定到交换,然后在有消息时从主题接收消息。按 Ctrl + C 终止,否则它将继续轮询队列以获取消息。
我们在 RabbitMQ - Test Application 章节中将多次运行此应用程序以创建多个订阅者。
RabbitMQ - Test Application Topic
Start the Publisher Application
在 eclipse 中,鼠标右键单击 Publisher.java 源,然后选择以 Java 应用程序运行。Publisher 应用程序将开始运行,您会看到如下所示的输出:
Enter message:
Start the Subscriber Application
在 eclipse 中,鼠标右键单击 Subscriber.java 源,然后选择以 Java 应用程序运行。Subscriber 应用程序将开始运行,您会看到如下所示的输出:
Waiting for messages. To exit press CTRL+C
Start another Subscriber Application
在 eclipse 中,再次鼠标右键单击 Subscriber.java 源,然后选择以 Java 应用程序运行。另一个 Subscriber 应用程序将开始运行,您会看到如下所示的输出:
Waiting for messages. To exit press CTRL+C
Verification
现在,在您的浏览器中打开 http://rabbitmq:15672/ 。它将要求输入凭据。使用 guest/guest 作为用户名/密码,它将加载 RabbitMQ 管理控制台,您可以在其中检查队列和交换以检查已传递的消息和绑定的状态。