Outbound Channel Adapter
以下示例显示了 AMQP 出站通道适配器的可用属性:
-
Java DSL
-
Java
-
XML
@Bean
public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate,
MessageChannel amqpOutboundChannel) {
return IntegrationFlow.from(amqpOutboundChannel)
.handle(Amqp.outboundAdapter(amqpTemplate)
.routingKey("queue1")) // default exchange - route to queue 'queue1'
.get();
}
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
outbound.setRoutingKey("queue1"); // default exchange - route to queue 'queue1'
return outbound;
}
@Bean
public MessageChannel amqpOutboundChannel() {
return new DirectChannel();
}
<int-amqp:outbound-channel-adapter id="outboundAmqp" 1
channel="outboundChannel" 2
amqp-template="myAmqpTemplate" 3
exchange-name="" 4
exchange-name-expression="" 5
order="1" 6
routing-key="" 7
routing-key-expression="" 8
default-delivery-mode"" 9
confirm-correlation-expression="" 10
confirm-ack-channel="" 11
confirm-nack-channel="" 12
confirm-timeout="" 13
wait-for-confirm="" 14
return-channel="" 15
error-message-strategy="" 16
header-mapper="" 17
mapped-request-headers="" 18
lazy-connect="true" 19
multi-send="false"/> 20
1 | 此适配器的唯一 ID。可选。 |
2 | 用于将消息发送到该通道的邮件通道,以便将邮件转换为 AMQP 交换并发布到该通道。必需。 |
3 | 配置的 AMQP 模板的 Bean 引用。可选(默认为 amqpTemplate )。 |
4 | 将消息发送到的 AMQP 交换的名称。如果未提供,将把消息发送到默认的无名称交换。与“exchange-name-expression”互斥。可选。 |
5 | 一个 SpEL 表达式,评估后确定发送消息的 AMQP 交换的名称,其中消息为根对象。如果没有提供,则消息将发送到默认的无名交换。与“exchange-name”互斥。可选。 |
6 | 当多个使用者已注册时的此使用者顺序,从而启用负载平衡和故障转移。可选(默认为 Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE] )。 |
7 | 将消息发送时使用的固定的路由键。默认情况下,这是一个空的 String .与“routing-key-expression”互斥。可选。 |
8 | 一个 SpEL 表达式,在发送消息时求值以确定要使用的路由键,并将消息作为根对象(例如,“payload.key”)。默认情况下,这是一个空的 String .与“routing-key”互斥。可选。 |
9 | 消息的默认传递模式:PERSISTENT 或 NON_PERSISTENT 。如果 header-mapper 设置了传递模式,则会覆盖。如果存在 Spring Integration 消息头 amqp_deliveryMode ,则 DefaultHeaderMapper 会设置值。如果没有提供此属性并且头映射器未将其设置,则默认值取决于 RabbitTemplate 使用的底层 Spring AMQP MessagePropertiesConverter 。如果根本没有对其进行自定义,则默认值为 PERSISTENT 。可选。 |
10 | 一个定义相关数据的表达式。一旦提供,将配置基础 AMQP 模板以接收发布确认。需要一个专用的 RabbitTemplate`和一个将 `publisherConfirms`属性设置为 `true`的 `CachingConnectionFactory .收到发布确认且提供相关数据时,将其写入 confirm-ack-channel`或 `confirm-nack-channel ,具体取决于确认类型。确认的有效内容是相关数据,由该表达式定义。消息设置一个“amqp_publishConfirm”头,将该头设置为 true (ack ) 或 false (nack )。示例: headers['myCorrelationData']`和 `payload .4.1 版引入了 amqp_publishConfirmNackCause`消息头。它包含“publisher confirmation”中“nack”的 `cause .从 4.2 版开始,如果表达式解析为 Message<?>`实例(例如 `#this ),则在 ack /`nack`通道上发出的消息将基于该消息,并添加一个或多个附加头。之前,无论类型为何,都会创建一个新的消息,其有效内容为相关数据。另请参阅 Alternative Mechanism for Publisher Confirms and Returns.可选。 |
11 | 将正向 (ack ) 发布确认发送到的通道。有效内容是 confirm-correlation-expression`定义的相关数据。如果表达式是 `#root`或 `#this ,将从原始消息构建消息,并将 amqp_publishConfirm`头设置为 `true 。另请参阅 Alternative Mechanism for Publisher Confirms and Returns.可选(默认设置为 `nullChannel )。 |
12 | 将负向 (nack ) 发布确认发送到的通道。有效内容是 confirm-correlation-expression`定义的相关数据(如果没有配置 `ErrorMessageStrategy )。如果表达式是 #root`或 `#this ,将从原始消息构建消息,并将 amqp_publishConfirm`头设置为 `false 。如果存在 ErrorMessageStrategy ,则该消息是一个 ErrorMessage ,其有效内容为 NackedAmqpMessageException 。另请参阅 Alternative Mechanism for Publisher Confirms and Returns.可选(默认设置为 nullChannel )。 |
13 | 如果在毫秒内未收到发布确认,则设置时,适配器会综合生成一个负向确认 (nack)。每过 50% 的该值就会检查待定的确认,因此发送 nack 的实际时间将在 1 倍到 1.5 倍该值之间。另请参阅 Alternative Mechanism for Publisher Confirms and Returns.默认无(不会生成 nack)。 |
14 | 设置为 true 时,调用线程将阻塞,等待发布确认。这需要配置用于确认的 RabbitTemplate`以及 `confirm-correlation-expression .该线程将最多阻塞 confirm-timeout`秒(默认 5 秒)。如果超时,将抛出 `MessageTimeoutException .如果启用返回并且消息返回,或者在等待确认时发生任何其他异常,将抛出 MessageHandlingException ,并附带相应的消息。 |
15 | 将返回的消息发送到的通道。一旦提供,将配置基础 AMQP 模板以将无法传递的消息返回给适配器。如果没有配置 ErrorMessageStrategy ,消息将使用从 AMQP 接收的数据构建,并带有以下附加头: amqp_returnReplyCode ,amqp_returnReplyText ,amqp_returnExchange ,amqp_returnRoutingKey 。如果存在 ErrorMessageStrategy ,则该消息是一个 ErrorMessage ,其有效内容为 ReturnedAmqpMessageException 。另请参阅 Alternative Mechanism for Publisher Confirms and Returns.可选。 |
16 | 对 ErrorMessageStrategy 实现的引用,用于在发送返回的消息或未被证实的消息时构建 ErrorMessage 实例。 |
17 | 在发送 AMQP 消息时使用的 AmqpHeaderMapper`的引用。默认情况下,只有标准 AMQP 属性(例如 `contentType )会复制到 Spring Integration MessageHeaders 。默认的 DefaultAmqpHeaderMapper 不会将任何用户定义的头复制到消息。如果提供了“request-header-names”,则不允许。可选。 |
18 | 要从 MessageHeaders`映射到 AMQP 消息的 AMQP 标头名称的逗号分隔列表。如果提供了“header-mapper”引用,则不允许。此列表中的值也可以是与标头名称匹配的简单模式(例如 `"" or "thing1, thing2"`或 `"*thing1" )。 |
19 | 设置为 `false`时,端点将在应用程序上下文初始化期间尝试连接到代理。这允许 "`fail fast`"检测错误配置,但也会导致在代理关闭时初始化失败。设置为 `true`时(默认),连接会在发送第一条消息时建立(如果它还未存在,因为其他组件已建立它)。 |
20 | 设置为 true`时,类型为 `Iterable<Message<?>>`的有效内容将作为离散消息在单个 `RabbitTemplate`调用范围内同一个通道上发送。需要 `RabbitTemplate 。当 wait-for-confirms`为 true 时,在发送完消息后将调用 `RabbitTemplate.waitForConfirmsOrDie() 。使用事务模板时,发送将在新事务中执行,或者在已存在的事务中执行(如果存在)。 |
Example 1. return-channel
使用 return-channel
需要将 RabbitTemplate
的 mandatory
属性设置为 true
,并将 CachingConnectionFactory
的 publisherReturns
属性设置为 true
。当使用带有返回的多个出站端点时,每个端点都需要一个单独的 RabbitTemplate
。