Listener Container Properties

Table 1. ContainerProperties Properties
Property Default Description

[id="ackCount"]<<`ackCount`,ackCount>>

1

提交挂起偏移量之前记录的数量,当 ackModeCOUNTCOUNT_TIME 时。

[id="adviceChain"]<<`adviceChain`,adviceChain>>

null

围绕建议,包装消息侦听器的一个 Advice 对象链(例如 MethodInterceptor ),按顺序调用。

[id="ackMode"]<<`ackMode`,ackMode>>

BATCH

控制对偏移量进行提交的频率——参见 Committing Offsets

[id="ackTime"]<<`ackTime`,ackTime>>

5000

ackModeTIMECOUNT_TIME 时,提交挂起偏移量后的毫秒数。

[id="assignmentCommitOption"]<<`assignmentCommitOption`,assignmentCommitOption>>

LATEST_ONLY _NO_TX

是否在分配时提交初始位置;默认情况下,仅当 ConsumerConfig.AUTO_OFFSET_RESET_CONFIGlatest 时才会提交初始偏移量,并且即使存在事务管理器,它也不会在事务中运行。有关可用选项的更多信息,请参阅 ContainerProperties.AssignmentCommitOption 的 JavaDoc。

[id="asyncAcks"]<<`asyncAcks`,asyncAcks>>

false

启用乱序提交(参见 Manually Committing Offsets);将暂停使用者并且提交延迟直到填满缺口。

[id="authExceptionRetryInterval"]<<`authExceptionRetryInterval`,authExceptionRetryInterval>>

null

当 Kafka 客户端抛出 AuthenticationExceptionAuthorizationException 时,如果非空,则在轮询之间休眠的 Duration。如果为 null,则此类异常被视为致命错误,并且容器将停止。

[id="batchRecoverAfterRollback"]<<`batchRecoverAfterRollback`,batchRecoverAfterRollback>>

false

设置为 `true`以启用批处理恢复,参见 After Rollback Processor

[id="clientId"]<<`clientId`,clientId>>

(empty string)

client.id 使用者属性的前缀。覆盖使用者工厂 client.id 属性;在并发容器中,为每个使用者实例添加 -n 作为后缀。

[id="checkDeserExWhenKeyNull"]<<`checkDeserExWhenKeyNull`,checkDeserExWhenKeyNull>>

false

设置为 true,以在收到 null key 时始终检查是否存在 DeserializationException 标头。当使用者代码无法确定已配置的 ErrorHandlingDeserializer 时很有用,例如使用委派反序列化器时。

[id="checkDeserExWhenValueNull"]<<`checkDeserExWhenValueNull`,checkDeserExWhenValueNull>>

false

设置为 true,以在收到 null value 时始终检查是否存在 DeserializationException 标头。当使用者代码无法确定已配置的 ErrorHandlingDeserializer 时很有用,例如使用委派反序列化器时。

[id="commitCallback"]<<`commitCallback`,commitCallback>>

null

syncCommitsfalse 时,如果存在,则在提交完成后调用回调。

[id="commitLogLevel"]<<`commitLogLevel`,commitLogLevel>>

DEBUG

用于提交偏移量的日志的日志记录级别。

[id="consumerRebalanceListener"]<<`consumerRebalanceListener`,consumerRebalanceListener>>

null

再平衡监听器;参见 Rebalancing Listeners

[id="commitRetries"]<<`commitRetries`,commitRetries>>

3

使用 syncCommits 设置为 true 时设置重试次数 RetriableCommitFailedException。默认值为 3(共尝试 4 次)。

[id="consumerStartTimeout"]<<`consumerStartTimeout`,consumerStartTimeout>>

30s

在记录错误之前等待消费者启动的时间;例如,如果您使用的是线程不足的任务执行器,则可能会发生这种情况。

[id="deliveryAttemptHeader"]<<`deliveryAttemptHeader`,deliveryAttemptHeader>>

false

See Delivery Attempts Header.

[id="eosMode"]<<`eosMode`,eosMode>>

V2

仅一次语义模式;参见 Exactly Once Semantics

