Rabbitmq 简明教程

RabbitMQ - Quick Guide

RabbitMQ - Overview

What is RabbitMQ?

RabbitMQ 是一个用 Java 编写的开源消息代理。它完全符合 JMS 1.1 标准。它由 Apache 软件基金会开发和维护,并根据 Apache 许可证获得许可。它为企业级消息传递应用程序提供高可用性、可扩展性、可靠性、性能和安全性。

JMS 是一项允许开发基于消息的系统的规范。RabbitMQ 作为消息代理,位于应用程序之间,并允许它们以异步且可靠的方式进行通信。

amq

Types of Messaging

为了便于理解,下面解释了两种类型的消息传递选项。

Point to Point

在这种通信方式中,代理只会向一个消费者发送消息,而其他消费者将等待从代理获得消息。没有消费者会收到相同的消息。

如果没有消费者,代理将一直持有消息,直到它得到消费者。这种通信方式也称为 Queue based communication ,生产者将消息发送到队列,并且只有一个消费者从队列中获取一条消息。如果有超过一个消费者,他们可能会获得下一条消息,但他们不会收到与其他消费者相同的消息。

point to point messaging

Publish/Subscribe

在这种通信方式中,代理会向所有活跃的消费者发送消息的同份副本。这种通信方式也称为 Topic based communication ,代理向所有为特定主题订阅的活跃消费者发送相同的消息。此模型支持单向通信,其中不期望对已传输的消息进行验证。

publish subscribe messaging

RabbitMQ - Environment Setup

本章节将指导您如何准备开发环境以便开始使用 RabbitMQ。它还将教您如何在设置 RabbitMQ 之前在计算机上设置 JDK、Maven 和 Eclipse −

Setup Java Development Kit (JDK)

你可以从 Oracle 的 Java 站点下载 SDK 的最新版本 − Java SE Downloads. 你在下载文件中可以找到安装 JDK 的说明,按照给定的说明进行安装和配置。最后设置 PATH 和 JAVA_HOME 环境变量以引用包含 java 和 javac 的目录,通常分别为 java_install_dir/bin 和 java_install_dir。

如果您运行的是 Windows 且在 C:\jdk-11.0.11 中安装了 JDK,那么您将不得不把以下代码行放入您的 C:\autoexec.bat 文件中。

set PATH=C:\jdk-11.0.11;%PATH%
set JAVA_HOME=C:\jdk-11.0.11

或者,在 Windows NT/2000/XP 中,你必须右键单击我的电脑,选择属性 → 高级 → 环境变量。然后,你将不得不更新 PATH 值并单击确定按钮。

在 Unix(Solaris、Linux 等)中,如果 SDK 安装在 /usr/local/jdk-11.0.11 且您使用 C shell,那么您将不得不把以下代码行放入您的 .cshrc 文件中。

setenv PATH /usr/local/jdk-11.0.11/bin:$PATH
setenv JAVA_HOME /usr/local/jdk-11.0.11

或者,如果你使用诸如 Borland JBuilder、Eclipse、IntelliJ IDEA 或 Sun ONE Studio 这样的集成开发环境 (IDE),则必须编译并运行一个简单程序来确认 IDE 知道你在何处安装了 Java。否则,你必须按照 IDE 文档中给出的内容执行正确的设置。

Setup Eclipse IDE

本教程中的所有示例都是使用 Eclipse IDE 编写的。因此,我们建议你应该在你机器上安装 Eclipse 的最新版本。

要安装 Eclipse IDE,请从 www.eclipse.org/downloads 下载最新的 Eclipse 二进制文件。下载安装文件后,将二进制分发包解压到方便的位置。例如,在 Windows 上的 C:\eclipse 中,或 Linux/Unix 上的 /usr/local/eclipse 中,最后相应地设置 PATH 变量。

可以通过在 Windows 机器上执行以下命令来启动 Eclipse,或者您也可以简单地双击 eclipse.exe。

