AMQP-backed Message Channels

提供了两种消息通道实现。一种是点对点,另一种是发布-订阅。这两种通道都为基础 AmqpTemplateSimpleMessageListenerContainer 提供了各种配置属性(如本章前面针对通道适配器和网关所示)。但是,这里展示的示例具有最少配置。探索 XML 模式以查看可用属性。

There are two message channel implementations available. One is point-to-point, and the other is publish-subscribe. Both of these channels provide a wide range of configuration attributes for the underlying AmqpTemplate and SimpleMessageListenerContainer (as shown earlier in this chapter for the channel adapters and gateways). However, the examples we show here have minimal configuration. Explore the XML schema to view the available attributes.

点对点通道可能类似于以下示例:

A point-to-point channel might look like the following example:

<int-amqp:channel id="p2pChannel"/>

在幕后,前述示例会声明一个名为 si.p2pChannelQueue,且此通道会发送到该 Queue(从技术上讲,通过使用与该 Queue 名称匹配的路由键发送到无名称的直接交换)。此通道也在该 Queue 上注册一个使用者。如果您想让该通道成为“可轮询”的,而不是消息驱动的,请提供一个 false 值的 message-driven 标志,如下例所示:

Under the covers, the preceding example causes a Queue named si.p2pChannel to be declared, and this channel sends to that Queue (technically, by sending to the no-name direct exchange with a routing key that matches the name of this Queue). This channel also registers a consumer on that Queue. If you want the channel to be “pollable” instead of message-driven, provide the message-driven flag with a value of false, as the following example shows:

<int-amqp:channel id="p2pPollableChannel"  message-driven="false"/>

发布-订阅通道可能类似于以下示例:

A publish-subscribe channel might look like the following:

<int-amqp:publish-subscribe-channel id="pubSubChannel"/>

在幕后,前述示例会声明一个名为 si.fanout.pubSubChannel 的扇形交换,且此通道会发送到该扇形交换。此通道还会声明一个以服务器命名的独占、自动删除、非持久 Queue,并在该 Queue 上注册一个使用者以接收消息的同时将该 Queue 绑定到扇形交换。发布-订阅通道没有“可轮询”选项。它必须是消息驱动的。

Under the covers, the preceding example causes a fanout exchange named si.fanout.pubSubChannel to be declared, and this channel sends to that fanout exchange. This channel also declares a server-named exclusive, auto-delete, non-durable Queue and binds that to the fanout exchange while registering a consumer on that Queue to receive messages. There is no “pollable” option for a publish-subscribe-channel. It must be message-driven.

从 4.1 版本开始,AMQP 支持的消息通道(与 channel-transacted 结合使用)支持 template-channel-transacted,以分离 AbstractMessageListenerContainerRabbitTemplatetransactional 配置。请注意,以前,channel-transacted 默认值为 true。现在,对于 AbstractMessageListenerContainer,它的默认值为 false

Starting with version 4.1, AMQP-backed message channels (in conjunction with channel-transacted) support template-channel-transacted to separate transactional configuration for the AbstractMessageListenerContainer and for the RabbitTemplate. Note that, previously, channel-transacted was true by default. Now, by default, it is false for the AbstractMessageListenerContainer.

在 4.3 版之前,AMQP 支持的通道仅支持 Serializable 有效负载和标头的消息。整个消息已转换为 (序列化的) 并发送到 RabbitMQ。现在,你可以将 extract-payload 属性(或在使用 Java 配置时为 setExtractPayload())设置为 true。当此标志为 true 时,消息有效负载会进行转换,并将标头映射,类似于使用通道适配器时的处理方式。此安排允许使用不支持序列化的有效负载(可能与另一个消息转换器配合使用,例如 Jackson2JsonMessageConverter)的 AMQP 支持的通道。请参阅 AMQP Message Headers以了解有关默认映射标头的更多信息。你可以通过提供使用 outbound-header-mapperinbound-header-mapper 属性的自定义映射器来修改映射。你现在还可以指定 default-delivery-mode,用于在不存在 amqp_deliveryMode 标头时设置传递模式。默认情况下,Spring AMQP MessageProperties 使用 PERSISTENT 发送模式。

Prior to version 4.3, AMQP-backed channels only supported messages with Serializable payloads and headers. The entire message was converted (serialized) and sent to RabbitMQ. Now, you can set the extract-payload attribute (or setExtractPayload() when using Java configuration) to true. When this flag is true, the message payload is converted and the headers are mapped, in a manner similar to when you use channel adapters. This arrangement lets AMQP-backed channels be used with non-serializable payloads (perhaps with another message converter, such as the Jackson2JsonMessageConverter). See AMQP Message Headers for more about the default mapped headers. You can modify the mapping by providing custom mappers that use the outbound-header-mapper and inbound-header-mapper attributes. You can now also specify a default-delivery-mode, which is used to set the delivery mode when there is no amqp_deliveryMode header. By default, Spring AMQP MessageProperties uses PERSISTENT delivery mode.

与其他持久性支持的通道一样,AMQP 支持的通道旨在提供消息持久性以避免消息丢失。它们不打算将工作分配给其他对等应用程序。为此,请改用通道适配器。

As with other persistence-backed channels, AMQP-backed channels are intended to provide message persistence to avoid message loss. They are not intended to distribute work to other peer applications. For that purpose, use channel adapters instead.

从版本 5.0 开始,可轮询通道现在为指定的 receiveTimeout 阻塞轮询线程(默认值为 1 秒)。以前,与其他 PollableChannel 实现不同,如果没有任何消息可用,不管接收超时如何,该线程都会立即返回到调度程序中。阻塞比使用 basicGet() 检索消息(无超时)要昂贵一些,因为必须创建使用者来接收每条消息。要恢复以前的错误,请将轮询器的 receiveTimeout 设置为 0。

Starting with version 5.0, the pollable channel now blocks the poller thread for the specified receiveTimeout (the default is 1 second). Previously, unlike other PollableChannel implementations, the thread returned immediately to the scheduler if no message was available, regardless of the receive timeout. Blocking is a little more expensive than using a basicGet() to retrieve a message (with no timeout), because a consumer has to be created to receive each message. To restore the previous behavior, set the poller’s receiveTimeout to 0.

Configuring with Java Configuration

以下示例展示了如何使用 Java 配置配置通道:

The following example shows how to configure the channels with Java configuration:

@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("foo");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("bar");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("baz");
    factoryBean.setPubSub(false);
    return factoryBean;
}

Configuring with the Java DSL

以下示例展示了如何使用 Java DSL 配置通道:

The following example shows how to configure the channels with the Java DSL:

@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.pollableChannel(connectionFactory)
                    .queueName("foo"))
            ...
            .get();
}

@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.channel(connectionFactory)
                    .queueName("bar"))
            ...
            .get();
}

@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.publishSubscribeChannel(connectionFactory)
                    .queueName("baz"))
            ...
            .get();
}