Inbound Channel Adapter

以下清单显示了 AMQP 入站通道适配器可能的配置选项:

  • Java DSL

  • Java

  • XML

@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "aName"))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
}
@Bean
public MessageChannel amqpInputChannel() {
    return new DirectChannel();
}

@Bean
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
        @Qualifier("amqpInputChannel") MessageChannel channel) {
    AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
    adapter.setOutputChannel(channel);
    return adapter;
}

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container =
                               new SimpleMessageListenerContainer(connectionFactory);
    container.setQueueNames("aName");
    container.setConcurrentConsumers(2);
    // ...
    return container;
}

@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
    return new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println(message.getPayload());
        }

    };
}
<int-amqp:inbound-channel-adapter
                                  id="inboundAmqp"                1
                                  channel="inboundChannel"        2
                                  queue-names="si.test.queue"     3
                                  acknowledge-mode="AUTO"         4
                                  advice-chain=""                 5
                                  channel-transacted=""           6
                                  concurrent-consumers=""         7
                                  connection-factory=""           8
                                  error-channel=""                9
                                  expose-listener-channel=""      10
                                  header-mapper=""                11
                                  mapped-request-headers=""       12
                                  listener-container=""           13
                                  message-converter=""            14
                                  message-properties-converter="" 15
                                  phase=""                        16
                                  prefetch-count=""               17
                                  receive-timeout=""              18
                                  recovery-interval=""            19
                                  missing-queues-fatal=""         20
                                  shutdown-timeout=""             21
                                  task-executor=""                22
                                  transaction-attribute=""        23
                                  transaction-manager=""          24
                                  tx-size=""                      25
                                  consumers-per-queue             26
                                  batch-mode="MESSAGES"/>         27

