Configuration Options
本部分包含 Apache Kafka 绑定器使用的配置选项。 有关粘合剂的相关通用配置选项和属性,请参见核心文档中的 binding properties。
Kafka Binder Properties
- spring.cloud.stream.kafka.binder.brokers
-
A list of brokers to which the Kafka binder connects.
默认值:
localhost
。 - spring.cloud.stream.kafka.binder.defaultBrokerPort
-
brokers
allows hosts specified with or without port information (for example,host1,host2:port2
). This sets the default port when no port is configured in the broker list.默认值:
9092
。 - spring.cloud.stream.kafka.binder.configuration
-
Key/Value map of client properties (both producers and consumer) passed to all clients created by the binder. Due to the fact that these properties are used by both producers and consumers, usage should be restricted to common properties — for example, security settings. Unknown Kafka producer or consumer properties provided through this configuration are filtered out and not allowed to propagate. Properties here supersede any properties set in boot.
默认值:空映射。
- spring.cloud.stream.kafka.binder.consumerProperties
-
Key/Value map of arbitrary Kafka client consumer properties. In addition to support known Kafka consumer properties, unknown consumer properties are allowed here as well. Properties here supersede any properties set in boot and in the
configuration
property above.默认值:空映射。
- spring.cloud.stream.kafka.binder.headers
-
The list of custom headers that are transported by the binder. Only required when communicating with older applications (⇐ 1.3.x) with a
kafka-clients
version < 0.11.0.0. Newer versions support headers natively.默认:空。
- spring.cloud.stream.kafka.binder.healthTimeout
-
The time to wait to get partition information, in seconds. Health reports as down if this timer expires.
默认值:10。
- spring.cloud.stream.kafka.binder.requiredAcks
-
The number of required acks on the broker. See the Kafka documentation for the producer
acks
property.默认:
1
. - spring.cloud.stream.kafka.binder.minPartitionCount
-
Effective only if
autoCreateTopics
orautoAddPartitions
is set. The global minimum number of partitions that the binder configures on topics on which it produces or consumes data. It can be superseded by thepartitionCount
setting of the producer or by the value ofinstanceCount * concurrency
settings of the producer (if either is larger).默认:
1
. - spring.cloud.stream.kafka.binder.producerProperties
-
Key/Value map of arbitrary Kafka client producer properties. In addition to support known Kafka producer properties, unknown producer properties are allowed here as well. Properties here supersede any properties set in boot and in the
configuration
property above.默认值:空映射。
- spring.cloud.stream.kafka.binder.replicationFactor
-
The replication factor of auto-created topics if
autoCreateTopics
is active. Can be overridden on each binding.如果您使用的是 2.4 之前的 Kafka 服务器版本,那么此值应至少设置为
1
。从 3.0.8 版本开始,黏合剂使用-1
作为默认值,这意味着将使用服务器 'default.replication.factor' 属性来确定副本数。向您的 Kafka 服务器管理员咨询是否有要求最小复制因子的政策,如果是这种情况,通常情况下,default.replication.factor
将与该值匹配,则应使用-1
,除非您需要大于最小值的复制因子。默认值:
-1
。 - spring.cloud.stream.kafka.binder.autoCreateTopics
-
If set to
true
, the binder creates new topics automatically. If set tofalse
, the binder relies on the topics being already configured. In the latter case, if the topics do not exist, the binder fails to start.此设置独立于服务器的
auto.create.topics.enable
设置且不影响它。如果服务器设置为自动创建主题,则它们可能会作为元数据检索请求的一部分创建,并带有默认服务器设置。默认:“true”。
- spring.cloud.stream.kafka.binder.autoAddPartitions
-
If set to
true
, the binder creates new partitions if required. If set tofalse
, the binder relies on the partition size of the topic being already configured. If the partition count of the target topic is smaller than the expected value, the binder fails to start.默认:
false
. - spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
-
Enables transactions in the binder. See
transaction.id
in the Kafka documentation and Transactions in thespring-kafka
documentation. When transactions are enabled, individualproducer
properties are ignored and all producers use thespring.cloud.stream.kafka.binder.transaction.producer.*
properties.默认值:
null
(无事务)。 - spring.cloud.stream.kafka.binder.transaction.producer.*
-
Global producer properties for producers in a transactional binder. See
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
and Kafka Producer Properties and the general producer properties supported by all binders.默认值:参阅各个生产者属性。
- spring.cloud.stream.kafka.binder.headerMapperBeanName
-
The bean name of a
KafkaHeaderMapper
used for mappingspring-messaging
headers to and from Kafka headers. Use this, for example, if you wish to customize the trusted packages in aBinderHeaderMapper
bean that uses JSON deserialization for the headers. If this customBinderHeaderMapper
bean is not made available to the binder using this property, then the binder will look for a header mapper bean with the namekafkaBinderHeaderMapper
that is of typeBinderHeaderMapper
before falling back to a defaultBinderHeaderMapper
created by the binder.默认值:无。
- spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader
-
Flag to set the binder health as
down
, when any partitions on the topic, regardless of the consumer that is receiving data from it, is found without a leader.默认:“true”。
- spring.cloud.stream.kafka.binder.certificateStoreDirectory
-
When the truststore or keystore certificate location is given as a non-local file system resource (resources supported by org.springframework.core.io.Resource e.g. CLASSPATH, HTTP, etc.), the binder copies the resource from the path (which is convertible to org.springframework.core.io.Resource) to a location on the filesystem. This is true for both broker level certificates (
ssl.truststore.location
andssl.keystore.location
) and certificates intended for schema registry (schema.registry.ssl.truststore.location
andschema.registry.ssl.keystore.location
). Keep in mind that the truststore and keystore location paths must be provided underspring.cloud.stream.kafka.binder.configuration…
. For example,spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location
,spring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location
, etc. The file will be copied to the location specified as the value for this property which must be an existing directory on the filesystem that is writable by the process running the application. If this value is not set and the certificate file is a non-local file system resource, then it will be copied to System’s temp directory as returned bySystem.getProperty("java.io.tmpdir")
. This is also true, if this value is present, but the directory cannot be found on the filesystem or is not writable.默认值:无。
- spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetricsEnabled
-
When set to true, the offset lag metric of each consumer topic is computed whenever the metric is accessed. When set to false only the periodically calculated offset lag is used.
默认值:true
- spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval
-
The interval in which the offset lag for each consumer topic is computed. This value is used whenever
metrics.defaultOffsetLagMetricsEnabled
is disabled or its computation is taking too long.默认值:60 秒
- spring.cloud.stream.kafka.binder.enableObservation
-
Enable Micrometer observation registry on all the bindings in this binder.
默认值:false
Kafka Consumer Properties
以下属性仅适用于 Kafka 使用者,并且必须加前缀 spring.cloud.stream.kafka.bindings.<channelName>.consumer.
。
为了避免重复,Spring Cloud Stream 支持以 |
- admin.configuration
-
Since version 2.1.1, this property is deprecated in favor of
topic.properties
, and support for it will be removed in a future version. - admin.replicas-assignment
-
Since version 2.1.1, this property is deprecated in favor of
topic.replicas-assignment
, and support for it will be removed in a future version. - admin.replication-factor
-
Since version 2.1.1, this property is deprecated in favor of
topic.replication-factor
, and support for it will be removed in a future version. - autoRebalanceEnabled
-
When
true
, topic partitions is automatically rebalanced between the members of a consumer group. Whenfalse
, each consumer is assigned a fixed set of partitions based onspring.cloud.stream.instanceCount
andspring.cloud.stream.instanceIndex
. This requires both thespring.cloud.stream.instanceCount
andspring.cloud.stream.instanceIndex
properties to be set appropriately on each launched instance. The value of thespring.cloud.stream.instanceCount
property must typically be greater than 1 in this case.默认:“true”。
- ackEachRecord
-
When
autoCommitOffset
istrue
, this setting dictates whether to commit the offset after each record is processed. By default, offsets are committed after all records in the batch of records returned byconsumer.poll()
have been processed. The number of records returned by a poll can be controlled with themax.poll.records
Kafka property, which is set through the consumerconfiguration
property. Setting this totrue
may cause a degradation in performance, but doing so reduces the likelihood of redelivered records when a failure occurs. Also, see the binderrequiredAcks
property, which also affects the performance of committing offsets. This property is deprecated as of 3.1 in favor of usingackMode
. If theackMode
is not set and batch mode is not enabled,RECORD
ackMode will be used.默认:
false
. - autoCommitOffset
-
Starting with version 3.1, this property is deprecated. See
ackMode
for more details on alternatives. Whether to autocommit offsets when a message has been processed. If set tofalse
, a header with the keykafka_acknowledgment
of the typeorg.springframework.kafka.support.Acknowledgment
header is present in the inbound message. Applications may use this header for acknowledging messages. See the examples section for details. When this property is set tofalse
, Kafka binder sets the ack mode toorg.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL
and the application is responsible for acknowledging records. Also seeackEachRecord
.默认:“true”。
- ackMode
-
Specify the container ack mode. This is based on the AckMode enumeration defined in Spring Kafka. If
ackEachRecord
property is set totrue
and consumer is not in batch mode, then this will use the ack mode ofRECORD
, otherwise, use the provided ack mode using this property. - autoCommitOnError
-
In pollable consumers, if set to
true
, it always auto commits on error. If not set (the default) or false, it will not auto commit in pollable consumers. Note that this property is only applicable for pollable consumers.默认值:未设置。
- resetOffsets
-
Whether to reset offsets on the consumer to the value provided by startOffset. Must be false if a
KafkaBindingRebalanceListener
is provided; see rebalance listener See reset-offsets for more information about this property.默认:
false
. - startOffset
-
The starting offset for new groups. Allowed values:
earliest
andlatest
. If the consumer group is set explicitly for the consumer 'binding' (throughspring.cloud.stream.bindings.<channelName>.group
), 'startOffset' is set toearliest
. Otherwise, it is set tolatest
for theanonymous
consumer group. See reset-offsets for more information about this property.默认值:null(相当于
earliest
)。 - enableDlq
-
When set to true, it enables DLQ behavior for the consumer. By default, messages that result in errors are forwarded to a topic named
error.<destination>.<group>
. The DLQ topic name can be configurable by setting thedlqName
property or by defining a@Bean
of typeDlqDestinationResolver
. This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome. See kafka dlq processing for more information. Starting with version 2.0, messages sent to the DLQ topic are enhanced with the following headers:x-original-topic
,x-exception-message
, andx-exception-stacktrace
asbyte[]
. By default, a failed record is sent to the same partition number in the DLQ topic as the original record. See dlq partition selection for how to change that behavior. Not allowed whendestinationIsPattern
istrue
.默认:
false
. - dlqPartitions
-
When
enableDlq
is true, and this property is not set, a dead letter topic with the same number of partitions as the primary topic(s) is created. Usually, dead-letter records are sent to the same partition in the dead-letter topic as the original record. This behavior can be changed; see dlq partition selection. If this property is set to1
and there is noDqlPartitionFunction
bean, all dead-letter records will be written to partition0
. If this property is greater than1
, you MUST provide aDlqPartitionFunction
bean. Note that the actual partition count is affected by the binder’sminPartitionCount
property.默认值:
none
- configuration
-
Map with a key/value pair containing generic Kafka consumer properties. In addition to having Kafka consumer properties, other configuration properties can be passed here. For example some properties needed by the application such as
spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar
. Thebootstrap.servers
property cannot be set here; use multi-binder support if you need to connect to multiple clusters.默认值:空映射。
- dlqName
-
The name of the DLQ topic to receive the error messages.
默认值:null(如果没有指定,导致错误的消息会被转发到名为
error.<destination>.<group>
的主题)。 - dlqProducerProperties
-
Using this, DLQ-specific producer properties can be set. All the properties available through kafka producer properties can be set through this property. When native decoding is enabled on the consumer (i.e., useNativeDecoding: true) , the application must provide corresponding key/value serializers for DLQ. This must be provided in the form of
dlqProducerProperties.configuration.key.serializer
anddlqProducerProperties.configuration.value.serializer
.默认值:默认 Kafka 生产者属性。
- standardHeaders
-
Indicates which standard headers are populated by the inbound channel adapter. Allowed values:
none
,id
,timestamp
, orboth
. Useful if using native deserialization and the first component to receive a message needs anid
(such as an aggregator that is configured to use a JDBC message store).默认值:
none
- converterBeanName
-
The name of a bean that implements
RecordMessageConverter
. Used in the inbound channel adapter to replace the defaultMessagingMessageConverter
.默认值:
null
- idleEventInterval
-
The interval, in milliseconds, between events indicating that no messages have recently been received. Use an
ApplicationListener<ListenerContainerIdleEvent>
to receive these events. See pause-resume for a usage example.默认值:
30000
- destinationIsPattern
-
When true, the destination is treated as a regular expression
Pattern
used to match topic names by the broker. When true, topics are not provisioned, andenableDlq
is not allowed, because the binder does not know the topic names during the provisioning phase. Note, the time taken to detect new topics that match the pattern is controlled by the consumer propertymetadata.max.age.ms
, which (at the time of writing) defaults to 300,000ms (5 minutes). This can be configured using theconfiguration
property above.默认值:
false
- topic.properties
-
A
Map
of Kafka topic properties used when provisioning new topics — for example,spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0
默认值:无。
- topic.replicas-assignment
-
A Map<Integer, List<Integer>> of replica assignments, with the key being the partition and the value being the assignments. Used when provisioning new topics. See the
NewTopic
Javadocs in thekafka-clients
jar.默认值:无。
- topic.replication-factor
-
The replication factor to use when provisioning topics. Overrides the binder-wide setting. Ignored if
replicas-assignments
is present.默认值:无(使用粘合剂范围内的默认值 -1)。
- pollTimeout
-
Timeout used for polling in pollable consumers.
默认值:5 秒。
- transactionManager
-
Bean name of a
KafkaAwareTransactionManager
used to override the binder’s transaction manager for this binding. Usually needed if you want to synchronize another transaction with the Kafka transaction, using theChainedKafkaTransactionManaager
. To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.默认值:无。
- txCommitRecovered
-
When using a transactional binder, the offset of a recovered record (e.g. when retries are exhausted and the record is sent to a dead letter topic) will be committed via a new transaction, by default. Setting this property to
false
suppresses committing the offset of recovered record.默认值:true。
- commonErrorHandlerBeanName
-
CommonErrorHandler
bean name to use per consumer binding. When present, this user providedCommonErrorHandler
takes precedence over any other error handlers defined by the binder. This is a handy way to express error handlers, if the application does not want to use aListenerContainerCustomizer
and then check the destination/group combination to set an error handler.默认值:无。
Kafka Producer Properties
以下属性仅适用于 Kafka 生产者,并且必须以 spring.cloud.stream.kafka.bindings.<channelName>.producer.
为前缀。
为了避免重复,Spring Cloud Stream 支持以 |
- admin.configuration
-
Since version 2.1.1, this property is deprecated in favor of
topic.properties
, and support for it will be removed in a future version. - admin.replicas-assignment
-
Since version 2.1.1, this property is deprecated in favor of
topic.replicas-assignment
, and support for it will be removed in a future version. - admin.replication-factor
-
Since version 2.1.1, this property is deprecated in favor of
topic.replication-factor
, and support for it will be removed in a future version. - bufferSize
-
Upper limit, in bytes, of how much data the Kafka producer attempts to batch before sending.
默认值:
16384
。 - sync
-
Whether the producer is synchronous.
默认:
false
. - sendTimeoutExpression
-
A SpEL expression evaluated against the outgoing message used to evaluate the time to wait for ack when synchronous publish is enabled — for example,
headers['mySendTimeout']
. The value of the timeout is in milliseconds. With versions before 3.0, the payload could not be used unless native encoding was being used because, by the time this expression was evaluated, the payload was already in the form of abyte[]
. Now, the expression is evaluated before the payload is converted.默认值:
none
。 - batchTimeout
-
How long the producer waits to allow more messages to accumulate in the same batch before sending the messages. (Normally, the producer does not wait at all and simply sends all the messages that accumulated while the previous send was in progress.) A non-zero value may increase throughput at the expense of latency.
默认值:
0
。 - messageKeyExpression
-
A SpEL expression evaluated against the outgoing message used to populate the key of the produced Kafka message — for example,
headers['myKey']
. With versions before 3.0, the payload could not be used unless native encoding was being used because, by the time this expression was evaluated, the payload was already in the form of abyte[]
. Now, the expression is evaluated before the payload is converted. In the case of a regular processor (Function<String, String>
orFunction<Message<?>, Message<?>
), if the produced key needs to be same as the incoming key from the topic, this property can be set as below.spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']
There is an important caveat to keep in mind for reactive functions. In that case, it is up to the application to manually copy the headers from the incoming messages to outbound messages. You can set the header, e.g.myKey
and useheaders['myKey']
as suggested above or, for convenience, simply set theKafkaHeaders.MESSAGE_KEY
header, and you do not need to set this property at all.默认值:
none
。 - headerPatterns
-
A comma-delimited list of simple patterns to match Spring messaging headers to be mapped to the Kafka
Headers
in theProducerRecord
. Patterns can begin or end with the wildcard character (asterisk). Patterns can be negated by prefixing with!
. Matching stops after the first match (positive or negative). For example!ask,as*
will passash
but notask
.id
andtimestamp
are never mapped.默认值:
*
(所有头文件 - 除id
和timestamp
) - configuration
-
Map with a key/value pair containing generic Kafka producer properties. The
bootstrap.servers
property cannot be set here; use multi-binder support if you need to connect to multiple clusters.默认值:空映射。
- topic.properties
-
A
Map
of Kafka topic properties used when provisioning new topics — for example,spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0
- topic.replicas-assignment
-
A Map<Integer, List<Integer>> of replica assignments, with the key being the partition and the value being the assignments. Used when provisioning new topics. See the
NewTopic
Javadocs in thekafka-clients
jar.默认值:无。
- topic.replication-factor
-
The replication factor to use when provisioning topics. Overrides the binder-wide setting. Ignored if
replicas-assignments
is present.默认值:无(使用粘合剂范围内的默认值 -1)。
- useTopicHeader
-
Set to
true
to override the default binding destination (topic name) with the value of theKafkaHeaders.TOPIC
message header in the outbound message. If the header is not present, the default binding destination is used.默认:
false
. - recordMetadataChannel
-
The bean name of a
MessageChannel
to which successful send results should be sent; the bean must exist in the application context. The message sent to the channel is the sent message (after conversion, if any) with an additional headerKafkaHeaders.RECORD_METADATA
. The header contains aRecordMetadata
object provided by the Kafka client; it includes the partition and offset where the record was written in the topic.ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)
发送失败会进入生产者错误通道(如果已配置);请参阅 Kafka error channels。 默认值:null。
Kafka Binder 使用生产者的 |
- compression
-
Set the
compression.type
producer property. Supported values arenone
,gzip
,snappy
,lz4
andzstd
. If you override thekafka-clients
jar to 2.1.0 (or later), as discussed in the Spring for Apache Kafka documentation, and wish to usezstd
compression, usespring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd
.默认值:
none
。 - transactionManager
-
Bean name of a
KafkaAwareTransactionManager
used to override the binder’s transaction manager for this binding. Usually needed if you want to synchronize another transaction with the Kafka transaction, using theChainedKafkaTransactionManaager
. To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.默认值:无。
- closeTimeout
-
Timeout in number of seconds to wait for when closing the producer.
默认:
30
. - allowNonTransactional
-
Normally, all output bindings associated with a transactional binder will publish in a new transaction, if one is not already in process. This property allows you to override that behavior. If set to true, records published to this output binding will not be run in a transaction, unless one is already in process.
默认值:
false