Record serialization and deserialization
Kafka Streams 绑定允许您以两种方式序列化和反序列化记录。一种是 Kafka 提供的本机序列化和反序列化工具,另一种是 Spring Cloud Stream 框架的消息转换功能。我们来看看一些详细信息。
Inbound deserialization
键总是使用本机 Serdes 进行反序列化。
对于值,在入站过程中,默认情况下,反序列化由 Kafka 本机执行。请注意,这是与 Kafka Streams binder 的先前版本的默认行为相比的一个重大改变,在先前版本中,由框架完成反序列化。
Kafka Streams binder 会通过查看 java.util.function.Function|Consumer
的类型签名来尝试推断匹配的 Serde
类型。以下是其匹配 Serdes 的顺序。
-
如果应用程序提供类型为
Serde
的 Bean,并且如果返回类型已使用传入键或值类型的实际类型进行了参数化,那么它将使用该Serde
进行入站反序列化,例如,如果在应用程序中进行了以下操作,绑定程序检测到KStream
的传入值类型与参数化为Serde
Bean 的类型匹配,则它将使用该类型进行入站反序列化。
@Bean
public Serde<Foo> customSerde() {
...
}
@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
-
接下来,它查看类型,看看它们是不是 Kafka Streams 暴露的其中一种类型,如果是,则使用它们,以下是绑定程序将尝试从 Kafka Streams 进行匹配的 Serde 类型。Integer, Long, Short, Double, Float, byte[], UUID and String.
-
如果 Kafka Streams 提供的 Serde 均与这些类型不匹配,那么它将使用 Spring Kafka 提供的 JsonSerde,在这种情况下,绑定程序假定这些类型是 JSON 友好的,如果您拥有多个值对象作为输入,这是非常有用的,因为绑定程序将在内部将其推断为正确的 Java 类型,但是在回退到
JsonSerde
之前,绑定程序将在 Kafka Streams 配置中设置的默认Serde
进行检查,查看它是否是可以与传入 KStream 的类型匹配的Serde
。
如果上述策略均未奏效,则应用程序必须通过配置来提供 Serde
。这可以通过两种方式进行配置——绑定或默认。
首先,binder 将查找是否在绑定级别提供了 Serde
。例如,如果你有以下处理器,
@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}
然后,你可以使用以下内容来提供绑定级别 Serde
:
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果您按每个输入绑定提供 |
如果你想将默认键/值 Serdes 用于入站反序列化,则可以在 binder 级别执行此操作。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果你不想要 Kafka 提供的本机解码,则可以依靠 Spring Cloud Stream 提供的消息转换功能。由于本机解码是默认选项,因此为了让 Spring Cloud Stream 反序列化入站值对象,你需要明确禁用本机解码。
例如,如果你具有与上述相同的 BiFunction 处理器,则 spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false
你需要为所有输入分别禁用本机解码。否则,本机解码仍将适用于未禁用的输入。
默认情况下,Spring Cloud Stream 将 application/json
用作内容类型并使用适当的 json 消息转换器。你可以使用适当的 MessageConverter
bean 和以下属性来使用自定义消息转换器。
spring.cloud.stream.bindings.process-in-0.contentType
Outbound serialization
出站序列化遵循与上述入站反序列化相同的规则。与入站反序列化一样,Spring Cloud Stream 早期版本的一个重大更改是出站序列化由 Kafka 本机处理。在 3.0 版本之前的 binder 中,这是由框架本身完成的。
出站中的键总是由 Kafka 使用 binder 推断出的匹配 Serde
进行序列化。如果它无法推断键的类型,则需要使用配置进行指定。
值序列化反序列化使用与入站反序列化相同的规则推断。首先匹配出站类型是否来自应用程序中的已提供 Bean。如果没有,它会检查是否与 Kafka 公开的 Serde`匹配,例如 - `Integer
、Long
、Short
、Double
、Float
、byte[]
、UUID`和 `String
。如果不行,那么它会退回到 Spring Kafka 项目提供的 JsonSerde
,但首先查看默认的 Serde`配置,看看是否有匹配项。请记住,所有这些操作对应用程序都是透明的。如果这些都行不通,那么用户必须通过配置提供要使用的 `Serde
。
假设你使用的是与上述相同的 BiFunction
处理器。那么你可以按如下方式配置出站键/值 Serdes。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果 Serde 推断失败,并且未提供任何绑定级别 Serdes,则 binder 会回退到 JsonSerde
,但会查看默认 Serdes 寻找匹配项。
默认 serdes 的配置方式与上述反序列化中所述的相同。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde``spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果你的应用程序使用分支功能并有多个输出绑定,则必须为每个绑定配置这些内容。同样,如果 binder 能够推断出 Serde
类型,则无需进行此配置。
如果你不想要 Kafka 提供的本机编码,但希望使用框架提供的消息转换,则你需要明确禁用本机编码,因为本机编码是默认选项。例如,如果你具有与上述相同的 BiFunction 处理器,则 spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false
在分支的情况下,你需要为每个输出分别禁用本机编码。否则,本机编码仍将适用于未禁用的输出。
当 Spring Cloud Stream 执行转换时,它将默认使用 application/json
作为内容类型并使用适当的 json 消息转换器。你可以使用适当的 MessageConverter
bean 和以下属性来使用自定义消息转换器。
spring.cloud.stream.bindings.process-out-0.contentType
当禁用原生编码/解码时,与原生 Serdes 的情况一样,粘合剂不会发挥任何推断作用。应用程序需要明确提供所有配置选项。出于此原因,通常建议坚持默认的序列化/反序列化选项,并在编写 Spring Cloud Stream Kafka Streams 应用程序时坚持 Kafka Streams 提供的原生序列化/反序列化。你必须使用框架提供的消息转换功能的一个场景是,你的上游生产者正在使用特定的序列化策略。在这种情况下,你希望使用匹配的反序列化策略,因为原生机制可能会失败。当依赖于默认的 Serde
机制时,应用程序必须确保粘合剂可以使用正确映射入站和出站与适当的 Serde
的方法,否则可能会出现问题。
值得一提的是,上面概述的数据序列化/反序列化方法仅适用于处理器的边缘,即入站和出站。你的业务逻辑可能仍需要调用明确需要 Serde
对象的 Kafka 流 API。这些仍然是应用程序的责任,并且必须由开发人员相应地处理。