Consuming Records
在上述 upppercase
函数中,我们以 Flux<String>
的方式消耗记录,然后以 Flux<String>
方式生成它。有时,您可能需要按原始接收格式(ReceiverRecord
)接收记录。这里有一个这样的函数。
@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
return s -> s.map(rec -> new String(rec.value()).toLowerCase());
}
请注意,在此函数中,我们将记录作为 Flux<ReceiverRecord<byte[], byte[]>>
消费,然后作为 Flux<String>
生产。ReceiverRecord
是基本接收到的记录,它是 Reactor Kafka 中一个专门的 Kafka ConsumerRecord
。使用反应性 Kafka 粘合剂时,上述函数将使您能够访问每个传入记录的 ReceiverRecord
类型。但是,在这种情况下,您需要为 RecordMessageConverter 提供一个自定义实现。默认情况下,反应性 Kafka 粘合剂会使用一个将其负载和标头从 ConsumerRecord
转换的 MessagingMessageConverter。因此,当您的处理方法接收到记录时,负载已从接收到的记录中抽取出来,并作为上述的第一个函数的情况传递到该方法中。通过在应用中提供自定义 RecordMessageConverter
实现,您可以覆盖默认响应。例如,如果您想要将记录作为未经处理的 Flux<ReceiverRecord<byte[], byte[]>>
消费,则可以在应用中提供以下 bean 定义。
@Bean
RecordMessageConverter fullRawReceivedRecord() {
return new RecordMessageConverter() {
private final RecordMessageConverter converter = new MessagingMessageConverter();
@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type payloadType) {
return MessageBuilder.withPayload(record).build();
}
@Override
public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
return this.converter.fromMessage(message, defaultTopic);
}
};
}
然后,您需要指示框架为必需的绑定使用此转换器。这里有一个基于我们的 lowercase
函数的示例。
spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"
lowercase-in-0
是 lowercase
函数的输入绑定名称。对于 outbound(lowecase-out-0
),我们仍然使用常规 MessagingMessageConverter
。
在上述 toMessage
实现中,我们接收原始 ConsumerRecord
(由于我们处于反应式绑定程序上下文中,所以为 ReceiverRecord
),然后将其包装在 Message
中。然后,将该消息有效负载(即 ReceiverRecord
)提供给用户方法。
如果 reactiveAutoCommit
为 false
(默认值),则调用 rec.receiverOffset().acknowledge()
(或 commit()
)以导致提交偏移量;如果 reactiveAutoCommit
为 true
,则 flux 提供 ConsumerRecord
。有关更多信息,请参阅 reactor-kafka
文档和 javadoc。