Change History

What’s New in 3.1 Since 3.0

本部分涵盖了从版本 3.0 到版本 3.1 所做的更改。要了解较早版本的更改,请参阅 Change History

Kafka Client Version

此版本需要 3.6.0“kafka-clients”。

EmbeddedKafkaBroker

现在提供了一个附加的实现,以使用 Kraft 代替 Zookeeper。有关更多信息,请参见 Embedded Kafka Broker

JsonDeserializer

当出现反序列化异常时,“SerializationException”消息不再包含具有“Can’t deserialize data [[123, 34, 98, 97, 122, …”形式的数据;每个数据字节的数值数组没有用,而且对于大体积数据,它可能会过于冗长。与“ErrorHandlingDeserializer”配合使用时,发送到错误处理程序的“DeserializationException”包含“data”属性,其中包含无法反序列化的原始数据。与“ErrorHandlingDeserializer”搭配使用时,“KafkaConsumer”将持续地对同一记录发出异常,显示 Jackson 抛出的主题/分区/偏移和原因。

ContainerPostProcessor

可以通过在 @KafkaListener 注解中指定 ContainerPostProcessor 的 bean 名称,对监听器容器应用后处理。这发生在容器已创建且在容器工厂中配置的任何已配置 ContainerCustomizer 之后。参见 Container Factory 了解更多信息。

ErrorHandlingDeserializer

现在,您可以向此反序列化程序添加 Validator;如果委托 Deserializer 成功反序列化该对象,但该对象无法通过验证,则会抛出一个与反序列化异常发生的类似异常。这样就可以将原始原始数据传递到错误处理程序。有关更多信息,请参见 Using ErrorHandlingDeserializer

Retryable Topics

@RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC) 时,将后缀 -retry-5000 更改为 -retry。如果您想保留后缀 -retry-5000,请使用 @RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2")。参见 Topic Naming 了解更多信息。

Listener Container Changes

使用 null 消费者 group.id 手动分配分区时,AckMode 现在会自动强制转换为 MANUAL。有关更多信息,请参见 Manually Assigning All Partitions

What’s New in 3.0 Since 2.9

Kafka Client Version

此版本需要 3.3.1“kafka-clients”。

Exactly Once Semantics

“EOSMode.V1”(又名“ALPHA”)不再受支持。

使用事务时,最低代理版本是 2.5。

有关更多信息,请参见 Exactly Once SemanticsKIP-447

Observation

现在支持使用 Micrometer 为计时器和跟踪启用观察。有关更多信息,请参见 Observation

Native Images

提供了创建本机映像的支持。有关更多信息,请参见 Native Images

Global Single Embedded Kafka

EmbeddedKafkaBroker 中嵌入的 Kafka 现在可以作为一个单个全局实例为整个测试计划启动。有关更多信息,请参见 Using the Same Broker(s) for Multiple Test Classes

Retryable Topics Changes

此功能不再被视为实验性(就其 API 而言),该功能本身自 2.7 以来一直受支持,但 API 变更中断的可能性高于正常情况。

此次版本中 Non-Blocking Retries 基础设施 bean 的引导已更改,以避免某些应用程序在应用程序初始化方面遇到的一些时序问题。

您现在可以为重试容器设置不同的“concurrency”;默认情况下,“concurrency”与主容器相同。

“@RetryableTopic”现在可用作自定义注释的元注释,包括对“@AliasFor”属性的支持。

参见 Configuration 了解更多信息。

重试主题的默认复制因子现在为“-1”(使用代理默认值)。如果您的代理早于 2.4 版本,您现在需要显式地设置该属性。

你现在可以在同一应用程序上下文中的同一主题上配置多个 @RetryableTopic 监听器。以前,这是不可能的。参见 Multiple Listeners, Same Topic(s) 了解更多信息。

RetryTopicConfigurationSupport 中有重大 API 更改;具体而言,如果你覆盖 destinationTopicResolverkafkaConsumerBackoffManager 和/或 retryTopicConfigurer 的 bean 定义方法;现在这些方法需要一个 ObjectProvider<RetryTopicComponentFactory> 参数。

Listener Container Changes

现在,容器会发布与消费者认证和授权失败相关的事件。参见 Application Events 了解更多信息。

你现在可以自定义消费者线程使用的线程名称。参见 Container Thread Naming 了解更多信息。

容器属性 restartAfterAuthException 已添加。参见 Listener Container Properties 了解更多信息。

KafkaTemplate Changes

此类返回的 future 现在是 CompletableFuture,而不是 ListenableFuture。请参见 Using KafkaTemplate

ReplyingKafkaTemplate Changes

此类返回的 future 现在是 CompletableFuture,而不是 ListenableFuture。请参见 Using ReplyingKafkaTemplateRequest/Reply with Message<?> s

@KafkaListener Changes

你现在可以使用自定义关联标头,它会在任何回复消息中回显。有关更多信息,请参见 Using ReplyingKafkaTemplate结尾处的注释。

你现在可以在处理整个批次之前手动提交批次的部分内容。有关更多信息,请参见 Committing Offsets

KafkaHeaders Changes

KafkaHeaders 中 2.9.x 版本中废弃的四个常量现已移除。

  • MESSAGE_KEY 替换为 KEY

  • PARTITION_ID 替换为 PARTITION

同样,RECEIVED_MESSAGE_KEYRECEIVED_KEY 代替,RECEIVED_PARTITION_IDRECEIVED_PARTITION 代替。

Testing Changes

3.0.7 版本引入了一个 MockConsumerFactory`和 `MockProducerFactory。有关更多信息,请参见 Mock Consumer and Producer

从 3.0.10 版本开始,嵌入式 Kafka 代理默认将 Spring Boot 属性 spring.kafka.bootstrap-servers 设置为嵌入式代理的地址。

What’s New in 2.9 since 2.8

Kafka Client Version

此版本需要 3.2.0 kafka-clients

Error Handler Changes

现在可以将 `DefaultErrorHandler`配置为暂停容器一次轮询,并使用前一次轮询的剩余结果,而不是寻求剩余记录的偏移量。有关更多信息,请参见 DefaultErrorHandler

`DefaultErrorHandler`现在有一个 `BackOffHandler`属性。有关更多信息,请参见 Back Off Handlers

Listener Container Changes

interceptBeforeTx 现在适用于所有事务管理器(以前仅适用于 KafkaAwareTransactionManager)。见 [interceptBeforeTx]

提供了一个新的容器属性 pauseImmediate ,它允许容器在处理当前记录后暂停消费者,而不是在处理上一次轮询的所有记录后暂停消费者。见 [pauseImmediate]

与消费者认证和授权相关的事件

Header Mapper Changes

现在你可以配置要映射哪一个入站标头。也适用于版本 2.8.8 或更高版本。有关更多信息,参见 Message Headers

KafkaTemplate Changes

在 3.0 中,此类返回的 future 将是 CompletableFuture,而不是 ListenableFuture。有关在使用此版本时如何转换的帮助,请参见 Using KafkaTemplate

ReplyingKafkaTemplate Changes

模板现在提供了一种方法来等待回复容器上的分配,以便在回复容器初始化之前发送请求时避免出现竞争。也可以在 2.8.8 或更高版本中使用。请参见 Using ReplyingKafkaTemplate

在 3.0 中,此类返回的 future 将是 CompletableFuture,而不是 ListenableFuture。有关在使用此版本时如何转换的帮助,请参见 Using ReplyingKafkaTemplateRequest/Reply with Message<?> s

What’s New in 2.8 Since 2.7

本节涵盖了从 2.7 版到 2.8 版所做的更改。有关早期版本中的更改,请参见 Change History

Kafka Client Version

此版本要求 3.0.0 kafka-clients

Package Changes

与类型映射相关的类和接口已从 …​support.converter 移至 …​support.mapping

  • AbstractJavaTypeMapper

  • ClassMapper

  • DefaultJackson2JavaTypeMapper

  • Jackson2JavaTypeMapper

Out of Order Manual Commits

现可配置侦听器容器乱序接受手动偏移提交(通常异步)。该容器会延迟提交,直到已确认丢失的偏移量。有关更多信息,参见 Manually Committing Offsets

@KafkaListener Changes

现在可以在方法本身上指定侦听器方法是否是批处理侦听器。这允许将同一容器工厂用于记录和批处理侦听器。

有关更多信息,请参阅 [batch-listeners]

批处理侦听器现在可以处理转换异常。

有关更多信息,请参见 Conversion Errors with Batch Error Handlers

RecordFilterStrategy 在与批处理侦听器一起使用时,现在可以在一个调用中过滤整个批处理。有关更多信息,请参阅 [batch-listeners] 末尾的注释。

@KafkaListener 注释现在具有 filter 属性,用于仅为该侦听器覆盖容器工厂的 RecordFilterStrategy

@KafkaListener`注释现在有 `info`属性;这用于填充新的侦听器容器属性 `listenerInfo。然后将其用于填充每个记录中的 KafkaHeaders.LISTENER_INFO`标头,该标头可在 `RecordInterceptor、`RecordFilterStrategy`或侦听器本身中使用。有关更多信息,请参见 Listener Info HeaderAbstract Listener Container Properties

KafkaTemplate Changes

现在你可以接收单条记录,因为给定了主题、分区和偏移量。有关更多信息,参见 Using KafkaTemplate to Receive

CommonErrorHandler Added

用于记录和批次侦听器的旧版 GenericErrorHandler`及其子接口层次结构已被新的单接口 `CommonErrorHandler`取代,其实现对应于 `GenericErrorHandler`的大多数旧版实现。有关更多信息,请参见 Container Error HandlersMigrating Custom Legacy Error Handler Implementations to `CommonErrorHandler

Listener Container Changes

现在,interceptBeforeTx 容器属性默认情况下为 true

authorizationExceptionRetryInterval 属性已重命名为 authExceptionRetryInterval,现在除了以前的 AuthorizationException 之外,还适用于 AuthenticationException。除非设置此属性,否则这两个异常都被视为致命异常,并且容器会默认停止。

Serializer/Deserializer Changes

现在提供了 DelegatingByTopicSerializer`和 `DelegatingByTopicDeserializer。有关更多信息,请参见 Delegating Serializer and Deserializer

DeadLetterPublishingRecover Changes

现在,stripPreviousExceptionHeaders 属性默认值为 true

现在,可以使用多种技术来自定义添加到输出记录中的标题。

有关更多信息,请参见 Managing Dead Letter Record Headers

Retryable Topics Changes

现在你可以对可重试主题和不可重试主题使用相同的工厂。有关更多信息,参见 Specifying a ListenerContainerFactory

现在有一个可管理的致命异常全局列表,它将使失败的记录直接进入 DLT。请参阅 Exception Classifier以了解如何管理它。

你现可同时使用阻塞和非阻塞重试。有关更多信息,参见 Combining Blocking and Non-Blocking Retries

现在,使用可重试主题功能时引发的 KafkaBackOffException 记录在 DEBUG 级别。如果你需要将日志级别更改回 WARN 或将其设置为任何其他级别,请参见 Changing KafkaBackOffException Logging Level

Changes between 2.6 and 2.7

Kafka Client Version

此版本需要 2.7.0 kafka-clients。自版本 2.7.1 起,它与 2.8.0 客户端兼容;请参见 Override Spring Boot Dependencies

Non-Blocking Delayed Retries Using Topics

此版本添加了此显著的新功能。如果严格顺序并不重要,则可以将失败的传送发送到另一个主题以便稍后使用。可以配置一系列这些重试主题,并增加延迟时间。有关详细信息,请参见 Non-Blocking Retries

Listener Container Changes

现在,onlyLogRecordMetadata 容器属性默认值为 true

现在可以使用新的容器属性 stopImmediate

有关详细信息,请参见 Listener Container Properties

在发送尝试(例如 SeekToCurrentErrorHandlerDefaultAfterRollbackProcessor)之间使用 BackOff 的错误处理程序现在将在容器停止后立即退出后退间隔,而不是延迟停止。

现在可以将扩展了 FailedRecordProcessor 的错误处理程序和回滚后处理程序配置为一个或多个 RetryListener,以接收有关重试和恢复进度的信息。

RecordInterceptor`现在具有监听器返回后(正常返回或通过抛出异常)调用的附加方法。它还具有子接口 `ConsumerAwareRecordInterceptor.此外,现在还存在用于批处理监听器的 BatchInterceptor。有关详细信息,请参见 Message Listener Containers

@KafkaListener Changes

您现在可以验证 @KafkaHandler`方法(类级别监听器)的 payload 参数。有关详细信息,请参见 @KafkaListener` @Payload Validation

现在您可以在 MessagingMessageConverterBatchMessagingMessageConverter 上设置 rawRecordHeader 属性,导致原始 ConsumerRecord 添加到已转换的 Message<?> 中。如果要在侦听器错误处理程序中使用 DeadLetterPublishingRecoverer,这很有用。有关详细信息,请参阅 Listener Error Handlers

您现在可以在应用程序初始化期间修改 @KafkaListener`注释。有关详细信息,请参见 @KafkaListener` Attribute Modification

DeadLetterPublishingRecover Changes

现在,如果键和值都无法反序列化,则原始值将发布到 DLT。以前,该值已填充,但密钥 DeserializationException 仍保留在标题中。如果您对恢复程序进行了子类化并覆盖了 createProducerRecord 方法,就会有破坏性的 API 更改。

此外,恢复程序在发布到目标解析器选择的特定分区之前,将验证该分区实际存在。

有关详细信息,请参阅 Publishing Dead-letter Records

ChainedKafkaTransactionManager is Deprecated

有关详细信息,请参见 Transactions

ReplyingKafkaTemplate Changes

现在有一种检查回复并让将来异常失败的机制,如果某个条件存在。

已添加对发送和接收 spring-messaging Message<?> 的支持。

有关详细信息,请参阅 Using ReplyingKafkaTemplate

Kafka Streams Changes

默认情况下,现在将 StreamsBuilderFactoryBean 配置为不清理本地状态。有关详细信息,请参阅 Configuration

KafkaAdmin Changes

已添加新的方法 createOrModifyTopicsdescribeTopics。已添加 KafkaAdmin.NewTopics 以便于在一个 Bean 中配置多个主题。有关更多信息,请参见 [configuring-topics]

MessageConverter Changes

现在可以向 MessagingMessageConverter 添加 spring-messaging SmartMessageConverter,从而允许基于 contentType 标头的内容协商。有关详细信息,请参阅 Spring Messaging Message Conversion

Sequencing @KafkaListener s

有关详细信息,请参见 Starting @KafkaListener s in Sequence

ExponentialBackOffWithMaxRetries

提供了一个新的 BackOff 实现,可以更方便地配置最大重试次数。有关详细信息,请参阅 ExponentialBackOffWithMaxRetries Implementation

Conditional Delegating Error Handlers

这些新的错误处理程序可以配置为根据异常类型委托给不同的错误处理程序。有关详细信息,请参阅 Delegating Error Handler

Changes between 2.5 and 2.6

Kafka Client Version

此版本需要 2.6.0 kafka-clients

Listener Container Changes

默认 EOSMode`现在是 `BETA。有关详细信息,请参见 Exactly Once Semantics

如果恢复失败,各种错误处理程序(扩展 FailedRecordProcessor)和 DefaultAfterRollbackProcessor 现在会重置 BackOff。此外,现在可以根据失败的记录和/或异常选择要使用的 BackOff

您现在可以在容器属性中配置 adviceChain。有关详细信息,请参见 Listener Container Properties

当容器配置为发布 ListenerContainerIdleEvent 时,现在在发布空闲事件后收到记录时,它将发布 ListenerContainerNoLongerIdleEvent。有关详细信息,请参阅 Application EventsDetecting Idle and Non-Responsive Consumers

@KafkaListener Changes

当使用手动分区分配时,你现在可以指定一个通配符,用于确定哪些分区应重置为初始偏移量。此外,如果侦听器实现 ConsumerSeekAware,将自动分配后调用 onPartitionsAssigned()。(在 2.5.5 版本中也已添加)。有关详细信息,请参阅 Explicit Partition Assignment

已将便捷方法添加到 AbstractConsumerSeekAware 中,以简化查找。有关更多信息,请参见 [seek]

ErrorHandler Changes

FailedRecordProcessor 的子类(例如 SeekToCurrentErrorHandlerDefaultAfterRollbackProcessorRecoveringBatchErrorHandler)现在可以配置为如果异常与此记录之前发生的异常类型不同,则重置重试状态。

Producer Factory Changes

你现可为制作者设置最大年龄,之后将关闭再创建他们。参见 Transactions 了解更多信息。

现在可以在创建 DefaultKafkaProducerFactory 后更新其配置映射。这可能很有用,例如,如果凭据更改后必须更新 SSL 密钥/可信存储位置。有关更多信息,请参阅 xref:kafka/sending-messages.adoc#producer-factory[Using `DefaultKafkaProducerFactory`

Changes between 2.4 and 2.5

本节介绍了从 2.4 版到 2.5 版所做的更改。有关早期版本中的更改,请参阅 Change History

Consumer/Producer Factory Changes

默认消费者和生产者工厂现在可以在创建或关闭消费者或生产者时调用回调。提供了用于本机微米指标的实现。有关详细信息,请参阅 Factory Listeners

你现可以在运行时更改引导服务器属性,实现故障转移到另一个 Kafka 集群。参见 Connecting to Kafka 了解更多信息。

StreamsBuilderFactoryBean Changes

现在,工厂 bean 可以在创建或销毁 KafkaStreams 时调用回调。提供了用于本机微米指标的实现。有关详细信息,请参阅 KafkaStreams Micrometer Support

Kafka Client Version

此版本需要 2.5.0 kafka-clients

Class/Package Changes

SeekUtils 已从 o.s.k.support 包移至 o.s.k.listener

Delivery Attempts Header

现在有一个选项,可以在使用某些错误处理程序和回滚处理器后添加跟踪传递尝试的标头。有关详细信息,请参阅 Delivery Attempts Header

@KafkaListener Changes

如果 @KafkaListener 返回类型为 Message<?>,现在将在需要时自动填充默认答复标头。有关详细信息,请参阅 Reply Type Message<?>

当传入记录的键为 null 时,不再使用 null 值填充 KafkaHeaders.RECEIVED_MESSAGE_KEY ;而是完全省略该标题。

@KafkaListener 方法现在可以指定 ConsumerRecordMetadata 参数,而不是针对元数据(例如主题、分区等)使用离散标头。有关详细信息,请参阅 Consumer Record Metadata

Listener Container Changes

assignmentCommitOption 容器属性现在默认为 LATEST_ONLY_NO_TX。参见 Listener Container Properties 了解更多信息。

在使用事务时,subBatchPerPartition 容器属性现在默认为 true。参见 Transactions 了解更多信息。

现在提供新的 RecoveringBatchErrorHandler

现已支持静态组成员资格。参见 Message Listener Containers 了解更多信息。

如果在配置增量/协作式重新平衡时,使用非致命的 RebalanceInProgressException 提交偏移量失败,容器将在重新平衡完成后尝试为分配给该实例的分区重新提交偏移量。

现在,默认错误处理程序对于记录侦听器是 SeekToCurrentErrorHandler,对于批处理侦听器是 RecoveringBatchErrorHandler。有关详细信息,请参阅 Container Error Handlers

现在,您可以控制标准错误处理程序故意抛出的异常所记录的级别。有关详细信息,请参阅 Container Error Handlers

已添加 getAssignmentsByClientId() 方法,使得更容易确定并发容器中的哪些消费者分配了哪些分区。参见 Listener Container Properties 了解更多信息。

你现可在错误中抑制日志记录整个 ConsumerRecord s、调试日志等。参见 Listener Container Properties 中的 onlyLogRecordMetadata

KafkaTemplate Changes

KafkaTemplate 现可维护微米计时器。参见 Monitoring 了解更多信息。

现在可以使用 ProducerConfig 属性配置 KafkaTemplate 来覆盖生产者工厂中的属性。有关更多信息,请参阅 xref:kafka/sending-messages.adoc#kafka-template[Using `KafkaTemplate`

现在已提供一个 RoutingKafkaTemplate。有关更多信息,请参阅 xref:kafka/sending-messages.adoc#routing-template[Using `RoutingKafkaTemplate`

现在可以使用 KafkaSendCallback 代替 ListenerFutureCallback 获取更窄的异常,这使得提取失败的 ProducerRecord 变得更加容易。有关更多信息,请参阅 xref:kafka/sending-messages.adoc#kafka-template[Using `KafkaTemplate`

Kafka String Serializer/Deserializer

现在提供了新的 ToStringSerializer/StringDeserializer,以及一个关联的 SerDe。有关详细信息,请参阅 String serialization

JsonDeserializer

JsonDeserializer 现在具有确定反序列化类型更大的灵活性。有关详细信息,请参阅 Using Methods to Determine Types

Delegating Serializer/Deserializer

现在,当出站记录没有头时,`DelegatingSerializer`能处理“standard”类型。详见 Delegating Serializer and Deserializer

Testing Changes

现在,`KafkaTestUtils.consumerProps()`帮助器记录将 `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG`设为 `earliest`作为默认设置。详见 JUnit

Changes between 2.3 and 2.4

Kafka Client Version

此版本需要 2.4.0 kafka-clients 或更高版本,并支持新的增量重新平衡功能。

ConsumerAwareRebalanceListener

ConsumerRebalanceListener 一样,此界面现在有一个附加的方法 onPartitionsLost。有关详细信息,请参阅 Apache Kafka 文档。

ConsumerRebalanceListener 不同,默认实现 调用 onPartitionsRevoked。相反,侦听器容器在调用 onPartitionsLost 之后将调用该方法;因此,在实现 ConsumerAwareRebalanceListener 时,您不应执行此操作。

请参阅 Rebalancing Listeners 结尾处的重要说明,了解更详细信息。

GenericErrorHandler

isAckAfterHandle() 默认实现现在默认返回 true。

KafkaTemplate

现在,KafkaTemplate`支持非事务和事务发布。详见 `KafkaTemplate Transactional and non-Transactional Publishing

AggregatingReplyingKafkaTemplate

releaseStrategy 现在是 BiConsumer。它现在在超时后(以及当记录到达时)被调用;第二个参数在超时后调用时为 true

Listener Container

ContainerProperties`提供一个 `authorizationExceptionRetryInterval`选项让监听器容器在 `KafkaConsumer`抛出任何 `AuthorizationException`后重试。详见其 JavaDocs 和 Using `KafkaMessageListenerContainer以了解更多信息。

@KafkaListener

@KafkaListener 注释有一个新属性 splitIterables;默认值为 true。当答复监听器返回 Iterable 时,此属性控制返回结果是作为一个单独的记录发送还是为每个元素发送一个记录。请参见 Forwarding Listener Results using @SendTo 了解更多信息。

现在可以使用 `BatchToRecordAdapter`配置批量监听器;这允许例如批量在一个事务中被处理,而监听器一次获取一个记录。使用默认实现,可以利用 `ConsumerRecordRecoverer`处理批量中的错误,而不用停止整个批量的处理——当使用事务时这可能很有用。详见 Transactions with Batch Listeners以了解更多信息。

Kafka Streams

StreamsBuilderFactoryBean`接受一个新属性 `KafkaStreamsInfrastructureCustomizer。这允许在创建流之前配置生成器和/或拓扑。详见 Spring Management以了解更多信息。

Changes Between 2.2 and 2.3

本节涵盖从 2.2 版到 2.3 版的变更。

Tips, Tricks and Examples

新章节 Tips, Tricks and Examples已经添加。请提交 GitHub 问题和/或针对该章节中的其他条目发出请求。

Kafka Client Version

此版本需要 2.3.0 kafka-clients 或更高版本。

Class/Package Changes

弃用了 TopicPartitionInitialOffset,赞成 TopicPartitionOffset

Configuration Changes

从 2.3.4 版开始,missingTopicsFatal 容器属性默认情况下为 false。当该属性为 true 时,如果代理停机,应用程序将无法启动。此变更影响了许多用户。鉴于 Kafka 是一个高可用性平台,我们并未预期常见使用案例中会启动没有可用代理的应用程序。

Producer and Consumer Factory Changes

现在可以将 DefaultKafkaProducerFactory`配置为为每个线程创建一个生产者。您还可以在构造函数中提供 `Supplier<Serializer>`实例,作为配置类(需要无参数构造函数)或使用 `Serializer`实例构建(它们在所有生产者之间共享)的替代方案。详见 Using `DefaultKafkaProducerFactory以了解更多信息。

相同的选项也可用于 DefaultKafkaConsumerFactory`中的 `Supplier<Deserializer>`实例。详见 Using `KafkaMessageListenerContainer以了解更多信息。

Listener Container Changes

以前,当使用侦听器适配器(如 @KafkaListener s)调用侦听器时,错误处理程序会收到 ListenerExecutionFailedException(将实际侦听器异常作为 cause)。由本机 GenericMessageListener s 引发的异常将原封不动地传递给错误处理程序。现在,ListenerExecutionFailedException 始终是参数(将实际侦听器异常作为 cause),它提供对容器的 group.id 属性的访问。

由于侦听器容器有自己的机制来提交偏移量,因此它希望 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIGfalse。现在,它会自动将其设置为 false,除非在使用者工厂中明确设置它,或者在容器的使用者属性中覆盖它。

ackOnError 属性现在默认情况下为 false

现在可以在监听器方法中获取消费者的 group.id 属性。请参见 Obtaining the Consumer group.id 了解更多信息。

容器有一个新属性 recordInterceptor,允许在调用监听器之前检查或修改记录。如果您需要调用多个拦截器,还可以提供一个 CompositeRecordInterceptor。请参见 Message Listener Containers 了解更多信息。

ConsumerSeekAware 有了新方法,允许你执行相对于开始、结束或当前位置的查询,并查询到大于或等于时间戳的第一个偏移量。有关更多信息,请参阅 [seek]

现在提供了一个便捷类 AbstractConsumerSeekAware 来简化查询。有关更多信息,请参阅 [seek]

ContainerProperties`提供了一个 `idleBetweenPolls`选项,让监听器容器中的主循环可以在 `KafkaConsumer.poll()`调用之间睡眠。详见其 JavaDocs 和 Using `KafkaMessageListenerContainer以了解更多信息。

现在,当使用 AckMode.MANUAL(或 MANUAL_IMMEDIATE)时,您可以通过在 `Acknowledgment`上调用 `nack`来导致重新交付。详见 Committing Offsets以了解更多信息。

现在可使用 Micrometer Timer 来监控侦听器性能。有关更多信息,请参阅 Monitoring

现在,容器发布与启动相关的附加使用者生命周期事件。有关更多信息,请参阅 Application Events

事务批量侦听器现在可支持僵尸隔离。有关更多信息,请参阅 Transactions

侦听器容器 factory 现在可配置 ContainerCustomizer,以便在创建并配置每个容器后进一步配置它。有关更多信息,请参阅 Container factory

ErrorHandler Changes

现在,SeekToCurrentErrorHandler 将某些异常视为致命错误,为这些错误禁用重试,在首次失败时调用恢复程序。

现在可以将 SeekToCurrentErrorHandlerSeekToCurrentBatchErrorHandler 配置为在发送尝试之间应用 BackOff(线程睡眠)。

从版本 2.3.2 开始,在错误处理程序在恢复失败记录后返回时,将提交恢复的记录的偏移量。

DeadLetterPublishingRecoverer,当与 ErrorHandlingDeserializer`结合使用时,现在将发送到失效字母主题的消息有效负载设置成无法反序列化的原始值。在此之前,它是 `null,而用户代码需要从消息头中提取 DeserializationException。详见 Publishing Dead-letter Records以了解更多信息。

TopicBuilder

提供了一个新类 TopicBuilder,用于更方便地为自动主题置备创建`NewTopic` @Bean。有关更多信息,请参见 [configuring-topics]

Kafka Streams Changes

现在,您可以对 `@EnableKafkaStreams`创建的 `StreamsBuilderFactoryBean`执行其他配置。详见 Streams Configuration以了解更多信息。

现在提供了一个 RecoveringDeserializationExceptionHandler,它允许恢复具有反序列化错误的记录。它可以与 `DeadLetterPublishingRecoverer`结合使用,以将这些记录发送到失效字母主题。详见 Recovery from Deserialization Exceptions以了解更多信息。

`HeaderEnricher`转换器已提供,它使用 SpEL 生成头值。详见 Header Enricher以了解更多信息。

MessagingTransformer`已提供。这允许一个 Kafka 流拓扑与 spring-messaging 组件交互,例如 Spring 集成流。详见 `MessagingProcessor和详见 [Calling a Spring Integration Flow from a KStream] 以了解更多信息。

JSON Component Changes

现在,所有 JSON 感知组件均默认通过 JacksonUtils.enhancedObjectMapper() 生成的 Jackson ObjectMapper 配置。JsonDeserializer 现在提供基于 TypeReference 的构造函数,以便更好地处理目标泛型容器类型。此外,已经引入 JacksonMimeTypeModule,以便将 org.springframework.util.MimeType 序列化为纯字符串。有关更多信息,请参阅其 JavaDoc 和 Serialization, Deserialization, and Message Conversion

ByteArrayJsonMessageConverter`已提供,以及一个新的所有 Json 转换器的超级类,`JsonMessageConverter。此外,一个 StringOrBytesSerializer`现在可用;它可以在 `ProducerRecord`s 中序列化 `byte[]、`Bytes`和 `String`值。详见 Spring Messaging Message Conversion以了解更多信息。

JsonSerializer,`JsonDeserializer`和 `JsonSerde`现在具有流畅的 API,以简化编程配置。详见 javadocs、Serialization, Deserialization, and Message ConversionStreams JSON Serialization and Deserialization以了解更多信息。

ReplyingKafkaTemplate

当回复超时时,将来会以 KafkaReplyTimeoutException(而不是`KafkaException`)完成异常处理。

此外,现在提供了重载 sendAndReceive 方法,它允许按消息指定回复超时。

AggregatingReplyingKafkaTemplate

通过汇总多个接收者的答复扩展 ReplyingKafkaTemplate。有关详细信息,请参阅 Aggregating Multiple Replies

Transaction Changes

现在可在 KafkaTemplateKafkaTransactionManager 上覆盖生产者工厂的 transactionIdPrefix。有关详细信息,请参见 xref:kafka/transactions.adoc#transaction-id-prefix[transactionIdPrefix

New Delegating Serializer/Deserializer

框架现提供委托的 Serializer`和 `Deserializer,利用标头支持生成和使用具有多种键/值类型的记录。有关详细信息,请参阅 Delegating Serializer and Deserializer

New Retrying Deserializer

框架现提供委托的 RetryingDeserializer,以便在出现瞬时错误(如网络问题)时重试序列化。有关详细信息,请参阅 Retrying Deserializer

Changes Between 2.1 and 2.2

Kafka Client Version

此版本需要 2.0.0 kafka-clients 或更高版本。

Class and Package Changes

ContainerProperties 类已从 org.springframework.kafka.listener.config 移至 org.springframework.kafka.listener

AckMode 枚举已从 AbstractMessageListenerContainer 移至 ContainerProperties

setBatchErrorHandler()setErrorHandler() 方法已从 ContainerProperties 移至 AbstractMessageListenerContainerAbstractKafkaListenerContainerFactory

After Rollback Processing

提供了一个新的 `AfterRollbackProcessor`策略。有关详细信息,请参阅 After-rollback Processor

ConcurrentKafkaListenerContainerFactory Changes

您现在可以使用 ConcurrentKafkaListenerContainerFactory 来创建和配置任何 ConcurrentMessageListenerContainer,而不仅是用于 @KafkaListener 批注的 ConcurrentMessageListenerContainer。有关更多信息,请参见 Container factory

Listener Container Changes

已添加新的容器属性 (missingTopicsFatal)。有关详细信息,请参见 xref:kafka/receiving-messages/message-listener-container.adoc#kafka-container[Using KafkaMessageListenerContainer

消费者停止时会触发 ConsumerStoppedEvent。有关详细信息,请参阅 Thread Safety

批量使用者可以选择接收完整的 ConsumerRecords<?, ?> 对象,而不是 List<ConsumerRecord<?, ?>。有关更多信息,请参阅 [batch-listeners]

DefaultAfterRollbackProcessorSeekToCurrentErrorHandler 现在可以恢复(跳过)不断失败的记录,并且默认情况下在 10 次失败后这样做。可以将其配置为将失败的记录发布到死信主题。

从版本 2.2.4 开始,消费者的组 ID 可用于选择死信主题名称。

已添加 ConsumerStoppingEvent。有关详细信息,请参阅 Application Events

现在可以将 SeekToCurrentErrorHandler 配置为在使用 AckMode.MANUAL_IMMEDIATE 配置容器时提交已恢复记录的偏移量(自 2.2.4 起)。

@KafkaListener Changes

现在,可以通过在注释中设置属性来覆盖侦听器容器工厂的 concurrencyautoStartup 属性。现在,您可以添加配置来确定将哪些标头(如果有)复制到答复消息。有关详细信息,请参阅 @KafkaListener Annotation

现在,您可以将 @KafkaListener 用作对自己的注释的元注释。有关详细信息,请参阅 @KafkaListener as a Meta Annotation

现在,可以更轻松地为 @Payload 验证配置 Validator。有关详细信息,请参阅 @KafkaListener @Payload Validation

您现在可以在注释上直接指定 kafka 消费者属性;这些属性将覆盖使用消费者工厂(自 2.2.4 版以来)定义的具有相同名称的所有属性。有关详细信息,请参阅 Annotation Properties

Header Mapping Changes

MimeTypeMediaType 类型的标题现在在 RecordHeader 值中映射为简单的字符串。以前,它们被映射为 JSON,并且只有 MimeType 被解码。MediaType 无法被解码,现在它们为简单的字符串,以便于互操作。

此外,DefaultKafkaHeaderMapper 有一个新的 addToStringClasses 方法,允许指定应该使用 toString() 而不是 JSON 映射的类型。有关详细信息,请参阅 Message Headers

Embedded Kafka Changes

KafkaEmbedded 类及其 KafkaRule 接口已弃用,取而代之的是 EmbeddedKafkaBroker 及其 JUnit 4 EmbeddedKafkaRule 包装器。@EmbeddedKafka 注释现在填充 EmbeddedKafkaBroker bean,而不是弃用的 KafkaEmbedded。此更改允许在 JUnit 5 测试中使用 @EmbeddedKafka@EmbeddedKafka 注释现在具有属性 ports,用于指定填充 EmbeddedKafkaBroker 的端口。有关详细信息,请参阅 Testing Applications

JsonSerializer/Deserializer Enhancements

您现在可以通过使用生产者和消费者属性来提供类型映射信息。

反序列化器上提供了新构造函数,允许使用提供的目标类型覆盖类型标题信息。

JsonDeserializer 现在默认删除所有类型信息头。

你现可以通过使用一个 Kafka 属性(自 2.2.3 起)来配置 JsonDeserializer 忽略类型信息头。

有关详细信息,请参阅 Serialization, Deserialization, and Message Conversion

Kafka Streams Changes

流配置 bean 现在必须是一个 KafkaStreamsConfiguration 对象,而不是 StreamsConfig 对象。

StreamsBuilderFactoryBean 已从程序包 …​core 移至 …​config

引入了 KafkaStreamBrancher,以在条件分支建立在 KStream 实例之上时提高终端用户体验。

有关详细信息,请参阅 Apache Kafka Streams SupportConfiguration

Transactional ID

侦听器容器启动事务时,transactional.id 现在是追加了 <group.id>.<topic>.<partition>transactionIdPrefix。此更改允许对僵尸进行适当隔离, as described here

Changes Between 2.0 and 2.1

Kafka Client Version

此版本需要 1.0.0 kafka-clients 或更高版本。

1.1.x 客户端在版本 2.2 中得到本机支持。

JSON Improvements

StringJsonMessageConverterJsonSerializer 现在会在 Headers 中添加类型信息,让转换器和 JsonDeserializer 根据消息本身而不是固定配置的类型在接收时创建特定类型。有关详细信息,请参阅 Serialization, Deserialization, and Message Conversion

Container Stopping Error Handlers

现在为记录和批次侦听器提供了容器错误处理程序,这些错误处理程序将侦听器抛出的任何异常视为致命异常/它们会停止容器。有关详细信息,请参阅 Handling Exceptions

Pausing and Resuming Containers

侦听器容器现在具有 pause()resume() 方法(自版本 2.1.3 起)。有关详细信息,请参阅 Pausing and Resuming Listener Containers

Stateful Retry

从 2.1.3 版开始,您可以配置有状态重试。有关详细信息,请参阅 Stateful Retry

Client ID

从版本 2.1.1 开始,你可以在 @KafkaListener 中设置 client.id 前缀。以前,要自定义客户端 ID,你需要每个监听器一个独立的消费者工厂(及容器工厂)。此前缀带有 -n 后缀,以便在使用并发时提供唯一的客户端 ID。

Logging Offset Commits

默认情况下,使用 DEBUG 记录级别执行主题偏移提交的记录。从版本 2.1.2 开始,ContainerProperties 中一个名为 commitLogLevel 的新属性可让你指定这些消息的记录级别。有关详细信息,请参见 xref:kafka/receiving-messages/message-listener-container.adoc#kafka-container[Using KafkaMessageListenerContainer

Default @KafkaHandler

从版本 2.1.3 开始,您可以将类级别的 @KafkaListener 上的某个 @KafkaHandler 注释指定为默认值。有关详细信息,请参阅 @KafkaListener on a Class

ReplyingKafkaTemplate

从版本 2.1.3 开始,提供 KafkaTemplate 的子类,以支持请求/回答语义。有关详细信息,请参见 xref:kafka/sending-messages.adoc#replying-template[Using ReplyingKafkaTemplate

ChainedKafkaTransactionManager

版本 2.1.3 引入了 ChainedKafkaTransactionManager。(现在它已弃用)。

Migration Guide from 2.0

请参阅 2.0 to 2.1 Migration 指南。

Changes Between 1.3 and 2.0

Spring Framework and Java Versions

Apache Kafka 的 Spring 项目现在需要 Spring Framework 5.0 和 Java 8。

@KafkaListener Changes

你现在可以用 @KafkaListener 方法进行注释(同时还有类和 @KafkaHandler 方法)使用 @SendTo 。如果该方法返回结果,它将被转发到指定主题。更多信息请参阅 Forwarding Listener Results using @SendTo

Message Listeners

消息侦听器现在可以感知 Consumer 对象。请参阅 [message-listeners] 了解更多信息。

Using ConsumerAwareRebalanceListener

重新平衡侦听器现在可以在重新平衡通知期间访问“ Consumer ”对象。更多信息请参阅 Rebalancing Listeners

Changes Between 1.2 and 1.3

Support for Transactions

0.11.0.0 客户端库增加了对事务的支持。已经添加了 KafkaTransactionManager 及其他对事务的支持。更多信息请参阅 Transactions

Support for Headers

0.11.0.0 客户端库增加了对消息标头的支持。这些标头现在可以映射到 spring-messaging MessageHeaders 。更多信息请参阅 Message Headers

Creating Topics

0.11.0.0 客户端库提供了一个 AdminClient,您可以用它来创建主题。KafkaAdmin 使用此客户端自动添加定义为 @Bean 实例的主题。

Support for Kafka Timestamps

KafkaTemplate`现在支持一个用于添加带时间戳的记录的 API。关于 `timestamp`支持,已引入新的 `KafkaHeaders。此外,还添加了新的 KafkaConditions.timestamp()`和 `KafkaMatchers.hasTimestamp()`测试实用程序。有关详细信息,请参阅 Using `KafkaTemplate@KafkaListener AnnotationTesting Applications

@KafkaListener Changes

你现在可以配置“ KafkaListenerErrorHandler ”来处理异常。更多信息请参阅 Handling Exceptions

默认情况下,@KafkaListener id 属性现在用作 group.id 属性,覆盖消费者工厂中配置的属性(如果存在)。此外,您可以在注释中显式配置 groupId。以前,您需要一个单独的容器工厂(和消费者工厂)才能为侦听器使用不同的 group.id 值。要恢复使用工厂配置的 group.id 的先前行为,请将注释中的 idIsGroup 属性设置为 false

@EmbeddedKafka Annotation

为了方便起见,提供了一个测试类级别的 @EmbeddedKafka 注释,将 KafkaEmbedded 注册为 Bean。更多信息请参阅 Testing Applications

Kerberos Configuration

现在提供对配置 Kerberos 的支持。更多信息请参阅 JAAS and Kerberos

Changes Between 1.1 and 1.2

此版本使用 0.10.2.x 客户端。

Changes Between 1.0 and 1.1

Kafka Client

此版本使用 Apache Kafka 0.10.x.x 客户端。

Batch Listeners

可以将侦听器配置为接收 consumer.poll() 操作返回的整个消息块,而不是一次接收一条消息。

Null Payloads

当您使用日志压缩时,空有效负载用于“删除”键。

Initial Offset

在显式分配分区时,您现在可以相对于当前位置配置消费者组的初始偏移量,而不是绝对偏移量或相对于当前结尾的偏移量。

Seek

您现在可以查找每个主题或分区的偏移量。当使用组管理且 Kafka 分配分区时,您可以在初始化期间使用此功能来设置初始位置。您还可以在检测到空闲容器时或应用程序执行中的任意点进行查找。请参阅 [seek] 了解更多信息。