Configuration Options

  • 绑定器级别配置(前缀:spring.cloud.stream.kafka.streams.binder),可配置 Kafka 流配置、应用程序 ID 和状态存储重试。

  • 生产者配置(前缀:spring.cloud.stream.kafka.streams.bindings.<binding name>.producer),可设置序列化、分区和编码。

  • 消费者配置(前缀:spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer),可设置序列化、物质化、错误处理和并发性。

本节包含 Kafka Streams 绑定器使用的配置选项。

This section contains the configuration options used by the Kafka Streams binder.

有关绑定器的常见配置选项和属性,请参阅core documentation

For common configuration options and properties pertaining to binder, refer to the core documentation.

Kafka Streams Binder Properties

以下属性在绑定器级别可用,并且必须以 spring.cloud.stream.kafka.streams.binder. 为前缀。在 Kafka Streams 绑定器中重新使用的 Kafka 绑定器提供的任何属性都必须以 spring.cloud.stream.kafka.streams.binder 为前缀,而不是 spring.cloud.stream.kafka.binder。此规则的唯一例外是,在定义 Kafka 引导服务器属性时,两种前缀都可以使用。

The following properties are available at the binder level and must be prefixed with spring.cloud.stream.kafka.streams.binder. Any Kafka binder provided properties re-used in Kafka Streams binder must be prefixed with spring.cloud.stream.kafka.streams.binder instead of spring.cloud.stream.kafka.binder. The only exception to this rule is when defining the Kafka bootstrap server property in which case either prefix works.

configuration

Map with a key/value pair containing properties pertaining to Apache Kafka Streams API. This property must be prefixed with spring.cloud.stream.kafka.streams.binder.. Following are some examples of using this property.

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000

有关 streams 配置中可能涉及的所有属性的更多信息,请参阅 Apache Kafka Streams 文档中的 StreamsConfig JavaDocs。你可以从 StreamsConfig 设置的所有配置都可以通过此方式设置。在使用此属性时,由于这是一个绑定器级别的属性,因此适用于整个应用程序。如果应用程序中有多个处理器,它们都将获取这些属性。对于 application.id 这样的属性,这将变得很麻烦,因此你必须仔细检查如何使用此绑定器级别 configuration 属性映射 StreamsConfig 中的属性。

For more information about all the properties that may go into streams configuration, see StreamsConfig JavaDocs in Apache Kafka Streams docs. All configuration that you can set from StreamsConfig can be set through this. When using this property, it is applicable against the entire application since this is a binder level property. If you have more than one processor in the application, all of them will acquire these properties. In the case of properties like application.id, this will become problematic and therefore you have to carefully examine how the properties from StreamsConfig are mapped using this binder level configuration property.

functions.<function-bean-name>.applicationId

Applicable only for functional style processors. This can be used for setting application ID per function in the application. In the case of multiple functions, this is a handy way to set the application ID.

functions.<function-bean-name>.configuration

Applicable only for functional style processors. Map with a key/value pair containing properties pertaining to Apache Kafka Streams API. This is similar to the binder level configuration property describe above, but this level of configuration property is restricted only against the named function. When you have multiple processors and you want to restrict access to the configuration based on particular functions, you might want to use this. All StreamsConfig properties can be used here.

brokers

Broker URL

默认值:localhost

Default: localhost

zkNodes

Zookeeper URL

默认值:localhost

Default: localhost

deserializationExceptionHandler

Deserialization error handler type. This handler is applied at the binder level and thus applied against all input binding in the application. There is a way to control it in a more fine-grained way at the consumer binding level. Possible values are - logAndContinue, logAndFail, skipAndContinue or sendToDlq

默认值:logAndFail

Default: logAndFail

applicationId

Convenient way to set the application.id for the Kafka Streams application globally at the binder level. If the application contains multiple functions, then the application id should be set differently. See above where setting the application id is discussed in detail.

默认值:应用程序将生成一个静态应用程序 ID。请参阅应用程序 ID 部分以了解更多详细信息。

Default: application will generate a static application ID. See the application ID section for more details.

stateStoreRetry.maxAttempts

Max attempts for trying to connect to a state store.

默认值:1

Default: 1

stateStoreRetry.backoffPeriod

Backoff period when trying to connect to a state store on a retry.

默认值:1000 毫秒

Default: 1000 ms

consumerProperties

Arbitrary consumer properties at the binder level.

producerProperties

Arbitrary producer properties at the binder level.

includeStoppedProcessorsForHealthCheck

When bindings for processors are stopped through actuator, then this processor will not participate in the health check by default. Set this property to true to enable health check for all processors including the ones that are currently stopped through bindings actuator endpoint.

默认值:false

Default: false

Kafka Streams Producer Properties

以下属性为 only,适用于 Kafka Streams 生产者,并且必须以 spring.cloud.stream.kafka.streams.bindings.<binding name>.producer. 为前缀。为了方便起见,如果有多个输出绑定,并且它们都需要一个公共值,则可以通过使用前缀 spring.cloud.stream.kafka.streams.default.producer. 来配置它。

The following properties are only available for Kafka Streams producers and must be prefixed with spring.cloud.stream.kafka.streams.bindings.<binding name>.producer. For convenience, if there are multiple output bindings and they all require a common value, that can be configured by using the prefix spring.cloud.stream.kafka.streams.default.producer..

