Configuring Topics

如果在应用程序上下文中定义了 KafkaAdmin bean,则它可以自动向代理添加主题。为此,可以为每个主题向应用程序上下文添加一个 NewTopic @Bean。版本 2.3 引入了一个新类 TopicBuilder,以使创建此类豆更方便。以下示例演示如何操作:

  • Java

  • Kotlin

link:{java-examples}/topics/Config.java[role=include]
link:{kotlin-examples}/topics/Config.kt[role=include]

从版本 2.6 开始,你可以省去 partitions() 和/或 replicas(),而代理默认值将应用于这些属性。代理版本必须至少为 2.4.0 才能支持此功能 - 请参阅 KIP-464

  • Java

  • Kotlin

link:{java-examples}/topics/Config.java[role=include]
link:{kotlin-examples}/topics/Config.kt[role=include]

从版本 2.7 开始,可以在单个 KafkaAdmin.NewTopics bean 定义中声明多个 NewTopic

  • Java

  • Kotlin

link:{java-examples}/topics/Config.java[role=include]
link:{kotlin-examples}/topics/Config.kt[role=include]

使用 Spring Boot 时,会自动注册 KafkaAdmin Bean,因此您只需要 NewTopic(和/或 NewTopics@Bean

默认情况下,如果代理不可用,则会记录一条消息,但上下文将继续加载。可以以编程方式调用管理员的 initialize() 方法稍后再试。如果希望将此条件视为致命条件,请将管理员的 fatalIfBrokerNotAvailable 属性设置为 true。然后,上下文无法初始化。

如果代理支持(1.0.0 或更高版本),如果发现现有主题的分区少于 NewTopic.numPartitions,管理端会增加分区数量。

从版本 2.7 开始,KafkaAdmin 提供了在运行时创建和检查主题的方法。

  • createOrModifyTopics

  • describeTopics

对于更高级的功能,可以使用 AdminClient 直接操作。以下示例演示如何操作:

@Autowired
private KafkaAdmin admin;

...

    AdminClient client = AdminClient.create(admin.getConfigurationProperties());
    ...
    client.close();

从版本 2.9.10、3.0.9 开始,可以提供一个 Predicate<NewTopic>,该谓词可用于确定是否应考虑创建或修改特定的 NewTopic bean。这很有用,例如,如果你有多个 KafkaAdmin 实例指向不同的群集,并且希望选择每个管理员应创建或修改的主题。

admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreateThisOne"));