<1> The unique ID for this adapter.
Optional.
<2> Message channel to which converted messages should be sent.
Required.
<3> Names of the AMQP queues (comma-separated list) from which messages should be consumed.
Required.
<4> Acknowledge mode for the `MessageListenerContainer`.
When set to `MANUAL`, the delivery tag and channel are provided in message headers `amqp_deliveryTag` and `amqp_channel`, respectively.
The user application is responsible for acknowledgement.
`NONE` means no acknowledgements (`autoAck`).
`AUTO` means the adapter's container acknowledges when the downstream flow completes.
Optional (defaults to AUTO).
See xref:amqp/inbound-ack.adoc[Inbound Endpoint Acknowledge Mode].
<5> Extra AOP Advices to handle cross-cutting behavior associated with this inbound channel adapter.
Optional.
<6> Flag to indicate that channels created by this component are transactional.
If true, it tells the framework to use a transactional channel and to end all operations (send or receive) with a commit or rollback, depending on the outcome, with an exception that signals a rollback.
Optional (Defaults to false).
<7> Specify the number of concurrent consumers to create.
The default is `1`.
We recommend raising the number of concurrent consumers to scale the consumption of messages coming in from a queue.
However, note that any ordering guarantees are lost once multiple consumers are registered.
In general, use one consumer for low-volume queues.
Not allowed when 'consumers-per-queue' is set.
Optional.
<8> Bean reference to the RabbitMQ `ConnectionFactory`.
Optional (defaults to `connectionFactory`).
<9> Message channel to which error messages should be sent.
Optional.
<10> Whether the listener channel (com.rabbitmq.client.Channel) is exposed to a registered `ChannelAwareMessageListener`.
Optional (defaults to true).
<11> A reference to an `AmqpHeaderMapper` to use when receiving AMQP Messages.
Optional.
By default, only standard AMQP properties (such as `contentType`) are copied to Spring Integration `MessageHeaders`.
Any user-defined headers within the AMQP `MessageProperties` are NOT copied to the message by the default `DefaultAmqpHeaderMapper`.
Not allowed if 'request-header-names' is provided.
<12> Comma-separated list of the names of AMQP Headers to be mapped from the AMQP request into the `MessageHeaders`.
This can only be provided if the 'header-mapper' reference is not provided.
The values in this list can also be simple patterns to be matched against the header names (such as "\*" or "thing1*, thing2" or "*something").
<13> Reference to the `AbstractMessageListenerContainer` to use for receiving AMQP Messages.
If this attribute is provided, no other attribute related to the listener container configuration should be provided.
In other words, by setting this reference, you must take full responsibility for the listener container configuration.
The only exception is the `MessageListener` itself.
Since that is actually the core responsibility of this channel adapter implementation, the referenced listener container must not already have its own `MessageListener`.
Optional.
<14> The `MessageConverter` to use when receiving AMQP messages.
Optional.
<15> The `MessagePropertiesConverter` to use when receiving AMQP messages.
Optional.
<16> Specifies the phase in which the underlying `AbstractMessageListenerContainer` should be started and stopped.
The startup order proceeds from lowest to highest, and the shutdown order is the reverse of that.
By default, this value is `Integer.MAX_VALUE`, meaning that this container starts as late as possible and stops as soon as possible.
Optional.
<17> Tells the AMQP broker how many messages to send to each consumer in a single request.
Often, you can set this value high to improve throughput.
It should be greater than or equal to the transaction size (see the `tx-size` attribute, later in this list).
Optional (defaults to `1`).
<18> Receive timeout in milliseconds.
Optional (defaults to `1000`).
<19> Specifies the interval between recovery attempts of the underlying `AbstractMessageListenerContainer` (in milliseconds).
Optional (defaults to `5000`).
<20> If 'true' and none of the queues are available on the broker, the container throws a fatal exception during startup and stops if the queues are deleted when the container is running (after making three attempts to passively declare the queues).
If `false`, the container does not throw an exception and goes into recovery mode, attempting to restart according to the `recovery-interval`.
Optional (defaults to `true`).
<21> The time to wait for workers (in milliseconds) after the underlying `AbstractMessageListenerContainer` is stopped and before the AMQP connection is forced closed.
If any workers are active when the shutdown signal comes, they are allowed to finish processing as long as they can finish within this timeout.
Otherwise, the connection is closed and messages remain unacknowledged (if the channel is transactional).
Optional (defaults to `5000`).
<22> By default, the underlying `AbstractMessageListenerContainer` uses a `SimpleAsyncTaskExecutor` implementation, that fires up a new thread for each task, running it asynchronously.
By default, the number of concurrent threads is unlimited.
Note that this implementation does not reuse threads.
Consider using a thread-pooling `TaskExecutor` implementation as an alternative.
Optional (defaults to `SimpleAsyncTaskExecutor`).
<23> By default, the underlying `AbstractMessageListenerContainer` creates a new instance of the `DefaultTransactionAttribute` (it takes the EJB approach to rolling back on runtime but not checked exceptions).
Optional (defaults to `DefaultTransactionAttribute`).
<24> Sets a bean reference to an external `PlatformTransactionManager` on the underlying `AbstractMessageListenerContainer`.
The transaction manager works in conjunction with the `channel-transacted` attribute.
If there is already a transaction in progress when the framework is sending or receiving a message and the `channelTransacted` flag is `true`, the commit or rollback of the messaging transaction is deferred until the end of the current transaction.
If the `channelTransacted` flag is `false`, no transaction semantics apply to the messaging operation (it is auto-acked).
For further information, see
https://docs.spring.io/spring-amqp/reference/html/%255Freference.html#%5Ftransactions[Transactions with Spring AMQP].
Optional.
<25> Tells the `SimpleMessageListenerContainer` how many messages to process in a single transaction (if the channel is transactional).
For best results, it should be less than or equal to the value set in `prefetch-count`.
Not allowed when 'consumers-per-queue' is set.
Optional (defaults to `1`).
<26> Indicates that the underlying listener container should be a `DirectMessageListenerContainer` instead of the default `SimpleMessageListenerContainer`.
See the https://docs.spring.io/spring-amqp/reference/html/[Spring AMQP Reference Manual] for more information.
<27> When the container's `consumerBatchEnabled` is `true`, determines how the adapter presents the batch of messages in the message payload.
When set to `MESSAGES` (default), the payload is a `List<Message<?>>` where each message has headers mapped from the incoming AMQP `Message` and the payload is the converted `body`.
When set to `EXTRACT_PAYLOADS`, the payload is a `List<?>` where the elements are converted from the AMQP `Message` body.
`EXTRACT_PAYLOADS_WITH_HEADERS` is similar to `EXTRACT_PAYLOADS` but, in addition, the headers from each message are mapped from the `MessageProperties` into a `List<Map<String, Object>` at the corresponding index; the header name is `AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS`.
container

