Change History
What’s New in 3.1 Since 3.0
本部分涵盖了从版本 3.0 到版本 3.1 所做的更改。要了解较早版本的更改,请参阅 Change History。
This section covers the changes made from version 3.0 to version 3.1. For changes in earlier version, see Change History.
EmbeddedKafkaBroker
现在提供了一个附加的实现,以使用 Kraft
代替 Zookeeper。有关更多信息,请参见 Embedded Kafka Broker。
An additional implementation is now provided to use Kraft
instead of Zookeeper.
See Embedded Kafka Broker for more information.
JsonDeserializer
当出现反序列化异常时,“SerializationException”消息不再包含具有“Can’t deserialize data [[123, 34, 98, 97, 122, …”形式的数据;每个数据字节的数值数组没有用,而且对于大体积数据,它可能会过于冗长。与“ErrorHandlingDeserializer”配合使用时,发送到错误处理程序的“DeserializationException”包含“data”属性,其中包含无法反序列化的原始数据。与“ErrorHandlingDeserializer”搭配使用时,“KafkaConsumer”将持续地对同一记录发出异常,显示 Jackson 抛出的主题/分区/偏移和原因。
When a deserialization exception occurs, the SerializationException
message no longer contains the data with the form Can’t deserialize data [[123, 34, 98, 97, 122, …
; an array of numerical values for each data byte is not useful and can be verbose for large data.
When used with an ErrorHandlingDeserializer
, the DeserializationException
sent to the error handler contains the data
property which contains the raw data that could not be deserialized.
When not used with an ErrorHandlingDeserializer
, the KafkaConsumer
will continually emit exceptions for the same record showing the topic/partition/offset and the cause thrown by Jackson.
ContainerPostProcessor
可以通过在 @KafkaListener
注解中指定 ContainerPostProcessor
的 bean 名称,对监听器容器应用后处理。这发生在容器已创建且在容器工厂中配置的任何已配置 ContainerCustomizer
之后。参见 Container Factory 了解更多信息。
Post-processing can be applied on a listener container by specifying the bean name of a ContainerPostProcessor
on the @KafkaListener
annotation.
This occurs after the container has been created and after any configured ContainerCustomizer
configured on the container factory.
See Container Factory for more information.
ErrorHandlingDeserializer
现在,您可以向此反序列化程序添加 Validator
;如果委托 Deserializer
成功反序列化该对象,但该对象无法通过验证,则会抛出一个与反序列化异常发生的类似异常。这样就可以将原始原始数据传递到错误处理程序。有关更多信息,请参见 Using ErrorHandlingDeserializer
。
You can now add a Validator
to this deserializer; if the delegate Deserializer
successfully deserializes the object, but that object fails validation, an exception is thrown similar to a deserialization exception occurring.
This allows the original raw data to be passed to the error handler.
See Using ErrorHandlingDeserializer
for more information.
Retryable Topics
当 @RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
时,将后缀 -retry-5000
更改为 -retry
。如果您想保留后缀 -retry-5000
,请使用 @RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2")
。参见 Topic Naming 了解更多信息。
Change suffix -retry-5000
to -retry
when @RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
.
If you want to keep suffix -retry-5000
, use @RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2")
.
See Topic Naming for more information.
Listener Container Changes
使用 null
消费者 group.id
手动分配分区时,AckMode
现在会自动强制转换为 MANUAL
。有关更多信息,请参见 Manually Assigning All Partitions。
When manually assigning partitions, with a null
consumer group.id
, the AckMode
is now automatically coerced to MANUAL
.
See Manually Assigning All Partitions for more information.
What’s New in 3.0 Since 2.9
Exactly Once Semantics
“EOSMode.V1”(又名“ALPHA”)不再受支持。
EOSMode.V1
(aka ALPHA
) is no longer supported.
使用事务时,最低代理版本是 2.5。
When using transactions, the minimum broker version is 2.5.
有关更多信息,请参见 Exactly Once Semantics 和 KIP-447。
See Exactly Once Semantics and KIP-447 for more information.
Observation
现在支持使用 Micrometer 为计时器和跟踪启用观察。有关更多信息,请参见 Observation。
Enabling observation for timers and tracing using Micrometer is now supported. See Observation for more information.
Native Images
提供了创建本机映像的支持。有关更多信息,请参见 Native Images。
Support for creating native images is provided. See Native Images for more information.
Global Single Embedded Kafka
EmbeddedKafkaBroker
中嵌入的 Kafka 现在可以作为一个单个全局实例为整个测试计划启动。有关更多信息,请参见 Using the Same Broker(s) for Multiple Test Classes。
The embedded Kafka (EmbeddedKafkaBroker
) can now be start as a single global instance for the whole test plan.
See Using the Same Broker(s) for Multiple Test Classes for more information.
Retryable Topics Changes
此功能不再被视为实验性(就其 API 而言),该功能本身自 2.7 以来一直受支持,但 API 变更中断的可能性高于正常情况。
This feature is no longer considered experimental (as far as its API is concerned), the feature itself has been supported since 2.7, but with a greater than normal possibility of breaking API changes.
此次版本中 Non-Blocking Retries 基础设施 bean 的引导已更改,以避免某些应用程序在应用程序初始化方面遇到的一些时序问题。
The bootstrapping of Non-Blocking Retries infrastructure beans has changed in this release to avoid some timing problems that occurred in some application regarding application initialization.
您现在可以为重试容器设置不同的“concurrency”;默认情况下,“concurrency”与主容器相同。
You can now set a different concurrency
for the retry containers; by default, the concurrency is the same as the main container.
“@RetryableTopic”现在可用作自定义注释的元注释,包括对“@AliasFor”属性的支持。
@RetryableTopic
can now be used as a meta-annotation on custom annotations, including support for @AliasFor
properties.
参见 Configuration 了解更多信息。
See Configuration for more information.
重试主题的默认复制因子现在为“-1”(使用代理默认值)。如果您的代理早于 2.4 版本,您现在需要显式地设置该属性。
The default replication factor for the retry topics is now -1
(use broker default).
If your broker is earlier that version 2.4, you will now need to explicitly set the property.
你现在可以在同一应用程序上下文中的同一主题上配置多个 @RetryableTopic
监听器。以前,这是不可能的。参见 Multiple Listeners, Same Topic(s) 了解更多信息。
You can now configure multiple @RetryableTopic
listeners on the same topic in the same application context.
Previously, this was not possible.
See Multiple Listeners, Same Topic(s) for more information.
RetryTopicConfigurationSupport
中有重大 API 更改;具体而言,如果你覆盖 destinationTopicResolver
、kafkaConsumerBackoffManager
和/或 retryTopicConfigurer
的 bean 定义方法;现在这些方法需要一个 ObjectProvider<RetryTopicComponentFactory>
参数。
There are breaking API changes in RetryTopicConfigurationSupport
; specifically, if you override the bean definition methods for destinationTopicResolver
, kafkaConsumerBackoffManager
and/or retryTopicConfigurer
;
these methods now require an ObjectProvider<RetryTopicComponentFactory>
parameter.
Listener Container Changes
现在,容器会发布与消费者认证和授权失败相关的事件。参见 Application Events 了解更多信息。
Events related to consumer authentication and authorization failures are now published by the container. See Application Events for more information.
你现在可以自定义消费者线程使用的线程名称。参见 Container Thread Naming 了解更多信息。
You can now customize the thread names used by consumer threads. See Container Thread Naming for more information.
容器属性 restartAfterAuthException
已添加。参见 Listener Container Properties 了解更多信息。
The container property restartAfterAuthException
has been added.
See Listener Container Properties for more information.
KafkaTemplate
Changes
此类返回的 future 现在是 CompletableFuture
,而不是 ListenableFuture
。请参见 Using KafkaTemplate
。
The futures returned by this class are now CompletableFuture
s instead of ListenableFuture
s.
See Using KafkaTemplate
.
ReplyingKafkaTemplate
Changes
此类返回的 future 现在是 CompletableFuture
,而不是 ListenableFuture
。请参见 Using ReplyingKafkaTemplate
和 Request/Reply with Message<?>
s。
The futures returned by this class are now CompletableFuture
s instead of ListenableFuture
s.
See Using ReplyingKafkaTemplate
and Request/Reply with Message<?>
s.
@KafkaListener
Changes
你现在可以使用自定义关联标头,它会在任何回复消息中回显。有关更多信息,请参见 Using ReplyingKafkaTemplate
结尾处的注释。
You can now use a custom correlation header which will be echoed in any reply message.
See the note at the end of Using ReplyingKafkaTemplate
for more information.
你现在可以在处理整个批次之前手动提交批次的部分内容。有关更多信息,请参见 Committing Offsets。
You can now manually commit parts of a batch before the entire batch is processed. See Committing Offsets for more information.
KafkaHeaders
Changes
KafkaHeaders
中 2.9.x 版本中废弃的四个常量现已移除。
Four constants in KafkaHeaders
that were deprecated in 2.9.x have now been removed.
-
Instead of
MESSAGE_KEY
, useKEY
. -
Instead of
PARTITION_ID
, usePARTITION
同样,RECEIVED_MESSAGE_KEY
由 RECEIVED_KEY
代替,RECEIVED_PARTITION_ID
由 RECEIVED_PARTITION
代替。
Similarly, RECEIVED_MESSAGE_KEY
is replaced by RECEIVED_KEY
and RECEIVED_PARTITION_ID
is replaced by RECEIVED_PARTITION
.
Testing Changes
3.0.7 版本引入了一个 MockConsumerFactory`和 `MockProducerFactory
。有关更多信息,请参见 Mock Consumer and Producer。
Version 3.0.7 introduced a MockConsumerFactory
and MockProducerFactory
.
See Mock Consumer and Producer for more information.
从 3.0.10 版本开始,嵌入式 Kafka 代理默认将 Spring Boot 属性 spring.kafka.bootstrap-servers
设置为嵌入式代理的地址。
Starting with version 3.0.10, the embedded Kafka broker, by default, sets the Spring Boot property spring.kafka.bootstrap-servers
to the address(es) of the embedded broker(s).
What’s New in 2.9 since 2.8
Error Handler Changes
现在可以将 `DefaultErrorHandler`配置为暂停容器一次轮询,并使用前一次轮询的剩余结果,而不是寻求剩余记录的偏移量。有关更多信息,请参见 DefaultErrorHandler。
The DefaultErrorHandler
can now be configured to pause the container for one poll and use the remaining results from the previous poll, instead of seeking to the offsets of the remaining records.
See DefaultErrorHandler for more information.
`DefaultErrorHandler`现在有一个 `BackOffHandler`属性。有关更多信息,请参见 Back Off Handlers。
The DefaultErrorHandler
now has a BackOffHandler
property.
See Back Off Handlers for more information.
Listener Container Changes
interceptBeforeTx
现在适用于所有事务管理器(以前仅适用于 KafkaAwareTransactionManager
)。见 [interceptBeforeTx]。
interceptBeforeTx
now works with all transaction managers (previously it was only applied when a KafkaAwareTransactionManager
was used).
See [interceptBeforeTx].
提供了一个新的容器属性 pauseImmediate
,它允许容器在处理当前记录后暂停消费者,而不是在处理上一次轮询的所有记录后暂停消费者。见 [pauseImmediate]。
A new container property pauseImmediate
is provided which allows the container to pause the consumer after the current record is processed, instead of after all the records from the previous poll have been processed.
See [pauseImmediate].
与消费者认证和授权相关的事件
Events related to consumer authentication and authorization
Header Mapper Changes
现在你可以配置要映射哪一个入站标头。也适用于版本 2.8.8 或更高版本。有关更多信息,参见 Message Headers。
You can now configure which inbound headers should be mapped. Also available in version 2.8.8 or later. See Message Headers for more information.
KafkaTemplate
Changes
在 3.0 中,此类返回的 future 将是 CompletableFuture
,而不是 ListenableFuture
。有关在使用此版本时如何转换的帮助,请参见 Using KafkaTemplate
。
In 3.0, the futures returned by this class will be CompletableFuture
s instead of ListenableFuture
s.
See Using KafkaTemplate
for assistance in transitioning when using this release.
ReplyingKafkaTemplate
Changes
模板现在提供了一种方法来等待回复容器上的分配,以便在回复容器初始化之前发送请求时避免出现竞争。也可以在 2.8.8 或更高版本中使用。请参见 Using ReplyingKafkaTemplate
。
The template now provides a method to wait for assignment on the reply container, to avoid a race when sending a request before the reply container is initialized.
Also available in version 2.8.8 or later.
See Using ReplyingKafkaTemplate
.
在 3.0 中,此类返回的 future 将是 CompletableFuture
,而不是 ListenableFuture
。有关在使用此版本时如何转换的帮助,请参见 Using ReplyingKafkaTemplate
和 Request/Reply with Message<?>
s。
In 3.0, the futures returned by this class will be CompletableFuture
s instead of ListenableFuture
s.
See Using ReplyingKafkaTemplate
and Request/Reply with Message<?>
s for assistance in transitioning when using this release.
What’s New in 2.8 Since 2.7
本节涵盖了从 2.7 版到 2.8 版所做的更改。有关早期版本中的更改,请参见 Change History。
This section covers the changes made from version 2.7 to version 2.8. For changes in earlier version, see Change History.
Package Changes
与类型映射相关的类和接口已从 …support.converter
移至 …support.mapping
。
Classes and interfaces related to type mapping have been moved from …support.converter
to …support.mapping
.
-
AbstractJavaTypeMapper
-
ClassMapper
-
DefaultJackson2JavaTypeMapper
-
Jackson2JavaTypeMapper
Out of Order Manual Commits
现可配置侦听器容器乱序接受手动偏移提交(通常异步)。该容器会延迟提交,直到已确认丢失的偏移量。有关更多信息,参见 Manually Committing Offsets。
The listener container can now be configured to accept manual offset commits out of order (usually asynchronously). The container will defer the commit until the missing offset is acknowledged. See Manually Committing Offsets for more information.
@KafkaListener
Changes
现在可以在方法本身上指定侦听器方法是否是批处理侦听器。这允许将同一容器工厂用于记录和批处理侦听器。
It is now possible to specify whether the listener method is a batch listener on the method itself. This allows the same container factory to be used for both record and batch listeners.
有关更多信息,请参阅 [batch-listeners]。
See [batch-listeners] for more information.
批处理侦听器现在可以处理转换异常。
Batch listeners can now handle conversion exceptions.
有关更多信息,请参见 Conversion Errors with Batch Error Handlers。
See Conversion Errors with Batch Error Handlers for more information.
RecordFilterStrategy
在与批处理侦听器一起使用时,现在可以在一个调用中过滤整个批处理。有关更多信息,请参阅 [batch-listeners] 末尾的注释。
RecordFilterStrategy
, when used with batch listeners, can now filter the entire batch in one call.
See the note at the end of [batch-listeners] for more information.
@KafkaListener
注释现在具有 filter
属性,用于仅为该侦听器覆盖容器工厂的 RecordFilterStrategy
。
The @KafkaListener
annotation now has the filter
attribute, to override the container factory’s RecordFilterStrategy
for just this listener.
@KafkaListener`注释现在有 `info`属性;这用于填充新的侦听器容器属性 `listenerInfo
。然后将其用于填充每个记录中的 KafkaHeaders.LISTENER_INFO`标头,该标头可在 `RecordInterceptor
、`RecordFilterStrategy`或侦听器本身中使用。有关更多信息,请参见 Listener Info Header和 Abstract Listener Container Properties。
The @KafkaListener
annotation now has the info
attribute; this is used to populate the new listener container property listenerInfo
.
This is then used to populate a KafkaHeaders.LISTENER_INFO
header in each record which can be used in RecordInterceptor
, RecordFilterStrategy
, or the listener itself.
See Listener Info Header and Abstract Listener Container Properties for more information.
KafkaTemplate
Changes
现在你可以接收单条记录,因为给定了主题、分区和偏移量。有关更多信息,参见 Using KafkaTemplate
to Receive。
You can now receive a single record, given the topic, partition and offset.
See Using KafkaTemplate
to Receive for more information.
CommonErrorHandler
Added
用于记录和批次侦听器的旧版 GenericErrorHandler`及其子接口层次结构已被新的单接口 `CommonErrorHandler`取代,其实现对应于 `GenericErrorHandler`的大多数旧版实现。有关更多信息,请参见 Container Error Handlers和 Migrating Custom Legacy Error Handler Implementations to `CommonErrorHandler
。
The legacy GenericErrorHandler
and its sub-interface hierarchies for record an batch listeners have been replaced by a new single interface CommonErrorHandler
with implementations corresponding to most legacy implementations of GenericErrorHandler
.
See Container Error Handlers and Migrating Custom Legacy Error Handler Implementations to CommonErrorHandler
for more information.
Listener Container Changes
现在,interceptBeforeTx
容器属性默认情况下为 true
。
The interceptBeforeTx
container property is now true
by default.
authorizationExceptionRetryInterval
属性已重命名为 authExceptionRetryInterval
,现在除了以前的 AuthorizationException
之外,还适用于 AuthenticationException
。除非设置此属性,否则这两个异常都被视为致命异常,并且容器会默认停止。
The authorizationExceptionRetryInterval
property has been renamed to authExceptionRetryInterval
and now applies to AuthenticationException
s in addition to AuthorizationException
s previously.
Both exceptions are considered fatal and the container will stop by default, unless this property is set.
See Using KafkaMessageListenerContainer
and Listener Container Properties for more information.
Serializer/Deserializer Changes
现在提供了 DelegatingByTopicSerializer`和 `DelegatingByTopicDeserializer
。有关更多信息,请参见 Delegating Serializer and Deserializer。
The DelegatingByTopicSerializer
and DelegatingByTopicDeserializer
are now provided.
See Delegating Serializer and Deserializer for more information.
DeadLetterPublishingRecover
Changes
现在,stripPreviousExceptionHeaders
属性默认值为 true
。
The property stripPreviousExceptionHeaders
is now true
by default.
现在,可以使用多种技术来自定义添加到输出记录中的标题。
There are now several techniques to customize which headers are added to the output record.
有关更多信息,请参见 Managing Dead Letter Record Headers。
See Managing Dead Letter Record Headers for more information.
Retryable Topics Changes
现在你可以对可重试主题和不可重试主题使用相同的工厂。有关更多信息,参见 Specifying a ListenerContainerFactory。
Now you can use the same factory for retryable and non-retryable topics. See Specifying a ListenerContainerFactory for more information.
现在有一个可管理的致命异常全局列表,它将使失败的记录直接进入 DLT。请参阅 Exception Classifier以了解如何管理它。
There’s now a manageable global list of fatal exceptions that will make the failed record go straight to the DLT. Refer to Exception Classifier to see how to manage it.
你现可同时使用阻塞和非阻塞重试。有关更多信息,参见 Combining Blocking and Non-Blocking Retries。
You can now use blocking and non-blocking retries in conjunction. See Combining Blocking and Non-Blocking Retries for more information.
现在,使用可重试主题功能时引发的 KafkaBackOffException 记录在 DEBUG 级别。如果你需要将日志级别更改回 WARN 或将其设置为任何其他级别,请参见 Changing KafkaBackOffException Logging Level。
The KafkaBackOffException thrown when using the retryable topics feature is now logged at DEBUG level. See Changing KafkaBackOffException Logging Level if you need to change the logging level back to WARN or set it to any other level.
Changes between 2.6 and 2.7
Kafka Client Version
此版本需要 2.7.0 kafka-clients
。自版本 2.7.1 起,它与 2.8.0 客户端兼容;请参见 Override Spring Boot Dependencies。
This version requires the 2.7.0 kafka-clients
.
It is also compatible with the 2.8.0 clients, since version 2.7.1; see Override Spring Boot Dependencies.
Non-Blocking Delayed Retries Using Topics
此版本添加了此显著的新功能。如果严格顺序并不重要,则可以将失败的传送发送到另一个主题以便稍后使用。可以配置一系列这些重试主题,并增加延迟时间。有关详细信息,请参见 Non-Blocking Retries。
This significant new feature is added in this release. When strict ordering is not important, failed deliveries can be sent to another topic to be consumed later. A series of such retry topics can be configured, with increasing delays. See Non-Blocking Retries for more information.
Listener Container Changes
现在,onlyLogRecordMetadata
容器属性默认值为 true
。
The onlyLogRecordMetadata
container property is now true
by default.
现在可以使用新的容器属性 stopImmediate
。
A new container property stopImmediate
is now available.
有关详细信息,请参见 Listener Container Properties。
See Listener Container Properties for more information.
在发送尝试(例如 SeekToCurrentErrorHandler
和 DefaultAfterRollbackProcessor
)之间使用 BackOff
的错误处理程序现在将在容器停止后立即退出后退间隔,而不是延迟停止。
Error handlers that use a BackOff
between delivery attempts (e.g. SeekToCurrentErrorHandler
and DefaultAfterRollbackProcessor
) will now exit the back off interval soon after the container is stopped, rather than delaying the stop.
现在可以将扩展了 FailedRecordProcessor
的错误处理程序和回滚后处理程序配置为一个或多个 RetryListener
,以接收有关重试和恢复进度的信息。
Error handlers and after rollback processors that extend FailedRecordProcessor
can now be configured with one or more RetryListener
s to receive information about retry and recovery progress.
RecordInterceptor`现在具有监听器返回后(正常返回或通过抛出异常)调用的附加方法。它还具有子接口 `ConsumerAwareRecordInterceptor
.此外,现在还存在用于批处理监听器的 BatchInterceptor
。有关详细信息,请参见 Message Listener Containers。
The RecordInterceptor
now has additional methods called after the listener returns (normally, or by throwing an exception).
It also has a sub-interface ConsumerAwareRecordInterceptor
.
In addition, there is now a BatchInterceptor
for batch listeners.
See Message Listener Containers for more information.
@KafkaListener
Changes
You can now validate the payload parameter of @KafkaHandler
methods (class-level listeners).
See @KafkaListener
@Payload
Validation for more information.
现在您可以在 MessagingMessageConverter
和 BatchMessagingMessageConverter
上设置 rawRecordHeader
属性,导致原始 ConsumerRecord
添加到已转换的 Message<?>
中。如果要在侦听器错误处理程序中使用 DeadLetterPublishingRecoverer
,这很有用。有关详细信息,请参阅 Listener Error Handlers。
You can now set the rawRecordHeader
property on the MessagingMessageConverter
and BatchMessagingMessageConverter
which causes the raw ConsumerRecord
to be added to the converted Message<?>
.
This is useful, for example, if you wish to use a DeadLetterPublishingRecoverer
in a listener error handler.
See Listener Error Handlers for more information.
You can now modify @KafkaListener
annotations during application initialization.
See @KafkaListener
Attribute Modification for more information.
DeadLetterPublishingRecover
Changes
现在,如果键和值都无法反序列化,则原始值将发布到 DLT。以前,该值已填充,但密钥 DeserializationException
仍保留在标题中。如果您对恢复程序进行了子类化并覆盖了 createProducerRecord
方法,就会有破坏性的 API 更改。
Now, if both the key and value fail deserialization, the original values are published to the DLT.
Previously, the value was populated but the key DeserializationException
remained in the headers.
There is a breaking API change, if you subclassed the recoverer and overrode the createProducerRecord
method.
此外,恢复程序在发布到目标解析器选择的特定分区之前,将验证该分区实际存在。
In addition, the recoverer verifies that the partition selected by the destination resolver actually exists before publishing to it.
有关详细信息,请参阅 Publishing Dead-letter Records。
See Publishing Dead-letter Records for more information.
ChainedKafkaTransactionManager
is Deprecated
有关详细信息,请参见 Transactions。
See Transactions for more information.
ReplyingKafkaTemplate
Changes
现在有一种检查回复并让将来异常失败的机制,如果某个条件存在。
There is now a mechanism to examine a reply and fail the future exceptionally if some condition exists.
已添加对发送和接收 spring-messaging
Message<?>
的支持。
Support for sending and receiving spring-messaging
Message<?>
s has been added.
有关详细信息,请参阅 Using ReplyingKafkaTemplate
。
See Using ReplyingKafkaTemplate
for more information.
Kafka Streams Changes
默认情况下,现在将 StreamsBuilderFactoryBean
配置为不清理本地状态。有关详细信息,请参阅 Configuration。
By default, the StreamsBuilderFactoryBean
is now configured to not clean up local state.
See Configuration for more information.
KafkaAdmin
Changes
已添加新的方法 createOrModifyTopics
和 describeTopics
。已添加 KafkaAdmin.NewTopics
以便于在一个 Bean 中配置多个主题。有关更多信息,请参见 [configuring-topics]。
New methods createOrModifyTopics
and describeTopics
have been added.
KafkaAdmin.NewTopics
has been added to facilitate configuring multiple topics in a single bean.
See [configuring-topics] for more information.
MessageConverter
Changes
现在可以向 MessagingMessageConverter
添加 spring-messaging
SmartMessageConverter
,从而允许基于 contentType
标头的内容协商。有关详细信息,请参阅 Spring Messaging Message Conversion。
It is now possible to add a spring-messaging
SmartMessageConverter
to the MessagingMessageConverter
, allowing content negotiation based on the contentType
header.
See Spring Messaging Message Conversion for more information.
Sequencing @KafkaListener
s
有关详细信息,请参见 Starting @KafkaListener
s in Sequence。
See Starting @KafkaListener
s in Sequence for more information.
ExponentialBackOffWithMaxRetries
提供了一个新的 BackOff
实现,可以更方便地配置最大重试次数。有关详细信息,请参阅 ExponentialBackOffWithMaxRetries
Implementation。
A new BackOff
implementation is provided, making it more convenient to configure the max retries.
See ExponentialBackOffWithMaxRetries
Implementation for more information.
Conditional Delegating Error Handlers
这些新的错误处理程序可以配置为根据异常类型委托给不同的错误处理程序。有关详细信息,请参阅 Delegating Error Handler。
These new error handlers can be configured to delegate to different error handlers, depending on the exception type. See Delegating Error Handler for more information.
Changes between 2.5 and 2.6
Listener Container Changes
默认 EOSMode`现在是 `BETA
。有关详细信息,请参见 Exactly Once Semantics。
The default EOSMode
is now BETA
.
See Exactly Once Semantics for more information.
如果恢复失败,各种错误处理程序(扩展 FailedRecordProcessor
)和 DefaultAfterRollbackProcessor
现在会重置 BackOff
。此外,现在可以根据失败的记录和/或异常选择要使用的 BackOff
。
Various error handlers (that extend FailedRecordProcessor
) and the DefaultAfterRollbackProcessor
now reset the BackOff
if recovery fails.
In addition, you can now select the BackOff
to use based on the failed record and/or exception.
您现在可以在容器属性中配置 adviceChain
。有关详细信息,请参见 Listener Container Properties。
You can now configure an adviceChain
in the container properties.
See Listener Container Properties for more information.
当容器配置为发布 ListenerContainerIdleEvent
时,现在在发布空闲事件后收到记录时,它将发布 ListenerContainerNoLongerIdleEvent
。有关详细信息,请参阅 Application Events 和 Detecting Idle and Non-Responsive Consumers。
When the container is configured to publish ListenerContainerIdleEvent
s, it now publishes a ListenerContainerNoLongerIdleEvent
when a record is received after publishing an idle event.
See Application Events and Detecting Idle and Non-Responsive Consumers for more information.
@KafkaListener Changes
当使用手动分区分配时,你现在可以指定一个通配符,用于确定哪些分区应重置为初始偏移量。此外,如果侦听器实现 ConsumerSeekAware
,将自动分配后调用 onPartitionsAssigned()
。(在 2.5.5 版本中也已添加)。有关详细信息,请参阅 Explicit Partition Assignment。
When using manual partition assignment, you can now specify a wildcard for determining which partitions should be reset to the initial offset.
In addition, if the listener implements ConsumerSeekAware
, onPartitionsAssigned()
is called after the manual assignment.
(Also added in version 2.5.5).
See Explicit Partition Assignment for more information.
已将便捷方法添加到 AbstractConsumerSeekAware
中,以简化查找。有关更多信息,请参见 [seek]。
Convenience methods have been added to AbstractConsumerSeekAware
to make seeking easier.
See [seek] for more information.
ErrorHandler Changes
FailedRecordProcessor
的子类(例如 SeekToCurrentErrorHandler
、DefaultAfterRollbackProcessor
、RecoveringBatchErrorHandler
)现在可以配置为如果异常与此记录之前发生的异常类型不同,则重置重试状态。
Subclasses of FailedRecordProcessor
(e.g. SeekToCurrentErrorHandler
, DefaultAfterRollbackProcessor
, RecoveringBatchErrorHandler
) can now be configured to reset the retry state if the exception is a different type to that which occurred previously with this record.
Producer Factory Changes
你现可为制作者设置最大年龄,之后将关闭再创建他们。参见 Transactions 了解更多信息。
You can now set a maximum age for producers after which they will be closed and recreated. See Transactions for more information.
现在可以在创建 DefaultKafkaProducerFactory
后更新其配置映射。这可能很有用,例如,如果凭据更改后必须更新 SSL 密钥/可信存储位置。有关更多信息,请参阅 xref:kafka/sending-messages.adoc#producer-factory[Using `DefaultKafkaProducerFactory`
。
You can now update the configuration map after the DefaultKafkaProducerFactory
has been created.
This might be useful, for example, if you have to update SSL key/trust store locations after a credentials change.
See Using DefaultKafkaProducerFactory
for more information.
Changes between 2.4 and 2.5
本节介绍了从 2.4 版到 2.5 版所做的更改。有关早期版本中的更改,请参阅 Change History。
This section covers the changes made from version 2.4 to version 2.5. For changes in earlier version, see Change History.
Consumer/Producer Factory Changes
默认消费者和生产者工厂现在可以在创建或关闭消费者或生产者时调用回调。提供了用于本机微米指标的实现。有关详细信息,请参阅 Factory Listeners。
The default consumer and producer factories can now invoke a callback whenever a consumer or producer is created or closed. Implementations for native Micrometer metrics are provided. See Factory Listeners for more information.
你现可以在运行时更改引导服务器属性,实现故障转移到另一个 Kafka 集群。参见 Connecting to Kafka 了解更多信息。
You can now change bootstrap server properties at runtime, enabling failover to another Kafka cluster. See Connecting to Kafka for more information.
StreamsBuilderFactoryBean
Changes
现在,工厂 bean 可以在创建或销毁 KafkaStreams
时调用回调。提供了用于本机微米指标的实现。有关详细信息,请参阅 KafkaStreams Micrometer Support。
The factory bean can now invoke a callback whenever a KafkaStreams
created or destroyed.
An Implementation for native Micrometer metrics is provided.
See KafkaStreams Micrometer Support for more information.
Class/Package Changes
SeekUtils
已从 o.s.k.support
包移至 o.s.k.listener
。
SeekUtils
has been moved from the o.s.k.support
package to o.s.k.listener
.
Delivery Attempts Header
现在有一个选项,可以在使用某些错误处理程序和回滚处理器后添加跟踪传递尝试的标头。有关详细信息,请参阅 Delivery Attempts Header。
There is now an option to to add a header which tracks delivery attempts when using certain error handlers and after rollback processors. See Delivery Attempts Header for more information.
@KafkaListener Changes
如果 @KafkaListener
返回类型为 Message<?>
,现在将在需要时自动填充默认答复标头。有关详细信息,请参阅 Reply Type Message<?>。
Default reply headers will now be populated automatically if needed when a @KafkaListener
return type is Message<?>
.
See Reply Type Message<?> for more information.
当传入记录的键为 null
时,不再使用 null
值填充 KafkaHeaders.RECEIVED_MESSAGE_KEY
;而是完全省略该标题。
The KafkaHeaders.RECEIVED_MESSAGE_KEY
is no longer populated with a null
value when the incoming record has a null
key; the header is omitted altogether.
@KafkaListener
方法现在可以指定 ConsumerRecordMetadata
参数,而不是针对元数据(例如主题、分区等)使用离散标头。有关详细信息,请参阅 Consumer Record Metadata。
@KafkaListener
methods can now specify a ConsumerRecordMetadata
parameter instead of using discrete headers for metadata such as topic, partition, etc.
See Consumer Record Metadata for more information.
Listener Container Changes
assignmentCommitOption
容器属性现在默认为 LATEST_ONLY_NO_TX
。参见 Listener Container Properties 了解更多信息。
The assignmentCommitOption
container property is now LATEST_ONLY_NO_TX
by default.
See Listener Container Properties for more information.
在使用事务时,subBatchPerPartition
容器属性现在默认为 true
。参见 Transactions 了解更多信息。
The subBatchPerPartition
container property is now true
by default when using transactions.
See Transactions for more information.
现在提供新的 RecoveringBatchErrorHandler
。
A new RecoveringBatchErrorHandler
is now provided.
现已支持静态组成员资格。参见 Message Listener Containers 了解更多信息。
Static group membership is now supported. See Message Listener Containers for more information.
如果在配置增量/协作式重新平衡时,使用非致命的 RebalanceInProgressException
提交偏移量失败,容器将在重新平衡完成后尝试为分配给该实例的分区重新提交偏移量。
When incremental/cooperative rebalancing is configured, if offsets fail to commit with a non-fatal RebalanceInProgressException
, the container will attempt to re-commit the offsets for the partitions that remain assigned to this instance after the rebalance is completed.
现在,默认错误处理程序对于记录侦听器是 SeekToCurrentErrorHandler
,对于批处理侦听器是 RecoveringBatchErrorHandler
。有关详细信息,请参阅 Container Error Handlers。
The default error handler is now the SeekToCurrentErrorHandler
for record listeners and RecoveringBatchErrorHandler
for batch listeners.
See Container Error Handlers for more information.
现在,您可以控制标准错误处理程序故意抛出的异常所记录的级别。有关详细信息,请参阅 Container Error Handlers。
You can now control the level at which exceptions intentionally thrown by standard error handlers are logged. See Container Error Handlers for more information.
已添加 getAssignmentsByClientId()
方法,使得更容易确定并发容器中的哪些消费者分配了哪些分区。参见 Listener Container Properties 了解更多信息。
The getAssignmentsByClientId()
method has been added, making it easier to determine which consumers in a concurrent container are assigned which partition(s).
See Listener Container Properties for more information.
你现可在错误中抑制日志记录整个 ConsumerRecord
s、调试日志等。参见 Listener Container Properties 中的 onlyLogRecordMetadata
。
You can now suppress logging entire ConsumerRecord
s in error, debug logs etc.
See onlyLogRecordMetadata
in Listener Container Properties.
KafkaTemplate Changes
KafkaTemplate
现可维护微米计时器。参见 Monitoring 了解更多信息。
The KafkaTemplate
can now maintain micrometer timers.
See Monitoring for more information.
现在可以使用 ProducerConfig
属性配置 KafkaTemplate
来覆盖生产者工厂中的属性。有关更多信息,请参阅 xref:kafka/sending-messages.adoc#kafka-template[Using `KafkaTemplate`
。
The KafkaTemplate
can now be configured with ProducerConfig
properties to override those in the producer factory.
See Using KafkaTemplate
for more information.
现在已提供一个 RoutingKafkaTemplate
。有关更多信息,请参阅 xref:kafka/sending-messages.adoc#routing-template[Using `RoutingKafkaTemplate`
。
A RoutingKafkaTemplate
has now been provided.
See Using RoutingKafkaTemplate
for more information.
现在可以使用 KafkaSendCallback
代替 ListenerFutureCallback
获取更窄的异常,这使得提取失败的 ProducerRecord
变得更加容易。有关更多信息,请参阅 xref:kafka/sending-messages.adoc#kafka-template[Using `KafkaTemplate`
。
You can now use KafkaSendCallback
instead of ListenerFutureCallback
to get a narrower exception, making it easier to extract the failed ProducerRecord
.
See Using KafkaTemplate
for more information.
Kafka String Serializer/Deserializer
现在提供了新的 ToStringSerializer
/StringDeserializer
,以及一个关联的 SerDe
。有关详细信息,请参阅 String serialization。
New ToStringSerializer
/StringDeserializer
s as well as an associated SerDe
are now provided.
See String serialization for more information.
JsonDeserializer
JsonDeserializer
现在具有确定反序列化类型更大的灵活性。有关详细信息,请参阅 Using Methods to Determine Types。
The JsonDeserializer
now has more flexibility to determine the deserialization type.
See Using Methods to Determine Types for more information.
Delegating Serializer/Deserializer
现在,当出站记录没有头时,`DelegatingSerializer`能处理“standard”类型。详见 Delegating Serializer and Deserializer。
The DelegatingSerializer
can now handle "standard" types, when the outbound record has no header.
See Delegating Serializer and Deserializer for more information.
Changes between 2.3 and 2.4
Kafka Client Version
此版本需要 2.4.0 kafka-clients
或更高版本,并支持新的增量重新平衡功能。
This version requires the 2.4.0 kafka-clients
or higher and supports the new incremental rebalancing feature.
ConsumerAwareRebalanceListener
与 ConsumerRebalanceListener
一样,此界面现在有一个附加的方法 onPartitionsLost
。有关详细信息,请参阅 Apache Kafka 文档。
Like ConsumerRebalanceListener
, this interface now has an additional method onPartitionsLost
.
Refer to the Apache Kafka documentation for more information.
与 ConsumerRebalanceListener
不同,默认实现 不 调用 onPartitionsRevoked
。相反,侦听器容器在调用 onPartitionsLost
之后将调用该方法;因此,在实现 ConsumerAwareRebalanceListener
时,您不应执行此操作。
Unlike the ConsumerRebalanceListener
, The default implementation does not call onPartitionsRevoked
.
Instead, the listener container will call that method after it has called onPartitionsLost
; you should not, therefore, do the same when implementing ConsumerAwareRebalanceListener
.
请参阅 Rebalancing Listeners 结尾处的重要说明,了解更详细信息。
See the IMPORTANT note at the end of Rebalancing Listeners for more information.
GenericErrorHandler
isAckAfterHandle()
默认实现现在默认返回 true。
The isAckAfterHandle()
default implementation now returns true by default.
KafkaTemplate
现在,KafkaTemplate`支持非事务和事务发布。详见 `KafkaTemplate
Transactional and non-Transactional Publishing。
The KafkaTemplate
now supports non-transactional publishing alongside transactional.
See KafkaTemplate
Transactional and non-Transactional Publishing for more information.
AggregatingReplyingKafkaTemplate
releaseStrategy
现在是 BiConsumer
。它现在在超时后(以及当记录到达时)被调用;第二个参数在超时后调用时为 true
。
The releaseStrategy
is now a BiConsumer
.
It is now called after a timeout (as well as when records arrive); the second parameter is true
in the case of a call after a timeout.
See Aggregating Multiple Replies for more information.
Listener Container
ContainerProperties`提供一个 `authorizationExceptionRetryInterval`选项让监听器容器在 `KafkaConsumer`抛出任何 `AuthorizationException`后重试。详见其 JavaDocs 和 Using `KafkaMessageListenerContainer
以了解更多信息。
The ContainerProperties
provides an authorizationExceptionRetryInterval
option to let the listener container to retry after any AuthorizationException
is thrown by the KafkaConsumer
.
See its JavaDocs and Using KafkaMessageListenerContainer
for more information.
@KafkaListener
@KafkaListener
注释有一个新属性 splitIterables
;默认值为 true。当答复监听器返回 Iterable
时,此属性控制返回结果是作为一个单独的记录发送还是为每个元素发送一个记录。请参见 Forwarding Listener Results using @SendTo
了解更多信息。
The @KafkaListener
annotation has a new property splitIterables
; default true.
When a replying listener returns an Iterable
this property controls whether the return result is sent as a single record or a record for each element is sent.
See Forwarding Listener Results using @SendTo
for more information
现在可以使用 `BatchToRecordAdapter`配置批量监听器;这允许例如批量在一个事务中被处理,而监听器一次获取一个记录。使用默认实现,可以利用 `ConsumerRecordRecoverer`处理批量中的错误,而不用停止整个批量的处理——当使用事务时这可能很有用。详见 Transactions with Batch Listeners以了解更多信息。
Batch listeners can now be configured with a BatchToRecordAdapter
; this allows, for example, the batch to be processed in a transaction while the listener gets one record at a time.
With the default implementation, a ConsumerRecordRecoverer
can be used to handle errors within the batch, without stopping the processing of the entire batch - this might be useful when using transactions.
See Transactions with Batch Listeners for more information.
Kafka Streams
StreamsBuilderFactoryBean`接受一个新属性 `KafkaStreamsInfrastructureCustomizer
。这允许在创建流之前配置生成器和/或拓扑。详见 Spring Management以了解更多信息。
The StreamsBuilderFactoryBean
accepts a new property KafkaStreamsInfrastructureCustomizer
.
This allows configuration of the builder and/or topology before the stream is created.
See Spring Management for more information.
Changes Between 2.2 and 2.3
本节涵盖从 2.2 版到 2.3 版的变更。
This section covers the changes made from version 2.2 to version 2.3.
Tips, Tricks and Examples
新章节 Tips, Tricks and Examples已经添加。请提交 GitHub 问题和/或针对该章节中的其他条目发出请求。
A new chapter Tips, Tricks and Examples has been added. Please submit GitHub issues and/or pull requests for additional entries in that chapter.
Kafka Client Version
此版本需要 2.3.0 kafka-clients
或更高版本。
This version requires the 2.3.0 kafka-clients
or higher.
Class/Package Changes
弃用了 TopicPartitionInitialOffset
,赞成 TopicPartitionOffset
。
TopicPartitionInitialOffset
is deprecated in favor of TopicPartitionOffset
.
Configuration Changes
从 2.3.4 版开始,missingTopicsFatal
容器属性默认情况下为 false。当该属性为 true 时,如果代理停机,应用程序将无法启动。此变更影响了许多用户。鉴于 Kafka 是一个高可用性平台,我们并未预期常见使用案例中会启动没有可用代理的应用程序。
Starting with version 2.3.4, the missingTopicsFatal
container property is false by default.
When this is true, the application fails to start if the broker is down; many users were affected by this change; given that Kafka is a high-availability platform, we did not anticipate that starting an application with no active brokers would be a common use case.
Producer and Consumer Factory Changes
现在可以将 DefaultKafkaProducerFactory`配置为为每个线程创建一个生产者。您还可以在构造函数中提供 `Supplier<Serializer>`实例,作为配置类(需要无参数构造函数)或使用 `Serializer`实例构建(它们在所有生产者之间共享)的替代方案。详见 Using `DefaultKafkaProducerFactory
以了解更多信息。
The DefaultKafkaProducerFactory
can now be configured to create a producer per thread.
You can also provide Supplier<Serializer>
instances in the constructor as an alternative to either configured classes (which require no-arg constructors), or constructing with Serializer
instances, which are then shared between all Producers.
See Using DefaultKafkaProducerFactory
for more information.
相同的选项也可用于 DefaultKafkaConsumerFactory`中的 `Supplier<Deserializer>`实例。详见 Using `KafkaMessageListenerContainer
以了解更多信息。
The same option is available with Supplier<Deserializer>
instances in DefaultKafkaConsumerFactory
.
See Using KafkaMessageListenerContainer
for more information.
Listener Container Changes
以前,当使用侦听器适配器(如 @KafkaListener
s)调用侦听器时,错误处理程序会收到 ListenerExecutionFailedException
(将实际侦听器异常作为 cause
)。由本机 GenericMessageListener
s 引发的异常将原封不动地传递给错误处理程序。现在,ListenerExecutionFailedException
始终是参数(将实际侦听器异常作为 cause
),它提供对容器的 group.id
属性的访问。
Previously, error handlers received ListenerExecutionFailedException
(with the actual listener exception as the cause
) when the listener was invoked using a listener adapter (such as @KafkaListener
s).
Exceptions thrown by native GenericMessageListener
s were passed to the error handler unchanged.
Now a ListenerExecutionFailedException
is always the argument (with the actual listener exception as the cause
), which provides access to the container’s group.id
property.
由于侦听器容器有自己的机制来提交偏移量,因此它希望 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
为 false
。现在,它会自动将其设置为 false,除非在使用者工厂中明确设置它,或者在容器的使用者属性中覆盖它。
Because the listener container has it’s own mechanism for committing offsets, it prefers the Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
to be false
.
It now sets it to false automatically unless specifically set in the consumer factory or the container’s consumer property overrides.
ackOnError
属性现在默认情况下为 false
。
The ackOnError
property is now false
by default.
现在可以在监听器方法中获取消费者的 group.id
属性。请参见 Obtaining the Consumer group.id
了解更多信息。
It is now possible to obtain the consumer’s group.id
property in the listener method.
See Obtaining the Consumer group.id
for more information.
容器有一个新属性 recordInterceptor
,允许在调用监听器之前检查或修改记录。如果您需要调用多个拦截器,还可以提供一个 CompositeRecordInterceptor
。请参见 Message Listener Containers 了解更多信息。
The container has a new property recordInterceptor
allowing records to be inspected or modified before invoking the listener.
A CompositeRecordInterceptor
is also provided in case you need to invoke multiple interceptors.
See Message Listener Containers for more information.
ConsumerSeekAware
有了新方法,允许你执行相对于开始、结束或当前位置的查询,并查询到大于或等于时间戳的第一个偏移量。有关更多信息,请参阅 [seek]。
The ConsumerSeekAware
has new methods allowing you to perform seeks relative to the beginning, end, or current position and to seek to the first offset greater than or equal to a time stamp.
See [seek] for more information.
现在提供了一个便捷类 AbstractConsumerSeekAware
来简化查询。有关更多信息,请参阅 [seek]。
A convenience class AbstractConsumerSeekAware
is now provided to simplify seeking.
See [seek] for more information.
ContainerProperties`提供了一个 `idleBetweenPolls`选项,让监听器容器中的主循环可以在 `KafkaConsumer.poll()`调用之间睡眠。详见其 JavaDocs 和 Using `KafkaMessageListenerContainer
以了解更多信息。
The ContainerProperties
provides an idleBetweenPolls
option to let the main loop in the listener container to sleep between KafkaConsumer.poll()
calls.
See its JavaDocs and Using KafkaMessageListenerContainer
for more information.
现在,当使用 AckMode.MANUAL
(或 MANUAL_IMMEDIATE
)时,您可以通过在 `Acknowledgment`上调用 `nack`来导致重新交付。详见 Committing Offsets以了解更多信息。
When using AckMode.MANUAL
(or MANUAL_IMMEDIATE
) you can now cause a redelivery by calling nack
on the Acknowledgment
.
See Committing Offsets for more information.
现在可使用 Micrometer Timer
来监控侦听器性能。有关更多信息,请参阅 Monitoring。
Listener performance can now be monitored using Micrometer Timer
s.
See Monitoring for more information.
现在,容器发布与启动相关的附加使用者生命周期事件。有关更多信息,请参阅 Application Events。
The containers now publish additional consumer lifecycle events relating to startup. See Application Events for more information.
事务批量侦听器现在可支持僵尸隔离。有关更多信息,请参阅 Transactions。
Transactional batch listeners can now support zombie fencing. See Transactions for more information.
侦听器容器 factory 现在可配置 ContainerCustomizer
,以便在创建并配置每个容器后进一步配置它。有关更多信息,请参阅 Container factory。
The listener container factory can now be configured with a ContainerCustomizer
to further configure each container after it has been created and configured.
See Container factory for more information.
ErrorHandler Changes
现在,SeekToCurrentErrorHandler
将某些异常视为致命错误,为这些错误禁用重试,在首次失败时调用恢复程序。
The SeekToCurrentErrorHandler
now treats certain exceptions as fatal and disables retry for those, invoking the recoverer on first failure.
现在可以将 SeekToCurrentErrorHandler
和 SeekToCurrentBatchErrorHandler
配置为在发送尝试之间应用 BackOff
(线程睡眠)。
The SeekToCurrentErrorHandler
and SeekToCurrentBatchErrorHandler
can now be configured to apply a BackOff
(thread sleep) between delivery attempts.
从版本 2.3.2 开始,在错误处理程序在恢复失败记录后返回时,将提交恢复的记录的偏移量。
Starting with version 2.3.2, recovered records' offsets will be committed when the error handler returns after recovering a failed record.
DeadLetterPublishingRecoverer
,当与 ErrorHandlingDeserializer`结合使用时,现在将发送到失效字母主题的消息有效负载设置成无法反序列化的原始值。在此之前,它是 `null
,而用户代码需要从消息头中提取 DeserializationException
。详见 Publishing Dead-letter Records以了解更多信息。
The DeadLetterPublishingRecoverer
, when used in conjunction with an ErrorHandlingDeserializer
, now sets the payload of the message sent to the dead-letter topic, to the original value that could not be deserialized.
Previously, it was null
and user code needed to extract the DeserializationException
from the message headers.
See Publishing Dead-letter Records for more information.
TopicBuilder
提供了一个新类 TopicBuilder
,用于更方便地为自动主题置备创建`NewTopic` @Bean
。有关更多信息,请参见 [configuring-topics]。
A new class TopicBuilder
is provided for more convenient creation of NewTopic
@Bean
s for automatic topic provisioning.
See [configuring-topics] for more information.
Kafka Streams Changes
现在,您可以对 `@EnableKafkaStreams`创建的 `StreamsBuilderFactoryBean`执行其他配置。详见 Streams Configuration以了解更多信息。
You can now perform additional configuration of the StreamsBuilderFactoryBean
created by @EnableKafkaStreams
.
See Streams Configuration for more information.
现在提供了一个 RecoveringDeserializationExceptionHandler
,它允许恢复具有反序列化错误的记录。它可以与 `DeadLetterPublishingRecoverer`结合使用,以将这些记录发送到失效字母主题。详见 Recovery from Deserialization Exceptions以了解更多信息。
A RecoveringDeserializationExceptionHandler
is now provided which allows records with deserialization errors to be recovered.
It can be used in conjunction with a DeadLetterPublishingRecoverer
to send these records to a dead-letter topic.
See Recovery from Deserialization Exceptions for more information.
`HeaderEnricher`转换器已提供,它使用 SpEL 生成头值。详见 Header Enricher以了解更多信息。
The HeaderEnricher
transformer has been provided, using SpEL to generate the header values.
See Header Enricher for more information.
MessagingTransformer`已提供。这允许一个 Kafka 流拓扑与 spring-messaging 组件交互,例如 Spring 集成流。详见 `MessagingProcessor
和详见 [Calling a Spring Integration Flow from a KStream
] 以了解更多信息。
The MessagingTransformer
has been provided.
This allows a Kafka streams topology to interact with a spring-messaging component, such as a Spring Integration flow.
See MessagingProcessor
and See [Calling a Spring Integration Flow from a KStream
] for more information.
JSON Component Changes
现在,所有 JSON 感知组件均默认通过 JacksonUtils.enhancedObjectMapper()
生成的 Jackson ObjectMapper
配置。JsonDeserializer
现在提供基于 TypeReference
的构造函数,以便更好地处理目标泛型容器类型。此外,已经引入 JacksonMimeTypeModule
,以便将 org.springframework.util.MimeType
序列化为纯字符串。有关更多信息,请参阅其 JavaDoc 和 Serialization, Deserialization, and Message Conversion。
Now all the JSON-aware components are configured by default with a Jackson ObjectMapper
produced by the JacksonUtils.enhancedObjectMapper()
.
The JsonDeserializer
now provides TypeReference
-based constructors for better handling of target generic container types.
Also a JacksonMimeTypeModule
has been introduced for serialization of org.springframework.util.MimeType
to plain string.
See its JavaDocs and Serialization, Deserialization, and Message Conversion for more information.
ByteArrayJsonMessageConverter`已提供,以及一个新的所有 Json 转换器的超级类,`JsonMessageConverter
。此外,一个 StringOrBytesSerializer`现在可用;它可以在 `ProducerRecord`s 中序列化 `byte[]
、`Bytes`和 `String`值。详见 Spring Messaging Message Conversion以了解更多信息。
A ByteArrayJsonMessageConverter
has been provided as well as a new super class for all Json converters, JsonMessageConverter
.
Also, a StringOrBytesSerializer
is now available; it can serialize byte[]
, Bytes
and String
values in ProducerRecord
s.
See Spring Messaging Message Conversion for more information.
JsonSerializer
,`JsonDeserializer`和 `JsonSerde`现在具有流畅的 API,以简化编程配置。详见 javadocs、Serialization, Deserialization, and Message Conversion和 Streams JSON Serialization and Deserialization以了解更多信息。
The JsonSerializer
, JsonDeserializer
and JsonSerde
now have fluent APIs to make programmatic configuration simpler.
See the javadocs, Serialization, Deserialization, and Message Conversion, and Streams JSON Serialization and Deserialization for more informaion.
ReplyingKafkaTemplate
当回复超时时,将来会以 KafkaReplyTimeoutException
(而不是`KafkaException`)完成异常处理。
When a reply times out, the future is completed exceptionally with a KafkaReplyTimeoutException
instead of a KafkaException
.
此外,现在提供了重载 sendAndReceive
方法,它允许按消息指定回复超时。
Also, an overloaded sendAndReceive
method is now provided that allows specifying the reply timeout on a per message basis.
AggregatingReplyingKafkaTemplate
通过汇总多个接收者的答复扩展 ReplyingKafkaTemplate
。有关详细信息,请参阅 Aggregating Multiple Replies。
Extends the ReplyingKafkaTemplate
by aggregating replies from multiple receivers.
See Aggregating Multiple Replies for more information.
Transaction Changes
现在可在 KafkaTemplate
和 KafkaTransactionManager
上覆盖生产者工厂的 transactionIdPrefix
。有关详细信息,请参见 xref:kafka/transactions.adoc#transaction-id-prefix[transactionIdPrefix
。
You can now override the producer factory’s transactionIdPrefix
on the KafkaTemplate
and KafkaTransactionManager
.
See transactionIdPrefix
for more information.
New Delegating Serializer/Deserializer
框架现提供委托的 Serializer`和 `Deserializer
,利用标头支持生成和使用具有多种键/值类型的记录。有关详细信息,请参阅 Delegating Serializer and Deserializer。
The framework now provides a delegating Serializer
and Deserializer
, utilizing a header to enable producing and consuming records with multiple key/value types.
See Delegating Serializer and Deserializer for more information.
New Retrying Deserializer
框架现提供委托的 RetryingDeserializer
,以便在出现瞬时错误(如网络问题)时重试序列化。有关详细信息,请参阅 Retrying Deserializer。
The framework now provides a delegating RetryingDeserializer
, to retry serialization when transient errors such as network problems might occur.
See Retrying Deserializer for more information.
Changes Between 2.1 and 2.2
Kafka Client Version
此版本需要 2.0.0 kafka-clients
或更高版本。
This version requires the 2.0.0 kafka-clients
or higher.
Class and Package Changes
ContainerProperties
类已从 org.springframework.kafka.listener.config
移至 org.springframework.kafka.listener
。
The ContainerProperties
class has been moved from org.springframework.kafka.listener.config
to org.springframework.kafka.listener
.
AckMode
枚举已从 AbstractMessageListenerContainer
移至 ContainerProperties
。
The AckMode
enum has been moved from AbstractMessageListenerContainer
to ContainerProperties
.
setBatchErrorHandler()
和 setErrorHandler()
方法已从 ContainerProperties
移至 AbstractMessageListenerContainer
和 AbstractKafkaListenerContainerFactory
。
The setBatchErrorHandler()
and setErrorHandler()
methods have been moved from ContainerProperties
to both AbstractMessageListenerContainer
and AbstractKafkaListenerContainerFactory
.
After Rollback Processing
提供了一个新的 `AfterRollbackProcessor`策略。有关详细信息,请参阅 After-rollback Processor。
A new AfterRollbackProcessor
strategy is provided.
See After-rollback Processor for more information.
ConcurrentKafkaListenerContainerFactory
Changes
您现在可以使用 ConcurrentKafkaListenerContainerFactory
来创建和配置任何 ConcurrentMessageListenerContainer
,而不仅是用于 @KafkaListener
批注的 ConcurrentMessageListenerContainer
。有关更多信息,请参见 Container factory。
You can now use the ConcurrentKafkaListenerContainerFactory
to create and configure any ConcurrentMessageListenerContainer
, not only those for @KafkaListener
annotations.
See Container factory for more information.
Listener Container Changes
已添加新的容器属性 (missingTopicsFatal
)。有关详细信息,请参见 xref:kafka/receiving-messages/message-listener-container.adoc#kafka-container[Using KafkaMessageListenerContainer
。
A new container property (missingTopicsFatal
) has been added.
See Using KafkaMessageListenerContainer
for more information.
消费者停止时会触发 ConsumerStoppedEvent
。有关详细信息,请参阅 Thread Safety。
A ConsumerStoppedEvent
is now emitted when a consumer stops.
See Thread Safety for more information.
批量使用者可以选择接收完整的 ConsumerRecords<?, ?>
对象,而不是 List<ConsumerRecord<?, ?>
。有关更多信息,请参阅 [batch-listeners]。
Batch listeners can optionally receive the complete ConsumerRecords<?, ?>
object instead of a List<ConsumerRecord<?, ?>
.
See [batch-listeners] for more information.
DefaultAfterRollbackProcessor
和 SeekToCurrentErrorHandler
现在可以恢复(跳过)不断失败的记录,并且默认情况下在 10 次失败后这样做。可以将其配置为将失败的记录发布到死信主题。
The DefaultAfterRollbackProcessor
and SeekToCurrentErrorHandler
can now recover (skip) records that keep failing, and, by default, does so after 10 failures.
They can be configured to publish failed records to a dead-letter topic.
从版本 2.2.4 开始,消费者的组 ID 可用于选择死信主题名称。
Starting with version 2.2.4, the consumer’s group ID can be used while selecting the dead letter topic name.
已添加 ConsumerStoppingEvent
。有关详细信息,请参阅 Application Events。
The ConsumerStoppingEvent
has been added.
See Application Events for more information.
现在可以将 SeekToCurrentErrorHandler
配置为在使用 AckMode.MANUAL_IMMEDIATE
配置容器时提交已恢复记录的偏移量(自 2.2.4 起)。
The SeekToCurrentErrorHandler
can now be configured to commit the offset of a recovered record when the container is configured with AckMode.MANUAL_IMMEDIATE
(since 2.2.4).
@KafkaListener Changes
现在,可以通过在注释中设置属性来覆盖侦听器容器工厂的 concurrency
和 autoStartup
属性。现在,您可以添加配置来确定将哪些标头(如果有)复制到答复消息。有关详细信息,请参阅 @KafkaListener
Annotation。
You can now override the concurrency
and autoStartup
properties of the listener container factory by setting properties on the annotation.
You can now add configuration to determine which headers (if any) are copied to a reply message.
See @KafkaListener
Annotation for more information.
现在,您可以将 @KafkaListener
用作对自己的注释的元注释。有关详细信息,请参阅 @KafkaListener
as a Meta Annotation。
You can now use @KafkaListener
as a meta-annotation on your own annotations.
See @KafkaListener
as a Meta Annotation for more information.
现在,可以更轻松地为 @Payload
验证配置 Validator
。有关详细信息,请参阅 @KafkaListener
@Payload
Validation。
It is now easier to configure a Validator
for @Payload
validation.
See @KafkaListener
@Payload
Validation for more information.
您现在可以在注释上直接指定 kafka 消费者属性;这些属性将覆盖使用消费者工厂(自 2.2.4 版以来)定义的具有相同名称的所有属性。有关详细信息,请参阅 Annotation Properties。
You can now specify kafka consumer properties directly on the annotation; these will override any properties with the same name defined in the consumer factory (since version 2.2.4). See Annotation Properties for more information.
Header Mapping Changes
MimeType
和 MediaType
类型的标题现在在 RecordHeader
值中映射为简单的字符串。以前,它们被映射为 JSON,并且只有 MimeType
被解码。MediaType
无法被解码,现在它们为简单的字符串,以便于互操作。
Headers of type MimeType
and MediaType
are now mapped as simple strings in the RecordHeader
value.
Previously, they were mapped as JSON and only MimeType
was decoded.
MediaType
could not be decoded.
They are now simple strings for interoperability.
此外,DefaultKafkaHeaderMapper
有一个新的 addToStringClasses
方法,允许指定应该使用 toString()
而不是 JSON 映射的类型。有关详细信息,请参阅 Message Headers。
Also, the DefaultKafkaHeaderMapper
has a new addToStringClasses
method, allowing the specification of types that should be mapped by using toString()
instead of JSON.
See Message Headers for more information.
Embedded Kafka Changes
KafkaEmbedded
类及其 KafkaRule
接口已弃用,取而代之的是 EmbeddedKafkaBroker
及其 JUnit 4 EmbeddedKafkaRule
包装器。@EmbeddedKafka
注释现在填充 EmbeddedKafkaBroker
bean,而不是弃用的 KafkaEmbedded
。此更改允许在 JUnit 5 测试中使用 @EmbeddedKafka
。@EmbeddedKafka
注释现在具有属性 ports
,用于指定填充 EmbeddedKafkaBroker
的端口。有关详细信息,请参阅 Testing Applications。
The KafkaEmbedded
class and its KafkaRule
interface have been deprecated in favor of the EmbeddedKafkaBroker
and its JUnit 4 EmbeddedKafkaRule
wrapper.
The @EmbeddedKafka
annotation now populates an EmbeddedKafkaBroker
bean instead of the deprecated KafkaEmbedded
.
This change allows the use of @EmbeddedKafka
in JUnit 5 tests.
The @EmbeddedKafka
annotation now has the attribute ports
to specify the port that populates the EmbeddedKafkaBroker
.
See Testing Applications for more information.
JsonSerializer/Deserializer Enhancements
您现在可以通过使用生产者和消费者属性来提供类型映射信息。
You can now provide type mapping information by using producer and consumer properties.
反序列化器上提供了新构造函数,允许使用提供的目标类型覆盖类型标题信息。
New constructors are available on the deserializer to allow overriding the type header information with the supplied target type.
JsonDeserializer
现在默认删除所有类型信息头。
The JsonDeserializer
now removes any type information headers by default.
你现可以通过使用一个 Kafka 属性(自 2.2.3 起)来配置 JsonDeserializer
忽略类型信息头。
You can now configure the JsonDeserializer
to ignore type information headers by using a Kafka property (since 2.2.3).
See Serialization, Deserialization, and Message Conversion for more information.
Kafka Streams Changes
流配置 bean 现在必须是一个 KafkaStreamsConfiguration
对象,而不是 StreamsConfig
对象。
The streams configuration bean must now be a KafkaStreamsConfiguration
object instead of a StreamsConfig
object.
StreamsBuilderFactoryBean
已从程序包 …core
移至 …config
。
The StreamsBuilderFactoryBean
has been moved from package …core
to …config
.
引入了 KafkaStreamBrancher
,以在条件分支建立在 KStream
实例之上时提高终端用户体验。
The KafkaStreamBrancher
has been introduced for better end-user experience when conditional branches are built on top of KStream
instance.
有关详细信息,请参阅 Apache Kafka Streams Support和 Configuration。
See Apache Kafka Streams Support and Configuration for more information.
Transactional ID
侦听器容器启动事务时,transactional.id
现在是追加了 <group.id>.<topic>.<partition>
的 transactionIdPrefix
。此更改允许对僵尸进行适当隔离, as described here。
When a transaction is started by the listener container, the transactional.id
is now the transactionIdPrefix
appended with <group.id>.<topic>.<partition>
.
This change allows proper fencing of zombies, as described here.
Changes Between 2.0 and 2.1
Kafka Client Version
此版本需要 1.0.0 kafka-clients
或更高版本。
This version requires the 1.0.0 kafka-clients
or higher.
1.1.x 客户端在版本 2.2 中得到本机支持。
The 1.1.x client is supported natively in version 2.2.
JSON Improvements
StringJsonMessageConverter
和 JsonSerializer
现在会在 Headers
中添加类型信息,让转换器和 JsonDeserializer
根据消息本身而不是固定配置的类型在接收时创建特定类型。有关详细信息,请参阅 Serialization, Deserialization, and Message Conversion。
The StringJsonMessageConverter
and JsonSerializer
now add type information in Headers
, letting the converter and JsonDeserializer
create specific types on reception, based on the message itself rather than a fixed configured type.
See Serialization, Deserialization, and Message Conversion for more information.
Container Stopping Error Handlers
现在为记录和批次侦听器提供了容器错误处理程序,这些错误处理程序将侦听器抛出的任何异常视为致命异常/它们会停止容器。有关详细信息,请参阅 Handling Exceptions。
Container error handlers are now provided for both record and batch listeners that treat any exceptions thrown by the listener as fatal/ They stop the container. See Handling Exceptions for more information.
Pausing and Resuming Containers
侦听器容器现在具有 pause()
和 resume()
方法(自版本 2.1.3 起)。有关详细信息,请参阅 Pausing and Resuming Listener Containers。
The listener containers now have pause()
and resume()
methods (since version 2.1.3).
See Pausing and Resuming Listener Containers for more information.
Stateful Retry
从 2.1.3 版开始,您可以配置有状态重试。有关详细信息,请参阅 Stateful Retry。
Starting with version 2.1.3, you can configure stateful retry. See Stateful Retry for more information.
Client ID
从版本 2.1.1 开始,你可以在 @KafkaListener
中设置 client.id
前缀。以前,要自定义客户端 ID,你需要每个监听器一个独立的消费者工厂(及容器工厂)。此前缀带有 -n
后缀,以便在使用并发时提供唯一的客户端 ID。
Starting with version 2.1.1, you can now set the client.id
prefix on @KafkaListener
.
Previously, to customize the client ID, you needed a separate consumer factory (and container factory) per listener.
The prefix is suffixed with -n
to provide unique client IDs when you use concurrency.
Logging Offset Commits
默认情况下,使用 DEBUG
记录级别执行主题偏移提交的记录。从版本 2.1.2 开始,ContainerProperties
中一个名为 commitLogLevel
的新属性可让你指定这些消息的记录级别。有关详细信息,请参见 xref:kafka/receiving-messages/message-listener-container.adoc#kafka-container[Using KafkaMessageListenerContainer
。
By default, logging of topic offset commits is performed with the DEBUG
logging level.
Starting with version 2.1.2, a new property in ContainerProperties
called commitLogLevel
lets you specify the log level for these messages.
See Using KafkaMessageListenerContainer
for more information.
Default @KafkaHandler
从版本 2.1.3 开始,您可以将类级别的 @KafkaListener
上的某个 @KafkaHandler
注释指定为默认值。有关详细信息,请参阅 @KafkaListener
on a Class。
Starting with version 2.1.3, you can designate one of the @KafkaHandler
annotations on a class-level @KafkaListener
as the default.
See @KafkaListener
on a Class for more information.
ReplyingKafkaTemplate
从版本 2.1.3 开始,提供 KafkaTemplate
的子类,以支持请求/回答语义。有关详细信息,请参见 xref:kafka/sending-messages.adoc#replying-template[Using ReplyingKafkaTemplate
。
Starting with version 2.1.3, a subclass of KafkaTemplate
is provided to support request/reply semantics.
See Using ReplyingKafkaTemplate
for more information.
Changes Between 1.3 and 2.0
Spring Framework and Java Versions
Apache Kafka 的 Spring 项目现在需要 Spring Framework 5.0 和 Java 8。
The Spring for Apache Kafka project now requires Spring Framework 5.0 and Java 8.
@KafkaListener
Changes
你现在可以用 @KafkaListener
方法进行注释(同时还有类和 @KafkaHandler
方法)使用 @SendTo
。如果该方法返回结果,它将被转发到指定主题。更多信息请参阅 Forwarding Listener Results using @SendTo
。
You can now annotate @KafkaListener
methods (and classes and @KafkaHandler
methods) with @SendTo
.
If the method returns a result, it is forwarded to the specified topic.
See Forwarding Listener Results using @SendTo
for more information.
Message Listeners
消息侦听器现在可以感知 Consumer
对象。请参阅 [message-listeners] 了解更多信息。
Message listeners can now be aware of the Consumer
object.
See [message-listeners] for more information.
Using ConsumerAwareRebalanceListener
重新平衡侦听器现在可以在重新平衡通知期间访问“ Consumer
”对象。更多信息请参阅 Rebalancing Listeners 。
Rebalance listeners can now access the Consumer
object during rebalance notifications.
See Rebalancing Listeners for more information.
Changes Between 1.2 and 1.3
Support for Transactions
0.11.0.0 客户端库增加了对事务的支持。已经添加了 KafkaTransactionManager
及其他对事务的支持。更多信息请参阅 Transactions 。
The 0.11.0.0 client library added support for transactions.
The KafkaTransactionManager
and other support for transactions have been added.
See Transactions for more information.
Support for Headers
0.11.0.0 客户端库增加了对消息标头的支持。这些标头现在可以映射到 spring-messaging
MessageHeaders
。更多信息请参阅 Message Headers 。
The 0.11.0.0 client library added support for message headers.
These can now be mapped to and from spring-messaging
MessageHeaders
.
See Message Headers for more information.
Creating Topics
0.11.0.0 客户端库提供了一个 AdminClient
,您可以用它来创建主题。KafkaAdmin
使用此客户端自动添加定义为 @Bean
实例的主题。
The 0.11.0.0 client library provides an AdminClient
, which you can use to create topics.
The KafkaAdmin
uses this client to automatically add topics defined as @Bean
instances.
Support for Kafka Timestamps
KafkaTemplate`现在支持一个用于添加带时间戳的记录的 API。关于 `timestamp`支持,已引入新的 `KafkaHeaders
。此外,还添加了新的 KafkaConditions.timestamp()`和 `KafkaMatchers.hasTimestamp()`测试实用程序。有关详细信息,请参阅 Using `KafkaTemplate
、@KafkaListener
Annotation和 Testing Applications。
KafkaTemplate
now supports an API to add records with timestamps.
New KafkaHeaders
have been introduced regarding timestamp
support.
Also, new KafkaConditions.timestamp()
and KafkaMatchers.hasTimestamp()
testing utilities have been added.
See Using KafkaTemplate
, @KafkaListener
Annotation, and Testing Applications for more details.
@KafkaListener
Changes
你现在可以配置“ KafkaListenerErrorHandler
”来处理异常。更多信息请参阅 Handling Exceptions 。
You can now configure a KafkaListenerErrorHandler
to handle exceptions.
See Handling Exceptions for more information.
默认情况下,@KafkaListener
id
属性现在用作 group.id
属性,覆盖消费者工厂中配置的属性(如果存在)。此外,您可以在注释中显式配置 groupId
。以前,您需要一个单独的容器工厂(和消费者工厂)才能为侦听器使用不同的 group.id
值。要恢复使用工厂配置的 group.id
的先前行为,请将注释中的 idIsGroup
属性设置为 false
。
By default, the @KafkaListener
id
property is now used as the group.id
property, overriding the property configured in the consumer factory (if present).
Further, you can explicitly configure the groupId
on the annotation.
Previously, you would have needed a separate container factory (and consumer factory) to use different group.id
values for listeners.
To restore the previous behavior of using the factory configured group.id
, set the idIsGroup
property on the annotation to false
.
@EmbeddedKafka
Annotation
为了方便起见,提供了一个测试类级别的 @EmbeddedKafka
注释,将 KafkaEmbedded
注册为 Bean。更多信息请参阅 Testing Applications 。
For convenience, a test class-level @EmbeddedKafka
annotation is provided, to register KafkaEmbedded
as a bean.
See Testing Applications for more information.
Kerberos Configuration
现在提供对配置 Kerberos 的支持。更多信息请参阅 JAAS and Kerberos 。
Support for configuring Kerberos is now provided. See JAAS and Kerberos for more information.
Changes Between 1.0 and 1.1
Batch Listeners
可以将侦听器配置为接收 consumer.poll()
操作返回的整个消息块,而不是一次接收一条消息。
Listeners can be configured to receive the entire batch of messages returned by the consumer.poll()
operation, rather than one at a time.
Null Payloads
当您使用日志压缩时,空有效负载用于“删除”键。
Null payloads are used to “delete” keys when you use log compaction.
Initial Offset
在显式分配分区时,您现在可以相对于当前位置配置消费者组的初始偏移量,而不是绝对偏移量或相对于当前结尾的偏移量。
When explicitly assigning partitions, you can now configure the initial offset relative to the current position for the consumer group, rather than absolute or relative to the current end.
Seek
您现在可以查找每个主题或分区的偏移量。当使用组管理且 Kafka 分配分区时,您可以在初始化期间使用此功能来设置初始位置。您还可以在检测到空闲容器时或应用程序执行中的任意点进行查找。请参阅 [seek] 了解更多信息。
You can now seek the position of each topic or partition. You can use this to set the initial position during initialization when group management is in use and Kafka assigns the partitions. You can also seek when an idle container is detected or at any arbitrary point in your application’s execution. See [seek] for more information.