Sample Applications

  1. Hello World 示例:演示同步和异步消息接收,以及如何使用 Maven 和 Spring 配置来创建连接工厂、队列和消息模板。

  2. 股票交易示例:演示股票交易用例中的更高级交互,包括:

    • 使用主题交换推送市场数据

    • 基于路由键的订阅

    • 请求-响应式消息传递

    • 从非 Spring 应用程序接收 JSON

Spring AMQP Samples 项目包括两个示例应用程序。第一个是一个简单的 “Hello World” 示例,演示了同步和异步消息接收。它为理解基本组件提供了极好的起点。第二个示例基于股票交易用例,演示现实世界应用程序中常见的交互类型。本章简要介绍了每个示例,以便您专注于最重要的组件。这些示例均基于 Maven,因此您应该能够将它们直接导入任何支持 Maven 的 IDE(例如 SpringSource Tool Suite)中。

The “Hello World” Sample

Hello World”示例演示了同步和异步消息接收。你可以将 spring-rabbit-helloworld 示例导入 IDE,然后按照下面的讨论进行。

Synchronous Example

src/main/java 目录中,导航到 org.springframework.amqp.helloworld 包。打开 HelloWorldConfiguration 类,并注意它包含类级别的 @Configuration 注释,并注意一些方法级别的 @Bean 注释。这是基于 Spring 的 Java 配置的一个示例。您可以阅读有关 here 的更多信息。

以下清单显示了如何创建连接工厂:

@Bean
public CachingConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory =
        new CachingConnectionFactory("localhost");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    return connectionFactory;
}

该配置还包含 RabbitAdmin 的实例,它默认情况下查找任何 exchange、queue 或 binding 类型 bean,然后在代理上声明它们。事实上,HelloWorldConfiguration 中生成的 helloWorldQueue bean 是一个示例,因为它是一个 Queue 的实例。

以下清单显示了 helloWorldQueue bean 定义:

@Bean
public Queue helloWorldQueue() {
    return new Queue(this.helloWorldQueueName);
}

回顾 rabbitTemplate bean 配置,您会看到它设置了 helloWorldQueue 名称作为其 queue 属性(用于接收消息)和其 routingKey 属性(用于发送消息)。

现在我们已经探索了配置,我们可以看看实际使用这些组件的代码。首先,在同一个包中打开 Producer 类。它包含一个 main() 方法,其中创建 Spring ApplicationContext

以下清单显示了 main 方法:

public static void main(String[] args) {
    ApplicationContext context =
        new AnnotationConfigApplicationContext(RabbitConfiguration.class);
    AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
    amqpTemplate.convertAndSend("Hello World");
    System.out.println("Sent: Hello World");
}

在上一个示例中,检索 AmqpTemplate bean 并用于发送 Message。由于客户端代码应尽可能依赖接口,因此类型是 AmqpTemplate 而不是 RabbitTemplate。即使 HelloWorldConfiguration 中创建的 bean 是 RabbitTemplate 的实例,依赖于接口也意味着此代码更易于移植(可以独立于代码更改配置)。由于调用了 convertAndSend() 方法,所以模板将其代理到其 MessageConverter 实例。本例中,它使用默认的 SimpleMessageConverter,但可以将不同的实现提供给 rabbitTemplate bean,如 HelloWorldConfiguration 中定义的那样。

现在打开 Consumer 类。它实际上共享相同的配置基类,这意味着它共享 rabbitTemplate bean。这就是为什么我们用 routingKey (用于发送)和 queue (用于接收)配置了该模板。正如我们在 AmqpTemplate 中描述的那样,您可以将“routingKey”参数传递给 send 方法,并将“queue”参数传递给 receive 方法。Consumer 代码基本上是 Producer 的镜像,调用 receiveAndConvert() 而不是 convertAndSend()

以下清单显示 Consumer 的 main 方法:

public static void main(String[] args) {
    ApplicationContext context =
        new AnnotationConfigApplicationContext(RabbitConfiguration.class);
    AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
    System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}

如果您运行 Producer,然后再运行 Consumer,您应该会在控制台输出中看到 Received: Hello World

Asynchronous Example

Synchronous Example 遍历了同步 Hello World 示例。本节介绍了一个稍微高级一些但功能强大的选项。通过一些修改,Hello World 示例可以提供异步接收的示例,也称为消息驱动的 POJO。事实上,存在一个子包可以提供完全相同的功能:org.springframework.amqp.samples.helloworld.async

