Apache Kafka Support

Apache Kafka 通过提供 spring-kafka 项目的自动配置来支持。

Apache Kafka is supported by providing auto-configuration of the spring-kafka project.

Kafka 配置由 spring.kafka.* 中的外部配置属性控制。例如,你可以在 application.properties 中声明以下部分:

Kafka configuration is controlled by external configuration properties in spring.kafka.*. For example, you might declare the following section in application.properties:

spring:
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      group-id: "myGroup"

要创建启动主题,请添加类型为 NewTopic 的 Bean。如果主题已经存在,则 Bean 被忽略。

To create a topic on startup, add a bean of type NewTopic. If the topic already exists, the bean is ignored.

有关更多支持的选项,请参阅 {code-spring-boot-autoconfigure-src}/kafka/KafkaProperties.java[KafkaProperties]。

See {code-spring-boot-autoconfigure-src}/kafka/KafkaProperties.java[KafkaProperties] for more supported options.

Sending a Message

Spring 的 KafkaTemplate 是自动配置的,你可以直接在你的 Bean 中自动连线它,如下例所示:

Spring’s KafkaTemplate is auto-configured, and you can autowire it directly in your own beans, as shown in the following example:

如果 configprop:spring.kafka.producer.transaction-id-prefix[] 属性已定义,则自动配置一个 KafkaTransactionManager。此外,如果定义了一个 RecordMessageConverter Bean,它会自动关联到自动配置的 KafkaTemplate

If the property configprop:spring.kafka.producer.transaction-id-prefix[] is defined, a KafkaTransactionManager is automatically configured. Also, if a RecordMessageConverter bean is defined, it is automatically associated to the auto-configured KafkaTemplate.

Receiving a Message

当 Apache Kafka 基础设施存在时,任何 Bean 都可以使用 @KafkaListener 进行注释以创建侦听器终结点。如果没有定义 KafkaListenerContainerFactory,则会自动配置一个默认的 KafkaListenerContainerFactory,并由 spring.kafka.listener.* 中定义的键定义。

When the Apache Kafka infrastructure is present, any bean can be annotated with @KafkaListener to create a listener endpoint. If no KafkaListenerContainerFactory has been defined, a default one is automatically configured with keys defined in spring.kafka.listener.*.

以下组件在 someTopic 主题上创建了一个侦听器终结点:

The following component creates a listener endpoint on the someTopic topic:

如果定义了一个 KafkaTransactionManager Bean,它会自动关联到容器工厂。同样,如果定义了 RecordFilterStrategyCommonErrorHandlerAfterRollbackProcessorConsumerAwareRebalanceListener Bean,它会自动关联到默认工厂。

If a KafkaTransactionManager bean is defined, it is automatically associated to the container factory. Similarly, if a RecordFilterStrategy, CommonErrorHandler, AfterRollbackProcessor or ConsumerAwareRebalanceListener bean is defined, it is automatically associated to the default factory.

根据侦听器类型,一个 RecordMessageConverterBatchMessageConverter Bean 会关联到默认工厂。如果一个批处理侦听器仅存在一个 RecordMessageConverter Bean,它将被包装在一个 BatchMessageConverter 中。

Depending on the listener type, a RecordMessageConverter or BatchMessageConverter bean is associated to the default factory. If only a RecordMessageConverter bean is present for a batch listener, it is wrapped in a BatchMessageConverter.

自定义 ChainedKafkaTransactionManager 必须标记为 @Primary,因为它通常引用自动配置的 KafkaTransactionManager bean。

A custom ChainedKafkaTransactionManager must be marked @Primary as it usually references the auto-configured KafkaTransactionManager bean.

Kafka Streams

面向 Apache Kafka 的 Spring 提供一个工厂 bean 用于创建一个 StreamsBuilder 对象并管理其流的生命周期。只要 kafka-streams 位于类路径中且 Kafka 流已由 @EnableKafkaStreams 注解启用,Spring Boot 就会自动配置所需的 KafkaStreamsConfiguration bean。

Spring for Apache Kafka provides a factory bean to create a StreamsBuilder object and manage the lifecycle of its streams. Spring Boot auto-configures the required KafkaStreamsConfiguration bean as long as kafka-streams is on the classpath and Kafka Streams is enabled by the @EnableKafkaStreams annotation.

启用 Kafka 流意味着必须设置应用程序 id 和引导服务器。前者可以使用 spring.kafka.streams.application-id 进行配置,如果未设置,则默认为 spring.application.name。后者可以全局设置或仅为流进行特定覆盖。

Enabling Kafka Streams means that the application id and bootstrap servers must be set. The former can be configured using spring.kafka.streams.application-id, defaulting to spring.application.name if not set. The latter can be set globally or specifically overridden only for streams.

可以使用专用的属性来设置多个其他属性;其他任意 Kafka 属性可以使用 spring.kafka.streams.properties 命名范围来设置。有关详细信息,请参阅 Additional Kafka Properties

Several additional properties are available using dedicated properties; other arbitrary Kafka properties can be set using the spring.kafka.streams.properties namespace. See also Additional Kafka Properties for more information.

要使用工厂 bean,请像以下示例中所示,将 StreamsBuilder 连接到您的 @Bean

