Apache Kafka Support
Apache Kafka 通过提供 spring-kafka
项目的自动配置来支持。
Apache Kafka is supported by providing auto-configuration of the spring-kafka
project.
Kafka 配置由 spring.kafka.*
中的外部配置属性控制。例如,你可以在 application.properties
中声明以下部分:
Kafka configuration is controlled by external configuration properties in spring.kafka.*
.
For example, you might declare the following section in application.properties
:
spring: kafka: bootstrap-servers: "localhost:9092" consumer: group-id: "myGroup"
要创建启动主题,请添加类型为 |
To create a topic on startup, add a bean of type |
有关更多支持的选项,请参阅 {code-spring-boot-autoconfigure-src}/kafka/KafkaProperties.java[KafkaProperties
]。
See {code-spring-boot-autoconfigure-src}/kafka/KafkaProperties.java[KafkaProperties
] for more supported options.
Sending a Message
Spring 的 KafkaTemplate
是自动配置的,你可以直接在你的 Bean 中自动连线它,如下例所示:
Spring’s KafkaTemplate
is auto-configured, and you can autowire it directly in your own beans, as shown in the following example:
如果 configprop:spring.kafka.producer.transaction-id-prefix[] 属性已定义,则自动配置一个 |
If the property configprop:spring.kafka.producer.transaction-id-prefix[] is defined, a |
Receiving a Message
当 Apache Kafka 基础设施存在时,任何 Bean 都可以使用 @KafkaListener
进行注释以创建侦听器终结点。如果没有定义 KafkaListenerContainerFactory
,则会自动配置一个默认的 KafkaListenerContainerFactory
,并由 spring.kafka.listener.*
中定义的键定义。
When the Apache Kafka infrastructure is present, any bean can be annotated with @KafkaListener
to create a listener endpoint.
If no KafkaListenerContainerFactory
has been defined, a default one is automatically configured with keys defined in spring.kafka.listener.*
.
以下组件在 someTopic
主题上创建了一个侦听器终结点:
The following component creates a listener endpoint on the someTopic
topic:
如果定义了一个 KafkaTransactionManager
Bean,它会自动关联到容器工厂。同样,如果定义了 RecordFilterStrategy
、CommonErrorHandler
、 AfterRollbackProcessor
或 ConsumerAwareRebalanceListener
Bean,它会自动关联到默认工厂。
If a KafkaTransactionManager
bean is defined, it is automatically associated to the container factory.
Similarly, if a RecordFilterStrategy
, CommonErrorHandler
, AfterRollbackProcessor
or ConsumerAwareRebalanceListener
bean is defined, it is automatically associated to the default factory.
根据侦听器类型,一个 RecordMessageConverter
或 BatchMessageConverter
Bean 会关联到默认工厂。如果一个批处理侦听器仅存在一个 RecordMessageConverter
Bean,它将被包装在一个 BatchMessageConverter
中。
Depending on the listener type, a RecordMessageConverter
or BatchMessageConverter
bean is associated to the default factory.
If only a RecordMessageConverter
bean is present for a batch listener, it is wrapped in a BatchMessageConverter
.
自定义 |
A custom |
Kafka Streams
面向 Apache Kafka 的 Spring 提供一个工厂 bean 用于创建一个 StreamsBuilder
对象并管理其流的生命周期。只要 kafka-streams
位于类路径中且 Kafka 流已由 @EnableKafkaStreams
注解启用,Spring Boot 就会自动配置所需的 KafkaStreamsConfiguration
bean。
Spring for Apache Kafka provides a factory bean to create a StreamsBuilder
object and manage the lifecycle of its streams.
Spring Boot auto-configures the required KafkaStreamsConfiguration
bean as long as kafka-streams
is on the classpath and Kafka Streams is enabled by the @EnableKafkaStreams
annotation.
启用 Kafka 流意味着必须设置应用程序 id 和引导服务器。前者可以使用 spring.kafka.streams.application-id
进行配置,如果未设置,则默认为 spring.application.name
。后者可以全局设置或仅为流进行特定覆盖。
Enabling Kafka Streams means that the application id and bootstrap servers must be set.
The former can be configured using spring.kafka.streams.application-id
, defaulting to spring.application.name
if not set.
The latter can be set globally or specifically overridden only for streams.
可以使用专用的属性来设置多个其他属性;其他任意 Kafka 属性可以使用 spring.kafka.streams.properties
命名范围来设置。有关详细信息,请参阅 Additional Kafka Properties。
Several additional properties are available using dedicated properties; other arbitrary Kafka properties can be set using the spring.kafka.streams.properties
namespace.
See also Additional Kafka Properties for more information.
要使用工厂 bean,请像以下示例中所示,将 StreamsBuilder
连接到您的 @Bean
:
To use the factory bean, wire StreamsBuilder
into your @Bean
as shown in the following example:
默认情况下,由 StreamBuilder
对象管理的流会自动启动。您可以使用 configprop:spring.kafka.streams.auto-startup[] 属性来自定义此行为。
By default, the streams managed by the StreamBuilder
object are started automatically.
You can customize this behavior using the configprop:spring.kafka.streams.auto-startup[] property.
Additional Kafka Properties
自动配置支持的属性显示在附录的 “Integration Properties” 部分中。请注意,在大多数情况下,这些属性(带连字符或驼峰式大小写)直接映射到 Apache Kafka 点状属性。有关详细信息,请参阅 Apache Kafka 文档。
The properties supported by auto configuration are shown in the “Integration Properties” section of the Appendix. Note that, for the most part, these properties (hyphenated or camelCase) map directly to the Apache Kafka dotted properties. See the Apache Kafka documentation for details.
名称中不包含客户端类型(producer
、consumer
、admin`或 `streams
)的属性都被认为是通用的,适用于所有客户端。大多数通用属性可以根据需要为一个或多个客户端类型覆盖。
Properties that don’t include a client type (producer
, consumer
, admin
, or streams
) in their name are considered to be common and apply to all clients.
Most of these common properties can be overridden for one or more of the client types, if needed.
Apache Kafka 指定了重要性为 HIGH、MEDIUM 或 LOW 的属性。Spring Boot 自动配置支持所有 HIGH 重要性属性、一些选定的 MEDIUM 和 LOW 属性以及任何没有默认值属性。
Apache Kafka designates properties with an importance of HIGH, MEDIUM, or LOW. Spring Boot auto-configuration supports all HIGH importance properties, some selected MEDIUM and LOW properties, and any properties that do not have a default value.
只有 Kafka 支持的属性的一个子集才可以通过 KafkaProperties
类直接提供。如果您希望使用未直接支持的附加属性来配置单个客户端类型,请使用以下属性:
Only a subset of the properties supported by Kafka are available directly through the KafkaProperties
class.
If you wish to configure the individual client types with additional properties that are not directly supported, use the following properties:
spring: kafka: properties: "[prop.one]": "first" admin: properties: "[prop.two]": "second" consumer: properties: "[prop.three]": "third" producer: properties: "[prop.four]": "fourth" streams: properties: "[prop.five]": "fifth"
这会将通用的 prop.one
Kafka 属性设置为 first
(适用于生产者、消费者、管理者和流),将 prop.two
管理属性设置为 second
,将 prop.three
消费者属性设置为 third
、将 prop.four
生产者属性设置为 fourth
,并将 prop.five
流属性设置为 fifth
。
This sets the common prop.one
Kafka property to first
(applies to producers, consumers, admins, and streams), the prop.two
admin property to second
, the prop.three
consumer property to third
, the prop.four
producer property to fourth
and the prop.five
streams property to fifth
.
您还可以这样配置 Spring Kafka JsonDeserializer
:
You can also configure the Spring Kafka JsonDeserializer
as follows:
spring: kafka: consumer: value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer" properties: "[spring.json.value.default.type]": "com.example.Invoice" "[spring.json.trusted.packages]": "com.example.main,com.example.another"
同样,您可以禁用 JsonSerializer
在标头中发送类型信息的默认行为:
Similarly, you can disable the JsonSerializer
default behavior of sending type information in headers:
spring: kafka: producer: value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer" properties: "[spring.json.add.type.headers]": false
这样设置的属性会覆盖 Spring Boot 明确支持的任何配置项。
Properties set in this way override any configuration item that Spring Boot explicitly supports.
Testing with Embedded Kafka
面向 Apache Kafka 的 Spring 提供了一种使用嵌入式 Apache Kafka 代理测试项目的方法。要使用此功能,请使用 spring-kafka-test
模块中的 @EmbeddedKafka
为测试类添加注释。有关详细信息,请参阅面向 Apache Kafka 的 Spring 的 {url-spring-kafka-docs}/testing.html#ekb[参考手册]。
Spring for Apache Kafka provides a convenient way to test projects with an embedded Apache Kafka broker.
To use this feature, annotate a test class with @EmbeddedKafka
from the spring-kafka-test
module.
For more information, please see the Spring for Apache Kafka {url-spring-kafka-docs}/testing.html#ekb[reference manual].
为了让 Spring Boot 自动配置与前面提到的嵌入式 Apache Kafka 代理一起工作,您需要将嵌入式代理地址的系统属性(EmbeddedKafkaBroker
填充)重新映射到 Apache Kafka 的 Spring Boot 配置属性。有几种方法可以做到这一点:
To make Spring Boot auto-configuration work with the aforementioned embedded Apache Kafka broker, you need to remap a system property for embedded broker addresses (populated by the EmbeddedKafkaBroker
) into the Spring Boot configuration property for Apache Kafka.
There are several ways to do that:
-
Provide a system property to map embedded broker addresses into configprop:spring.kafka.bootstrap-servers[] in the test class:
-
Configure a property name on the
@EmbeddedKafka
annotation:
-
Use a placeholder in configuration properties:
spring: kafka: bootstrap-servers: "${spring.embedded.kafka.brokers}"