Application Events

侦听器容器及其消费者会发布以下 Spring 应用事件:

The following Spring application events are published by listener containers and their consumers:

  • ConsumerStartingEvent: published when a consumer thread is first started, before it starts polling.

  • ConsumerStartedEvent: published when a consumer is about to start polling.

  • ConsumerFailedToStartEvent: published if no ConsumerStartingEvent is published within the consumerStartTimeout container property. This event might signal that the configured task executor has insufficient threads to support the containers it is used in and their concurrency. An error message is also logged when this condition occurs.

  • ListenerContainerIdleEvent: published when no messages have been received in idleInterval (if configured).

  • ListenerContainerNoLongerIdleEvent: published when a record is consumed after previously publishing a ListenerContainerIdleEvent.

  • ListenerContainerPartitionIdleEvent: published when no messages have been received from that partition in idlePartitionEventInterval (if configured).

  • ListenerContainerPartitionNoLongerIdleEvent: published when a record is consumed from a partition that has previously published a ListenerContainerPartitionIdleEvent.

  • NonResponsiveConsumerEvent: published when the consumer appears to be blocked in the poll method.

  • ConsumerPartitionPausedEvent: published by each consumer when a partition is paused.

  • ConsumerPartitionResumedEvent: published by each consumer when a partition is resumed.

  • ConsumerPausedEvent: published by each consumer when the container is paused.

  • ConsumerResumedEvent: published by each consumer when the container is resumed.

  • ConsumerStoppingEvent: published by each consumer just before stopping.

  • ConsumerStoppedEvent: published after the consumer is closed. See Thread Safety.

  • ConsumerRetryAuthEvent: published when authentication or authorization of a consumer fails and is being retried.

  • ConsumerRetryAuthSuccessfulEvent: published when authentication or authorization has been retried successfully. Can only occur when there has been a ConsumerRetryAuthEvent before.

  • ContainerStoppedEvent: published when all consumers have stopped.

默认情况下,应用程序上下文的事件多播器将在调用线程上调用事件侦听器。如果您更改多播器以使用异步执行器,则当事件包含对消费者的引用时,您不得调用任何 Consumer 方法。

By default, the application context’s event multicaster invokes event listeners on the calling thread. If you change the multicaster to use an async executor, you must not invoke any Consumer methods when the event contains a reference to the consumer.

ListenerContainerIdleEvent 具有以下属性:

The ListenerContainerIdleEvent has the following properties:

  • source: The listener container instance that published the event.

  • container: The listener container or the parent listener container, if the source container is a child.

  • id: The listener ID (or container bean name).

  • idleTime: The time the container had been idle when the event was published.

  • topicPartitions: The topics and partitions that the container was assigned at the time the event was generated.

  • consumer: A reference to the Kafka Consumer object. For example, if the consumer’s pause() method was previously called, it can resume() when the event is received.

  • paused: Whether the container is currently paused. See Pausing and Resuming Listener Containers for more information.

ListenerContainerNoLongerIdleEvent 具有相同的属性,但 idleTimepaused 除外。

The ListenerContainerNoLongerIdleEvent has the same properties, except idleTime and paused.

ListenerContainerPartitionIdleEvent 具有以下属性:

The ListenerContainerPartitionIdleEvent has the following properties:

  • source: The listener container instance that published the event.

  • container: The listener container or the parent listener container, if the source container is a child.

  • id: The listener ID (or container bean name).

  • idleTime: The time partition consumption had been idle when the event was published.

  • topicPartition: The topic and partition that triggered the event.

  • consumer: A reference to the Kafka Consumer object. For example, if the consumer’s pause() method was previously called, it can resume() when the event is received.

  • paused: Whether that partition consumption is currently paused for that consumer. See Pausing and Resuming Listener Containers for more information.

ListenerContainerPartitionNoLongerIdleEvent 具有相同的属性,但 idleTimepaused 除外。

The ListenerContainerPartitionNoLongerIdleEvent has the same properties, except idleTime and paused.

NonResponsiveConsumerEvent 具有以下属性:

The NonResponsiveConsumerEvent has the following properties:

  • source: The listener container instance that published the event.

  • container: The listener container or the parent listener container, if the source container is a child.

  • id: The listener ID (or container bean name).

  • timeSinceLastPoll: The time just before the container last called poll().

  • topicPartitions: The topics and partitions that the container was assigned at the time the event was generated.

  • consumer: A reference to the Kafka Consumer object. For example, if the consumer’s pause() method was previously called, it can resume() when the event is received.

  • paused: Whether the container is currently paused. See Pausing and Resuming Listener Containers for more information.

