Message Listener Containers

KafkaMessageListenerContainer 从单个线程上处理所有主题或分区的全部消息。ConcurrentMessageListenerContainer 则委派给一个或多个 KafkaMessageListenerContainer 实例,以便提供多线程消耗。文章还讨论了记录拦截器、偏移提交、确认和侦听器容器自动启动等其他相关主题。

提供两个 MessageListenerContainer 实现:

  • KafkaMessageListenerContainer

  • ConcurrentMessageListenerContainer

KafkaMessageListenerContainer 从单个线程上所有主题或分区接收所有消息。ConcurrentMessageListenerContainer 委托给一个或多个 KafkaMessageListenerContainer 实例,以便提供多线程消耗。 从版本 2.2.7 开始,您可以向侦听器容器添加 RecordInterceptor;它将在调用侦听器之前被调用,从而允许检查或修改记录。如果拦截器返回 null,则不会调用侦听器。从版本 2.7 开始,它具有其他在侦听器退出后(正常退出或通过抛出异常)调用的方法。此外,从版本 2.7 开始,现在有一个 BatchInterceptor,为 Batch Listeners 提供类似的功能。此外,ConsumerAwareRecordInterceptor (和 BatchInterceptor)提供对 Consumer<?, ?> 的访问。例如,这可用于在拦截器中访问使用指标。

您不应在这些拦截器中执行任何影响消费者位置和已提交偏移量的方法;容器需要管理此类信息。

如果拦截器改变了记录(通过创建一个新的记录),则 topicpartitionoffset 必须保持不变以避免意外的副作用,如记录丢失。

CompositeRecordInterceptorCompositeBatchInterceptor 可以用于调用多个拦截器。 默认情况下,从版本 2.8 开始,在使用事务时,会在事务启动之前调用拦截器。可以将侦听器容器的 interceptBeforeTx 属性设置为 false,以在事务启动之后调用拦截器。从版本 2.9 开始,这将适用于任何事务管理器,而不仅仅是 KafkaAwareTransactionManager。例如,这允许拦截器参与容器启动的 JDBC 事务。 从版本 2.3.8、2.4.6 开始,当并发性大于 1 时,ConcurrentMessageListenerContainer 现在支持 Static Membershipgroup.instance.id-n 为后缀,其中 n1 开始。这与增加的 session.timeout.ms 一起,可用于减少再平衡事件,例如,在重新启动应用程序实例时。

Using KafkaMessageListenerContainer

以下构造函数可用:

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)

它接收一个 ConsumerFactory 和有关主题和分区的的信息,以及 ContainerProperties 对象中的其他配置。ContainerProperties 具有以下构造函数:

public ContainerProperties(TopicPartitionOffset... topicPartitions)

public ContainerProperties(String... topics)

public ContainerProperties(Pattern topicPattern)

第一个构造函数使用 TopicPartitionOffset 参数数组来明确地指示容器使用哪些分区(使用消费者 assign() 方法)以及可选的初始偏移量。正值默认情况下是绝对偏移量。负值默认情况下相对分区内的当前最后偏移量。提供了一个带有额外布尔参数的 TopicPartitionOffset 构造函数。如果它为 true,则初始偏移量(正或负)将相对此消费者的当前位置。在启动容器时应用偏移量。第二个构造函数使用主题数组,并且 Kafka 根据 group.id 属性分配分区,将分区分布到组中。第三个构造函数使用正则表达式 Pattern 来选择主题。

要向容器分配 MessageListener,可以在创建容器时使用 ContainerProps.setMessageListener 方法。以下示例显示如何执行此操作:

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

请注意,在创建 DefaultKafkaConsumerFactory 时,使用仅在上面获取属性的构造函数意味着键和值 Deserializer 类将从配置中获取。或者,可以为键和/或值将 Deserializer 实例传递给 DefaultKafkaConsumerFactory 构造函数,在这种情况下,所有消费者共享相同的实例。另一种选择是提供 Supplier<Deserializer> s(从 2.3 版开始),这些 Supplier<Deserializer> s 将用于为每个 Consumer 获取单独的 Deserializer 实例:

DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

有关可以设置的各种属性的更多信息,请参阅 Javadoc 中的 ContainerProperties

从版本 2.1.1 开始,可以使用名为 logContainerConfig 的新属性。当为 true 并且启用了 INFO 记录时,每个侦听器容器都会写入一个日志消息,总结其配置属性。

默认情况下,以 DEBUG 记录级别执行主题偏移量提交的记录。从版本 2.1.2 开始,ContainerProperties 中的一个名为 commitLogLevel 的属性允许指定这些消息的日志级别。例如,要将日志级别更改为 INFO,可以使用 containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);

