Basic Example using the Reactive Kafka Binder

在本节中,我们展示了一些使用反应式 binder 编写反应式 Kafka 应用程序的基本代码片段及其详细信息。

@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
    return s -> s.map(String::toUpperCase);
}

你可以将上述 upppercase 函数与基于消息通道的 Kafka binder (spring-cloud-stream-binder-kafka) 以及此部分讨论的主题反应式 Kafka binder (spring-cloud-stream-binder-kafka-reactive) 一起使用。与常规 Kafka binder 一起使用此函数时,即使你在应用程序中使用反应式类型(即在 uppercase 函数中),你只在函数执行时获得反应式流。在函数的执行上下文之外,没有反应式优点,因为底层 binder 不是基于反应式堆栈的。因此,虽然这可能看起来像是在提供一个完整的端到端反应式堆栈,但此应用程序只是部分反应式的。

现在假设您正在将合适的 Kafka 反应性粘合剂 - spring-cloud-stream-binder-kafka-reactive 用在上述函数的应用中。此粘合剂实现将发挥全面的反应性优势,从顶端的消费到链条底端的发布。这是因为底层粘合剂建立在 Reactor Kafka 的核心 API 之上。在使用者端,它利用 KafkaReceiver,这是 Kafka 使用者的一个反应实现。类似地,在生产者端,它使用 KafkaSender API,这是 Kafka 生产者的反应实现。由于反应性 Kafka 粘合剂的基础是建立在合适的反应性 Kafka API 之上,因此应用可以充分利用反应式技术。在使用此反应性 Kafka 粘合剂时,自动反压等其他反应能力已为应用程序内建。

从版本 4.0.2 开始,你可以分别通过提供一个或多个 ReceiverOptionsCustomizerSenderOptionsCustomizer bean 来定制 ReceiverOptionsSenderOptions。它们是 BiFunction,接收绑定名称和初始选项,返回定制的选项。这些接口扩展了 Ordered,因此当存在多个定制器时,将按所需的顺序应用这些定制器。

默认情况下,黏合剂不会提交偏移。从 4.0.2 版开始,KafkaHeaders.ACKNOWLEDGMENT 头包含一个 ReceiverOffset 对象,使您可以通过调用其 acknowledge()commit() 方法来提交偏移。

@Bean
public Consumer<Flux<Message<String>> consume() {
    return msg -> {
        process(msg.getPayload());
        msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
    }
}

有关更多信息,请参阅 reactor-kafka 文档和 javadoc。

此外,从版本 4.0.3 开始,可以将 Kafka consumer 属性 reactiveAtmostOnce 设置为 true,绑定程序将在处理每次投票返回的记录之前自动提交偏移量。此外,从版本 4.0.3 开始,可以将 consumer 属性 reactiveAutoCommit 设置为 true,绑定程序将在处理每次投票返回的记录之后自动提交偏移量。在这些情况下,不会出现确认标头。

4.0.2 还提供了 reactiveAutoCommit,但实现有误,其行为类似于 reactiveAtMostOnce

以下示例演示如何使用 reaciveAutoCommit

@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
	return flux -> flux
			.doOnNext(inner -> inner
				.doOnNext(val -> {
					log.info(val.value());
				})
				.subscribe())
			.subscribe();
}

请注意,在使用自动提交时,reactor-kafka 会返回一个 Flux<Flux<ConsumerRecord<?, ?>>>。由于 Spring 无法访问内部 flux 的内容,因此应用程序必须处理本机 ConsumerRecord;没有消息转换或转换服务应用于内容。这需要使用本机解码(通过在配置中指定适当类型的 Deserializer)才能返回所需类型的记录密钥/值。