ConsumerPausedEventConsumerResumedEventConsumerStopping 事件具有以下属性:

The ConsumerPausedEvent, ConsumerResumedEvent, and ConsumerStopping events have the following properties:

  • source: The listener container instance that published the event.

  • container: The listener container or the parent listener container, if the source container is a child.

  • partitions: The TopicPartition instances involved.

ConsumerPartitionPausedEventConsumerPartitionResumedEvent 事件具有以下属性:

The ConsumerPartitionPausedEvent, ConsumerPartitionResumedEvent events have the following properties:

  • source: The listener container instance that published the event.

  • container: The listener container or the parent listener container, if the source container is a child.

  • partition: The TopicPartition instance involved.

ConsumerRetryAuthEvent 事件具有以下属性:

The ConsumerRetryAuthEvent event has the following properties:

  • source: The listener container instance that published the event.

  • container: The listener container or the parent listener container, if the source container is a child.

  • reason:

    • AUTHENTICATION - the event was published because of an authentication exception.

    • AUTHORIZATION - the event was published because of an authorization exception.

ConsumerStartingEventConsumerStartingEventConsumerFailedToStartEventConsumerStoppedEventConsumerRetryAuthSuccessfulEventContainerStoppedEvent 事件具有以下属性:

The ConsumerStartingEvent, ConsumerStartingEvent, ConsumerFailedToStartEvent, ConsumerStoppedEvent, ConsumerRetryAuthSuccessfulEvent and ContainerStoppedEvent events have the following properties:

  • source: The listener container instance that published the event.

  • container: The listener container or the parent listener container, if the source container is a child.

所有容器(包括子级或父级)都发布 ContainerStoppedEvent。对于父级容器,source 和 container 属性是相同的。

All containers (whether a child or a parent) publish ContainerStoppedEvent. For a parent container, the source and container properties are identical.

此外,ConsumerStoppedEvent 还具有以下额外的属性:

In addition, the ConsumerStoppedEvent has the following additional property:

  • reason:

    • NORMAL - the consumer stopped normally (container was stopped).

    • ERROR - a java.lang.Error was thrown.

    • FENCED - the transactional producer was fenced and the stopContainerWhenFenced container property is true.

    • AUTH - an AuthenticationException or AuthorizationException was thrown and the authExceptionRetryInterval is not configured.

    • NO_OFFSET - there is no offset for a partition and the auto.offset.reset policy is none.

可以在出现此类情况后使用此事件重新启动容器:

You can use this event to restart the container after such a condition:

if (event.getReason.equals(Reason.FENCED)) {
    event.getSource(MessageListenerContainer.class).start();
}

Detecting Idle and Non-Responsive Consumers

尽管效率很高,但异步消费者检测何时处于空闲状态时存在一个问题。如果在一段时间内没有消息到达,您可能需要采取一些措施。

While efficient, one problem with asynchronous consumers is detecting when they are idle. You might want to take some action if no messages arrive for some period of time.

您可以将侦听器容器配置为在一段时间没有消息传送到时发布 ListenerContainerIdleEvent。在容器空闲时,每隔 idleEventInterval 毫秒发布一个事件。

You can configure the listener container to publish a ListenerContainerIdleEvent when some time passes with no message delivery. While the container is idle, an event is published every idleEventInterval milliseconds.

要配置此功能,请在容器上设置 idleEventInterval。以下示例演示如何执行此操作:

To configure this feature, set the idleEventInterval on the container. The following example shows how to do so:

@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
    ...
    containerProps.setIdleEventInterval(60000L);
    ...
    KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
    return container;
}

以下示例展示了如何为 @KafkaListener 设置 idleEventInterval

The following example shows how to set the idleEventInterval for a @KafkaListener:

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.getContainerProperties().setIdleEventInterval(60000L);
    ...
    return factory;
}

在每个案例中,容器空闲时每分钟发布一次事件。

In each of these cases, an event is published once per minute while the container is idle.

