Connecting to Kafka

从 2.5 版本开始,每个版本都扩展了 KafkaResourceFactory。这允许在运行时通过向其配置添加 Supplier<String> 来更改引导服务器: setBootstrapServersSupplier(() → …​)。这将针对所有新连接调用以获取服务器列表。使用者和生产者通常是长期存在的。若要关闭现有的生产者,请在 DefaultKafkaProducerFactory 上调用 reset()。若要关闭现有的使用者,请在 KafkaListenerEndpointRegistry 上调用 stop()(然后调用 start()),并/或在其他侦听器容器 bean 上调用 stop()start()

Starting with version 2.5, each of these extends KafkaResourceFactory. This allows changing the bootstrap servers at runtime by adding a Supplier<String> to their configuration: setBootstrapServersSupplier(() → …​). This will be called for all new connections to get the list of servers. Consumers and Producers are generally long-lived. To close existing Producers, call reset() on the DefaultKafkaProducerFactory. To close existing Consumers, call stop() (and then start()) on the KafkaListenerEndpointRegistry and/or stop() and start() on any other listener container beans.

为方便起见,该框架还提供了一个 ABSwitchCluster,它支持两组引导服务器;其中一组在任何时间点都是处于活动状态。配置 ABSwitchCluster 并将其添加到生产者和使用者工厂,以及 KafkaAdmin,通过调用 setBootstrapServersSupplier()。当您想要切换时,请调用 primary()secondary(),并在生产者工厂上调用 reset() 以建立新连接;对于使用者,请 stop()start() 所有侦听器容器。当使用 @KafkaListener 时,请 stop()start() KafkaListenerEndpointRegistry bean。

For convenience, the framework also provides an ABSwitchCluster which supports two sets of bootstrap servers; one of which is active at any time. Configure the ABSwitchCluster and add it to the producer and consumer factories, and the KafkaAdmin, by calling setBootstrapServersSupplier(). When you want to switch, call primary() or secondary() and call reset() on the producer factory to establish new connection(s); for consumers, stop() and start() all listener containers. When using @KafkaListener`s, `stop() and start() the KafkaListenerEndpointRegistry bean.

有关更多信息,请参见 Javadoc。

See the Javadocs for more information.

Factory Listeners

从 2.5 版本开始,可以为 DefaultKafkaProducerFactoryDefaultKafkaConsumerFactory 配置一个 Listener,以便在创建或关闭生产者或使用者时接收通知。

Starting with version 2.5, the DefaultKafkaProducerFactory and DefaultKafkaConsumerFactory can be configured with a Listener to receive notifications whenever a producer or consumer is created or closed.

Producer Factory Listener
interface Listener<K, V> {

    default void producerAdded(String id, Producer<K, V> producer) {
    }

    default void producerRemoved(String id, Producer<K, V> producer) {
    }

}
Consumer Factory Listener
interface Listener<K, V> {

    default void consumerAdded(String id, Consumer<K, V> consumer) {
    }

    default void consumerRemoved(String id, Consumer<K, V> consumer) {
    }

}

在每种情况下,id 都通过将 client-id 属性(从创建之后的 metrics() 获得)追加到工厂 beanName 属性(用“.”分隔)来创建。

In each case, the id is created by appending the client-id property (obtained from the metrics() after creation) to the factory beanName property, separated by ..

例如,这些侦听器可用于在创建新客户端时创建一个 Micrometer KafkaClientMetrics 实例并绑定该实例(并在客户端关闭时关闭该实例)。

These listeners can be used, for example, to create and bind a Micrometer KafkaClientMetrics instance when a new client is created (and close it when the client is closed).

框架提供确切做到此的侦听器;请参见 Micrometer Native Metrics

The framework provides listeners that do exactly that; see Micrometer Native Metrics.

Default client ID prefixes

从 3.2 版本开始,对于使用 spring.application.name 属性定义应用程序名称的 Spring Boot 应用程序,此名称现在用作以下客户端类型的自动生成客户端 ID 的默认前缀:

Starting with version 3.2, for Spring Boot applications which define an application name using the spring.application.name property, this name is now used as a default prefix for auto-generated client IDs for these client types:

  • consumer clients which don’t use a consumer group

  • producer clients

  • admin clients

这使得在服务器端更轻松地识别这些客户端,以便进行故障排除或应用配额。

This makes it easier to identify these clients at server side for troubleshooting or applying quotas.

Table 1. Example client ids resulting for a Spring Boot application with spring.application.name=myapp
Client Type Without application name With application name

consumer without consumer group

consumer-null-1

myapp-consumer-1

consumer with consumer group "mygroup"

consumer-mygroup-1

consumer-mygroup-1

producer

producer-1

myapp-producer-1

admin

adminclient-1

myapp-admin-1