Configuring Topics

如果在应用程序上下文中定义了 KafkaAdmin bean,则它可以自动向代理添加主题。为此,可以为每个主题向应用程序上下文添加一个 NewTopic @Bean。版本 2.3 引入了一个新类 TopicBuilder,以使创建此类豆更方便。以下示例演示如何操作:

If you define a KafkaAdmin bean in your application context, it can automatically add topics to the broker. To do so, you can add a NewTopic @Bean for each topic to the application context. Version 2.3 introduced a new class TopicBuilder to make creation of such beans more convenient. The following example shows how to do so:

  • Java

  • Kotlin

Unresolved include directive in modules/ROOT/pages/kafka/configuring-topics.adoc - include::example$java-examples/org/springframework/kafka/jdocs/topics/Config.java[]
Unresolved include directive in modules/ROOT/pages/kafka/configuring-topics.adoc - include::example$kotlin-examples/org/springframework/kafka/kdocs/topics/Config.kt[]

从版本 2.6 开始,你可以省去 partitions() 和/或 replicas(),而代理默认值将应用于这些属性。代理版本必须至少为 2.4.0 才能支持此功能 - 请参阅 KIP-464

Starting with version 2.6, you can omit partitions() and/or replicas() and the broker defaults will be applied to those properties. The broker version must be at least 2.4.0 to support this feature - see KIP-464.

  • Java

  • Kotlin

Unresolved include directive in modules/ROOT/pages/kafka/configuring-topics.adoc - include::example$java-examples/org/springframework/kafka/jdocs/topics/Config.java[]
Unresolved include directive in modules/ROOT/pages/kafka/configuring-topics.adoc - include::example$kotlin-examples/org/springframework/kafka/kdocs/topics/Config.kt[]

从版本 2.7 开始,可以在单个 KafkaAdmin.NewTopics bean 定义中声明多个 NewTopic

Starting with version 2.7, you can declare multiple NewTopic`s in a single `KafkaAdmin.NewTopics bean definition:

  • Java

  • Kotlin

Unresolved include directive in modules/ROOT/pages/kafka/configuring-topics.adoc - include::example$java-examples/org/springframework/kafka/jdocs/topics/Config.java[]
Unresolved include directive in modules/ROOT/pages/kafka/configuring-topics.adoc - include::example$kotlin-examples/org/springframework/kafka/kdocs/topics/Config.kt[]

使用 Spring Boot 时,会自动注册 KafkaAdmin Bean,因此您只需要 NewTopic(和/或 NewTopics@Bean

When using Spring Boot, a KafkaAdmin bean is automatically registered so you only need the NewTopic (and/or NewTopics) `@Bean`s.

默认情况下,如果代理不可用,则会记录一条消息,但上下文将继续加载。可以以编程方式调用管理员的 initialize() 方法稍后再试。如果希望将此条件视为致命条件,请将管理员的 fatalIfBrokerNotAvailable 属性设置为 true。然后,上下文无法初始化。

By default, if the broker is not available, a message is logged, but the context continues to load. You can programmatically invoke the admin’s initialize() method to try again later. If you wish this condition to be considered fatal, set the admin’s fatalIfBrokerNotAvailable property to true. The context then fails to initialize.

如果代理支持(1.0.0 或更高版本),如果发现现有主题的分区少于 NewTopic.numPartitions,管理端会增加分区数量。

If the broker supports it (1.0.0 or higher), the admin increases the number of partitions if it is found that an existing topic has fewer partitions than the NewTopic.numPartitions.

从版本 2.7 开始,KafkaAdmin 提供了在运行时创建和检查主题的方法。

Starting with version 2.7, the KafkaAdmin provides methods to create and examine topics at runtime.

  • createOrModifyTopics

  • describeTopics

对于更高级的功能,可以使用 AdminClient 直接操作。以下示例演示如何操作:

For more advanced features, you can use the AdminClient directly. The following example shows how to do so:

@Autowired
private KafkaAdmin admin;

...

    AdminClient client = AdminClient.create(admin.getConfigurationProperties());
    ...
    client.close();

从版本 2.9.10、3.0.9 开始,可以提供一个 Predicate<NewTopic>,该谓词可用于确定是否应考虑创建或修改特定的 NewTopic bean。这很有用,例如,如果你有多个 KafkaAdmin 实例指向不同的群集,并且希望选择每个管理员应创建或修改的主题。

Starting with versions 2.9.10, 3.0.9, you can provide a Predicate<NewTopic> which can be used to determine whether a particular NewTopic bean should be considered for creation or modification. This is useful, for example, if you have multiple KafkaAdmin instances pointing to different clusters and you wish to select those topics that should be created or modified by each admin.

admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreateThisOne"));