%C:\eclipse\eclipse.exe

可以通过在 Unix(Solaris、Linux 等)机器上执行以下命令启动 Eclipse −

$/usr/local/eclipse/eclipse

成功启动后,如果一切正常,它应该显示以下结果 −

eclipsehomepage

Set Maven

本教程中,我们使用 maven 运行和构建基于 Spring 的示例,以运行基于 RabbitMQ 的应用程序。按照 Maven - Environment Setup 安装 maven。

RabbitMQ - Features

RabbitMQ 是最受欢迎的开源消息代理之一。它旨在为企业级消息传递应用程序提供高可用性、可扩展性、可靠性、性能和安全性。以下是一些 RabbitMQ 的突出特点。

  1. LightWeight − RabbitMQ 轻量且易于在本地和云端安装。

  2. Connectivity Options − RabbitMQ 支持多种消息传递协议,并且可以部署在分布式/联合配置中以满足高可用性、可扩展性的要求。

  3. Pluggable Architecture − RabbitMQ 允许选择持久性机制,还提供选项来自定义针对身份验证和授权的安全性,以满足应用程序需求。

  4. Multi-Platform − RabbitMQ 为许多流行语言(如 Java、Python、JavaScript、Ruby 等)提供客户端 API。

  5. Broker Cluster − RabbitMQ 可以部署为集群以实现高可用性和吞吐量。它可以在多个可用区域和区域之间联合使用。

  6. Features Rich − RabbitMQ 为代理和客户端提供许多高级功能。

  7. Simple Administration Interface − RabbitMQ 管理控制台易于使用,但仍然提供许多强大的管理功能。

  8. Enterprise and Cloud Ready − RabbitMQ 支持可插拔的身份验证和授权。它支持 LDAP 和 TLS。它可以轻松部署在公有和私有云中。

  9. Features Rich − RabbitMQ 为代理和客户端提供许多高级功能。它提供插件来支持持续集成、操作指标以及与其他企业系统的集成等。

  10. Management − RabbitMQ 提供 HTTP API、命令行工具和 UI 来管理和监视 RabbitMQ。

RabbitMQ - Installation

RabbitMQ 构建于 Erlang 运行时,所以我们必须先下载 Erlang,然后再安装 RabbitMQ。确保你在使用管理员权限安装 Erlang 和 RabbitMQ。

Erlang

Erlang 是一门通用编程语言和运行时环境。你可以从其主页 Download Erlang/OTP. 下载 Erlang 的最新版本。我们正在 Windows 上安装 Erlang 并下载了适用于 Windows 的 Erlang/OTP 24.2.2 安装程序 - otp_win64_24.2.2.exe。

erlang download

现在,双击安装程序并按照默认选择安装 Erlang,然后完成设置。

erlang installer

RabbitMQ Installation

从其 official downloads page 下载 RabbitMQ 最新二进制文件。我们下载了 windows 版的 3.9.13 作为 rabbitmq-server-3.9.13.exe。

rabbitmq download

现在,双击安装程序并按照默认选择安装 RabbitMQ,然后完成设置。

rabbitmq installer

默认情况下,RabbitMQ 作为 Windows 服务运行。若要启用基于 Web 的管理 UI,你需要执行以下步骤。

转到 RabbitMQ 安装目录并按如下所示键入命令 −

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.9.13\sbin>rabbitmq-plugins.bat enable rabbitmq_management
Enabling plugins on node rabbit@DESKTOP-86KD9FC:
rabbitmq_management
The following plugins have been configured:
   rabbitmq_management
   rabbitmq_management_agent
   rabbitmq_web_dispatch
Applying plugin configuration to rabbit@DESKTOP-86KD9FC...
The following plugins have been enabled:
   rabbitmq_management
   rabbitmq_management_agent
   rabbitmq_web_dispatch