如果由于某种原因,使用者 poll() 方法没有退出,没有接收到任何消息,也无法生成空闲事件(当代理不可访问时,早期版本的 kafka-clients 存在此问题)。在这种情况下,如果轮回没有在 3xpollTimeout 属性内返回,则容器会发布 NonResponsiveConsumerEvent。默认情况下,在每个容器中每 30 秒执行一次此检查。可以在配置侦听器容器时,在 ContainerProperties 中设置 monitorInterval(默认 30 秒)和 noPollThreshold(默认 3.0)属性来修改此行为。noPollThreshold 应大于 1.0,以避免因竞争条件而获得虚假事件。收到此类事件后,可以停止容器,从而唤醒使用者以便它停止。

If, for some reason, the consumer poll() method does not exit, no messages are received and idle events cannot be generated (this was a problem with early versions of the kafka-clients when the broker wasn’t reachable). In this case, the container publishes a NonResponsiveConsumerEvent if a poll does not return within 3x the pollTimeout property. By default, this check is performed once every 30 seconds in each container. You can modify this behavior by setting the monitorInterval (default 30 seconds) and noPollThreshold (default 3.0) properties in the ContainerProperties when configuring the listener container. The noPollThreshold should be greater than 1.0 to avoid getting spurious events due to a race condition. Receiving such an event lets you stop the containers, thus waking the consumer so that it can stop.

从 2.6.2 版本开始,如果容器已发布 ListenerContainerIdleEvent,则在随后收到记录时将发布 ListenerContainerNoLongerIdleEvent

Starting with version 2.6.2, if a container has published a ListenerContainerIdleEvent, it will publish a ListenerContainerNoLongerIdleEvent when a record is subsequently received.

Event Consumption

可以通过实现 ApplicationListener 来捕获这些事件,既可以是通用侦听器,也可以是仅接收此特定事件的窄化侦听器。您还可以使用 Spring Framework 4.2 中引入的 @EventListener

You can capture these events by implementing ApplicationListener — either a general listener or one narrowed to only receive this specific event. You can also use @EventListener, introduced in Spring Framework 4.2.

下一个示例将 @KafkaListener@EventListener 组合到一个类中。您应该明白,应用程序侦听器获取所有容器的事件,因此如果您想根据空闲的特定容器采取具体措施,您可能需要检查侦听器 ID。您还可以为此目的使用 @EventListenercondition

The next example combines @KafkaListener and @EventListener into a single class. You should understand that the application listener gets events for all containers, so you may need to check the listener ID if you want to take specific action based on which container is idle. You can also use the @EventListener’s `condition for this purpose.

请参阅 Application Events 来了解事件属性的信息。

See Application Events for information about event properties.

该事件通常在使用者线程上发布,因此与 Consumer 对象进行交互是安全的。

The event is normally published on the consumer thread, so it is safe to interact with the Consumer object.

以下示例同时使用了 @KafkaListener@EventListener

The following example uses both @KafkaListener and @EventListener:

public class Listener {

    @KafkaListener(id = "qux", topics = "annotated")
    public void listen4(@Payload String foo, Acknowledgment ack) {
        ...
    }

    @EventListener(condition = "event.listenerId.startsWith('qux-')")
    public void eventHandler(ListenerContainerIdleEvent event) {
        ...
    }

}

事件侦听器可以看到所有容器的事件。因此,在下例中,我们会根据侦听器 ID 缩小接收的事件范围。由于为 @KafkaListener 创建的容器支持并发,所以实际容器会命名为 id-n,其中 n 是用于支持并发的每个实例的唯一值。这就是我们在条件中使用 startsWith 的原因。

Event listeners see events for all containers. Consequently, in the preceding example, we narrow the events received based on the listener ID. Since containers created for the @KafkaListener support concurrency, the actual containers are named id-n where the n is a unique value for each instance to support the concurrency. That is why we use startsWith in the condition.

如果您希望使用空闲事件停止侦听器容器,则不应在调用侦听器的线程上调用 container.stop()。这会导致延迟和不必要的日志消息。相反,您应该将事件传递给可以停止容器的其他线程。此外,如果它是子容器,则您不应 stop() 容器实例。您应该停止并发容器。

If you wish to use the idle event to stop the lister container, you should not call container.stop() on the thread that calls the listener. Doing so causes delays and unnecessary log messages. Instead, you should hand off the event to a different thread that can then stop the container. Also, you should not stop() the container instance if it is a child container. You should stop the concurrent container instead.

Current Positions when Idle

请注意,你可以在空闲状态被检测到时,通过在侦听器中实现 ConsumerSeekAware 来获得当前的位置。请参阅 seek 中的 onIdleContainer()

Note that you can obtain the current positions when idle is detected by implementing ConsumerSeekAware in your listener. See onIdleContainer() in seek.