再次,我们从发送端开始。打开 ProducerConfiguration 类并注意它创建了 connectionFactoryrabbitTemplate bean。这次,由于配置专用于消息发送端,我们甚至不需要任何队列定义,并且 RabbitTemplate 仅设置了“routingKey”属性。回想一下,消息被发送到交换机,而不是直接发送到队列。AMQP 默认交换机是一个没有名称的直接交换机。所有队列都使用其名称作为路由键绑定到该默认交换机。这就是我们只需要在此提供路由键的原因。

以下清单显示了 rabbitTemplate 定义:

public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    template.setRoutingKey(this.helloWorldQueueName);
    return template;
}

由于本示例演示了异步消息接收,因此生产端的目的是连续发送消息(如果它是一个消息-每个执行模型,如同步版本),则不太明显,它实际上是一个消息驱动的消费者。负责连续发送消息的组件被定义为 ProducerConfiguration 中的内部类。它被配置为每三秒运行一次。

以下清单显示了组件:

static class ScheduledProducer {

    @Autowired
    private volatile RabbitTemplate rabbitTemplate;

    private final AtomicInteger counter = new AtomicInteger();

    @Scheduled(fixedRate = 3000)
    public void sendMessage() {
        rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
    }
}

您不需要了解所有详细信息,因为真正的重点应该是接收方(我们在接下来会介绍)。但是,如果您还不熟悉 Spring 任务调度支持,可以了解更多 here。简而言之,ProducerConfiguration 中的 postProcessor bean 向调度程序注册任务。

现在我们可以转向接收端。为了强调消息驱动的 POJO 行为,我们从对消息做出反应的组件开始。该类称为 HelloWorldHandler,并显示在以下清单中:

public class HelloWorldHandler {

    public void handleMessage(String text) {
        System.out.println("Received: " + text);
    }

}

该类是一个 POJO。它不扩展任何基类,不实现任何接口,甚至不包含任何导入。它正在被 Spring AMQP MessageListenerAdapter “适应”到 MessageListener 接口。然后,您可以在 SimpleMessageListenerContainer 上配置该适配器。对于本示例,容器在 ConsumerConfiguration 类中创建。您可以在其中看到封装在适配器中的 POJO。

以下清单显示了如何定义 listenerContainer

@Bean
public SimpleMessageListenerContainer listenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory());
    container.setQueueName(this.helloWorldQueueName);
    container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
    return container;
}

SimpleMessageListenerContainer 是一个 Spring 生命周期组件,默认情况下会自动启动。如果您查看 Consumer 类,您会看到它的 main() 方法只包含一个启动程序,用于创建 ApplicationContext。Producer 的 main() 方法也是一个单行启动程序,因为方法使用 @Scheduled 注释的组件也会自动启动。您可以按任何顺序启动 ProducerConsumer,您应该会看到每三秒发送和接收一次消息。

Stock Trading

股票交易示例演示了比 the Hello World sample 更高级的消息传递场景。但是,如果涉及更多内容,则配置非常相似。由于我们详细介绍了 Hello World 配置,因此,在这里,我们重点关注是什么让此示例与众不同。有一个服务器将市场数据(股票报价)推送到主题交换。然后,客户端可以通过使用路由模式将队列绑定来订阅市场数据馈送(例如,app.stock.quotes.nasdaq.*)。此演示的另一个主要特点是客户端发起的并由服务器处理的请求-答复 “stock trade” 交互。这涉及一个私有 replyTo 队列,该队列由客户端在订单请求消息中自身发送。

服务器的核心配置位于`org.springframework.amqp.rabbit.stocks.config.server`包中的`RabbitServerConfiguration`类中。它扩展了`AbstractStockAppRabbitConfiguration`。这是定义服务器和客户端常用资源的地方,包括市场数据主题交换(其名称为“app.stock.marketdata”)和服务器为股票交易公开的队列(其名称为“app.stock.request”)。在该通用配置文件中,您还看到在`RabbitTemplate`上配置了`Jackson2JsonMessageConverter`。

特定于服务器的配置包括两件事。首先,它在`RabbitTemplate`上配置市场数据交换,这样它不需要每次调用以发送`Message`时都提供该交换名称。它在基本配置类中定义的抽象回调方法中执行此操作。以下清单显示该方法:

public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
    rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}

其次,声明股票请求队列。在这种情况下,它不需要任何明确的绑定,因为它已经绑定到默认无名称交换,且其自己的名称作为路由键。如前所述,AMQP 规范定义了该行为。以下清单显示了`stockRequestQueue` bean 的定义:

@Bean
public Queue stockRequestQueue() {
    return new Queue(STOCK_REQUEST_QUEUE_NAME);
}

