Basic Example using the Reactive Kafka Binder
在本节中,我们展示了一些使用反应式 binder 编写反应式 Kafka 应用程序的基本代码片段及其详细信息。
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return s -> s.map(String::toUpperCase);
}
你可以将上述 upppercase
函数与基于消息通道的 Kafka binder (spring-cloud-stream-binder-kafka
) 以及此部分讨论的主题反应式 Kafka binder (spring-cloud-stream-binder-kafka-reactive
) 一起使用。与常规 Kafka binder 一起使用此函数时,即使你在应用程序中使用反应式类型(即在 uppercase
函数中),你只在函数执行时获得反应式流。在函数的执行上下文之外,没有反应式优点,因为底层 binder 不是基于反应式堆栈的。因此,虽然这可能看起来像是在提供一个完整的端到端反应式堆栈,但此应用程序只是部分反应式的。
现在假设您正在将合适的 Kafka 反应性粘合剂 - spring-cloud-stream-binder-kafka-reactive
用在上述函数的应用中。此粘合剂实现将发挥全面的反应性优势,从顶端的消费到链条底端的发布。这是因为底层粘合剂建立在 Reactor Kafka 的核心 API 之上。在使用者端,它利用 KafkaReceiver,这是 Kafka 使用者的一个反应实现。类似地,在生产者端,它使用 KafkaSender API,这是 Kafka 生产者的反应实现。由于反应性 Kafka 粘合剂的基础是建立在合适的反应性 Kafka API 之上,因此应用可以充分利用反应式技术。在使用此反应性 Kafka 粘合剂时,自动反压等其他反应能力已为应用程序内建。
从版本 4.0.2 开始,你可以分别通过提供一个或多个 ReceiverOptionsCustomizer
或 SenderOptionsCustomizer
bean 来定制 ReceiverOptions
和 SenderOptions
。它们是 BiFunction
,接收绑定名称和初始选项,返回定制的选项。这些接口扩展了 Ordered
,因此当存在多个定制器时,将按所需的顺序应用这些定制器。
默认情况下,黏合剂不会提交偏移。从 4.0.2 版开始,KafkaHeaders.ACKNOWLEDGMENT
头包含一个 ReceiverOffset
对象,使您可以通过调用其 acknowledge()
或 commit()
方法来提交偏移。
@Bean
public Consumer<Flux<Message<String>> consume() {
return msg -> {
process(msg.getPayload());
msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
}
}
有关更多信息,请参阅 reactor-kafka
文档和 javadoc。
此外,从版本 4.0.3 开始,可以将 Kafka consumer 属性 reactiveAtmostOnce
设置为 true
,绑定程序将在处理每次投票返回的记录之前自动提交偏移量。此外,从版本 4.0.3 开始,可以将 consumer 属性 reactiveAutoCommit
设置为 true
,绑定程序将在处理每次投票返回的记录之后自动提交偏移量。在这些情况下,不会出现确认标头。
4.0.2 还提供了 reactiveAutoCommit
,但实现有误,其行为类似于 reactiveAtMostOnce
。
以下示例演示如何使用 reaciveAutoCommit
。
@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
return flux -> flux
.doOnNext(inner -> inner
.doOnNext(val -> {
log.info(val.value());
})
.subscribe())
.subscribe();
}
请注意,在使用自动提交时,reactor-kafka
会返回一个 Flux<Flux<ConsumerRecord<?, ?>>>
。由于 Spring 无法访问内部 flux 的内容,因此应用程序必须处理本机 ConsumerRecord
;没有消息转换或转换服务应用于内容。这需要使用本机解码(通过在配置中指定适当类型的 Deserializer
)才能返回所需类型的记录密钥/值。