Detecting Idle Asynchronous Consumers

虽然异步消费者很有效,但其中一个问题是在检测到它们空闲时——用户可能需要在一段时间内未收到任何消息时执行一些操作。 从 1.6 版开始,现在可以通过设置监听器容器,以便在没有消息传递时经过一段时间后发布 ListenerContainerIdleEvent 来对其进行配置。在容器空闲时,每隔 idleEventInterval 毫秒发布一次事件。 要配置此功能,请在容器上设置 idleEventInterval。以下示例展示了如何在 XML 和 Java 中执行此操作(适用于 SimpleMessageListenerContainerSimpleRabbitListenerContainerFactory):

<rabbit:listener-container connection-factory="connectionFactory"
        ...
        idle-event-interval="60000"
        ...
        >
    <rabbit:listener id="container1" queue-names="foo" ref="myListener" method="handle" />
</rabbit:listener-container>
@Bean
public SimpleMessageListenerContainer(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    ...
    container.setIdleEventInterval(60000L);
    ...
    return container;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(rabbitConnectionFactory());
    factory.setIdleEventInterval(60000L);
    ...
    return factory;
}

在每个案例中,容器空闲时每分钟发布一次事件。

Event Consumption

您可以通过实现 ApplicationListener 来捕获空闲事件——它既可以是一个通用监听器,也可以是专门接收此特定事件的监听器。您还可以使用 Spring Framework 4.2 中引入的 @EventListener

以下示例将 @RabbitListener@EventListener 组合到单个类中。您需要理解应用程序监听器可以获取所有容器的事件,因此如果您想根据哪个容器处于空闲状态执行特定操作,您可能需要检查监听器 ID。您还可以为此目的使用 @EventListener condition

这些事件有四个属性:

  • source:侦听器容器实例

  • id:侦听器 ID(或容器 Bean 名称)

  • idleTime:发布事件时容器已处于空闲状态的时间

  • queueNames:容器侦听的队列名称

以下示例演示了如何同时使用 @RabbitListener@EventListener 注释创建监听器:

public class Listener {

    @RabbitListener(id="someId", queues="#{queue.name}")
    public String listen(String foo) {
        return foo.toUpperCase();
    }

    @EventListener(condition = "event.listenerId == 'someId'")
    public void onApplicationEvent(ListenerContainerIdleEvent event) {
        ...
    }

}

事件侦听器会看到所有容器的事件。因此,在前面的示例中,我们根据侦听器 ID 缩小接收的事件范围。

如果你希望使用空闲事件来停止侦听器容器,则不应在调用侦听器的线程上调用 container.stop()。这样做总是会导致延迟和不必要的日志消息。相反,你应该将事件传递给可以停止容器的其他线程。