Configuration Options

  • 绑定器级别配置(前缀:spring.cloud.stream.kafka.streams.binder),可配置 Kafka 流配置、应用程序 ID 和状态存储重试。

  • 生产者配置(前缀:spring.cloud.stream.kafka.streams.bindings.<binding name>.producer),可设置序列化、分区和编码。

  • 消费者配置(前缀:spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer),可设置序列化、物质化、错误处理和并发性。

本节包含 Kafka Streams 绑定器使用的配置选项。 有关绑定器的常见配置选项和属性,请参阅core documentation

Kafka Streams Binder Properties

以下属性在绑定器级别可用,并且必须以 spring.cloud.stream.kafka.streams.binder. 为前缀。在 Kafka Streams 绑定器中重新使用的 Kafka 绑定器提供的任何属性都必须以 spring.cloud.stream.kafka.streams.binder 为前缀,而不是 spring.cloud.stream.kafka.binder。此规则的唯一例外是,在定义 Kafka 引导服务器属性时,两种前缀都可以使用。

configuration

Map with a key/value pair containing properties pertaining to Apache Kafka Streams API. This property must be prefixed with spring.cloud.stream.kafka.streams.binder.. Following are some examples of using this property.

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000

有关 streams 配置中可能涉及的所有属性的更多信息,请参阅 Apache Kafka Streams 文档中的 StreamsConfig JavaDocs。你可以从 StreamsConfig 设置的所有配置都可以通过此方式设置。在使用此属性时,由于这是一个绑定器级别的属性,因此适用于整个应用程序。如果应用程序中有多个处理器,它们都将获取这些属性。对于 application.id 这样的属性,这将变得很麻烦,因此你必须仔细检查如何使用此绑定器级别 configuration 属性映射 StreamsConfig 中的属性。

functions.<function-bean-name>.applicationId

Applicable only for functional style processors. This can be used for setting application ID per function in the application. In the case of multiple functions, this is a handy way to set the application ID.

functions.<function-bean-name>.configuration

Applicable only for functional style processors. Map with a key/value pair containing properties pertaining to Apache Kafka Streams API. This is similar to the binder level configuration property describe above, but this level of configuration property is restricted only against the named function. When you have multiple processors and you want to restrict access to the configuration based on particular functions, you might want to use this. All StreamsConfig properties can be used here.

brokers

Broker URL

默认值:localhost

zkNodes

Zookeeper URL

默认值:localhost

deserializationExceptionHandler

Deserialization error handler type. This handler is applied at the binder level and thus applied against all input binding in the application. There is a way to control it in a more fine-grained way at the consumer binding level. Possible values are - logAndContinue, logAndFail, skipAndContinue or sendToDlq

默认值:logAndFail

applicationId

Convenient way to set the application.id for the Kafka Streams application globally at the binder level. If the application contains multiple functions, then the application id should be set differently. See above where setting the application id is discussed in detail.

默认值:应用程序将生成一个静态应用程序 ID。请参阅应用程序 ID 部分以了解更多详细信息。

stateStoreRetry.maxAttempts

Max attempts for trying to connect to a state store.

默认值:1

stateStoreRetry.backoffPeriod

Backoff period when trying to connect to a state store on a retry.

默认值:1000 毫秒

consumerProperties

Arbitrary consumer properties at the binder level.

producerProperties

Arbitrary producer properties at the binder level.

includeStoppedProcessorsForHealthCheck

When bindings for processors are stopped through actuator, then this processor will not participate in the health check by default. Set this property to true to enable health check for all processors including the ones that are currently stopped through bindings actuator endpoint.

默认值:false

Kafka Streams Producer Properties

以下属性为 only,适用于 Kafka Streams 生产者,并且必须以 spring.cloud.stream.kafka.streams.bindings.<binding name>.producer. 为前缀。为了方便起见,如果有多个输出绑定,并且它们都需要一个公共值,则可以通过使用前缀 spring.cloud.stream.kafka.streams.default.producer. 来配置它。

keySerde

key serde to use

默认值:请参阅有关消息 (de/serialization) 的上述讨论

valueSerde

value serde to use

默认值:请参阅有关消息 (de/serialization) 的上述讨论

useNativeEncoding

flag to enable/disable native encoding

默认:“true”。

streamPartitionerBeanName

Custom outbound partitioner bean name to be used at the consumer. Applications can provide custom StreamPartitioner as a Spring bean and the name of this bean can be provided to the producer to use instead of the default one.

默认值:请参阅有关出站分区支持的上述讨论。

producedAs

Custom name for the sink component to which the processor is producing to.

默认值:none(由 Kafka Streams 生成)

Kafka Streams Consumer Properties

以下属性适用于 Kafka Streams 消费者,并且必须以 spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer. 为前缀。为了方便起见,如果有多个输入绑定,并且它们都需要一个公共值,则可以通过使用前缀 spring.cloud.stream.kafka.streams.default.consumer. 来配置它。

applicationId

Setting application.id per input binding.

默认值:见上文。

keySerde

key serde to use

默认值:请参阅有关消息 (de/serialization) 的上述讨论

valueSerde

value serde to use

默认值:请参阅有关消息 (de/serialization) 的上述讨论

materializedAs

state store to materialize when using incoming KTable types

默认值:none

useNativeDecoding

flag to enable/disable native decoding

默认:“true”。

dlqName

DLQ topic name.

默认值:请参阅上述有关错误处理和 DLQ 的讨论。

startOffset

Offset to start from if there is no committed offset to consume from. This is mostly used when the consumer is consuming from a topic for the first time. Kafka Streams uses earliest as the default strategy and the binder uses the same default. This can be overridden to latest using this property.

默认值:earliest

注意:对消费者的`resetOffsets`不起作用没有影响 Kafka Streams 绑定程序。与基于消息频道的绑定程序不同,Kafka Streams 绑定程序不会按需查找开头或结尾。

deserializationExceptionHandler

Deserialization error handler type. This handler is applied per consumer binding as opposed to the binder level property described before. Possible values are - logAndContinue, logAndFail, skipAndContinue or sendToDlq

默认值:logAndFail

timestampExtractorBeanName

Specific time stamp extractor bean name to be used at the consumer. Applications can provide TimestampExtractor as a Spring bean and the name of this bean can be provided to the consumer to use instead of the default one.

默认值:请参见上述有关时间戳提取器的讨论。

eventTypes

Comma separated list of supported event types for this binding.

默认值:none

eventTypeHeaderKey

Event type header key on each incoming records through this binding.

默认值:event_type

consumedAs

Custom name for the source component from which the processor is consuming from.

默认值:none(由 Kafka Streams 生成)

Special note on concurrency

在 Kafka Streams 中,可以使用`num.stream.threads`属性控制处理器可以创建的线程数。你可以通过在绑定程序、函数、生产者或消费者级别下使用上述各种`configuration`选项来执行此操作。你还可以使用核心 Spring Cloud Stream 为此目的提供的`concurrency`属性。在使用时,需要在消费者上使用此属性。当有多个输入绑定时,请在第一个输入绑定上设置此属性。例如,设置`spring.cloud.stream.bindings.process-in-0.consumer.concurrency`时,绑定程序会将其转化为`num.stream.threads`。如果你有多个处理器,并且一个处理器定义了绑定级别并发性,而其他处理器没有,则那些没有绑定级别并发性的处理器将默认回退到通过`spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads`指定的绑定器范围属性。如果没有此绑定程序配置,则该应用程序将使用 Kafka Streams 设置的默认值。