[id="fixTxOffsets"]<<`fixTxOffsets`,fixTxOffsets>>

false

在使用事务性生产者生成记录时,并且使用者定位于分区的末尾,则由于用于指示事务提交/回滚以及可能存在回滚记录的伪记录,可能会错误地将滞后报告为大于零。这在功能上不会影响使用者,但是一些用户对“滞后”非零表示担忧。将此属性设置为 true,容器将更正这种错误报告的偏移量。该检查在下次轮询之前执行,以避免向提交处理添加显著的复杂性。在撰写本文时,仅当使用者使用 `isolation.level=read_committed`进行配置且 `max.poll.records`大于 1 时,才会更正滞后。如需了解更多信息,请参阅 KAFKA-10683

[id="groupId"]<<`groupId`,groupId>>

null

覆盖消费者 group.id 属性;自动由 @KafkaListener idgroupId 属性设置。

[id="idleBeforeDataMultiplier"]<<`idleBeforeDataMultiplier`,idleBeforeDataMultiplier>>

5.0

在收到任何记录之前,对 idleEventInterval 应用的乘数。在收到记录后,将不再应用乘数。从 2.8 版本开始提供。

[id="idleBetweenPolls"]<<`idleBetweenPolls`,idleBetweenPolls>>

0

用于通过在轮询之间使线程休眠来减慢交付速度。处理一批记录的时间加上这个值必须小于 max.poll.interval.ms 消费者属性。

[id="idleEventInterval"]<<`idleEventInterval`,idleEventInterval>>

null

设置后,将启用 `ListenerContainerIdleEvent`s, see Application Events and Detecting Idle and Non-Responsive Consumers. Also see `idleBeforeDataMultiplier`的发布。

[id="idlePartitionEventInterval"]<<`idlePartitionEventInterval`,idlePartitionEventInterval>>

null

设置后,将启用 ListenerContainerIdlePartitionEvent 的发布,参见 Application EventsDetecting Idle and Non-Responsive Consumers

[id="kafkaConsumerProperties"]<<`kafkaConsumerProperties`,kafkaConsumerProperties>>

None

用于覆盖在消费者工厂上配置的任何任意消费者属性。

[id="kafkaAwareTransactionManager"]<<`kafkaAwareTransactionManager`,kafkaAwareTransactionManager>>

null

See Transactions.

[id="listenerTaskExecutor"]<<`listenerTaskExecutor`,listenerTaskExecutor>>

SimpleAsyncTaskExecutor

用于运行消费者线程的任务执行器。默认执行器创建名为 &lt;name&gt;-C-n 的线程;在 KafkaMessageListenerContainer 中,该名称是 Bean 名称;在 ConcurrentMessageListenerContainer 中,除了 -n,名称是 Bean 名称的后缀,其中 n 是为每个子容器递增的。

[id="logContainerConfig"]<<`logContainerConfig`,logContainerConfig>>

false

设置为 true 以 INFO 级别记录所有容器属性。

[id="messageListener"]<<`messageListener`,messageListener>>

null

The message listener.

[id="micrometerEnabled"]<<`micrometerEnabled`,micrometerEnabled>>

true

是否为使用者线程维持 Micrometer 定时器。

[id="micrometerTags"]<<`micrometerTags`,micrometerTags>>

empty

要添加到 Micrometer 度量的静态标记图。

[id="micrometerTagsProvider"]<<`micrometerTagsProvider`,micrometerTagsProvider>>

null

一个根据使用者记录提供动态标记的函数。

[id="missingTopicsFatal"]<<`missingTopicsFatal`,missingTopicsFatal>>

false

如果某些主题不存在于代理中,则当该值为 true 时会阻止容器启动。

[id="monitorInterval"]<<`monitorInterval`,monitorInterval>>

30s

检查使用者线程状态的频率(以秒为单位)。请参阅 noPollThresholdpollTimeout

[id="noPollThreshold"]<<`noPollThreshold`,noPollThreshold>>

3.0

乘以 pollTimeOut 来决定是否发布 NonResponsiveConsumerEvent。请参阅 monitorInterval