started 3 plugins.

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.9.13\sbin>rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management
Enabling plugins on node rabbit@DESKTOP-86KD9FC:
rabbitmq_shovel
rabbitmq_shovel_management
The following plugins have been configured:
   rabbitmq_management
   rabbitmq_management_agent
   rabbitmq_shovel
   rabbitmq_shovel_management
   rabbitmq_web_dispatch
Applying plugin configuration to rabbit@DESKTOP-86KD9FC...
The following plugins have been enabled:
   rabbitmq_shovel
   rabbitmq_shovel_management

started 2 plugins.

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.9.13\sbin>

使用管理员权限编辑 C:\Windows\System32\drivers\etc\hosts 文件,并向其中添加以下行 −

127.0.0.1 rabbitmq

Verify the Installation

现在打开 http://rabbitmq:15672/ 以打开管理控制台。使用 Guest/Guest 登录。

rabbitmq management console

RabbitMQ - Producer Application

现在,我们创建一个生产者应用程序,它将向 RabbitMQ 队列发送消息。

Create Project

使用 Eclipse 选择 FileNew * → *Maven Project 。勾选 创建简单项目(跳过原型选择),然后单击下一步。

按照以下所示输入详细信息 −

  1. groupId − com.tutorialspoint

  2. artifactId − producer

  3. version − 0.0.1-SNAPSHOT

  4. name − RabbitMQ Producer

单击“完成”按钮,将创建新项目。

pom.xml

现在更新 pom.xml 的内容以包含 RabbitMQ 的依赖项。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <groupId>com.tutorialspoint.activemq</groupId>
   <artifactId>producer</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>RabbitMQ Producer</name>
   <properties>
      <java.version>11</java.version>
   </properties>
   <dependencies>
      <dependency>
         <groupId>com.rabbitmq</groupId>
         <artifactId>amqp-client</artifactId>
         <version>5.14.2</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-api</artifactId>
         <version>1.7.26</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-simple</artifactId>
         <version>1.7.26</version>
      </dependency>
   </dependencies>
</project>

现在创建一个生产者类,它将向 RabbitMQ 队列发送消息。

package com.tutorialspoint.rabbitmq;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
   private static String QUEUE = "MyFirstQueue";

   public static void main(String[] args) throws IOException, TimeoutException {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
         channel.queueDeclare(QUEUE, false, false, false, null);

         Scanner input = new Scanner(System.in);
         String message;
         do {
            System.out.println("Enter message: ");
            message = input.nextLine();
            channel.basicPublish("", QUEUE, null, message.getBytes());
         } while (!message.equalsIgnoreCase("Quit"));
      }
   }
}

生产者类创建一个连接,创建一个通道,连接到一个队列。如果用户输入“退出”,则应用程序终止,否则它将使用 basicPublish 方法向队列发送消息。

我们在 RabbitMQ - Test Application 章节中运行此应用程序。

RabbitMQ - Consumer Application

现在,让我们创建一个将从 RabbitMQ 队列接收消息的消费程序应用程序。

Create Project

使用 Eclipse 选择 FileNew * → *Maven Project 。勾选 创建简单项目(跳过原型选择),然后单击下一步。

输入详细信息,如下所示:

  1. groupId − com.tutorialspoint

  2. artifactId − consumer

  3. version − 0.0.1-SNAPSHOT

  4. name − RabbitMQ Consumer

单击“完成”按钮,将创建新项目。

pom.xml

现在,更新 pom.xml 的内容,以包含 ActiveMQ 的依赖关系。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <groupId>com.tutorialspoint.activemq</groupId>
   <artifactId>consumer</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>RabbitMQ Consumer</name>
      <properties>
      <java.version>11</java.version>
   </properties>
   <dependencies>
      <dependency>
         <groupId>com.rabbitmq</groupId>
         <artifactId>amqp-client</artifactId>
         <version>5.14.2</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-api</artifactId>
         <version>1.7.26</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-simple</artifactId>
         <version>1.7.26</version>
      </dependency>
   </dependencies>
</project>