To use the factory bean, wire StreamsBuilder into your @Bean as shown in the following example:

默认情况下,由 StreamBuilder 对象管理的流会自动启动。您可以使用 configprop:spring.kafka.streams.auto-startup[] 属性来自定义此行为。

By default, the streams managed by the StreamBuilder object are started automatically. You can customize this behavior using the configprop:spring.kafka.streams.auto-startup[] property.

Additional Kafka Properties

自动配置支持的属性显示在附录的 “Integration Properties” 部分中。请注意,在大多数情况下,这些属性(带连字符或驼峰式大小写)直接映射到 Apache Kafka 点状属性。有关详细信息,请参阅 Apache Kafka 文档。

The properties supported by auto configuration are shown in the “Integration Properties” section of the Appendix. Note that, for the most part, these properties (hyphenated or camelCase) map directly to the Apache Kafka dotted properties. See the Apache Kafka documentation for details.

名称中不包含客户端类型(producerconsumeradmin`或 `streams)的属性都被认为是通用的,适用于所有客户端。大多数通用属性可以根据需要为一个或多个客户端类型覆盖。

Properties that don’t include a client type (producer, consumer, admin, or streams) in their name are considered to be common and apply to all clients. Most of these common properties can be overridden for one or more of the client types, if needed.

Apache Kafka 指定了重要性为 HIGH、MEDIUM 或 LOW 的属性。Spring Boot 自动配置支持所有 HIGH 重要性属性、一些选定的 MEDIUM 和 LOW 属性以及任何没有默认值属性。

Apache Kafka designates properties with an importance of HIGH, MEDIUM, or LOW. Spring Boot auto-configuration supports all HIGH importance properties, some selected MEDIUM and LOW properties, and any properties that do not have a default value.

只有 Kafka 支持的属性的一个子集才可以通过 KafkaProperties 类直接提供。如果您希望使用未直接支持的附加属性来配置单个客户端类型,请使用以下属性:

Only a subset of the properties supported by Kafka are available directly through the KafkaProperties class. If you wish to configure the individual client types with additional properties that are not directly supported, use the following properties:

spring:
  kafka:
    properties:
      "[prop.one]": "first"
    admin:
      properties:
        "[prop.two]": "second"
    consumer:
      properties:
        "[prop.three]": "third"
    producer:
      properties:
        "[prop.four]": "fourth"
    streams:
      properties:
        "[prop.five]": "fifth"

这会将通用的 prop.one Kafka 属性设置为 first(适用于生产者、消费者、管理者和流),将 prop.two 管理属性设置为 second,将 prop.three 消费者属性设置为 third、将 prop.four 生产者属性设置为 fourth,并将 prop.five 流属性设置为 fifth

This sets the common prop.one Kafka property to first (applies to producers, consumers, admins, and streams), the prop.two admin property to second, the prop.three consumer property to third, the prop.four producer property to fourth and the prop.five streams property to fifth.

您还可以这样配置 Spring Kafka JsonDeserializer

You can also configure the Spring Kafka JsonDeserializer as follows:

spring:
  kafka:
    consumer:
      value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
      properties:
        "[spring.json.value.default.type]": "com.example.Invoice"
        "[spring.json.trusted.packages]": "com.example.main,com.example.another"

同样,您可以禁用 JsonSerializer 在标头中发送类型信息的默认行为:

Similarly, you can disable the JsonSerializer default behavior of sending type information in headers:

spring:
  kafka:
    producer:
      value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
      properties:
        "[spring.json.add.type.headers]": false

这样设置的属性会覆盖 Spring Boot 明确支持的任何配置项。

Properties set in this way override any configuration item that Spring Boot explicitly supports.

Testing with Embedded Kafka

面向 Apache Kafka 的 Spring 提供了一种使用嵌入式 Apache Kafka 代理测试项目的方法。要使用此功能,请使用 spring-kafka-test 模块中的 @EmbeddedKafka 为测试类添加注释。有关详细信息,请参阅面向 Apache Kafka 的 Spring 的 {url-spring-kafka-docs}/testing.html#ekb[参考手册]。

Spring for Apache Kafka provides a convenient way to test projects with an embedded Apache Kafka broker. To use this feature, annotate a test class with @EmbeddedKafka from the spring-kafka-test module. For more information, please see the Spring for Apache Kafka {url-spring-kafka-docs}/testing.html#ekb[reference manual].

为了让 Spring Boot 自动配置与前面提到的嵌入式 Apache Kafka 代理一起工作,您需要将嵌入式代理地址的系统属性(EmbeddedKafkaBroker 填充)重新映射到 Apache Kafka 的 Spring Boot 配置属性。有几种方法可以做到这一点:

To make Spring Boot auto-configuration work with the aforementioned embedded Apache Kafka broker, you need to remap a system property for embedded broker addresses (populated by the EmbeddedKafkaBroker) into the Spring Boot configuration property for Apache Kafka. There are several ways to do that:

  • Provide a system property to map embedded broker addresses into configprop:spring.kafka.bootstrap-servers[] in the test class:

  • Configure a property name on the @EmbeddedKafka annotation:

  • Use a placeholder in configuration properties:

spring:
  kafka:
    bootstrap-servers: "${spring.embedded.kafka.brokers}"