AMQP-backed Message Channels

提供了两种消息通道实现。一种是点对点,另一种是发布-订阅。这两种通道都为基础 AmqpTemplateSimpleMessageListenerContainer 提供了各种配置属性(如本章前面针对通道适配器和网关所示)。但是,这里展示的示例具有最少配置。探索 XML 模式以查看可用属性。 点对点通道可能类似于以下示例:

<int-amqp:channel id="p2pChannel"/>

在幕后,前述示例会声明一个名为 si.p2pChannelQueue,且此通道会发送到该 Queue(从技术上讲,通过使用与该 Queue 名称匹配的路由键发送到无名称的直接交换)。此通道也在该 Queue 上注册一个使用者。如果您想让该通道成为“可轮询”的,而不是消息驱动的,请提供一个 false 值的 message-driven 标志,如下例所示:

<int-amqp:channel id="p2pPollableChannel"  message-driven="false"/>

发布-订阅通道可能类似于以下示例:

<int-amqp:publish-subscribe-channel id="pubSubChannel"/>

在幕后,前述示例会声明一个名为 si.fanout.pubSubChannel 的扇形交换,且此通道会发送到该扇形交换。此通道还会声明一个以服务器命名的独占、自动删除、非持久 Queue,并在该 Queue 上注册一个使用者以接收消息的同时将该 Queue 绑定到扇形交换。发布-订阅通道没有“可轮询”选项。它必须是消息驱动的。 从 4.1 版本开始,AMQP 支持的消息通道(与 channel-transacted 结合使用)支持 template-channel-transacted,以分离 AbstractMessageListenerContainerRabbitTemplatetransactional 配置。请注意,以前,channel-transacted 默认值为 true。现在,对于 AbstractMessageListenerContainer,它的默认值为 false。 在 4.3 版之前,AMQP 支持的通道仅支持 Serializable 有效负载和标头的消息。整个消息已转换为 (序列化的) 并发送到 RabbitMQ。现在,你可以将 extract-payload 属性(或在使用 Java 配置时为 setExtractPayload())设置为 true。当此标志为 true 时,消息有效负载会进行转换,并将标头映射,类似于使用通道适配器时的处理方式。此安排允许使用不支持序列化的有效负载(可能与另一个消息转换器配合使用,例如 Jackson2JsonMessageConverter)的 AMQP 支持的通道。请参阅 AMQP Message Headers以了解有关默认映射标头的更多信息。你可以通过提供使用 outbound-header-mapperinbound-header-mapper 属性的自定义映射器来修改映射。你现在还可以指定 default-delivery-mode,用于在不存在 amqp_deliveryMode 标头时设置传递模式。默认情况下,Spring AMQP MessageProperties 使用 PERSISTENT 发送模式。

与其他持久性支持的通道一样,AMQP 支持的通道旨在提供消息持久性以避免消息丢失。它们不打算将工作分配给其他对等应用程序。为此,请改用通道适配器。

从版本 5.0 开始,可轮询通道现在为指定的 receiveTimeout 阻塞轮询线程(默认值为 1 秒)。以前,与其他 PollableChannel 实现不同,如果没有任何消息可用,不管接收超时如何,该线程都会立即返回到调度程序中。阻塞比使用 basicGet() 检索消息(无超时)要昂贵一些,因为必须创建使用者来接收每条消息。要恢复以前的错误,请将轮询器的 receiveTimeout 设置为 0。

Configuring with Java Configuration

以下示例展示了如何使用 Java 配置配置通道:

@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("foo");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("bar");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("baz");
    factoryBean.setPubSub(false);
    return factoryBean;
}

Configuring with the Java DSL

以下示例展示了如何使用 Java DSL 配置通道:

@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.pollableChannel(connectionFactory)
                    .queueName("foo"))
            ...
            .get();
}

@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.channel(connectionFactory)
                    .queueName("bar"))
            ...
            .get();
}

@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.publishSubscribeChannel(connectionFactory)
                    .queueName("baz"))
            ...
            .get();
}