请注意,使用 XML 配置外部容器时,不能使用 Spring AMQP 命名空间来定义容器。这是因为该命名空间至少需要一个 <listener/> 元素。在此环境中,侦听器内部属于适配器。出于此原因,必须使用普通的 Spring <bean/> 定义来定义容器,如下面的示例所示:

<bean id="container"
 class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueNames" value="aName.queue" />
    <property name="defaultRequeueRejected" value="false"/>
</bean>

尽管 Spring Integration JMS 和 AMQP 支持类似,但仍存在重要差异。JMS 入站通道适配器在内部使用 JmsDestinationPollingSource 预设并且需要一个已配置轮询器。AMQP 入站通道适配器使用 AbstractMessageListenerContainer 预设并且受消息驱动。在这方面,它更类似于 JMS 消息驱动通道适配器。

从 5.5 版本开始,AmqpInboundChannelAdapter 可以用 org.springframework.amqp.rabbit.retry.MessageRecoverer 策略进行配置,该策略在内部调用重试操作时会在 RecoveryCallback 中使用。有关更多信息,请参阅 setMessageRecoverer() JavaDoc。 @Publisher 批注还可以结合 @RabbitListener 使用:

@Configuration
@EnableIntegration
@EnableRabbit
@EnablePublisher
public static class ContextConfiguration {

    @Bean
    QueueChannel fromRabbitViaPublisher() {
        return new QueueChannel();
    }

    @RabbitListener(queuesToDeclare = @Queue("publisherQueue"))
    @Publisher("fromRabbitViaPublisher")
    @Payload("#args.payload.toUpperCase()")
    public void consumeForPublisher(String payload) {

    }

}

默认情况下,@Publisher AOP 拦截器处理来自方法调用的返回值。但是,@RabbitListener 的返回值被视为 AMQP 回复消息。因此,这种方法不能和 @Publisher 一起使用,所以针对方法参数带有相应 SpEL 表达式的 @Payload 注释便是此组合的推荐方式。有关 @Publisher 的更多信息,请参阅 Annotation-driven Configuration 章节。

在侦听器容器中使用独占或单一活动使用者时,建议您将容器属性 forceStop 设置为 true。这会防止竞争条件,在该条件下,在停止容器后,另一个使用者可能在此实例完全停止之前开始使用消息。

Batched Messages

有关批处理消息的更多信息,请参见 Spring AMQP 文档:https://docs.spring.io/spring-amqp/docs/current/reference/html/#template-batching。

要使用 Spring Integration 生成批处理消息,只需使用 BatchingRabbitTemplate 配置输出端点。

在接收批处理消息时,默认情况下,监听器容器会提取每个片段消息,适配器会为每个片段生成一个 Message<?>。从 5.2 版开始,如果容器的 deBatchingEnabled 属性设置为 false,则反批处理将由适配器执行,并且会生成一个 Message<List<?>>,其有效负载是片段有效负载的列表(如果适用,在转换之后)。

默认的 BatchingStrategySimpleBatchingStrategy,但可以在适配器上覆盖此策略。

在需要恢复重试操作的批次时,必须使用 org.springframework.amqp.rabbit.retry.MessageBatchRecoverer 预设。