现在创建一个将从 RabbitMQ 队列接收消息的消费者类。

package com.tutorialspoint.rabbitmq;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Consumer {
   private static String QUEUE = "MyFirstQueue";

   public static void main(String[] args) throws IOException, TimeoutException {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();

      channel.queueDeclare(QUEUE, false, false, false, null);
      System.out.println("Waiting for messages. To exit press CTRL+C");

      DeliverCallback deliverCallback = (consumerTag, delivery) -> {
         String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
         System.out.println("Received '" + message + "'");
      };
      channel.basicConsume(QUEUE, true, deliverCallback, consumerTag -> { });
   }
}

消费者类创建一个连接,创建一个通道,如果不存在则创建一个队列,然后从队列接收消息(如果有)并保持轮询队列以获取消息。一旦消息被传递,它将由 basicConsume() 方法使用 deliverCallback 处理。

我们在 RabbitMQ - Test Application 章节中运行此应用程序。

RabbitMQ - Test Application

Start the Producer Application

在 eclipse 中,右击 Producer.java 源文件,选择以 → Java 应用程序运行。生成程序应用程序将开始运行,您将看到如下输出 −

Enter message:

Start the Consumer Application

在 eclipse 中,右击 Consumer.java 源文件,选择以 → Java 应用程序运行。消费程序应用程序将开始运行,您将看到如下输出 −

Waiting for messages. To exit press CTRL+C

Send Message

在生产者控制台窗口中,键入 Hi 并按 enter 按钮发送消息。

Enter message:
Hi

Receive Message

在使用者控制台窗口中确认消息已收到。

Waiting for messages. To exit press CTRL+C
Received = Hi

发送“退出”作为消息以终止生成窗口会话并终止客户端窗口会话。

Verification

现在,在您的浏览器中打开 http://rabbitmq:15672/ 。它将要求输入凭据。使用 guest/guest 作为用户名/密码,它将加载 RabbitMQ 管理控制台,您可以在其中检查队列以查看状态。它将显示已排队和已交付的消息。

rabbitmq queues

RabbitMQ - Publisher Application

现在,让我们创建一个将消息发送到 RabbitMQ Exchange 的发布程序应用程序。此交换将把消息传递给与交换绑定的队列。

Create Project

使用 Eclipse 选择 FileNew * → *Maven Project 。勾选 创建简单项目(跳过原型选择),然后单击下一步。

按照以下所示输入详细信息 −

  1. groupId − com.tutorialspoint

  2. artifactId − publisher

  3. version − 0.0.1-SNAPSHOT

  4. name − RabbitMQ Publisher

单击“完成”按钮,将创建新项目。

pom.xml

现在更新 pom.xml 的内容以包含 RabbitMQ 的依赖项。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <groupId>com.tutorialspoint.activemq</groupId>
   <artifactId>publisher</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>RabbitMQ Publisher</name>
   <properties>
      <java.version>11</java.version>
   </properties>
   <dependencies>
      <dependency>
         <groupId>com.rabbitmq</groupId>
         <artifactId>amqp-client</artifactId>
         <version>5.14.2</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-api</artifactId>
         <version>1.7.26</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-simple</artifactId>
         <version>1.7.26</version>
      </dependency>
   </dependencies>
</project>

现在创建一个将消息发送到 RabbitMQ 主题的发布程序类以将其广播到所有订阅者。

package com.tutorialspoint.rabbitmq;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Publisher {
   private static final String EXCHANGE = "MyExchange";
   public static void main(String[] args) throws IOException, TimeoutException {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      try (Connection connection = factory.newConnection();
      Channel channel = connection.createChannel()) {
         channel.exchangeDeclare(EXCHANGE, "fanout");
         Scanner input = new Scanner(System.in);
         String message;
         do {
            System.out.println("Enter message: ");
            message = input.nextLine();
            channel.basicPublish(EXCHANGE, "", null, message.getBytes());
         } while (!message.equalsIgnoreCase("Quit"));
      }
   }
}

