Enable Listener Endpoint Annotations

若要启用对 @RabbitListener 注解的支持,你可以将 @EnableRabbit 添加到 @Configuration 类中。以下示例展示了如何操作:

@Configuration
@EnableRabbit
public class AppConfig {

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        factory.setContainerCustomizer(container -> /* customize the container */);
        return factory;
    }
}

从版本 2.0 开始,DirectMessageListenerContainerFactory 也可用。它创建 DirectMessageListenerContainer 实例。

有关帮助您在 SimpleRabbitListenerContainerFactoryDirectRabbitListenerContainerFactory 之间进行选择的的信息,请参阅 Choosing a Container

从版本 2.2.2 开始,你可以提供 ContainerCustomizer 实施(如上所示)。这可用于在容器创建和配置后对其进行进一步配置;例如,你可以使用它来设置容器工厂未公开的属性。

版本 2.4.8 提供 CompositeContainerCustomizer,用于希望应用多个定制程序的情况。

默认情况下,基础架构会查找名为 rabbitListenerContainerFactory 的 Bean,将其作为用于创建消息侦听器容器的工厂的源。在这种情况下,且忽略 RabbitMQ 基础架构设置,processOrder 方法可以使用三个线程的核心轮询大小和十个线程的最大池大小进行调用。

您可以自定义要用于每个注释的侦听器容器工厂,或者可以通过实现 RabbitListenerConfigurer 接口配置明确的默认值。只有在未注册任何特定容器工厂的情况下才需要使用该默认值。有关完整详细信息和示例,请参阅 Javadoc

容器工厂提供添加 MessagePostProcessor 实例的方法,这些实例在接收到消息(调用侦听器之前)和发送回复之前应用。

请参阅 Reply Management以了解有关答复的信息。

从版本 2.0.6 开始,你可以向侦听器容器工厂添加 RetryTemplateRecoveryCallback。当发送回复时会使用它。在用尽重试次数时调用 RecoveryCallback。你可以使用 SendRetryContextAccessor 来获取上下文中的信息。以下示例展示了如何操作:

factory.setRetryTemplate(retryTemplate);
factory.setReplyRecoveryCallback(ctx -> {
    Message failed = SendRetryContextAccessor.getMessage(ctx);
    Address replyTo = SendRetryContextAccessor.getAddress(ctx);
    Throwable t = ctx.getLastThrowable();
    ...
    return null;
});

如果你更喜欢 XML 配置,可以使用 <rabbit:annotation-driven> 元素。检测到使用 @RabbitListener 注解的任何 Bean。

对于 SimpleRabbitListenerContainer 实例,你可以使用类似于以下内容的 XML:

<rabbit:annotation-driven/>

<bean id="rabbitListenerContainerFactory"
      class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="concurrentConsumers" value="3"/>
    <property name="maxConcurrentConsumers" value="10"/>
</bean>

对于 DirectMessageListenerContainer 实例,你可以使用类似于以下内容的 XML:

<rabbit:annotation-driven/>

<bean id="rabbitListenerContainerFactory"
      class="org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="consumersPerQueue" value="3"/>
</bean>

从版本 2.0 开始,@RabbitListener 注解具有 concurrency 属性。它支持 SpEL 表达式(#{…​})和属性占位符(${…​})。其含义和允许的值取决于容器类型,如下所示:

  • 对于 DirectMessageListenerContainer,该值必须是单个整数,将 consumersPerQueue 属性设置在容器上。

  • 对于 SimpleRabbitListenerContainer,该值可以是单个整数,将 concurrentConsumers 属性设置在容器上,或者可以采用以下形式:m-n,其中 mconcurrentConsumers 属性,nmaxConcurrentConsumers 属性。

在任何一种情况下,此设置都会覆盖工厂中的设置。以前,如果你的侦听器需要不同的并发性,则必须定义不同的容器工厂。

该注解还允许通过 autoStartupexecutor(从 2.2 开始)注解属性覆盖工厂的 autoStartuptaskExecutor 属性。为每个属性使用不同的执行器有助于在日志和线程转储中识别与每个侦听器关联的线程。

版本 2.2 还添加了 ackMode 属性,它允许你覆盖容器工厂的 acknowledgeMode 属性。

@RabbitListener(id = "manual.acks.1", queues = "manual.acks.1", ackMode = "MANUAL")
public void manual1(String in, Channel channel,
    @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {

    ...
    channel.basicAck(tag, false);
}