keySerde

key serde to use

默认值:请参阅有关消息 (de/serialization) 的上述讨论

Default: See the above discussion on message de/serialization

valueSerde

value serde to use

默认值:请参阅有关消息 (de/serialization) 的上述讨论

Default: See the above discussion on message de/serialization

useNativeEncoding

flag to enable/disable native encoding

默认:“true”。

Default: true.

streamPartitionerBeanName

Custom outbound partitioner bean name to be used at the consumer. Applications can provide custom StreamPartitioner as a Spring bean and the name of this bean can be provided to the producer to use instead of the default one.

默认值:请参阅有关出站分区支持的上述讨论。

Default: See the discussion above on outbound partition support.

producedAs

Custom name for the sink component to which the processor is producing to.

默认值:none(由 Kafka Streams 生成)

Deafult: none (generated by Kafka Streams)

Kafka Streams Consumer Properties

以下属性适用于 Kafka Streams 消费者,并且必须以 spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer. 为前缀。为了方便起见,如果有多个输入绑定,并且它们都需要一个公共值,则可以通过使用前缀 spring.cloud.stream.kafka.streams.default.consumer. 来配置它。

The following properties are available for Kafka Streams consumers and must be prefixed with spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer. For convenience, if there are multiple input bindings and they all require a common value, that can be configured by using the prefix spring.cloud.stream.kafka.streams.default.consumer..

applicationId

Setting application.id per input binding.

默认值:见上文。

Default: See above.

keySerde

key serde to use

默认值:请参阅有关消息 (de/serialization) 的上述讨论

Default: See the above discussion on message de/serialization

valueSerde

value serde to use

默认值:请参阅有关消息 (de/serialization) 的上述讨论

Default: See the above discussion on message de/serialization

materializedAs

state store to materialize when using incoming KTable types

默认值:none

Default: none.

useNativeDecoding

flag to enable/disable native decoding

默认:“true”。

Default: true.

dlqName

DLQ topic name.

默认值:请参阅上述有关错误处理和 DLQ 的讨论。

Default: See above on the discussion of error handling and DLQ.

startOffset

Offset to start from if there is no committed offset to consume from. This is mostly used when the consumer is consuming from a topic for the first time. Kafka Streams uses earliest as the default strategy and the binder uses the same default. This can be overridden to latest using this property.

默认值:earliest

Default: earliest.

注意:对消费者的`resetOffsets`不起作用没有影响 Kafka Streams 绑定程序。与基于消息频道的绑定程序不同,Kafka Streams 绑定程序不会按需查找开头或结尾。

Note: Using resetOffsets on the consumer does not have any effect on Kafka Streams binder. Unlike the message channel based binder, Kafka Streams binder does not seek to beginning or end on demand.

deserializationExceptionHandler

Deserialization error handler type. This handler is applied per consumer binding as opposed to the binder level property described before. Possible values are - logAndContinue, logAndFail, skipAndContinue or sendToDlq

默认值:logAndFail

Default: logAndFail

timestampExtractorBeanName

Specific time stamp extractor bean name to be used at the consumer. Applications can provide TimestampExtractor as a Spring bean and the name of this bean can be provided to the consumer to use instead of the default one.

默认值:请参见上述有关时间戳提取器的讨论。

Default: See the discussion above on timestamp extractors.

eventTypes

Comma separated list of supported event types for this binding.

默认值:none

Default: none

eventTypeHeaderKey

Event type header key on each incoming records through this binding.

默认值:event_type

Default: event_type

consumedAs

Custom name for the source component from which the processor is consuming from.

默认值:none(由 Kafka Streams 生成)

Deafult: none (generated by Kafka Streams)

Special note on concurrency

在 Kafka Streams 中,可以使用`num.stream.threads`属性控制处理器可以创建的线程数。你可以通过在绑定程序、函数、生产者或消费者级别下使用上述各种`configuration`选项来执行此操作。你还可以使用核心 Spring Cloud Stream 为此目的提供的`concurrency`属性。在使用时,需要在消费者上使用此属性。当有多个输入绑定时,请在第一个输入绑定上设置此属性。例如,设置`spring.cloud.stream.bindings.process-in-0.consumer.concurrency`时,绑定程序会将其转化为`num.stream.threads`。如果你有多个处理器,并且一个处理器定义了绑定级别并发性,而其他处理器没有,则那些没有绑定级别并发性的处理器将默认回退到通过`spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads`指定的绑定器范围属性。如果没有此绑定程序配置,则该应用程序将使用 Kafka Streams 设置的默认值。

In Kafka Streams, you can control of the number of threads a processor can create using the num.stream.threads property. This, you can do using the various configuration options described above under binder, functions, producer or consumer level. You can also use the concurrency property that core Spring Cloud Stream provides for this purpose. When using this, you need to use it on the consumer. When you have more than one input binding, set this on the first input binding. For e.g. when setting spring.cloud.stream.bindings.process-in-0.consumer.concurrency, it will be translated as num.stream.threads by the binder. If you have multiple processors and one processor defines binding level concurrency, but not the others, those ones with no binding level concurrency will default back to the binder wide property specified through spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads. If this binder configuration is not available, then the application will use the default set by Kafka Streams.