[id="observationConvention"]<<`observationConvention`,observationConvention>>

null

设置后,根据使用者记录中的信息将动态标记添加到定时器和跟踪中。

[id="observationEnabled"]<<`observationEnabled`,observationEnabled>>

false

设置为 true 以通过 Micrometer 启用观察。

[id="offsetAndMetadataProvider"]<<`offsetAndMetadataProvider`,offsetAndMetadataProvider>>

null

OffsetAndMetadata 的提供者;默认情况下,该提供者会创建一个没有元数据的偏移量和元数据。该提供者提供一种自定义元数据的方法。

[id="onlyLogRecordMetadata"]<<`onlyLogRecordMetadata`,onlyLogRecordMetadata>>

false

设置为 false 以记录完整使用者记录(在错误、调试日志等中),而不仅仅是 topic-partition@offset

[id="pauseImmediate"]<<`pauseImmediate`,pauseImmediate>>

false

在容器暂停的情况下,在处理完当前记录后停止处理,而不是处理完前一次轮询中的所有记录后停止处理;其余的记录将保留在内存中并在容器恢复后传递到使用者。

[id="pollTimeout"]<<`pollTimeout`,pollTimeout>>

5000

传入 Consumer.poll() 中的超时,单位为毫秒。

[id="pollTimeoutWhilePaused"]<<`pollTimeoutWhilePaused`,pollTimeoutWhilePaused>>

100

在容器处于暂停状态时传入 Consumer.poll() 中的超时(单位为毫秒)。

[id="restartAfterAuthExceptions"]<<`restartAfterAuthExceptions`,restartAfterAuthExceptions>>

false

如果容器因认证/授权例外而停止,则重新启动容器。

[id="scheduler"]<<`scheduler`,scheduler>>

ThreadPoolTaskScheduler

运行消费者监控器任务的计划程序。

[id="shutdownTimeout"]<<`shutdownTimeout`,shutdownTimeout>>

10000

stop() 方法被阻止的最长时间(以毫秒为单位),直到所有消费者停止并在发布容器停止事件之前。

[id="stopContainerWhenFenced"]<<`stopContainerWhenFenced`,stopContainerWhenFenced>>

false

如果抛出 ProducerFencedException,则停止监听器容器。如需了解更多信息,请参阅 After-rollback Processor

[id="stopImmediate"]<<`stopImmediate`,stopImmediate>>

false

当容器停止时,在处理完当前记录后停止处理,而不是在处理完前一次轮询的所有记录后停止处理。

[id="subBatchPerPartition"]<<`subBatchPerPartition`,subBatchPerPartition>>

See desc.

当使用批处理侦听器时,如果这是 true,则会以拆分成一个批处理结果调用侦听器,每个分区一个。默认 false

[id="syncCommitTimeout"]<<`syncCommitTimeout`,syncCommitTimeout>>

null

syncCommitstrue 时使用的超时。如果没有设置,容器将尝试确定 default.api.timeout.ms 消费者属性并使用该属性;否则,它将使用 60 秒。

[id="syncCommits"]<<`syncCommits`,syncCommits>>

true

是否对偏移量使用同步或异步提交;请参见 commitCallback

[id="topics"]<<`topics` topicPattern topicPartitions,topics>>

n/a

已配置的主题、主题模式或明确分配的主题/分区。相互排斥;至少提供一个;由 ContainerProperties 构造函数强制执行。

[id="transactionManager"]<<`transactionManager`,transactionManager>>

null

自 3.2 版开始弃用,参见 [kafkaAwareTransactionManager]Other transaction managers

Table 2. AbstractListenerContainer Properties
Property Default Description

[id="afterRollbackProcessor"]<<`afterRollbackProcessor`,afterRollbackProcessor>>

DefaultAfterRollbackProcessor

在事务回滚后调用的 AfterRollbackProcessor

[id="applicationEventPublisher"]<<`applicationEventPublisher`,applicationEventPublisher>>

application context

The event publisher.

[id="batchErrorHandler"]<<`batchErrorHandler`,batchErrorHandler>>

See desc.

Deprecated - see commonErrorHandler.

