Outbound Channel Adapter

以下示例显示了 AMQP 出站通道适配器的可用属性:

  • Java DSL

  • Java

  • XML

@Bean
public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate,
        MessageChannel amqpOutboundChannel) {
    return IntegrationFlow.from(amqpOutboundChannel)
            .handle(Amqp.outboundAdapter(amqpTemplate)
                        .routingKey("queue1")) // default exchange - route to queue 'queue1'
            .get();
}
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
    AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
    outbound.setRoutingKey("queue1"); // default exchange - route to queue 'queue1'
    return outbound;
}

@Bean
public MessageChannel amqpOutboundChannel() {
    return new DirectChannel();
}
<int-amqp:outbound-channel-adapter id="outboundAmqp"             1
                               channel="outboundChannel"         2
                               amqp-template="myAmqpTemplate"    3
                               exchange-name=""                  4
                               exchange-name-expression=""       5
                               order="1"                         6
                               routing-key=""                    7
                               routing-key-expression=""         8
                               default-delivery-mode""           9
                               confirm-correlation-expression="" 10
                               confirm-ack-channel=""            11
                               confirm-nack-channel=""           12
                               confirm-timeout=""                13
                               wait-for-confirm=""               14
                               return-channel=""                 15
                               error-message-strategy=""         16
                               header-mapper=""                  17
                               mapped-request-headers=""         18
                               lazy-connect="true"               19
                               multi-send="false"/>              20
