Apache Kafka Support
Overview
适用于 Apache Kafka 的 Spring 集成基于 Spring for Apache Kafka project。
Spring Integration for Apache Kafka is based on the Spring for Apache Kafka project.
你需要将此依赖项包含在你的项目中:
You need to include this dependency into your project:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-kafka:{project-version}"
它提供以下组件:
It provides the following components:
Outbound Channel Adapter
出站通道适配器用于将消息从 Spring Integration 通道发布到 Apache Kafka 主题。通道在应用程序上下文中定义,然后连接到将消息发送到 Apache Kafka 的应用程序。发送者应用程序可以使用 Spring Integration 消息(由出站通道适配器在内部转换为 Kafka 记录)发布到 Apache Kafka,如下所示:
The Outbound channel adapter is used to publish messages from a Spring Integration channel to Apache Kafka topics. The channel is defined in the application context and then wired into the application that sends messages to Apache Kafka. Sender applications can publish to Apache Kafka by using Spring Integration messages, which are internally converted to Kafka records by the outbound channel adapter, as follows:
-
The payload of the Spring Integration message is used to populate the payload of the Kafka record.
-
By default, the
kafka_messageKey
header of the Spring Integration message is used to populate the key of the Kafka record.
你可以通过 kafka_topic
和 kafka_partitionId
头分别自定义用于发布消息的目标主题和分区。
You can customize the target topic and partition for publishing the message through the kafka_topic
and kafka_partitionId
headers, respectively.
此外,<int-kafka:outbound-channel-adapter>
提供了通过对出站消息应用 SpEL 表达式来提取密钥、目标主题和目标分区的功能。为此,它支持三对互斥的属性:
In addition, the <int-kafka:outbound-channel-adapter>
provides the ability to extract the key, target topic, and target partition by applying SpEL expressions on the outbound message.
To that end, it supports three mutually exclusive pairs of attributes:
-
topic
andtopic-expression
-
message-key
andmessage-key-expression
-
partition-id
andpartition-id-expression
这些属性分别让你可以在适配器上将 topic
、message-key
和 partition-id
指定为静态值,或者在运行时根据请求消息动态计算其值。
These let you specify topic
, message-key
, and partition-id
, respectively, as static values on the adapter or to dynamically evaluate their values at runtime against the request message.
KafkaHeaders
接口(由 spring-kafka
提供)包含用于与头信息交互的常量。messageKey
和 topic
默认头信息现在需要 kafka_
前缀。在从使用旧头信息的早期版本迁移时,您需要在 <int-kafka:outbound-channel-adapter>
上指定 message-key-expression="headers['messageKey']"
和 topic-expression="headers['topic']"
。您还可以使用 <header-enricher>
或 MessageBuilder
将上游头信息更改为来自 KafkaHeaders
的新头信息。如果您使用常量值,也可以使用 topic
和 message-key
来在适配器上配置它们。
The KafkaHeaders
interface (provided by spring-kafka
) contains constants used for interacting with
headers.
The messageKey
and topic
default headers now require a kafka_
prefix.
When migrating from an earlier version that used the old headers, you need to specify message-key-expression="headers['messageKey']"
and topic-expression="headers['topic']"
on the <int-kafka:outbound-channel-adapter>
.
Alternatively, you can change the headers upstream to the new headers from KafkaHeaders
by using a <header-enricher>
or a MessageBuilder
.
If you use constant values, you can also configure them on the adapter by using topic
and message-key
.
注意:如果适配器配置有主题或消息密钥(使用常量或表达式),则将使用这些密钥,并且将忽略相应头。如果你希望头覆盖配置,则需要在表达式中进行配置,例如以下表达式:
NOTE : If the adapter is configured with a topic or message key (either with a constant or expression), those are used and the corresponding header is ignored. If you wish the header to override the configuration, you need to configure it in an expression, such as the following:
topic-expression="headers['topic'] != null ? headers['topic'] : 'myTopic'"
适配器需要一个 KafkaTemplate
,而 KafkaTemplate
又需要一个配置适当的 KafkaProducerFactory
。
The adapter requires a KafkaTemplate
, which, in turn, requires a suitably configured KafkaProducerFactory
.
如果提供了 send-failure-channel
(sendFailureChannel
),并且接收到了 send()
故障(同步或异步),则会向该通道发送一个 ErrorMessage
。有效负载是一个带有 failedMessage
、record
(即 ProducerRecord
)和 cause
属性的 KafkaSendFailureException
。你可以通过设置 error-message-strategy
属性来覆盖 DefaultErrorMessageStrategy
。
If a send-failure-channel
(sendFailureChannel
) is provided and a send()
failure (sync or async) is received, an ErrorMessage
is sent to the channel.
The payload is a KafkaSendFailureException
with failedMessage
, record
(the ProducerRecord
) and cause
properties.
You can override the DefaultErrorMessageStrategy
by setting the error-message-strategy
property.
如果提供了 send-success-channel
(sendSuccessChannel
),则在发送成功后会发送一个有效负载类型为 org.apache.kafka.clients.producer.RecordMetadata
的消息。
If a send-success-channel
(sendSuccessChannel
) is provided, a message with a payload of type org.apache.kafka.clients.producer.RecordMetadata
is sent after a successful send.
如果您的应用程序使用事务,并且同一个通道适配器用于在侦听器容器启动事务的情况下发布消息,以及在不存在现有事务的情况下发布消息,您必须在 KafkaTemplate
上配置一个 transactionIdPrefix
来覆盖容器或事务管理器使用的前缀。由容器发起的交易(生产者工厂或交易管理器属性)使用的前缀在所有应用程序实例上必须相同。对于仅生产者的事务,使用的前缀必须在所有应用程序实例上都是唯一的。
If your application uses transactions and the same channel adapter is used to publish messages where the transaction is started by a listener container, as well as publishing where there is no existing transaction, you must configure a transactionIdPrefix
on the KafkaTemplate
to override the prefix used by the container or transaction manager.
The prefix used by container-initiated transactions (the producer factory or transaction manager property) must be the same on all application instances.
The prefix used for producer-only transactions must be unique on all application instances.
你可以配置一个 flushExpression
,它必须解析为一个布尔值。如果你使用的是 linger.ms
和 batch.size
Kafka 生产者属性,那么在发送几条消息后刷新可能有用;表达式应该在最后一条消息上计算为 Boolean.TRUE
,并且一个不完整的批处理将立即发送。默认情况下,表达式在 KafkaIntegrationHeaders.FLUSH
头(kafka_flush
)中查找一个 Boolean
值。如果该值为 true
则会刷新,如果该值是 false
或头不存在则不会刷新。
You can configure a flushExpression
which must resolve to a boolean value.
Flushing after sending several messages might be useful if you are using the linger.ms
and batch.size
Kafka producer properties; the expression should evaluate to Boolean.TRUE
on the last message and an incomplete batch will be sent immediately.
By default, the expression looks for a Boolean
value in the KafkaIntegrationHeaders.FLUSH
header (kafka_flush
).
The flush will occur if the value is true
and not if it’s false
or the header is absent.
KafkaProducerMessageHandler.sendTimeoutExpression
默认值已从 10 秒更改为 delivery.timeout.ms
Kafka 生产者属性 + 5000
,以便将超时后的实际 Kafka 错误传播到应用程序,而不是由该框架生成超时错误。这对于保持一致性而言已被更改,因为你可能会得到意外的行为(Spring 可能会对发送进行超时,而它实际上最终会成功)。重要提示:该超时在默认情况下为 120 秒,因此你可能希望缩短该超时以更及时地获得故障。
The KafkaProducerMessageHandler.sendTimeoutExpression
default has changed from 10 seconds to the delivery.timeout.ms
Kafka producer property + 5000
so that the actual Kafka error after a timeout is propagated to the application, instead of a timeout generated by this framework.
This has been changed for consistency because you may get unexpected behavior (Spring may timeout the send, while it is actually, eventually, successful).
IMPORTANT: That timeout is 120 seconds by default so you may wish to reduce it to get more timely failures.
Configuration
以下示例展示了如何为 Apache Kafka 配置出站通道适配器:
The following example shows how to configure the outbound channel adapter for Apache Kafka:
-
Java DSL
-
Java
-
XML
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
}
@Bean
public IntegrationFlow sendToKafkaFlow() {
return f -> f
.splitWith(s -> s.<String>function(p -> Stream.generate(() -> p).limit(101).iterator()))
.publishSubscribeChannel(c -> c
.subscribe(sf -> sf.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC1)
.timestampExpression("T(Long).valueOf('1487694048633')"),
e -> e.id("kafkaProducer1")))
.subscribe(sf -> sf.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC2)
.timestamp(m -> 1487694048644L),
e -> e.id("kafkaProducer2")))
);
}
@Bean
public DefaultKafkaHeaderMapper mapper() {
return new DefaultKafkaHeaderMapper();
}
private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler(
ProducerFactory<Integer, String> producerFactory, String topic) {
return Kafka
.outboundChannelAdapter(producerFactory)
.messageKey(m -> m
.getHeaders()
.get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.headerMapper(mapper())
.partitionId(m -> 10)
.topicExpression("headers[kafka_topic] ?: '" + topic + "'")
.configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
}
@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler handler() throws Exception {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression("someTopic"));
handler.setMessageKeyExpression(new LiteralExpression("someKey"));
handler.setSuccessChannel(successes());
handler.setFailureChannel(failures());
return handler;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
// set more properties
return new DefaultKafkaProducerFactory<>(props);
}
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-template="template"
auto-startup="false"
channel="inputToKafka"
topic="foo"
sync="false"
message-key-expression="'bar'"
send-failure-channel="failures"
send-success-channel="successes"
error-message-strategy="ems"
partition-id-expression="2">
</int-kafka:outbound-channel-adapter>
<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
... <!-- more producer properties -->
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
Message-driven Channel Adapter
KafkaMessageDrivenChannelAdapter
(<int-kafka:message-driven-channel-adapter>
) 使用一个 spring-kafka
KafkaMessageListenerContainer
或 ConcurrentListenerContainer
。
The KafkaMessageDrivenChannelAdapter
(<int-kafka:message-driven-channel-adapter>
) uses a spring-kafka
KafkaMessageListenerContainer
or ConcurrentListenerContainer
.
此外,还提供 mode
属性。它可以接受 record
或 batch
值(默认值:record
)。对于 record
模式,每个消息有效负载都从单个 ConsumerRecord
转换而来。对于 batch
模式,有效负载是一个对象列表,这些对象是从使用者轮询返回的所有 ConsumerRecord
实例转换而来的。与批处理 @KafkaListener
一样,KafkaHeaders.RECEIVED_KEY
、KafkaHeaders.RECEIVED_PARTITION
、KafkaHeaders.RECEIVED_TOPIC
和 KafkaHeaders.OFFSET
头也是列表,其位置对应于有效负载中的位置。
Also, the mode
attribute is available.
It can accept values of record
or batch
(default: record
).
For record
mode, each message payload is converted from a single ConsumerRecord
.
For batch
mode, the payload is a list of objects that are converted from all the ConsumerRecord
instances returned by the consumer poll.
As with the batched @KafkaListener
, the KafkaHeaders.RECEIVED_KEY
, KafkaHeaders.RECEIVED_PARTITION
, KafkaHeaders.RECEIVED_TOPIC
, and KafkaHeaders.OFFSET
headers are also lists, with positions corresponding to the position in the payload.
接收到的消息已填充了某些标头。有关详细信息,请参见 KafkaHeaders
class。
Received messages have certain headers populated.
See the KafkaHeaders
class for more information.
kafka_consumer
标头中的 Consumer
对象不是线程安全的。您必须仅在适配器内调用侦听器的线程上调用它的方法。如果您将消息传递给另一个线程,您不得调用它的方法。
The Consumer
object (in the kafka_consumer
header) is not thread-safe.
You must invoke its methods only on the thread that calls the listener within the adapter.
If you hand off the message to another thread, you must not call its methods.
在提供了 retry-template
的情况下,会根据其重试策略重试交付故障。如果还提供了 error-channel
,则在重试次数用尽后将使用一个默认的 ErrorMessageSendingRecoverer
作为恢复回调。你还可以使用 recovery-callback
来指定在该情况下采取的其他操作,或者将其设置为空值以将最终异常抛给监听器容器,以便在其中处理该异常。
When a retry-template
is provided, delivery failures are retried according to its retry policy.
If an error-channel
is also supplied, a default ErrorMessageSendingRecoverer
will be used as the recovery callback after retries are exhausted.
You can also use the recovery-callback
to specify some other action to take in that case, or set it to null
to throw the final exception to the listener container so it is handled there.
构建 ErrorMessage
(用于 error-channel
或 recovery-callback
)时,您可以通过设置 error-message-strategy
属性自定义错误消息。默认情况下,使用 RawRecordHeaderErrorMessageStrategy
,从而允许访问转换后的消息以及原始 ConsumerRecord
。
When building an ErrorMessage
(for use in the error-channel
or recovery-callback
), you can customize the error message by setting the error-message-strategy
property.
By default, a RawRecordHeaderErrorMessageStrategy
is used, to provide access to the converted message as well as the raw ConsumerRecord
.
这种形式的重试是阻塞的,并且如果所有轮询记录的重试延迟总和超过 consumer max.poll.interval.ms
属性,则可能导致重新平衡。相反,请考虑向侦听器容器添加一个 DefaultErrorHandler
,配置一个 KafkaErrorSendingMessageRecoverer
。
This form of retry is blocking and could cause a rebalance if the aggregate retry delays across all polled records might exceed the max.poll.interval.ms
consumer property.
Instead, consider adding a DefaultErrorHandler
to the listener container, configured with a KafkaErrorSendingMessageRecoverer
.
Configuration
以下示例展示如何配置一个消息驱动的通道适配器:
The following example shows how to configure a message-driven channel adapter:
-
Java DSL
-
Java
-
XML
@Bean
public IntegrationFlow topic1ListenerFromKafkaFlow() {
return IntegrationFlow
.from(Kafka.messageDrivenChannelAdapter(consumerFactory(),
KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC1)
.configureListenerContainer(c ->
c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL)
.id("topic1ListenerContainer"))
.recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(),
new RawRecordHeaderErrorMessageStrategy()))
.retryTemplate(new RetryTemplate())
.filterInRetry(true))
.filter(Message.class, m ->
m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
f -> f.throwExceptionOnRejection(true))
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("listeningFromKafkaResults1"))
.get();
}
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
return kafkaMessageDrivenChannelAdapter;
}
@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
ContainerProperties properties = new ContainerProperties(this.topic);
// set more properties
return new KafkaMessageListenerContainer<>(consumerFactory(), properties);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
// set more properties
return new DefaultKafkaConsumerFactory<>(props);
}
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
mode="record"
retry-template="template"
recovery-callback="callback"
error-message-strategy="ems"
channel="someChannel"
error-channel="errorChannel" />
<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
...
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="foo" />
</bean>
</constructor-arg>
</bean>
您还可以使用容器工厂(用于 @KafkaListener
注解)来为其他目的创建 ConcurrentMessageListenerContainer
实例。有关示例,请参见 the Spring for Apache Kafka documentation。
You can also use the container factory that is used for @KafkaListener
annotations to create ConcurrentMessageListenerContainer
instances for other purposes.
See the Spring for Apache Kafka documentation for an example.
使用 Java DSL,不需要将容器配置为 @Bean
,因为 DSL 将容器注册为 Bean。以下示例展示如何执行此操作:
With the Java DSL, the container does not have to be configured as a @Bean
, because the DSL registers the container as a bean.
The following example shows how to do so:
@Bean
public IntegrationFlow topic2ListenerFromKafkaFlow() {
return IntegrationFlow
.from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory().createContainer(TEST_TOPIC2),
KafkaMessageDrivenChannelAdapter.ListenerMode.record)
.id("topic2Adapter"))
...
get();
}
请注意,在这种情况下,为适配器指定了一个 id
(topic2Adapter
)。容器在应用程序上下文中以名称 topic2Adapter.container
注册。如果适配器没有 id
属性,则容器的 Bean 名称是容器的完全限定类名加上 #n
,其中 n
为每个容器递增。
Notice that, in this case, the adapter is given an id
(topic2Adapter
).
The container is registered in the application context with a name of topic2Adapter.container
.
If the adapter does not have an id
property, the container’s bean name is the container’s fully qualified class name plus #n
, where n
is incremented for each container.
Inbound Channel Adapter
KafkaMessageSource
提供了一个可轮询的通道适配器实现。
The KafkaMessageSource
provides a pollable channel adapter implementation.
Configuration
-
Java DSL
-
Kotlin
-
Java
-
XML
@Bean
public IntegrationFlow flow(ConsumerFactory<String, String> cf) {
return IntegrationFlow.from(Kafka.inboundChannelAdapter(cf, new ConsumerProperties("myTopic")),
e -> e.poller(Pollers.fixedDelay(5000)))
.handle(System.out::println)
.get();
}
@Bean
fun sourceFlow(cf: ConsumerFactory<String, String>) =
integrationFlow(Kafka.inboundChannelAdapter(cf,
ConsumerProperties(TEST_TOPIC3).also {
it.groupId = "kotlinMessageSourceGroup"
}),
{ poller(Pollers.fixedDelay(100)) }) {
handle { m ->
}
}
@InboundChannelAdapter(channel = "fromKafka", poller = @Poller(fixedDelay = "5000"))
@Bean
public KafkaMessageSource<String, String> source(ConsumerFactory<String, String> cf) {
ConsumerProperties consumerProperties = new ConsumerProperties("myTopic");
consumerProperties.setGroupId("myGroupId");
consumerProperties.setClientId("myClientId");
retunr new KafkaMessageSource<>(cf, consumerProperties);
}
<int-kafka:inbound-channel-adapter
id="adapter1"
consumer-factory="consumerFactory"
consumer-properties="consumerProperties1"
ack-factory="ackFactory"
channel="inbound"
message-converter="converter"
payload-type="java.lang.String"
raw-header="true"
auto-startup="false">
<int:poller fixed-delay="5000"/>
</int-kafka:inbound-channel-adapter>
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="max.poll.records" value="1"/>
</map>
</constructor-arg>
</bean>
<bean id="consumerProperties1" class="org.springframework.kafka.listener.ConsumerProperties">
<constructor-arg name="topics" value="topic1"/>
<property name="groupId" value="group"/>
<property name="clientId" value="client"/>
</bean>
请参阅 javadoc 以了解可用的属性。
Refer to the javadocs for available properties.
默认情况下,max.poll.records
必须在消费者工厂中显式设置,或者如果消费者工厂是 DefaultKafkaConsumerFactory
,则将强制其为 1。您可以将属性 allowMultiFetch
设置为 true
以覆盖此行为。
By default, max.poll.records
must be either explicitly set in the consumer factory, or it will be forced to 1 if the consumer factory is a DefaultKafkaConsumerFactory
.
You can set the property allowMultiFetch
to true
to override this behavior.
您必须在 max.poll.interval.ms
内轮询消费者以避免重新平衡。如果您将 allowMultiFetch
设置为 true
,您必须在 max.poll.interval.ms
内处理所有检索到的记录,并重新轮询。
You must poll the consumer within max.poll.interval.ms
to avoid a rebalance.
If you set allowMultiFetch
to true
you must process all the retrieved records, and poll again, within max.poll.interval.ms
.
该适配器发出的消息包含一个标头 kafka_remainingRecords
,其中包含上次轮询中剩余的记录数。
Messages emitted by this adapter contain a header kafka_remainingRecords
with a count of records remaining from the previous poll.
从版本 6.2
开始,KafkaMessageSource
支持客户机属性中提供的 ErrorHandlingDeserializer
。DeserializationException
从记录标头中提取并抛给被调用者。使用 SourcePollingChannelAdapter
,此异常将被包装到 ErrorMessage
中并发布到其 errorChannel
。有关详细信息,请参见 ErrorHandlingDeserializer
文档。
Starting with version 6.2
, the KafkaMessageSource
supports an ErrorHandlingDeserializer
provided in the consumer properties.
A DeserializationException
is extracted from record headers and thrown to the called.
With a SourcePollingChannelAdapter
this exception is wrapped into an ErrorMessage
and published to its errorChannel
.
See ErrorHandlingDeserializer
documentation for more information.
Outbound Gateway
出站网关用于请求/回复操作。它不同于大多数 Spring Integration 网关,原因在于发送线程不会封锁在网关中,而回复将在回复侦听器容器线程上进行处理。如果你的代码在同步 Messaging Gateway 代码之后调用网关,则用户线程会一直就此封锁,直至收到回复(或超时)。
The outbound gateway is for request/reply operations. It differs from most Spring Integration gateways in that the sending thread does not block in the gateway, and the reply is processed on the reply listener container thread. If your code invokes the gateway behind a synchronous Messaging Gateway, the user thread blocks there until the reply is received (or a timeout occurs).
在应答容器被分配主题和分区之前,网关不接受请求。建议您向模板的应答容器属性添加一个 ConsumerRebalanceListener
,并在向网关发送消息之前等待 onPartitionsAssigned
调用。
The gateway does not accept requests until the reply container has been assigned its topics and partitions.
It is suggested that you add a ConsumerRebalanceListener
to the template’s reply container properties and wait for the onPartitionsAssigned
call before sending messages to the gateway.
KafkaProducerMessageHandler
sendTimeoutExpression
默认值为 delivery.timeout.ms
Kafka 生产者属性 + 5000
,以便在超时后将实际的 Kafka 错误传播到应用程序,而不是由该框架生成超时。由于您可能会遇到意外行为(Spring 可能会让 send()
超时,而它实际上最终是成功的),因此进行了这种更改以保持一致性。重要提示:该超时时间默认是 120 秒,因此您可能希望将其减少,以便更及时地获取失败通知。
The KafkaProducerMessageHandler
sendTimeoutExpression
default is delivery.timeout.ms
Kafka producer property + 5000
so that the actual Kafka error after a timeout is propagated to the application, instead of a timeout generated by this framework.
This has been changed for consistency because you may get unexpected behavior (Spring may time out the send()
, while it is actually, eventually, successful).
IMPORTANT: That timeout is 120 seconds by default, so you may wish to reduce it to get more timely failures.
Configuration
以下示例展示如何配置一个网关:
The following example shows how to configure a gateway:
-
Java DSL
-
Java
-
XML
@Bean
public IntegrationFlow outboundGateFlow(
ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
return IntegrationFlow.from("kafkaRequests")
.handle(Kafka.outboundGateway(kafkaTemplate))
.channel("kafkaReplies")
.get();
}
@Bean
@ServiceActivator(inputChannel = "kafkaRequests", outputChannel = "kafkaReplies")
public KafkaProducerMessageHandler<String, String> outGateway(
ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
return new KafkaProducerMessageHandler<>(kafkaTemplate);
}
<int-kafka:outbound-gateway
id="allProps"
error-message-strategy="ems"
kafka-template="template"
message-key-expression="'key'"
order="23"
partition-id-expression="2"
reply-channel="replies"
reply-timeout="43"
request-channel="requests"
requires-reply="false"
send-success-channel="successes"
send-failure-channel="failures"
send-timeout-expression="44"
sync="true"
timestamp-expression="T(System).currentTimeMillis()"
topic-expression="'topic'"/>
请参阅 javadoc 以了解可用的属性。
Refer to the javadocs for available properties.
请注意,与 outbound channel adapter 相同的类正在被使用,唯一的区别是传递到构造函数中的 KafkaTemplate
是一个 ReplyingKafkaTemplate
。有关详细信息,请参见 the Spring for Apache Kafka documentation。
Notice that the same class as the outbound channel adapter is used, the only difference being that the KafkaTemplate
passed into the constructor is a ReplyingKafkaTemplate
.
See the Spring for Apache Kafka documentation for more information.
出站主题、分区、密钥等是由与出站适配器相同的方式确定的。答复主题的确定如下:
The outbound topic, partition, key, and so on are determined in the same way as the outbound adapter. The reply topic is determined as follows:
-
A message header named
KafkaHeaders.REPLY_TOPIC
(if present, it must have aString
orbyte[]
value) is validated against the template’s reply container’s subscribed topics. -
If the template’s
replyContainer
is subscribed to only one topic, it is used.
您还可以指定一个 KafkaHeaders.REPLY_PARTITION
标头来确定用于答复的特定分区。同样,这将根据模板的答复容器的订阅进行验证。
You can also specify a KafkaHeaders.REPLY_PARTITION
header to determine a specific partition to be used for replies.
Again, this is validated against the template’s reply container’s subscriptions.
或者,您还可以使用类似以下 Bean 的配置:
Alternatively, you can also use a configuration similar to the following bean:
@Bean
public IntegrationFlow outboundGateFlow() {
return IntegrationFlow.from("kafkaRequests")
.handle(Kafka.outboundGateway(producerFactory(), replyContainer())
.configureKafkaTemplate(t -> t.replyTimeout(30_000)))
.channel("kafkaReplies")
.get();
}
Inbound Gateway
入站网关用于请求/响应操作。
The inbound gateway is for request/reply operations.
Configuration
以下示例展示如何配置一个入站网关:
The following example shows how to configure an inbound gateway:
-
Java DSL
-
Java
-
XML
@Bean
public IntegrationFlow serverGateway(
ConcurrentMessageListenerContainer<Integer, String> container,
KafkaTemplate<Integer, String> replyTemplate) {
return IntegrationFlow
.from(Kafka.inboundGateway(container, replyTemplate)
.replyTimeout(30_000))
.<String, String>transform(String::toUpperCase)
.get();
}
@Bean
public KafkaInboundGateway<Integer, String, String> inboundGateway(
AbstractMessageListenerContainer<Integer, String>container,
KafkaTemplate<Integer, String> replyTemplate) {
KafkaInboundGateway<Integer, String, String> gateway =
new KafkaInboundGateway<>(container, replyTemplate);
gateway.setRequestChannel(requests);
gateway.setReplyChannel(replies);
gateway.setReplyTimeout(30_000);
return gateway;
}
<int-kafka:inbound-gateway
id="gateway1"
listener-container="container1"
kafka-template="template"
auto-startup="false"
phase="100"
request-timeout="5000"
request-channel="nullChannel"
reply-channel="errorChannel"
reply-timeout="43"
message-converter="messageConverter"
payload-type="java.lang.String"
error-message-strategy="ems"
retry-template="retryTemplate"
recovery-callback="recoveryCallback"/>
请参阅 javadoc 以了解可用的属性。
Refer to the javadocs for available properties.
当提供 RetryTemplate
时,会根据其重试策略重试传递失败。如果还提供了 error-channel
,则在重试用尽后,ErrorMessageSendingRecoverer
将被用作恢复回调。你还可以使用 recovery-callback
在那种情况下指定要执行的其他操作,或将其设置为 null
,以向侦听器容器抛出最终的异常,以便在那里进行处理。
When a RetryTemplate
is provided, delivery failures are retried according to its retry policy.
If an error-channel
is also supplied, a default ErrorMessageSendingRecoverer
will be used as the recovery callback after retries are exhausted.
You can also use the recovery-callback
to specify some other action to take in that case, or set it to null
to throw the final exception to the listener container so it is handled there.
构建 ErrorMessage
(用于 error-channel
或 recovery-callback
)时,您可以通过设置 error-message-strategy
属性自定义错误消息。默认情况下,使用 RawRecordHeaderErrorMessageStrategy
,从而允许访问转换后的消息以及原始 ConsumerRecord
。
When building an ErrorMessage
(for use in the error-channel
or recovery-callback
), you can customize the error message by setting the error-message-strategy
property.
By default, a RawRecordHeaderErrorMessageStrategy
is used, to provide access to the converted message as well as the raw ConsumerRecord
.
这种形式的重试是阻塞的,并且如果所有轮询记录的重试延迟总和超过 consumer max.poll.interval.ms
属性,则可能导致重新平衡。相反,请考虑向侦听器容器添加一个 DefaultErrorHandler
,配置一个 KafkaErrorSendingMessageRecoverer
。
This form of retry is blocking and could cause a rebalance if the aggregate retry delays across all polled records might exceed the max.poll.interval.ms
consumer property.
Instead, consider adding a DefaultErrorHandler
to the listener container, configured with a KafkaErrorSendingMessageRecoverer
.
下面的示例展示了如何使用 Java DSL 配置一个简单的转换器,将消息转换为大写:
The following example shows how to configure a simple upper case converter with the Java DSL:
或者,你可以使用类似于以下的代码来配置一个转换器,将消息转换为大写:
Alternatively, you could configure an upper-case converter by using code similar to the following:
@Bean
public IntegrationFlow serverGateway() {
return IntegrationFlow
.from(Kafka.inboundGateway(consumerFactory(), containerProperties(),
producerFactory())
.replyTimeout(30_000))
.<String, String>transform(String::toUpperCase)
.get();
}
您还可以使用容器工厂(用于 @KafkaListener
注解)来为其他目的创建 ConcurrentMessageListenerContainer
实例。有关示例,请参见 the Spring for Apache Kafka documentation 和 Message-driven Channel Adapter。
You can also use the container factory that is used for @KafkaListener
annotations to create ConcurrentMessageListenerContainer
instances for other purposes.
See the Spring for Apache Kafka documentation and Message-driven Channel Adapter for examples.
Channels Backed by Apache Kafka Topics
Spring Integration 具有受 Apache Kafka 主题支持的 MessageChannel
实现,以保证持久性。
Spring Integration has MessageChannel
implementations backed by an Apache Kafka topic for persistence.
每个通道都需要一个 KafkaTemplate
用于发送方,以及一个侦听器容器工厂(对于可订阅的通道)或一个 KafkaMessageSource
用于可轮询的通道。
Each channel requires a KafkaTemplate
for the sending side and either a listener container factory (for subscribable channels) or a KafkaMessageSource
for a pollable channel.
Java DSL Configuration
-
Java DSL
-
Java
-
XML
@Bean
public IntegrationFlow flowWithSubscribable(KafkaTemplate<Integer, String> template,
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
return IntegrationFlow.from(...)
...
.channel(Kafka.channel(template, containerFactory, "someTopic1").groupId("group1"))
...
.get();
}
@Bean
public IntegrationFlow flowWithPubSub(KafkaTemplate<Integer, String> template,
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
return IntegrationFlow.from(...)
...
.publishSubscribeChannel(pubSub(template, containerFactory),
pubsub -> pubsub
.subscribe(subflow -> ...)
.subscribe(subflow -> ...))
.get();
}
@Bean
public BroadcastCapableChannel pubSub(KafkaTemplate<Integer, String> template,
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
return Kafka.publishSubscribeChannel(template, containerFactory, "someTopic2")
.groupId("group2")
.get();
}
@Bean
public IntegrationFlow flowWithPollable(KafkaTemplate<Integer, String> template,
KafkaMessageSource<Integer, String> source) {
return IntegrationFlow.from(...)
...
.channel(Kafka.pollableChannel(template, source, "someTopic3").groupId("group3"))
.handle(..., e -> e.poller(...))
...
.get();
}
/**
* Channel for a single subscriber.
**/
@Bean
SubscribableKafkaChannel pointToPoint(KafkaTemplate<String, String> template,
KafkaListenerContainerFactory<String, String> factory)
SubscribableKafkaChannel channel =
new SubscribableKafkaChannel(template, factory, "topicA");
channel.setGroupId("group1");
return channel;
}
/**
* Channel for multiple subscribers.
**/
@Bean
SubscribableKafkaChannel pubsub(KafkaTemplate<String, String> template,
KafkaListenerContainerFactory<String, String> factory)
SubscribableKafkaChannel channel =
new SubscribableKafkaChannel(template, factory, "topicB", true);
channel.setGroupId("group2");
return channel;
}
/**
* Pollable channel (topic is configured on the source)
**/
@Bean
PollableKafkaChannel pollable(KafkaTemplate<String, String> template,
KafkaMessageSource<String, String> source)
PollableKafkaChannel channel =
new PollableKafkaChannel(template, source);
channel.setGroupId("group3");
return channel;
}
<int-kafka:channel kafka-template="template" id="ptp" topic="ptpTopic" group-id="ptpGroup"
container-factory="containerFactory" />
<int-kafka:pollable-channel kafka-template="template" id="pollable" message-source="source"
group-id = "pollableGroup"/>
<int-kafka:publish-subscribe-channel kafka-template="template" id="pubSub" topic="pubSubTopic"
group-id="pubSubGroup" container-factory="containerFactory" />
Message Conversion
StringJsonMessageConverter
提供。有关详细信息,请参见 the Spring for Apache Kafka documentation。
A StringJsonMessageConverter
is provided.
See the Spring for Apache Kafka documentation for more information.
将此转换器与消息驱动通道适配器一起使用时,您可以指定希望将传入有效负载转换为何类型。这是通过在适配器上设置 payload-type
属性(payloadType
属性)来实现的。以下示例演示如何在 XML 配置中执行此操作:
When using this converter with a message-driven channel adapter, you can specify the type to which you want the incoming payload to be converted.
This is achieved by setting the payload-type
attribute (payloadType
property) on the adapter.
The following example shows how to do so in XML configuration:
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
channel="nullChannel"
message-converter="messageConverter"
payload-type="com.example.Thing"
error-channel="errorChannel" />
<bean id="messageConverter"
class="org.springframework.kafka.support.converter.MessagingMessageConverter"/>
以下示例演示如何在 Java 配置中为适配器设置 payload-type
属性(payloadType
属性):
The following example shows how to set the payload-type
attribute (payloadType
property) on the adapter in Java configuration:
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
kafkaMessageDrivenChannelAdapter.setMessageConverter(converter());
kafkaMessageDrivenChannelAdapter.setPayloadType(Thing.class);
return kafkaMessageDrivenChannelAdapter;
}
Null Payloads and Log Compaction 'Tombstone' Records
Spring 消息传递 Message<?>
对象不能有 null
有效载荷。当您将端点用于 Apache Kafka 时,null
有效载荷(也称为墓碑记录)由类型为 KafkaNull
的有效载荷表示。有关详细信息,请参见 the Spring for Apache Kafka documentation。
Spring Messaging Message<?>
objects cannot have null
payloads.
When you use the endpoints for Apache Kafka, null
payloads (also known as tombstone records) are represented by a payload of type KafkaNull
.
See the Spring for Apache Kafka documentation for more information.
Spring Integration 端点的 POJO 方法可以使用 true null
值,而不是 KafkaNull
。为此,请使用 @Payload(required = false)
标记参数。以下示例演示如何执行此操作:
The POJO methods for Spring Integration endpoints can use a true null
value instead of KafkaNull
.
To do so, mark the parameter with @Payload(required = false)
.
The following example shows how to do so:
@ServiceActivator(inputChannel = "fromSomeKafkaInboundEndpoint")
public void in(@Header(KafkaHeaders.RECEIVED_KEY) String key,
@Payload(required = false) Customer customer) {
// customer is null if a tombstone record
...
}
Calling a Spring Integration flow from a KStream
您可以使用 MessagingTransformer
调用来自 KStream
的集成流:
You can use a MessagingTransformer
to invoke an integration flow from a KStream
:
@Bean
public KStream<byte[], byte[]> kStream(StreamsBuilder kStreamBuilder,
MessagingTransformer<byte[], byte[], byte[]> transformer) transformer) {
KStream<byte[], byte[]> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
stream.mapValues((ValueMapper<byte[], byte[]>) String::toUpperCase)
...
.transform(() -> transformer)
.to(streamingTopic2);
stream.print(Printed.toSysOut());
return stream;
}
@Bean
@DependsOn("flow")
public MessagingTransformer<byte[], byte[], String> transformer(
MessagingFunction function) {
MessagingMessageConverter converter = new MessagingMessageConverter();
converter.setHeaderMapper(new SimpleKafkaHeaderMapper("*"));
return new MessagingTransformer<>(function, converter);
}
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from(MessagingFunction.class)
...
.get();
}
当集成流从接口开始时,所创建的代理具有流 Bean 的名称,后跟 “.gateway”,因此如果需要,可以使用此 Bean 名称作为 @Qualifier
。
When an integration flow starts with an interface, the proxy that is created has the name of the flow bean, appended with ".gateway" so this bean name can be used a a @Qualifier
if needed.
Performance Considerations for read/process/write Scenarios
许多应用程序从主题中消耗,执行一些处理并写入另一个主题。在大多数情况下,如果 write
失败,应用程序将抛出一个异常,以便重试传入请求和/或将其发送到死信主题。此功能由基础消息侦听器容器和配置合理的错误处理程序支持。但是,为了支持这一点,我们需要在写操作成功(或失败)之前阻塞侦听器线程,以便可以将任何异常抛出到容器中。在使用单个记录时,通过在出站适配器上设置 sync
属性来实现此操作。但是,在使用批处理时,使用 sync
会导致严重的性能下降,因为应用程序将在发送下一条消息之前等待每次发送的结果。您还可以执行多次发送,然后等待这些发送的结果。这是通过向消息处理程序中添加 futuresChannel
来实现的。要启用此功能,请将 KafkaIntegrationHeaders.FUTURE_TOKEN
添加到出站消息;然后,这可用于将 Future
与特定已发送消息相关联。以下是如何使用此功能的一个示例:
Many applications consume from a topic, perform some processing and write to another topic.
In most, cases, if the write
fails, the application would want to throw an exception so the incoming request can be retried and/or sent to a dead letter topic.
This functionality is supported by the underlying message listener container, together with a suitably configured error handler.
However, in order to support this, we need to block the listener thread until the success (or failure) of the write operation so that any exceptions can be thrown to the container.
When consuming single records, this is achieved by setting the sync
property on the outbound adapter.
However, when consuming batches, using sync
causes a significant performance degradation because the application would wait for the result of each send before sending the next message.
You also can perform multiple sends and then wait for the results of those sends afterwards.
This is achieved by adding a futuresChannel
to the message handler.
To enable the feature add KafkaIntegrationHeaders.FUTURE_TOKEN
to the outbound messages; this can then be used to correlate a Future
to a particular sent message.
Here is an example of how you might use this feature:
@SpringBootApplication
public class FuturesChannelApplication {
public static void main(String[] args) {
SpringApplication.run(FuturesChannelApplication.class, args);
}
@Bean
IntegrationFlow inbound(ConsumerFactory<String, String> consumerFactory, Handler handler) {
return IntegrationFlow.from(Kafka.messageDrivenChannelAdapter(consumerFactory,
ListenerMode.batch, "inTopic"))
.handle(handler)
.get();
}
@Bean
IntegrationFlow outbound(KafkaTemplate<String, String> kafkaTemplate) {
return IntegrationFlow.from(Gate.class)
.enrichHeaders(h -> h
.header(KafkaHeaders.TOPIC, "outTopic")
.headerExpression(KafkaIntegrationHeaders.FUTURE_TOKEN, "headers[id]"))
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
.futuresChannel("futures"))
.get();
}
@Bean
PollableChannel futures() {
return new QueueChannel();
}
}
@Component
@DependsOn("outbound")
class Handler {
@Autowired
Gate gate;
@Autowired
PollableChannel futures;
public void handle(List<String> input) throws Exception {
System.out.println(input);
input.forEach(str -> this.gate.send(str.toUpperCase()));
for (int i = 0; i < input.size(); i++) {
Message<?> future = this.futures.receive(10000);
((Future<?>) future.getPayload()).get(10, TimeUnit.SECONDS);
}
}
}
interface Gate {
void send(String out);
}