Configuring Topics
如果在应用程序上下文中定义了 KafkaAdmin
bean,则它可以自动向代理添加主题。为此,可以为每个主题向应用程序上下文添加一个 NewTopic
@Bean
。版本 2.3 引入了一个新类 TopicBuilder
,以使创建此类豆更方便。以下示例演示如何操作:
-
Java
-
Kotlin
Unresolved include directive in modules/ROOT/pages/kafka/configuring-topics.adoc - include::example$java-examples/org/springframework/kafka/jdocs/topics/Config.java[]
Unresolved include directive in modules/ROOT/pages/kafka/configuring-topics.adoc - include::example$kotlin-examples/org/springframework/kafka/kdocs/topics/Config.kt[]
从版本 2.6 开始,你可以省去 partitions()
和/或 replicas()
,而代理默认值将应用于这些属性。代理版本必须至少为 2.4.0 才能支持此功能 - 请参阅 KIP-464。
-
Java
-
Kotlin
Unresolved include directive in modules/ROOT/pages/kafka/configuring-topics.adoc - include::example$java-examples/org/springframework/kafka/jdocs/topics/Config.java[]
Unresolved include directive in modules/ROOT/pages/kafka/configuring-topics.adoc - include::example$kotlin-examples/org/springframework/kafka/kdocs/topics/Config.kt[]
从版本 2.7 开始,可以在单个 KafkaAdmin.NewTopics
bean 定义中声明多个 NewTopic
:
-
Java
-
Kotlin
Unresolved include directive in modules/ROOT/pages/kafka/configuring-topics.adoc - include::example$java-examples/org/springframework/kafka/jdocs/topics/Config.java[]
Unresolved include directive in modules/ROOT/pages/kafka/configuring-topics.adoc - include::example$kotlin-examples/org/springframework/kafka/kdocs/topics/Config.kt[]
使用 Spring Boot 时,会自动注册 KafkaAdmin
Bean,因此您只需要 NewTopic
(和/或 NewTopics
)@Bean
。
默认情况下,如果代理不可用,则会记录一条消息,但上下文将继续加载。可以以编程方式调用管理员的 initialize()
方法稍后再试。如果希望将此条件视为致命条件,请将管理员的 fatalIfBrokerNotAvailable
属性设置为 true
。然后,上下文无法初始化。
如果代理支持(1.0.0 或更高版本),如果发现现有主题的分区少于 |
从版本 2.7 开始,KafkaAdmin
提供了在运行时创建和检查主题的方法。
-
createOrModifyTopics
-
describeTopics
对于更高级的功能,可以使用 AdminClient
直接操作。以下示例演示如何操作:
@Autowired
private KafkaAdmin admin;
...
AdminClient client = AdminClient.create(admin.getConfigurationProperties());
...
client.close();
从版本 2.9.10、3.0.9 开始,可以提供一个 Predicate<NewTopic>
,该谓词可用于确定是否应考虑创建或修改特定的 NewTopic
bean。这很有用,例如,如果你有多个 KafkaAdmin
实例指向不同的群集,并且希望选择每个管理员应创建或修改的主题。
admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreateThisOne"));