生成程序类创建一个连接,创建一个通道,声明一个交换,然后要求用户输入消息。消息被发送到交换,并且作为队列名称,我们没有传递队列名称,因此所有与该交换绑定的队列都将收到该消息。如果用户输入退出,则应用程序终止,否则它会将消息发送到主题。

我们将在 RabbitMQ - Test Application 一章中运行此应用程序。

RabbitMQ - Subscriber Application

现在,我们创建一个订阅者应用程序,它将从 RabbitMQ 主题接收消息。

Create Project

使用 Eclipse 选择 FileNew * → *Maven Project 。勾选 创建简单项目(跳过原型选择),然后单击下一步。

按照以下所示输入详细信息 −

  1. groupId − com.tutorialspoint

  2. artifactId − subscriber

  3. version − 0.0.1-SNAPSHOT

  4. name − RabbitMQ Subscriber

单击“完成”按钮,将创建新项目。

pom.xml

现在更新 pom.xml 的内容以包含 RabbitMQ 的依赖项。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <groupId>com.tutorialspoint.activemq</groupId>
   <artifactId>subscriber</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>RabbitMQ Subscriber</name>
   <dependencies>
      <dependency>
         <groupId>com.rabbitmq</groupId>
         <artifactId>amqp-client</artifactId>
         <version>5.14.2</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-api</artifactId>
         <version>1.7.26</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-simple</artifactId>
         <version>1.7.26</version>
      </dependency>
   </dependencies>
</project>

现在创建一个订阅者类,它将从 RabbitMQ 队列接收消息。

package com.tutorialspoint.rabbitmq;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Subscriber {
   private static String EXCHANGE = "MyExchange";
   public static void main(String[] args) throws IOException, TimeoutException {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();
      channel.exchangeDeclare(EXCHANGE, "fanout");

      String queueName = channel.queueDeclare().getQueue();
      channel.queueBind(queueName, EXCHANGE, "");
      System.out.println("Waiting for messages. To exit press CTRL+C");

      DeliverCallback deliverCallback = (consumerTag, delivery) -> {
         String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
         System.out.println("Received '" + message + "'");
      };
      channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
   }
}

订阅者类创建一个连接,创建一个通道,声明该交换,创建一个随机队列并将其绑定到交换,然后在有消息时从主题接收消息。按 Ctrl + C 终止,否则它将继续轮询队列以获取消息。

我们在 RabbitMQ - Test Application 章节中将多次运行此应用程序以创建多个订阅者。

RabbitMQ - Test Application Topic

Start the Publisher Application

在 eclipse 中,鼠标右键单击 Publisher.java 源,然后选择以 Java 应用程序运行。Publisher 应用程序将开始运行,您会看到如下所示的输出:

Enter message:

Start the Subscriber Application

在 eclipse 中,鼠标右键单击 Subscriber.java 源,然后选择以 Java 应用程序运行。Subscriber 应用程序将开始运行,您会看到如下所示的输出:

Waiting for messages. To exit press CTRL+C

Start another Subscriber Application

在 eclipse 中,再次鼠标右键单击 Subscriber.java 源,然后选择以 Java 应用程序运行。另一个 Subscriber 应用程序将开始运行,您会看到如下所示的输出:

Waiting for messages. To exit press CTRL+C

Send Message

在 Publisher 控制台窗口中,输入 Hi 并按回车键以发送消息。

Enter message:
Hi

Receive Message

在 Subscriber 控制台窗口中验证,消息在每个窗口中都能收到。

Received = Hi

发送退出作为消息以终止所有发布者和订阅者控制台窗口会话。

Verification

现在,在您的浏览器中打开 http://rabbitmq:15672/ 。它将要求输入凭据。使用 guest/guest 作为用户名/密码,它将加载 RabbitMQ 管理控制台,您可以在其中检查队列和交换以检查已传递的消息和绑定的状态。