从版本 2.2 开始,添加了一个名为 missingTopicsFatal 的新容器属性(自 2.3.4 起的默认值为 false)。如果代理上不存在任何已配置主题,则此属性将阻止容器启动。如果将容器配置为侦听主题模式(正则表达式),则它不会应用。以前,容器线程在记录许多消息的同时循环于 consumer.poll() 方法内,等待主题出现。除了日志之外,没有表明存在问题。

从版本 2.8 开始,引入了名为 authExceptionRetryInterval 的新容器属性。这会导致容器在从 KafkaConsumer 获取任何 AuthenticationExceptionAuthorizationException 之后重试获取消息。例如,当配置的用户被拒绝读取某个主题的访问权限或凭据不正确时,就会发生这种情况。定义 authExceptionRetryInterval 允许当授予适当权限时容器恢复。

默认情况下,不配置任何时间间隔 - 身份验证和授权错误被视为致命错误,导致容器停止。

从版本 2.8 开始,在创建消费者工厂时,如果将反序列化程序作为对象提供(在构造函数中或通过设置程序提供),则工厂将调用 configure() 方法,使用配置属性对其进行配置。

Using ConcurrentMessageListenerContainer

单个构造器与 KafkaListenerContainer 构造器类似。以下清单显示了构造器的签名:

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)

它还具有 concurrency 属性。例如,container.setConcurrency(3) 创建三个 KafkaMessageListenerContainer 实例。

对于第一个构造器,Kafka 使用其组管理功能在消费者之间分配分区。

在监听多个主题时,默认的分区分配可能不是您所期望的。例如,如果您有 3 个每个包含 5 个分区的主题,并且您想使用 concurrency=15,那么您只会看到 5 个活动消费者,每个消费者从每个主题中分配一个分区,而另有 10 个消费者处于空闲状态。这是因为默认的 Kafka PartitionAssignorRangeAssignor(请参阅其 Javadoc)。对于此方案,您可能需要考虑改用 RoundRobinAssignor,它将分区分配给所有消费者。然后,每个消费者被分配一个主题或分区。要更改 PartitionAssignor,您可以在提供给 DefaultKafkaConsumerFactory 的属性中设置 partition.assignment.strategy 消费者属性 (ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)。 在使用 Spring Boot 时,您可以按如下方式分配策略:

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

当容器属性被配置为 TopicPartitionOffset 时,ConcurrentMessageListenerContainerTopicPartitionOffset 实例分配给委托 KafkaMessageListenerContainer 实例。

如果(例如)提供了六个 TopicPartitionOffset 实例,并且 concurrency3;每个容器获取两个分区。对于五个 TopicPartitionOffset 实例,两个容器获取两个分区,第三个容器获取一个分区。如果 concurrency 大于 TopicPartition 的数量,concurrency 将向下调整,使得每个容器获取一个分区。

client.id 属性(如果已设置)将附加 -n,其中 n 是与并发相对应的消费者实例。当启用 JMX 时,这对于为 MBean 提供唯一名称是必需的。

从 1.3 版本开始,MessageListenerContainer 提供对底层 KafkaConsumer 指标的访问。对于 ConcurrentMessageListenerContainermetrics() 方法返回所有目标 KafkaMessageListenerContainer 实例的指标。指标被分组到 Map<MetricName, ? extends Metric> 中,其中 client-id 提供给底层 KafkaConsumer

从 2.3 版本开始,ContainerProperties 提供了一个 idleBetweenPolls 选项,以便监听器容器中的主循环在 KafkaConsumer.poll() 调用之间休眠。实际睡眠间隔是从提供的选项和 max.poll.interval.ms 消费者配置与当前记录批处理时间之间的差异中选择的最小值。

Committing Offsets

提供了多种提交偏移量选项。如果 enable.auto.commit 消费者属性为 true,那么 Kafka 根据其配置自动提交偏移量。如果它为 false,容器支持几种 AckMode 设置(在下一列表中描述)。默认的 AckModeBATCH。从 2.3 版本开始,除非在配置中明确设置,否则框架将 enable.auto.commit 设置为 false。以前,如果未设置该属性,则使用 Kafka 默认值 (true)。