[id="batchInterceptor"]<<`batchInterceptor`,batchInterceptor>>

null

设置一个 BatchInterceptor,以便在调用批处理侦听器之前调用;不适用于记录侦听器。另请参见 interceptBeforeTx

[id="beanName"]<<`beanName`,beanName>>

bean name

容器的 Bean 名称;对于子容器,后缀为 -n

[id="commonErrorHandler"]<<`commonErrorHandler`,commonErrorHandler>>

See desc.

当在使用 DefaultAfterRollbackProcessor`时提供 `transactionManager`时,为 `DefaultErrorHandler`或 `null。参见 Container Error Handlers

[id="containerProperties"]<<`containerProperties`,containerProperties>>

ContainerProperties

The container properties instance.

[id="groupId2"]<<`groupId`,groupId2>>

See desc.

如果存在,则为 containerProperties.groupId,否则为消费器工厂中的 group.id 属性。

[id="interceptBeforeTx"]<<`interceptBeforeTx`,interceptBeforeTx>>

true

确定在事务开始之前或之后调用 recordInterceptor

[id="listenerId"]<<`listenerId`,listenerId>>

See desc.

用户配置容器的 Bean 名称或 @KafkaListenerid 属性。

[id="listenerInfo"]<<`listenerInfo`,listenerInfo>>

null

KafkaHeaders.LISTENER_INFO 标头中填充的值。有了 @KafkaListener,此值将从 info 中获取此属性。此标头可用于多个位置,例如 RecordInterceptorRecordFilterStrategy 和侦听器代码本身。

[id="pauseRequested"]<<`pauseRequested`,pauseRequested>>

(read only)

如果请求消费者暂停,则为 True。

[id="recordInterceptor"]<<`recordInterceptor`,recordInterceptor>>

null

在调用记录侦听器之前设置一个要调用的 RecordInterceptor;不适用于批处理侦听器。另请参阅 interceptBeforeTx

[id="topicCheckTimeout"]<<`topicCheckTimeout`,topicCheckTimeout>>

30s

missingTopicsFatal 的容器属性为 true 时,等待 describeTopics 操作完成的时间(以秒为单位)。

Table 3. KafkaMessageListenerContainer Properties
Property Default Description

[id="assignedPartitions"]<<`assignedPartitions`,assignedPartitions>>

(read only)

当前分配给此容器的分区(显示或不显示)。

[id="assignedPartitionsByClientId"]<<`assignedPartitionsByClientId`,assignedPartitionsByClientId>>

(read only)

当前分配给此容器的分区(显示或不显示)。

[id="clientIdSuffix"]<<`clientIdSuffix`,clientIdSuffix>>

null

并发容器用来给每个子容器的消费者一个唯一 client.id

[id="containerPaused"]<<`containerPaused`,containerPaused>>

n/a

如果暂停已被请求且消费者实际上已暂停,则为 True。

Table 4. ConcurrentMessageListenerContainer Properties
Property Default Description

[id="alwaysClientIdSuffix"]<<`alwaysClientIdSuffix`,alwaysClientIdSuffix>>

true

concurrency 仅为 1 时,将此项设置为 false 以禁止向 client.id 消费者属性添加后缀。

[id="assignedPartitions2"]<<`assignedPartitions`,assignedPartitions2>>

(read only)

当前分配给这个容器的子 KafkaMessageListenerContainer 的分区的集合(明示或默示)。

[id="assignedPartitionsByClientId2"]<<`assignedPartitionsByClientId`,assignedPartitionsByClientId2>>

(read only)

当前分配给此容器的子 KafkaMessageListenerContainer`s (explicitly or not), keyed by the child container&#8217;s consumer&#8217;s `client.id 属性的分区。

[id="concurrency"]<<`concurrency`,concurrency>>

1

要管理的子 KafkaMessageListenerContainer 数量。

[id="containerPaused2"]<<`containerPaused`,containerPaused2>>

n/a

如果已请求暂停且所有子容器的使用者均实际已暂停,则为 True。

[id="containers"]<<`containers`,containers>>

n/a

指向所有子 KafkaMessageListenerContainer 的引用。