Apache Kafka Support

Apache Kafka 通过提供 spring-kafka 项目的自动配置来支持。 Kafka 配置由 spring.kafka.* 中的外部配置属性控制。例如,你可以在 application.properties 中声明以下部分:

spring:
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      group-id: "myGroup"

要创建启动主题,请添加类型为 NewTopic 的 Bean。如果主题已经存在,则 Bean 被忽略。

有关更多支持的选项,请参阅 {code-spring-boot-autoconfigure-src}/kafka/KafkaProperties.java[KafkaProperties]。

Sending a Message

Spring 的 KafkaTemplate 是自动配置的,你可以直接在你的 Bean 中自动连线它,如下例所示:

如果 configprop:spring.kafka.producer.transaction-id-prefix[] 属性已定义,则自动配置一个 KafkaTransactionManager。此外,如果定义了一个 RecordMessageConverter Bean,它会自动关联到自动配置的 KafkaTemplate

Receiving a Message

当 Apache Kafka 基础设施存在时,任何 Bean 都可以使用 @KafkaListener 进行注释以创建侦听器终结点。如果没有定义 KafkaListenerContainerFactory,则会自动配置一个默认的 KafkaListenerContainerFactory,并由 spring.kafka.listener.* 中定义的键定义。

以下组件在 someTopic 主题上创建了一个侦听器终结点:

如果定义了一个 KafkaTransactionManager Bean,它会自动关联到容器工厂。同样,如果定义了 RecordFilterStrategyCommonErrorHandlerAfterRollbackProcessorConsumerAwareRebalanceListener Bean,它会自动关联到默认工厂。

根据侦听器类型,一个 RecordMessageConverterBatchMessageConverter Bean 会关联到默认工厂。如果一个批处理侦听器仅存在一个 RecordMessageConverter Bean,它将被包装在一个 BatchMessageConverter 中。

自定义 ChainedKafkaTransactionManager 必须标记为 @Primary,因为它通常引用自动配置的 KafkaTransactionManager bean。

Kafka Streams

面向 Apache Kafka 的 Spring 提供一个工厂 bean 用于创建一个 StreamsBuilder 对象并管理其流的生命周期。只要 kafka-streams 位于类路径中且 Kafka 流已由 @EnableKafkaStreams 注解启用,Spring Boot 就会自动配置所需的 KafkaStreamsConfiguration bean。

启用 Kafka 流意味着必须设置应用程序 id 和引导服务器。前者可以使用 spring.kafka.streams.application-id 进行配置,如果未设置,则默认为 spring.application.name。后者可以全局设置或仅为流进行特定覆盖。

可以使用专用的属性来设置多个其他属性;其他任意 Kafka 属性可以使用 spring.kafka.streams.properties 命名范围来设置。有关详细信息,请参阅 Additional Kafka Properties

要使用工厂 bean,请像以下示例中所示,将 StreamsBuilder 连接到您的 @Bean

默认情况下,由 StreamBuilder 对象管理的流会自动启动。您可以使用 configprop:spring.kafka.streams.auto-startup[] 属性来自定义此行为。

Additional Kafka Properties

自动配置支持的属性显示在附录的 “Integration Properties” 部分中。请注意,在大多数情况下,这些属性(带连字符或驼峰式大小写)直接映射到 Apache Kafka 点状属性。有关详细信息,请参阅 Apache Kafka 文档。

名称中不包含客户端类型(producerconsumeradmin`或 `streams)的属性都被认为是通用的,适用于所有客户端。大多数通用属性可以根据需要为一个或多个客户端类型覆盖。

Apache Kafka 指定了重要性为 HIGH、MEDIUM 或 LOW 的属性。Spring Boot 自动配置支持所有 HIGH 重要性属性、一些选定的 MEDIUM 和 LOW 属性以及任何没有默认值属性。

只有 Kafka 支持的属性的一个子集才可以通过 KafkaProperties 类直接提供。如果您希望使用未直接支持的附加属性来配置单个客户端类型,请使用以下属性:

spring:
  kafka:
    properties:
      "[prop.one]": "first"
    admin:
      properties:
        "[prop.two]": "second"
    consumer:
      properties:
        "[prop.three]": "third"
    producer:
      properties:
        "[prop.four]": "fourth"
    streams:
      properties:
        "[prop.five]": "fifth"

这会将通用的 prop.one Kafka 属性设置为 first(适用于生产者、消费者、管理者和流),将 prop.two 管理属性设置为 second,将 prop.three 消费者属性设置为 third、将 prop.four 生产者属性设置为 fourth,并将 prop.five 流属性设置为 fifth

您还可以这样配置 Spring Kafka JsonDeserializer

spring:
  kafka:
    consumer:
      value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
      properties:
        "[spring.json.value.default.type]": "com.example.Invoice"
        "[spring.json.trusted.packages]": "com.example.main,com.example.another"

同样,您可以禁用 JsonSerializer 在标头中发送类型信息的默认行为:

spring:
  kafka:
    producer:
      value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
      properties:
        "[spring.json.add.type.headers]": false

这样设置的属性会覆盖 Spring Boot 明确支持的任何配置项。

Testing with Embedded Kafka

面向 Apache Kafka 的 Spring 提供了一种使用嵌入式 Apache Kafka 代理测试项目的方法。要使用此功能,请使用 spring-kafka-test 模块中的 @EmbeddedKafka 为测试类添加注释。有关详细信息,请参阅面向 Apache Kafka 的 Spring 的 {url-spring-kafka-docs}/testing.html#ekb[参考手册]。

为了让 Spring Boot 自动配置与前面提到的嵌入式 Apache Kafka 代理一起工作,您需要将嵌入式代理地址的系统属性(EmbeddedKafkaBroker 填充)重新映射到 Apache Kafka 的 Spring Boot 配置属性。有几种方法可以做到这一点:

  • 在测试类中提供一个系统属性,将嵌入式代理地址映射到 configprop:spring.kafka.bootstrap-servers[]:

  • `@EmbeddedKafka`注释中配置属性名称:

  • 在配置属性中使用占位符:

spring:
  kafka:
    bootstrap-servers: "${spring.embedded.kafka.brokers}"