1 此适配器的唯一 ID。可选。
2 用于将消息发送到该通道的邮件通道,以便将邮件转换为 AMQP 交换并发布到该通道。必需。
3 配置的 AMQP 模板的 Bean 引用。可选(默认为 amqpTemplate)。
4 将消息发送到的 AMQP 交换的名称。如果未提供,将把消息发送到默认的无名称交换。与“exchange-name-expression”互斥。可选。
5 一个 SpEL 表达式,评估后确定发送消息的 AMQP 交换的名称,其中消息为根对象。如果没有提供,则消息将发送到默认的无名交换。与“exchange-name”互斥。可选。
6 当多个使用者已注册时的此使用者顺序,从而启用负载平衡和故障转移。可选(默认为 Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE])。
7 将消息发送时使用的固定的路由键。默认情况下,这是一个空的 String.与“routing-key-expression”互斥。可选。
8 一个 SpEL 表达式,在发送消息时求值以确定要使用的路由键,并将消息作为根对象(例如,“payload.key”)。默认情况下,这是一个空的 String.与“routing-key”互斥。可选。
9 消息的默认传递模式:PERSISTENTNON_PERSISTENT。如果 header-mapper 设置了传递模式,则会覆盖。如果存在 Spring Integration 消息头 amqp_deliveryMode,则 DefaultHeaderMapper 会设置值。如果没有提供此属性并且头映射器未将其设置,则默认值取决于 RabbitTemplate 使用的底层 Spring AMQP MessagePropertiesConverter。如果根本没有对其进行自定义,则默认值为 PERSISTENT。可选。
10 一个定义相关数据的表达式。一旦提供,将配置基础 AMQP 模板以接收发布确认。需要一个专用的 RabbitTemplate`和一个将 `publisherConfirms`属性设置为 `true`的 `CachingConnectionFactory.收到发布确认且提供相关数据时,将其写入 confirm-ack-channel`或 `confirm-nack-channel,具体取决于确认类型。确认的有效内容是相关数据,由该表达式定义。消息设置一个“amqp_publishConfirm”头,将该头设置为 true(ack) 或 false(nack)。示例: headers['myCorrelationData']`和 `payload .4.1 版引入了 amqp_publishConfirmNackCause`消息头。它包含“publisher confirmation”中“nack”的 `cause.从 4.2 版开始,如果表达式解析为 Message&lt;?&gt;`实例(例如 `#this),则在 ack/`nack`通道上发出的消息将基于该消息,并添加一个或多个附加头。之前,无论类型为何,都会创建一个新的消息,其有效内容为相关数据。另请参阅 Alternative Mechanism for Publisher Confirms and Returns.可选。
11 将正向 (ack) 发布确认发送到的通道。有效内容是 confirm-correlation-expression`定义的相关数据。如果表达式是 `#root`或 `#this,将从原始消息构建消息,并将 amqp_publishConfirm`头设置为 `true。另请参阅 Alternative Mechanism for Publisher Confirms and Returns.可选(默认设置为 `nullChannel)。
12 将负向 (nack) 发布确认发送到的通道。有效内容是 confirm-correlation-expression`定义的相关数据(如果没有配置 `ErrorMessageStrategy)。如果表达式是 #root`或 `#this,将从原始消息构建消息,并将 amqp_publishConfirm`头设置为 `false。如果存在 ErrorMessageStrategy,则该消息是一个 ErrorMessage,其有效内容为 NackedAmqpMessageException。另请参阅 Alternative Mechanism for Publisher Confirms and Returns.可选(默认设置为 nullChannel)。
13 如果在毫秒内未收到发布确认,则设置时,适配器会综合生成一个负向确认 (nack)。每过 50% 的该值就会检查待定的确认,因此发送 nack 的实际时间将在 1 倍到 1.5 倍该值之间。另请参阅 Alternative Mechanism for Publisher Confirms and Returns.默认无(不会生成 nack)。
14 设置为 true 时,调用线程将阻塞,等待发布确认。这需要配置用于确认的 RabbitTemplate`以及 `confirm-correlation-expression.该线程将最多阻塞 confirm-timeout`秒(默认 5 秒)。如果超时,将抛出 `MessageTimeoutException.如果启用返回并且消息返回,或者在等待确认时发生任何其他异常,将抛出 MessageHandlingException,并附带相应的消息。
15 将返回的消息发送到的通道。一旦提供,将配置基础 AMQP 模板以将无法传递的消息返回给适配器。如果没有配置 ErrorMessageStrategy,消息将使用从 AMQP 接收的数据构建,并带有以下附加头: amqp_returnReplyCodeamqp_returnReplyTextamqp_returnExchangeamqp_returnRoutingKey。如果存在 ErrorMessageStrategy,则该消息是一个 ErrorMessage,其有效内容为 ReturnedAmqpMessageException。另请参阅 Alternative Mechanism for Publisher Confirms and Returns.可选。
16 ErrorMessageStrategy 实现的引用,用于在发送返回的消息或未被证实的消息时构建 ErrorMessage 实例。
17 在发送 AMQP 消息时使用的 AmqpHeaderMapper`的引用。默认情况下,只有标准 AMQP 属性(例如 `contentType)会复制到 Spring Integration MessageHeaders。默认的 DefaultAmqpHeaderMapper 不会将任何用户定义的头复制到消息。如果提供了“request-header-names”,则不允许。可选。
18 要从 MessageHeaders`映射到 AMQP 消息的 AMQP 标头名称的逗号分隔列表。如果提供了“header-mapper”引用,则不允许。此列表中的值也可以是与标头名称匹配的简单模式(例如 `"" or "thing1, thing2"`或 `"*thing1")。
19 设置为 `false`时,端点将在应用程序上下文初始化期间尝试连接到代理。这允许 "`fail fast`"检测错误配置,但也会导致在代理关闭时初始化失败。设置为 `true`时(默认),连接会在发送第一条消息时建立(如果它还未存在,因为其他组件已建立它)。
20 设置为 true`时,类型为 `Iterable&lt;Message&lt;?&gt;&gt;`的有效内容将作为离散消息在单个 `RabbitTemplate`调用范围内同一个通道上发送。需要 `RabbitTemplate。当 wait-for-confirms`为 true 时,在发送完消息后将调用 `RabbitTemplate.waitForConfirmsOrDie()。使用事务模板时,发送将在新事务中执行,或者在已存在的事务中执行(如果存在)。
Example 1. return-channel

使用 return-channel 需要将 RabbitTemplatemandatory 属性设置为 true,并将 CachingConnectionFactorypublisherReturns 属性设置为 true。当使用带有返回的多个出站端点时,每个端点都需要一个单独的 RabbitTemplate