Apache Kafka Support
Overview
适用于 Apache Kafka 的 Spring 集成基于 Spring for Apache Kafka project。
你需要将此依赖项包含在你的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-kafka:{project-version}"
它提供以下组件:
Outbound Channel Adapter
出站通道适配器用于将消息从 Spring Integration 通道发布到 Apache Kafka 主题。通道在应用程序上下文中定义,然后连接到将消息发送到 Apache Kafka 的应用程序。发送者应用程序可以使用 Spring Integration 消息(由出站通道适配器在内部转换为 Kafka 记录)发布到 Apache Kafka,如下所示:
-
Spring Integration 消息的有效负载用于填充 Kafka 记录的有效负载。
-
默认情况下,Spring Integration 消息的
kafka_messageKey
标头用于填充 Kafka 记录的键。
你可以通过 kafka_topic
和 kafka_partitionId
头分别自定义用于发布消息的目标主题和分区。
此外,<int-kafka:outbound-channel-adapter>
提供了通过对出站消息应用 SpEL 表达式来提取密钥、目标主题和目标分区的功能。为此,它支持三对互斥的属性:
-
topic
andtopic-expression
-
message-key
andmessage-key-expression
-
partition-id
andpartition-id-expression
这些属性分别让你可以在适配器上将 topic
、message-key
和 partition-id
指定为静态值,或者在运行时根据请求消息动态计算其值。
KafkaHeaders
接口(由 spring-kafka
提供)包含用于与头信息交互的常量。messageKey
和 topic
默认头信息现在需要 kafka_
前缀。在从使用旧头信息的早期版本迁移时,您需要在 <int-kafka:outbound-channel-adapter>
上指定 message-key-expression="headers['messageKey']"
和 topic-expression="headers['topic']"
。您还可以使用 <header-enricher>
或 MessageBuilder
将上游头信息更改为来自 KafkaHeaders
的新头信息。如果您使用常量值,也可以使用 topic
和 message-key
来在适配器上配置它们。
注意:如果适配器配置有主题或消息密钥(使用常量或表达式),则将使用这些密钥,并且将忽略相应头。如果你希望头覆盖配置,则需要在表达式中进行配置,例如以下表达式:
topic-expression="headers['topic'] != null ? headers['topic'] : 'myTopic'"
适配器需要一个 KafkaTemplate
,而 KafkaTemplate
又需要一个配置适当的 KafkaProducerFactory
。
如果提供了 send-failure-channel
(sendFailureChannel
),并且接收到了 send()
故障(同步或异步),则会向该通道发送一个 ErrorMessage
。有效负载是一个带有 failedMessage
、record
(即 ProducerRecord
)和 cause
属性的 KafkaSendFailureException
。你可以通过设置 error-message-strategy
属性来覆盖 DefaultErrorMessageStrategy
。
如果提供了 send-success-channel
(sendSuccessChannel
),则在发送成功后会发送一个有效负载类型为 org.apache.kafka.clients.producer.RecordMetadata
的消息。
如果您的应用程序使用事务,并且同一个通道适配器用于在侦听器容器启动事务的情况下发布消息,以及在不存在现有事务的情况下发布消息,您必须在 KafkaTemplate
上配置一个 transactionIdPrefix
来覆盖容器或事务管理器使用的前缀。由容器发起的交易(生产者工厂或交易管理器属性)使用的前缀在所有应用程序实例上必须相同。对于仅生产者的事务,使用的前缀必须在所有应用程序实例上都是唯一的。
你可以配置一个 flushExpression
,它必须解析为一个布尔值。如果你使用的是 linger.ms
和 batch.size
Kafka 生产者属性,那么在发送几条消息后刷新可能有用;表达式应该在最后一条消息上计算为 Boolean.TRUE
,并且一个不完整的批处理将立即发送。默认情况下,表达式在 KafkaIntegrationHeaders.FLUSH
头(kafka_flush
)中查找一个 Boolean
值。如果该值为 true
则会刷新,如果该值是 false
或头不存在则不会刷新。
KafkaProducerMessageHandler.sendTimeoutExpression
默认值已从 10 秒更改为 delivery.timeout.ms
Kafka 生产者属性 + 5000
,以便将超时后的实际 Kafka 错误传播到应用程序,而不是由该框架生成超时错误。这对于保持一致性而言已被更改,因为你可能会得到意外的行为(Spring 可能会对发送进行超时,而它实际上最终会成功)。重要提示:该超时在默认情况下为 120 秒,因此你可能希望缩短该超时以更及时地获得故障。
Configuration
以下示例展示了如何为 Apache Kafka 配置出站通道适配器:
-
Java DSL
-
Java
-
XML
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
}
@Bean
public IntegrationFlow sendToKafkaFlow() {
return f -> f
.splitWith(s -> s.<String>function(p -> Stream.generate(() -> p).limit(101).iterator()))
.publishSubscribeChannel(c -> c
.subscribe(sf -> sf.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC1)
.timestampExpression("T(Long).valueOf('1487694048633')"),
e -> e.id("kafkaProducer1")))
.subscribe(sf -> sf.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC2)
.timestamp(m -> 1487694048644L),
e -> e.id("kafkaProducer2")))
);
}
@Bean
public DefaultKafkaHeaderMapper mapper() {
return new DefaultKafkaHeaderMapper();
}
private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler(
ProducerFactory<Integer, String> producerFactory, String topic) {
return Kafka
.outboundChannelAdapter(producerFactory)
.messageKey(m -> m
.getHeaders()
.get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.headerMapper(mapper())
.partitionId(m -> 10)
.topicExpression("headers[kafka_topic] ?: '" + topic + "'")
.configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
}
@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler handler() throws Exception {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression("someTopic"));
handler.setMessageKeyExpression(new LiteralExpression("someKey"));
handler.setSuccessChannel(successes());
handler.setFailureChannel(failures());
return handler;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
// set more properties
return new DefaultKafkaProducerFactory<>(props);
}
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-template="template"
auto-startup="false"
channel="inputToKafka"
topic="foo"
sync="false"
message-key-expression="'bar'"
send-failure-channel="failures"
send-success-channel="successes"
error-message-strategy="ems"
partition-id-expression="2">
</int-kafka:outbound-channel-adapter>
<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
... <!-- more producer properties -->
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
Message-driven Channel Adapter
KafkaMessageDrivenChannelAdapter
(<int-kafka:message-driven-channel-adapter>
) 使用一个 spring-kafka
KafkaMessageListenerContainer
或 ConcurrentListenerContainer
。
此外,还提供 mode
属性。它可以接受 record
或 batch
值(默认值:record
)。对于 record
模式,每个消息有效负载都从单个 ConsumerRecord
转换而来。对于 batch
模式,有效负载是一个对象列表,这些对象是从使用者轮询返回的所有 ConsumerRecord
实例转换而来的。与批处理 @KafkaListener
一样,KafkaHeaders.RECEIVED_KEY
、KafkaHeaders.RECEIVED_PARTITION
、KafkaHeaders.RECEIVED_TOPIC
和 KafkaHeaders.OFFSET
头也是列表,其位置对应于有效负载中的位置。
接收到的消息已填充了某些标头。有关详细信息,请参见 KafkaHeaders
class。
kafka_consumer
标头中的 Consumer
对象不是线程安全的。您必须仅在适配器内调用侦听器的线程上调用它的方法。如果您将消息传递给另一个线程,您不得调用它的方法。
在提供了 retry-template
的情况下,会根据其重试策略重试交付故障。如果还提供了 error-channel
,则在重试次数用尽后将使用一个默认的 ErrorMessageSendingRecoverer
作为恢复回调。你还可以使用 recovery-callback
来指定在该情况下采取的其他操作,或者将其设置为空值以将最终异常抛给监听器容器,以便在其中处理该异常。
构建 ErrorMessage
(用于 error-channel
或 recovery-callback
)时,您可以通过设置 error-message-strategy
属性自定义错误消息。默认情况下,使用 RawRecordHeaderErrorMessageStrategy
,从而允许访问转换后的消息以及原始 ConsumerRecord
。
这种形式的重试是阻塞的,并且如果所有轮询记录的重试延迟总和超过 consumer max.poll.interval.ms
属性,则可能导致重新平衡。相反,请考虑向侦听器容器添加一个 DefaultErrorHandler
,配置一个 KafkaErrorSendingMessageRecoverer
。
Configuration
以下示例展示如何配置一个消息驱动的通道适配器:
-
Java DSL
-
Java
-
XML
@Bean
public IntegrationFlow topic1ListenerFromKafkaFlow() {
return IntegrationFlow
.from(Kafka.messageDrivenChannelAdapter(consumerFactory(),
KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC1)
.configureListenerContainer(c ->
c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL)
.id("topic1ListenerContainer"))
.recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(),
new RawRecordHeaderErrorMessageStrategy()))
.retryTemplate(new RetryTemplate())
.filterInRetry(true))
.filter(Message.class, m ->
m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
f -> f.throwExceptionOnRejection(true))
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("listeningFromKafkaResults1"))
.get();
}
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
return kafkaMessageDrivenChannelAdapter;
}
@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
ContainerProperties properties = new ContainerProperties(this.topic);
// set more properties
return new KafkaMessageListenerContainer<>(consumerFactory(), properties);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
// set more properties
return new DefaultKafkaConsumerFactory<>(props);
}
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
mode="record"
retry-template="template"
recovery-callback="callback"
error-message-strategy="ems"
channel="someChannel"
error-channel="errorChannel" />
<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
...
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="foo" />
</bean>
</constructor-arg>
</bean>
您还可以使用容器工厂(用于 @KafkaListener
注解)来为其他目的创建 ConcurrentMessageListenerContainer
实例。有关示例,请参见 the Spring for Apache Kafka documentation。
使用 Java DSL,不需要将容器配置为 @Bean
,因为 DSL 将容器注册为 Bean。以下示例展示如何执行此操作:
@Bean
public IntegrationFlow topic2ListenerFromKafkaFlow() {
return IntegrationFlow
.from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory().createContainer(TEST_TOPIC2),
KafkaMessageDrivenChannelAdapter.ListenerMode.record)
.id("topic2Adapter"))
...
get();
}
请注意,在这种情况下,为适配器指定了一个 id
(topic2Adapter
)。容器在应用程序上下文中以名称 topic2Adapter.container
注册。如果适配器没有 id
属性,则容器的 Bean 名称是容器的完全限定类名加上 #n
,其中 n
为每个容器递增。
Inbound Channel Adapter
KafkaMessageSource
提供了一个可轮询的通道适配器实现。
Configuration
-
Java DSL
-
Kotlin
-
Java
-
XML
@Bean
public IntegrationFlow flow(ConsumerFactory<String, String> cf) {
return IntegrationFlow.from(Kafka.inboundChannelAdapter(cf, new ConsumerProperties("myTopic")),
e -> e.poller(Pollers.fixedDelay(5000)))
.handle(System.out::println)
.get();
}
@Bean
fun sourceFlow(cf: ConsumerFactory<String, String>) =
integrationFlow(Kafka.inboundChannelAdapter(cf,
ConsumerProperties(TEST_TOPIC3).also {
it.groupId = "kotlinMessageSourceGroup"
}),
{ poller(Pollers.fixedDelay(100)) }) {
handle { m ->
}
}
@InboundChannelAdapter(channel = "fromKafka", poller = @Poller(fixedDelay = "5000"))
@Bean
public KafkaMessageSource<String, String> source(ConsumerFactory<String, String> cf) {
ConsumerProperties consumerProperties = new ConsumerProperties("myTopic");
consumerProperties.setGroupId("myGroupId");
consumerProperties.setClientId("myClientId");
retunr new KafkaMessageSource<>(cf, consumerProperties);
}
<int-kafka:inbound-channel-adapter
id="adapter1"
consumer-factory="consumerFactory"
consumer-properties="consumerProperties1"
ack-factory="ackFactory"
channel="inbound"
message-converter="converter"
payload-type="java.lang.String"
raw-header="true"
auto-startup="false">
<int:poller fixed-delay="5000"/>
</int-kafka:inbound-channel-adapter>
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="max.poll.records" value="1"/>
</map>
</constructor-arg>
</bean>
<bean id="consumerProperties1" class="org.springframework.kafka.listener.ConsumerProperties">
<constructor-arg name="topics" value="topic1"/>
<property name="groupId" value="group"/>
<property name="clientId" value="client"/>
</bean>
请参阅 javadoc 以了解可用的属性。
默认情况下,max.poll.records
必须在消费者工厂中显式设置,或者如果消费者工厂是 DefaultKafkaConsumerFactory
,则将强制其为 1。您可以将属性 allowMultiFetch
设置为 true
以覆盖此行为。
您必须在 max.poll.interval.ms
内轮询消费者以避免重新平衡。如果您将 allowMultiFetch
设置为 true
,您必须在 max.poll.interval.ms
内处理所有检索到的记录,并重新轮询。
该适配器发出的消息包含一个标头 kafka_remainingRecords
,其中包含上次轮询中剩余的记录数。
从版本 6.2
开始,KafkaMessageSource
支持客户机属性中提供的 ErrorHandlingDeserializer
。DeserializationException
从记录标头中提取并抛给被调用者。使用 SourcePollingChannelAdapter
,此异常将被包装到 ErrorMessage
中并发布到其 errorChannel
。有关详细信息,请参见 ErrorHandlingDeserializer
文档。
Outbound Gateway
出站网关用于请求/回复操作。它不同于大多数 Spring Integration 网关,原因在于发送线程不会封锁在网关中,而回复将在回复侦听器容器线程上进行处理。如果你的代码在同步 Messaging Gateway 代码之后调用网关,则用户线程会一直就此封锁,直至收到回复(或超时)。
在应答容器被分配主题和分区之前,网关不接受请求。建议您向模板的应答容器属性添加一个 ConsumerRebalanceListener
,并在向网关发送消息之前等待 onPartitionsAssigned
调用。
KafkaProducerMessageHandler
sendTimeoutExpression
默认值为 delivery.timeout.ms
Kafka 生产者属性 + 5000
,以便在超时后将实际的 Kafka 错误传播到应用程序,而不是由该框架生成超时。由于您可能会遇到意外行为(Spring 可能会让 send()
超时,而它实际上最终是成功的),因此进行了这种更改以保持一致性。重要提示:该超时时间默认是 120 秒,因此您可能希望将其减少,以便更及时地获取失败通知。
Configuration
以下示例展示如何配置一个网关:
-
Java DSL
-
Java
-
XML
@Bean
public IntegrationFlow outboundGateFlow(
ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
return IntegrationFlow.from("kafkaRequests")
.handle(Kafka.outboundGateway(kafkaTemplate))
.channel("kafkaReplies")
.get();
}
@Bean
@ServiceActivator(inputChannel = "kafkaRequests", outputChannel = "kafkaReplies")
public KafkaProducerMessageHandler<String, String> outGateway(
ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
return new KafkaProducerMessageHandler<>(kafkaTemplate);
}
<int-kafka:outbound-gateway
id="allProps"
error-message-strategy="ems"
kafka-template="template"
message-key-expression="'key'"
order="23"
partition-id-expression="2"
reply-channel="replies"
reply-timeout="43"
request-channel="requests"
requires-reply="false"
send-success-channel="successes"
send-failure-channel="failures"
send-timeout-expression="44"
sync="true"
timestamp-expression="T(System).currentTimeMillis()"
topic-expression="'topic'"/>
请参阅 javadoc 以了解可用的属性。
请注意,与 outbound channel adapter 相同的类正在被使用,唯一的区别是传递到构造函数中的 KafkaTemplate
是一个 ReplyingKafkaTemplate
。有关详细信息,请参见 the Spring for Apache Kafka documentation。
出站主题、分区、密钥等是由与出站适配器相同的方式确定的。答复主题的确定如下:
-
名为
KafkaHeaders.REPLY_TOPIC
的消息标头(如果存在,它必须具有String
或byte[]
值)会根据模板的回复容器订阅的主题进行验证。 -
如果模板的
replyContainer
仅订阅了一个主题,将会使用此主题。
您还可以指定一个 KafkaHeaders.REPLY_PARTITION
标头来确定用于答复的特定分区。同样,这将根据模板的答复容器的订阅进行验证。
或者,您还可以使用类似以下 Bean 的配置:
@Bean
public IntegrationFlow outboundGateFlow() {
return IntegrationFlow.from("kafkaRequests")
.handle(Kafka.outboundGateway(producerFactory(), replyContainer())
.configureKafkaTemplate(t -> t.replyTimeout(30_000)))
.channel("kafkaReplies")
.get();
}
Inbound Gateway
入站网关用于请求/响应操作。
Configuration
以下示例展示如何配置一个入站网关:
-
Java DSL
-
Java
-
XML
@Bean
public IntegrationFlow serverGateway(
ConcurrentMessageListenerContainer<Integer, String> container,
KafkaTemplate<Integer, String> replyTemplate) {
return IntegrationFlow
.from(Kafka.inboundGateway(container, replyTemplate)
.replyTimeout(30_000))
.<String, String>transform(String::toUpperCase)
.get();
}
@Bean
public KafkaInboundGateway<Integer, String, String> inboundGateway(
AbstractMessageListenerContainer<Integer, String>container,
KafkaTemplate<Integer, String> replyTemplate) {
KafkaInboundGateway<Integer, String, String> gateway =
new KafkaInboundGateway<>(container, replyTemplate);
gateway.setRequestChannel(requests);
gateway.setReplyChannel(replies);
gateway.setReplyTimeout(30_000);
return gateway;
}
<int-kafka:inbound-gateway
id="gateway1"
listener-container="container1"
kafka-template="template"
auto-startup="false"
phase="100"
request-timeout="5000"
request-channel="nullChannel"
reply-channel="errorChannel"
reply-timeout="43"
message-converter="messageConverter"
payload-type="java.lang.String"
error-message-strategy="ems"
retry-template="retryTemplate"
recovery-callback="recoveryCallback"/>
请参阅 javadoc 以了解可用的属性。
当提供 RetryTemplate
时,会根据其重试策略重试传递失败。如果还提供了 error-channel
,则在重试用尽后,ErrorMessageSendingRecoverer
将被用作恢复回调。你还可以使用 recovery-callback
在那种情况下指定要执行的其他操作,或将其设置为 null
,以向侦听器容器抛出最终的异常,以便在那里进行处理。
构建 ErrorMessage
(用于 error-channel
或 recovery-callback
)时,您可以通过设置 error-message-strategy
属性自定义错误消息。默认情况下,使用 RawRecordHeaderErrorMessageStrategy
,从而允许访问转换后的消息以及原始 ConsumerRecord
。
这种形式的重试是阻塞的,并且如果所有轮询记录的重试延迟总和超过 consumer max.poll.interval.ms
属性,则可能导致重新平衡。相反,请考虑向侦听器容器添加一个 DefaultErrorHandler
,配置一个 KafkaErrorSendingMessageRecoverer
。
下面的示例展示了如何使用 Java DSL 配置一个简单的转换器,将消息转换为大写:
或者,你可以使用类似于以下的代码来配置一个转换器,将消息转换为大写:
@Bean
public IntegrationFlow serverGateway() {
return IntegrationFlow
.from(Kafka.inboundGateway(consumerFactory(), containerProperties(),
producerFactory())
.replyTimeout(30_000))
.<String, String>transform(String::toUpperCase)
.get();
}
您还可以使用容器工厂(用于 @KafkaListener
注解)来为其他目的创建 ConcurrentMessageListenerContainer
实例。有关示例,请参见 the Spring for Apache Kafka documentation 和 Message-driven Channel Adapter。
Channels Backed by Apache Kafka Topics
Spring Integration 具有受 Apache Kafka 主题支持的 MessageChannel
实现,以保证持久性。
每个通道都需要一个 KafkaTemplate
用于发送方,以及一个侦听器容器工厂(对于可订阅的通道)或一个 KafkaMessageSource
用于可轮询的通道。
Java DSL Configuration
-
Java DSL
-
Java
-
XML
@Bean
public IntegrationFlow flowWithSubscribable(KafkaTemplate<Integer, String> template,
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
return IntegrationFlow.from(...)
...
.channel(Kafka.channel(template, containerFactory, "someTopic1").groupId("group1"))
...
.get();
}
@Bean
public IntegrationFlow flowWithPubSub(KafkaTemplate<Integer, String> template,
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
return IntegrationFlow.from(...)
...
.publishSubscribeChannel(pubSub(template, containerFactory),
pubsub -> pubsub
.subscribe(subflow -> ...)
.subscribe(subflow -> ...))
.get();
}
@Bean
public BroadcastCapableChannel pubSub(KafkaTemplate<Integer, String> template,
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
return Kafka.publishSubscribeChannel(template, containerFactory, "someTopic2")
.groupId("group2")
.get();
}
@Bean
public IntegrationFlow flowWithPollable(KafkaTemplate<Integer, String> template,
KafkaMessageSource<Integer, String> source) {
return IntegrationFlow.from(...)
...
.channel(Kafka.pollableChannel(template, source, "someTopic3").groupId("group3"))
.handle(..., e -> e.poller(...))
...
.get();
}
/**
* Channel for a single subscriber.
**/
@Bean
SubscribableKafkaChannel pointToPoint(KafkaTemplate<String, String> template,
KafkaListenerContainerFactory<String, String> factory)
SubscribableKafkaChannel channel =
new SubscribableKafkaChannel(template, factory, "topicA");
channel.setGroupId("group1");
return channel;
}
/**
* Channel for multiple subscribers.
**/
@Bean
SubscribableKafkaChannel pubsub(KafkaTemplate<String, String> template,
KafkaListenerContainerFactory<String, String> factory)
SubscribableKafkaChannel channel =
new SubscribableKafkaChannel(template, factory, "topicB", true);
channel.setGroupId("group2");
return channel;
}
/**
* Pollable channel (topic is configured on the source)
**/
@Bean
PollableKafkaChannel pollable(KafkaTemplate<String, String> template,
KafkaMessageSource<String, String> source)
PollableKafkaChannel channel =
new PollableKafkaChannel(template, source);
channel.setGroupId("group3");
return channel;
}
<int-kafka:channel kafka-template="template" id="ptp" topic="ptpTopic" group-id="ptpGroup"
container-factory="containerFactory" />
<int-kafka:pollable-channel kafka-template="template" id="pollable" message-source="source"
group-id = "pollableGroup"/>
<int-kafka:publish-subscribe-channel kafka-template="template" id="pubSub" topic="pubSubTopic"
group-id="pubSubGroup" container-factory="containerFactory" />
Message Conversion
StringJsonMessageConverter
提供。有关详细信息,请参见 the Spring for Apache Kafka documentation。
将此转换器与消息驱动通道适配器一起使用时,您可以指定希望将传入有效负载转换为何类型。这是通过在适配器上设置 payload-type
属性(payloadType
属性)来实现的。以下示例演示如何在 XML 配置中执行此操作:
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
channel="nullChannel"
message-converter="messageConverter"
payload-type="com.example.Thing"
error-channel="errorChannel" />
<bean id="messageConverter"
class="org.springframework.kafka.support.converter.MessagingMessageConverter"/>
以下示例演示如何在 Java 配置中为适配器设置 payload-type
属性(payloadType
属性):
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
kafkaMessageDrivenChannelAdapter.setMessageConverter(converter());
kafkaMessageDrivenChannelAdapter.setPayloadType(Thing.class);
return kafkaMessageDrivenChannelAdapter;
}
Null Payloads and Log Compaction 'Tombstone' Records
Spring 消息传递 Message<?>
对象不能有 null
有效载荷。当您将端点用于 Apache Kafka 时,null
有效载荷(也称为墓碑记录)由类型为 KafkaNull
的有效载荷表示。有关详细信息,请参见 the Spring for Apache Kafka documentation。
Spring Integration 端点的 POJO 方法可以使用 true null
值,而不是 KafkaNull
。为此,请使用 @Payload(required = false)
标记参数。以下示例演示如何执行此操作:
@ServiceActivator(inputChannel = "fromSomeKafkaInboundEndpoint")
public void in(@Header(KafkaHeaders.RECEIVED_KEY) String key,
@Payload(required = false) Customer customer) {
// customer is null if a tombstone record
...
}
Calling a Spring Integration flow from a KStream
您可以使用 MessagingTransformer
调用来自 KStream
的集成流:
@Bean
public KStream<byte[], byte[]> kStream(StreamsBuilder kStreamBuilder,
MessagingTransformer<byte[], byte[], byte[]> transformer) transformer) {
KStream<byte[], byte[]> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
stream.mapValues((ValueMapper<byte[], byte[]>) String::toUpperCase)
...
.transform(() -> transformer)
.to(streamingTopic2);
stream.print(Printed.toSysOut());
return stream;
}
@Bean
@DependsOn("flow")
public MessagingTransformer<byte[], byte[], String> transformer(
MessagingFunction function) {
MessagingMessageConverter converter = new MessagingMessageConverter();
converter.setHeaderMapper(new SimpleKafkaHeaderMapper("*"));
return new MessagingTransformer<>(function, converter);
}
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from(MessagingFunction.class)
...
.get();
}
当集成流从接口开始时,所创建的代理具有流 Bean 的名称,后跟 “.gateway”,因此如果需要,可以使用此 Bean 名称作为 @Qualifier
。
Performance Considerations for read/process/write Scenarios
许多应用程序从主题中消耗,执行一些处理并写入另一个主题。在大多数情况下,如果 write
失败,应用程序将抛出一个异常,以便重试传入请求和/或将其发送到死信主题。此功能由基础消息侦听器容器和配置合理的错误处理程序支持。但是,为了支持这一点,我们需要在写操作成功(或失败)之前阻塞侦听器线程,以便可以将任何异常抛出到容器中。在使用单个记录时,通过在出站适配器上设置 sync
属性来实现此操作。但是,在使用批处理时,使用 sync
会导致严重的性能下降,因为应用程序将在发送下一条消息之前等待每次发送的结果。您还可以执行多次发送,然后等待这些发送的结果。这是通过向消息处理程序中添加 futuresChannel
来实现的。要启用此功能,请将 KafkaIntegrationHeaders.FUTURE_TOKEN
添加到出站消息;然后,这可用于将 Future
与特定已发送消息相关联。以下是如何使用此功能的一个示例:
@SpringBootApplication
public class FuturesChannelApplication {
public static void main(String[] args) {
SpringApplication.run(FuturesChannelApplication.class, args);
}
@Bean
IntegrationFlow inbound(ConsumerFactory<String, String> consumerFactory, Handler handler) {
return IntegrationFlow.from(Kafka.messageDrivenChannelAdapter(consumerFactory,
ListenerMode.batch, "inTopic"))
.handle(handler)
.get();
}
@Bean
IntegrationFlow outbound(KafkaTemplate<String, String> kafkaTemplate) {
return IntegrationFlow.from(Gate.class)
.enrichHeaders(h -> h
.header(KafkaHeaders.TOPIC, "outTopic")
.headerExpression(KafkaIntegrationHeaders.FUTURE_TOKEN, "headers[id]"))
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
.futuresChannel("futures"))
.get();
}
@Bean
PollableChannel futures() {
return new QueueChannel();
}
}
@Component
@DependsOn("outbound")
class Handler {
@Autowired
Gate gate;
@Autowired
PollableChannel futures;
public void handle(List<String> input) throws Exception {
System.out.println(input);
input.forEach(str -> this.gate.send(str.toUpperCase()));
for (int i = 0; i < input.size(); i++) {
Message<?> future = this.futures.receive(10000);
((Future<?>) future.getPayload()).get(10, TimeUnit.SECONDS);
}
}
}
interface Gate {
void send(String out);
}