Configuring Topics
如果在应用程序上下文中定义了 KafkaAdmin
bean,则它可以自动向代理添加主题。为此,可以为每个主题向应用程序上下文添加一个 NewTopic
@Bean
。版本 2.3 引入了一个新类 TopicBuilder
,以使创建此类豆更方便。以下示例演示如何操作:
If you define a KafkaAdmin
bean in your application context, it can automatically add topics to the broker.
To do so, you can add a NewTopic
@Bean
for each topic to the application context.
Version 2.3 introduced a new class TopicBuilder
to make creation of such beans more convenient.
The following example shows how to do so:
-
Java
-
Kotlin
Unresolved include directive in modules/ROOT/pages/kafka/configuring-topics.adoc - include::example$java-examples/org/springframework/kafka/jdocs/topics/Config.java[]
Unresolved include directive in modules/ROOT/pages/kafka/configuring-topics.adoc - include::example$kotlin-examples/org/springframework/kafka/kdocs/topics/Config.kt[]
从版本 2.6 开始,你可以省去 partitions()
和/或 replicas()
,而代理默认值将应用于这些属性。代理版本必须至少为 2.4.0 才能支持此功能 - 请参阅 KIP-464。
Starting with version 2.6, you can omit partitions()
and/or replicas()
and the broker defaults will be applied to those properties.
The broker version must be at least 2.4.0 to support this feature - see KIP-464.
-
Java
-
Kotlin
Unresolved include directive in modules/ROOT/pages/kafka/configuring-topics.adoc - include::example$java-examples/org/springframework/kafka/jdocs/topics/Config.java[]
Unresolved include directive in modules/ROOT/pages/kafka/configuring-topics.adoc - include::example$kotlin-examples/org/springframework/kafka/kdocs/topics/Config.kt[]
从版本 2.7 开始,可以在单个 KafkaAdmin.NewTopics
bean 定义中声明多个 NewTopic
:
Starting with version 2.7, you can declare multiple NewTopic`s in a single `KafkaAdmin.NewTopics
bean definition:
-
Java
-
Kotlin
Unresolved include directive in modules/ROOT/pages/kafka/configuring-topics.adoc - include::example$java-examples/org/springframework/kafka/jdocs/topics/Config.java[]
Unresolved include directive in modules/ROOT/pages/kafka/configuring-topics.adoc - include::example$kotlin-examples/org/springframework/kafka/kdocs/topics/Config.kt[]
使用 Spring Boot 时,会自动注册 KafkaAdmin
Bean,因此您只需要 NewTopic
(和/或 NewTopics
)@Bean
。
When using Spring Boot, a KafkaAdmin
bean is automatically registered so you only need the NewTopic
(and/or NewTopics
) `@Bean`s.
默认情况下,如果代理不可用,则会记录一条消息,但上下文将继续加载。可以以编程方式调用管理员的 initialize()
方法稍后再试。如果希望将此条件视为致命条件,请将管理员的 fatalIfBrokerNotAvailable
属性设置为 true
。然后,上下文无法初始化。
By default, if the broker is not available, a message is logged, but the context continues to load.
You can programmatically invoke the admin’s initialize()
method to try again later.
If you wish this condition to be considered fatal, set the admin’s fatalIfBrokerNotAvailable
property to true
.
The context then fails to initialize.
如果代理支持(1.0.0 或更高版本),如果发现现有主题的分区少于 |
If the broker supports it (1.0.0 or higher), the admin increases the number of partitions if it is found that an existing topic has fewer partitions than the |
从版本 2.7 开始,KafkaAdmin
提供了在运行时创建和检查主题的方法。
Starting with version 2.7, the KafkaAdmin
provides methods to create and examine topics at runtime.
-
createOrModifyTopics
-
describeTopics
对于更高级的功能,可以使用 AdminClient
直接操作。以下示例演示如何操作:
For more advanced features, you can use the AdminClient
directly.
The following example shows how to do so:
@Autowired
private KafkaAdmin admin;
...
AdminClient client = AdminClient.create(admin.getConfigurationProperties());
...
client.close();
从版本 2.9.10、3.0.9 开始,可以提供一个 Predicate<NewTopic>
,该谓词可用于确定是否应考虑创建或修改特定的 NewTopic
bean。这很有用,例如,如果你有多个 KafkaAdmin
实例指向不同的群集,并且希望选择每个管理员应创建或修改的主题。
Starting with versions 2.9.10, 3.0.9, you can provide a Predicate<NewTopic>
which can be used to determine whether a particular NewTopic
bean should be considered for creation or modification.
This is useful, for example, if you have multiple KafkaAdmin
instances pointing to different clusters and you wish to select those topics that should be created or modified by each admin.
admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreateThisOne"));