既然您已经看到了服务器的 AMQP 资源的配置,请导航到`src/test/java`目录下的`org.springframework.amqp.rabbit.stocks`包。在那里,您可以看到提供`main()方法的实际`Server`类。它基于`server-bootstrap.xml`配置文件创建了`ApplicationContext。在那里,您可以看到发布虚拟市场数据的调度任务。该配置依赖于 Spring 的`task`命名空间支持。自举配置文件还导入其他几个文件。最有趣的文件是直接位于`src/main/resources`下的`server-messaging.xml`。在那里,您可以看到负责处理股票交易请求的`messageListenerContainer` bean。最后,查看在`server-handlers.xml`中定义的`serverHandler` bean(也在“src/main/resources”中)。该 bean 是`ServerHandler`类的实例,是可发送回复消息的消息驱动的 POJO 的一个好示例。请注意,它本身不与框架或任何 AMQP 概念耦合。它接受`TradeRequest`并返回`TradeResponse`。以下清单显示了`handleMessage`方法的定义:

public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}

现在,我们已经看到了服务器最重要的配置和代码,可以转向客户端。最好的起点可能是`org.springframework.amqp.rabbit.stocks.config.client`包中的`RabbitClientConfiguration`。请注意,它声明了两个队列,但不提供显式名称。以下清单显示了这两个队列的 bean 定义:

@Bean
public Queue marketDataQueue() {
    return amqpAdmin().declareQueue();
}

@Bean
public Queue traderJoeQueue() {
    return amqpAdmin().declareQueue();
}

这些是私有队列,并且会自动生成唯一名称。第一个生成的队列由客户端用于绑定到服务器公开的市场数据交换。回想一下,在 AMQP 中,消费者与队列交互,而生产者与交换交互。队列的“绑定”到交换告诉代理将给定交换中的消息传递(或路由)到队列。由于市场数据交换是一个主题交换,因此可以使用路由模式表示绑定。RabbitClientConfiguration`使用`Binding`对象执行此操作,并且该对象使用 `BindingBuilder`流畅 API 生成。以下清单显示`Binding

@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;

@Bean
public Binding marketDataBinding() {
    return BindingBuilder.bind(
        marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}

请注意,实际值已外置在属性文件(client.properties`位于`src/main/resources`下)中,并且我们使用 Spring 的@Value`注释来注入该值。这通常是个好主意。否则,该值将被硬编码在一个类中并且无法在不重新编译的情况下修改。在这种情况下,在对用于绑定的路由模式进行更改时,运行客户端的多个版本会容易得多。我们现在可以尝试一下。

首先,运行`org.springframework.amqp.rabbit.stocks.Server`,然后运行`org.springframework.amqp.rabbit.stocks.Client`。您应该看到`NASDAQ`股票的虚拟报价,因为client.properties中与“stocks.quote.pattern”键关联的当前值是“app.stock.quotes.nasdaq.”。现在,在保持现有的`Server`和`Client`运行时,将该属性值更改为“app.stock.quotes.nyse.”,然后启动第二个`Client`实例。您应该看到第一个客户端仍然收到纳斯达克报价,而第二个客户端收到纽约证券交易所报价。您还可以更改模式以获取所有股票甚至单个代码。

我们探索的最后一个功能是从客户端角度进行的请求-响应交互。回想一下,我们已经看到了接受`TradeRequest`对象并返回`TradeResponse`对象的`ServerHandler`。Client`侧的相应代码是`org.springframework.amqp.rabbit.stocks.gateway`包中的`RabbitStockServiceGateway。它委托`RabbitTemplate`发送消息。以下清单显示`send`方法:

public void send(TradeRequest tradeRequest) {
    getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
            try {
                message.getMessageProperties().setCorrelationId(
                    UUID.randomUUID().toString().getBytes("UTF-8"));
            }
            catch (UnsupportedEncodingException e) {
                throw new AmqpException(e);
            }
            return message;
        }
    });
}

请注意,在发送消息之前,它设置了`replyTo`地址。它提供了由`traderJoeQueue` bean 定义生成的队列(前面已显示)。以下清单显示了StockServiceGateway类本身的`@Bean`定义:

@Bean
public StockServiceGateway stockServiceGateway() {
    RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
    gateway.setRabbitTemplate(rabbitTemplate());
    gateway.setDefaultReplyToQueue(traderJoeQueue());
    return gateway;
}

如果您不再运行服务器和客户端,请立即启动它们。尝试发送格式为“100 TCKR”的请求。在模拟请求“处理”的短暂人工延迟后,您应该会看到确认消息显示在客户端上。

Receiving JSON from Non-Spring Applications

发送 JSON 时,Spring 应用程序将`TypeId`头设置为完全限定类名,以帮助接收应用程序将 JSON 转换回 Java 对象。

spring-rabbit-json 样例探索了多种从非 Spring 应用程序转换 JSON 的技术。