Connecting to Kafka
-
KafkaAdmin
- see Configuring Topics -
ProducerFactory
- see Sending Messages -
ConsumerFactory
- see Receiving Messages
从 2.5 版本开始,每个版本都扩展了 KafkaResourceFactory
。这允许在运行时通过向其配置添加 Supplier<String>
来更改引导服务器: setBootstrapServersSupplier(() → …)
。这将针对所有新连接调用以获取服务器列表。使用者和生产者通常是长期存在的。若要关闭现有的生产者,请在 DefaultKafkaProducerFactory
上调用 reset()
。若要关闭现有的使用者,请在 KafkaListenerEndpointRegistry
上调用 stop()
(然后调用 start()
),并/或在其他侦听器容器 bean 上调用 stop()
和 start()
。
Starting with version 2.5, each of these extends KafkaResourceFactory
.
This allows changing the bootstrap servers at runtime by adding a Supplier<String>
to their configuration: setBootstrapServersSupplier(() → …)
.
This will be called for all new connections to get the list of servers.
Consumers and Producers are generally long-lived.
To close existing Producers, call reset()
on the DefaultKafkaProducerFactory
.
To close existing Consumers, call stop()
(and then start()
) on the KafkaListenerEndpointRegistry
and/or stop()
and start()
on any other listener container beans.
为方便起见,该框架还提供了一个 ABSwitchCluster
,它支持两组引导服务器;其中一组在任何时间点都是处于活动状态。配置 ABSwitchCluster
并将其添加到生产者和使用者工厂,以及 KafkaAdmin
,通过调用 setBootstrapServersSupplier()
。当您想要切换时,请调用 primary()
或 secondary()
,并在生产者工厂上调用 reset()
以建立新连接;对于使用者,请 stop()
并 start()
所有侦听器容器。当使用 @KafkaListener
时,请 stop()
并 start()
KafkaListenerEndpointRegistry
bean。
For convenience, the framework also provides an ABSwitchCluster
which supports two sets of bootstrap servers; one of which is active at any time.
Configure the ABSwitchCluster
and add it to the producer and consumer factories, and the KafkaAdmin
, by calling setBootstrapServersSupplier()
.
When you want to switch, call primary()
or secondary()
and call reset()
on the producer factory to establish new connection(s); for consumers, stop()
and start()
all listener containers.
When using @KafkaListener`s, `stop()
and start()
the KafkaListenerEndpointRegistry
bean.
有关更多信息,请参见 Javadoc。
See the Javadocs for more information.
Factory Listeners
从 2.5 版本开始,可以为 DefaultKafkaProducerFactory
和 DefaultKafkaConsumerFactory
配置一个 Listener
,以便在创建或关闭生产者或使用者时接收通知。
Starting with version 2.5, the DefaultKafkaProducerFactory
and DefaultKafkaConsumerFactory
can be configured with a Listener
to receive notifications whenever a producer or consumer is created or closed.
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
在每种情况下,id
都通过将 client-id
属性(从创建之后的 metrics()
获得)追加到工厂 beanName
属性(用“.”分隔)来创建。
In each case, the id
is created by appending the client-id
property (obtained from the metrics()
after creation) to the factory beanName
property, separated by .
.
例如,这些侦听器可用于在创建新客户端时创建一个 Micrometer KafkaClientMetrics
实例并绑定该实例(并在客户端关闭时关闭该实例)。
These listeners can be used, for example, to create and bind a Micrometer KafkaClientMetrics
instance when a new client is created (and close it when the client is closed).
框架提供确切做到此的侦听器;请参见 Micrometer Native Metrics。
The framework provides listeners that do exactly that; see Micrometer Native Metrics.
Default client ID prefixes
从 3.2 版本开始,对于使用 spring.application.name
属性定义应用程序名称的 Spring Boot 应用程序,此名称现在用作以下客户端类型的自动生成客户端 ID 的默认前缀:
Starting with version 3.2, for Spring Boot applications which define an application name using the spring.application.name
property, this name is now used
as a default prefix for auto-generated client IDs for these client types:
-
consumer clients which don’t use a consumer group
-
producer clients
-
admin clients
这使得在服务器端更轻松地识别这些客户端,以便进行故障排除或应用配额。
This makes it easier to identify these clients at server side for troubleshooting or applying quotas.
Client Type | Without application name | With application name |
---|---|---|
consumer without consumer group |
consumer-null-1 |
myapp-consumer-1 |
consumer with consumer group "mygroup" |
consumer-mygroup-1 |
consumer-mygroup-1 |
producer |
producer-1 |
myapp-producer-1 |
admin |
adminclient-1 |
myapp-admin-1 |