消费者 poll() 方法返回一个或多个 ConsumerRecordsMessageListener 被调用一次记录。以下列表描述了容器为每个 AckMode (当未使用事务时)执行的操作:

  • AuthorizationException: 在处理完记录后,当侦听器返回时提交偏移量。

  • authExceptionRetryInterval: 在 NO_OFFSET 返回的所有记录都处理完成后,提交偏移量。

  • NO_OFFSET: 在 auto.offset.reset 返回的所有记录都处理完成后,且自上次提交后 none 超出时,提交偏移量。

  • auto.offset.reset: 在 RECORD 返回的所有记录都处理完成后,且自上次提交后已收到 BATCH 条记录时,提交偏移量。

  • none: 类似于 RECORDBATCH,但如果任一条件 poll(),则执行提交。

  • TIME: 消息侦听器负责 poll() ackTime。之后,应用与 COUNT 相同的语义。

  • poll(): 当侦听器调用 poll() 方法时,立即提交偏移量。

在使用 transactions 时,偏移量将发送至事务,并且语义等同于 RECORDBATCH,具体取决于侦听器类型(记录或批处理)。

MANUALMANUAL_IMMEDIATE 需要监听器是 AcknowledgingMessageListenerBatchAcknowledgingMessageListener。请参阅 Message Listeners

根据 syncCommits 容器属性,消费者中的 commitSync()commitAsync() 方法被使用。默认情况下 syncCommitstrue;还请参阅 setSyncCommitTimeout。请参阅 setCommitCallback 以获取异步提交的结果;默认回调是 LoggingCommitCallback,它记录错误(以及调试级别的成功)。

因为监听器容器具有自己的提交偏移量机制,它偏好 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIGfalse。从 2.3 版本开始,它会无条件地将它设置为 false,除非在消费者工厂中专门设置或容器的消费者属性覆盖。

Acknowledgment 具有以下方法:

public interface Acknowledgment {

    void acknowledge();

}

此方法赋予侦听器控制权,决定何时提交偏移量。

从 2.3 版本开始,Acknowledgment 接口有两个额外的方法 nack(long sleep)nack(int index, long sleep)。第一个与记录侦听器一起使用,第二个与批处理侦听器一起使用。为您的侦听器类型调用错误的方法将抛出一个 IllegalStateException

如果您希望使用 nack() 提交部分批处理,在使用事务时,将 AckMode 设置为 MANUAL;调用 nack() 会将已成功处理的记录的偏移量发送到该事务。

只能在调用侦听器的消费者线程上调用 nack()

使用 Out of Order Commits 时不允许出现 nack()

对于记录侦听器,当调用 nack() 时,提交任何待处理的偏移量,丢弃上次轮询中剩下的记录,并在其分区上执行搜索,以便失败的记录和未处理的记录在下次 poll() 上重新发送。可以通过设置 sleep 参数在重新发送前暂停消费者。这与在使用 DefaultErrorHandler 配置容器时抛出异常的功能类似。

nack() 暂停整个侦听器的指定睡眠时间,包括所有分配的分区。

在使用批处理侦听器时,您可以在批处理中指定发生故障的索引。当调用 nack() 时,偏移量将提交给索引前的记录,并针对失败和丢弃的记录在其分区上执行搜索,以便它们在下次 poll() 上重新发送。

有关更多信息,请参阅 Container Error Handlers

在睡眠期间暂停消费者,以便继续轮询代理以保持消费者处于活动状态。实际的睡眠时间及其解析取决于容器的 pollTimeout,默认情况下为 5 秒。最短睡眠时间等于 pollTimeout,所有睡眠时间均为其倍数。对于较短的睡眠时间或为了提高其准确性,请考虑减少容器的 pollTimeout

从 3.0.10 版本开始,批量侦听器可以在 Acknowledgment 参数上使用 acknowledge(index) 提交批处理中部分的偏移。当调用该方法时,该索引处的记录的偏移量(以及所有前面的记录)将被提交。在执行部分批量提交后调用 acknowledge() 将提交批处理其余部分的偏移量。以下限制适用:

  • AckMode.MANUAL_IMMEDIATE is required

  • 必须在线程侦听器上调用该方法

  • 侦听器必须使用 ackCount,而不是原始 COUNT_TIME

  • 索引必须在列表元素范围内

  • 索引必须大于以前调用中使用的索引

这些限制已得到强制,该方法将基于违规情况抛出 IllegalArgumentExceptionIllegalStateException

Listener Container Auto Startup

侦听器容器实现了 SmartLifecycle,且默认情况下 autoStartuptrue。容器在较晚阶段启动(Integer.MAX-VALUE - 100)。为处理来自侦听器的 data 而实现 SmartLifecycle 的其他组件应在较早阶段启动。- 100 为后面的阶段留出了余地,允许在容器启动之后自动启动组件。