Record serialization and deserialization
Kafka Streams 绑定允许您以两种方式序列化和反序列化记录。一种是 Kafka 提供的本机序列化和反序列化工具,另一种是 Spring Cloud Stream 框架的消息转换功能。我们来看看一些详细信息。
Kafka Streams binder allows you to serialize and deserialize records in two ways. One is the native serialization and deserialization facilities provided by Kafka and the other one is the message conversion capabilities of Spring Cloud Stream framework. Lets look at some details.
Inbound deserialization
键总是使用本机 Serdes 进行反序列化。
Keys are always deserialized using native Serdes.
对于值,在入站过程中,默认情况下,反序列化由 Kafka 本机执行。请注意,这是与 Kafka Streams binder 的先前版本的默认行为相比的一个重大改变,在先前版本中,由框架完成反序列化。
For values, by default, deserialization on the inbound is natively performed by Kafka. Please note that this is a major change on default behavior from previous versions of Kafka Streams binder where the deserialization was done by the framework.
Kafka Streams binder 会通过查看 java.util.function.Function|Consumer
的类型签名来尝试推断匹配的 Serde
类型。以下是其匹配 Serdes 的顺序。
Kafka Streams binder will try to infer matching Serde
types by looking at the type signature of java.util.function.Function|Consumer
.
Here is the order that it matches the Serdes.
-
If the application provides a bean of type
Serde
and if the return type is parameterized with the actual type of the incoming key or value type, then it will use thatSerde
for inbound deserialization. For e.g. if you have the following in the application, the binder detects that the incoming value type for theKStream
matches with a type that is parameterized on aSerde
bean. It will use that for inbound deserialization.
@Bean
public Serde<Foo> customSerde() {
...
}
@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
-
Next, it looks at the types and see if they are one of the types exposed by Kafka Streams. If so, use them. Here are the Serde types that the binder will try to match from Kafka Streams.Integer, Long, Short, Double, Float, byte[], UUID and String.
-
If none of the Serdes provided by Kafka Streams don’t match the types, then it will use JsonSerde provided by Spring Kafka. In this case, the binder assumes that the types are JSON friendly. This is useful if you have multiple value objects as inputs since the binder will internally infer them to correct Java types. Before falling back to the
JsonSerde
though, the binder checks at the defaultSerde
s set in the Kafka Streams configuration to see if it is aSerde
that it can match with the incoming KStream’s types.
如果上述策略均未奏效,则应用程序必须通过配置来提供 Serde
。这可以通过两种方式进行配置——绑定或默认。
If none of the above strategies worked, then the applications must provide the Serde
s through configuration.
This can be configured in two ways - binding or default.
首先,binder 将查找是否在绑定级别提供了 Serde
。例如,如果你有以下处理器,
First the binder will look if a Serde
is provided at the binding level.
For e.g. if you have the following processor,
@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}
然后,你可以使用以下内容来提供绑定级别 Serde
:
then, you can provide a binding level Serde
using the following:
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果您按每个输入绑定提供 |
If you provide |
如果你想将默认键/值 Serdes 用于入站反序列化,则可以在 binder 级别执行此操作。
If you want the default key/value Serdes to be used for inbound deserialization, you can do so at the binder level.
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果你不想要 Kafka 提供的本机解码,则可以依靠 Spring Cloud Stream 提供的消息转换功能。由于本机解码是默认选项,因此为了让 Spring Cloud Stream 反序列化入站值对象,你需要明确禁用本机解码。
If you don’t want the native decoding provided by Kafka, you can rely on the message conversion features that Spring Cloud Stream provides. Since native decoding is the default, in order to let Spring Cloud Stream deserialize the inbound value object, you need to explicitly disable native decoding.
例如,如果你具有与上述相同的 BiFunction 处理器,则 spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false
你需要为所有输入分别禁用本机解码。否则,本机解码仍将适用于未禁用的输入。
For e.g. if you have the same BiFunction processor as above, then spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false
You need to disable native decoding for all the inputs individually. Otherwise, native decoding will still be applied for those you do not disable.
默认情况下,Spring Cloud Stream 将 application/json
用作内容类型并使用适当的 json 消息转换器。你可以使用适当的 MessageConverter
bean 和以下属性来使用自定义消息转换器。
By default, Spring Cloud Stream will use application/json
as the content type and use an appropriate json message converter.
You can use custom message converters by using the following property and an appropriate MessageConverter
bean.
spring.cloud.stream.bindings.process-in-0.contentType
Outbound serialization
出站序列化遵循与上述入站反序列化相同的规则。与入站反序列化一样,Spring Cloud Stream 早期版本的一个重大更改是出站序列化由 Kafka 本机处理。在 3.0 版本之前的 binder 中,这是由框架本身完成的。
Outbound serialization pretty much follows the same rules as above for inbound deserialization. As with the inbound deserialization, one major change from the previous versions of Spring Cloud Stream is that the serialization on the outbound is handled by Kafka natively. Before 3.0 versions of the binder, this was done by the framework itself.
出站中的键总是由 Kafka 使用 binder 推断出的匹配 Serde
进行序列化。如果它无法推断键的类型,则需要使用配置进行指定。
Keys on the outbound are always serialized by Kafka using a matching Serde
that is inferred by the binder.
If it can’t infer the type of the key, then that needs to be specified using configuration.
值序列化反序列化使用与入站反序列化相同的规则推断。首先匹配出站类型是否来自应用程序中的已提供 Bean。如果没有,它会检查是否与 Kafka 公开的 Serde`匹配,例如 - `Integer
、Long
、Short
、Double
、Float
、byte[]
、UUID`和 `String
。如果不行,那么它会退回到 Spring Kafka 项目提供的 JsonSerde
,但首先查看默认的 Serde`配置,看看是否有匹配项。请记住,所有这些操作对应用程序都是透明的。如果这些都行不通,那么用户必须通过配置提供要使用的 `Serde
。
Value serdes are inferred using the same rules used for inbound deserialization.
First it matches to see if the outbound type is from a provided bean in the application.
If not, it checks to see if it matches with a Serde
exposed by Kafka such as - Integer
, Long
, Short
, Double
, Float
, byte[]
, UUID
and String
.
If that doesnt’t work, then it falls back to JsonSerde
provided by the Spring Kafka project, but first look at the default Serde
configuration to see if there is a match.
Keep in mind that all these happen transparently to the application.
If none of these work, then the user has to provide the Serde
to use by configuration.
假设你使用的是与上述相同的 BiFunction
处理器。那么你可以按如下方式配置出站键/值 Serdes。
Lets say you are using the same BiFunction
processor as above. Then you can configure outbound key/value Serdes as following.
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果 Serde 推断失败,并且未提供任何绑定级别 Serdes,则 binder 会回退到 JsonSerde
,但会查看默认 Serdes 寻找匹配项。
If Serde inference fails, and no binding level Serdes are provided, then the binder falls back to the JsonSerde
, but look at the default Serdes for a match.
默认 serdes 的配置方式与上述反序列化中所述的相同。
Default serdes are configured in the same way as above where it is described under deserialization.
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde``spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果你的应用程序使用分支功能并有多个输出绑定,则必须为每个绑定配置这些内容。同样,如果 binder 能够推断出 Serde
类型,则无需进行此配置。
If your application uses the branching feature and has multiple output bindings, then these have to be configured per binding.
Once again, if the binder is capable of inferring the Serde
types, you don’t need to do this configuration.
如果你不想要 Kafka 提供的本机编码,但希望使用框架提供的消息转换,则你需要明确禁用本机编码,因为本机编码是默认选项。例如,如果你具有与上述相同的 BiFunction 处理器,则 spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false
在分支的情况下,你需要为每个输出分别禁用本机编码。否则,本机编码仍将适用于未禁用的输出。
If you don’t want the native encoding provided by Kafka, but want to use the framework provided message conversion, then you need to explicitly disable native encoding since since native encoding is the default.
For e.g. if you have the same BiFunction processor as above, then spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false
You need to disable native encoding for all the output individually in the case of branching. Otherwise, native encoding will still be applied for those you don’t disable.
当 Spring Cloud Stream 执行转换时,它将默认使用 application/json
作为内容类型并使用适当的 json 消息转换器。你可以使用适当的 MessageConverter
bean 和以下属性来使用自定义消息转换器。
When conversion is done by Spring Cloud Stream, by default, it will use application/json
as the content type and use an appropriate json message converter.
You can use custom message converters by using the following property and a corresponding MessageConverter
bean.
spring.cloud.stream.bindings.process-out-0.contentType
当禁用原生编码/解码时,与原生 Serdes 的情况一样,粘合剂不会发挥任何推断作用。应用程序需要明确提供所有配置选项。出于此原因,通常建议坚持默认的序列化/反序列化选项,并在编写 Spring Cloud Stream Kafka Streams 应用程序时坚持 Kafka Streams 提供的原生序列化/反序列化。你必须使用框架提供的消息转换功能的一个场景是,你的上游生产者正在使用特定的序列化策略。在这种情况下,你希望使用匹配的反序列化策略,因为原生机制可能会失败。当依赖于默认的 Serde
机制时,应用程序必须确保粘合剂可以使用正确映射入站和出站与适当的 Serde
的方法,否则可能会出现问题。
When native encoding/decoding is disabled, binder will not do any inference as in the case of native Serdes.
Applications need to explicitly provide all the configuration options.
For that reason, it is generally advised to stay with the default options for de/serialization and stick with native de/serialization provided by Kafka Streams when you write Spring Cloud Stream Kafka Streams applications.
The one scenario in which you must use message conversion capabilities provided by the framework is when your upstream producer is using a specific serialization strategy.
In that case, you want to use a matching deserialization strategy as native mechanisms may fail.
When relying on the default Serde
mechanism, the applications must ensure that the binder has a way forward with correctly map the inbound and outbound with a proper Serde
, as otherwise things might fail.
值得一提的是,上面概述的数据序列化/反序列化方法仅适用于处理器的边缘,即入站和出站。你的业务逻辑可能仍需要调用明确需要 Serde
对象的 Kafka 流 API。这些仍然是应用程序的责任,并且必须由开发人员相应地处理。
It is worth to mention that the data de/serialization approaches outlined above are only applicable on the edges of your processors, i.e. - inbound and outbound.
Your business logic might still need to call Kafka Streams API’s that explicitly need Serde
objects.
Those are still the responsibility of the application and must be handled accordingly by the developer.