Serialization, Deserialization, and Message Conversion
Overview
Apache Kafka 提供了一个高级 API,用于序列化和反序列化记录值及其键。它与 org.apache.kafka.common.serialization.Serializer<T>
和 org.apache.kafka.common.serialization.Deserializer<T>
抽象(以及一些内置实现)一起使用。同时,我们可以使用 Producer
或 Consumer
配置属性指定序列化器和反序列化器类。以下示例显示了如何这么做:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
对于更复杂或特殊的情况,KafkaConsumer
(因此,KafkaProducer
)提供重载的构造函数分别接受用于`keys`和`values`的`Serializer`和`Deserializer`实例。
使用此 API 时,DefaultKafkaProducerFactory`和`DefaultKafkaConsumerFactory`还提供属性(通过构造函数或 setter 方法)将自定义`Serializer`和`Deserializer`实例注入目标`Producer`或`Consumer`中。此外,您还可以通过构造函数传入 `Supplier<Serializer>
或 Supplier<Deserializer>
实例 - 当创建每个`Producer`或`Consumer`时调用这些 Supplier
。
String serialization
自 2.5 版本以来,Spring for Apache Kafka 提供使用实体字符串表示的 ToStringSerializer
和 ParseStringDeserializer
类。它们依赖方法 toString
和一些 Function<String>
或 BiFunction<String, Headers>
解析字符串并填充实例的属性。通常,这会调用类中的一些静态方法,例如 parse
:
ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);
默认情况下,ToStringSerializer
配置为在记录`Headers`中传达序列化实体的类型信息。您可以通过将`addTypeInfo`属性设置为`false`来禁用此功能。ParseStringDeserializer
可以在接收端使用此信息。
-
ToStringSerializer.ADD_TYPE_INFO_HEADERS
(默认true
):可以将其设置为false
,以禁用ToStringSerializer
上的此功能(设置addTypeInfo
属性)。
ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
String entityType = new String(header);
if (entityType.contains("Thing")) {
return Thing.parse(str);
}
else {
// ...parsing logic
}
});
您可以使用默认值为`UTF-8`来配置用于将`String`转换为`byte[]/从`byte[]`转换回来的`Charset
。
您可以使用`ConsumerConfig`属性配置使用解析器方法名称的解串器:
-
ParseStringDeserializer.KEY_PARSER
-
ParseStringDeserializer.VALUE_PARSER
这些属性必须包含类的完全限定名,后跟由句点`.分隔的方法名。该方法必须是静态方法并且具有
(String、Headers)或
(String)`的签名。
还为 Kafka 流提供了`ToFromStringSerde`。
JSON
Spring for Apache Kafka还提供了基于Jackson JSON对象映射程序的`JsonSerializer`和`JsonDeserializer`实现。JsonSerializer`允许将任何Java对象写入为JSON `byte[]
。要将使用`byte[]消费的`Class<?> targetType`正确反序列化为目标对象,`JsonDeserializer`需要一个附加的`Class<?> targetType`参数。以下示例演示如何创建`JsonDeserializer
:
JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);
您可以用`ObjectMapper`自定义`JsonSerializer`和`JsonDeserializer`。您还可以扩展它们以在`configure(Map<String, ?> configs, boolean isKey)`方法中实现一些特定的配置逻辑。
从版本 2.3 开始,所有支持 JSON 的组件都默认配置 JacksonUtils.enhancedObjectMapper()
实例,该实例携带有已禁用的 MapperFeature.DEFAULT_VIEW_INCLUSION
和 DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES
功能。此外,此实例还随附了用于自定义数据类型(例如 Java 时间和 Kotlin 支持)的众所周知的模块。详见 JacksonUtils.enhancedObjectMapper()
JavaDoc 以获取更多信息。此方法还注册一个 org.springframework.kafka.support.JacksonMimeTypeModule
,用于将 org.springframework.util.MimeType
对象序列化为普通字符串,以实现网络上的跨平台兼容性。可以将 JacksonMimeTypeModule
作为 bean 注册在应用程序上下文中,它将自动配置到 link:https://docs.spring.io/spring-boot/docs/current/reference/html/howto.html#howto.spring-mvc.customize-jackson-objectmapper[Spring Boot ObjectMapper
实例中。
从 2.3 版本开始,`JsonDeserializer`还提供了基于`TypeReference`的构造函数,以更好地处理目标泛型容器类型。
从 2.1 版本开始,您可以在记录`Headers`中传达类型信息,从而处理多种类型。此外,您可以使用以下 Kafka 属性配置序列化器和反序列化器。如果您已为 KafkaConsumer
和 KafkaProducer
分别提供了 Serializer
和 Deserializer
实例,它们不会产生任何影响。
Configuration Properties
-
JsonSerializer.ADD_TYPE_INFO_HEADERS
(默认true
):可以将其设置为false
,以禁用JsonSerializer
上的此功能(设置addTypeInfo
属性)。 -
JsonSerializer.TYPE_MAPPINGS
(默认empty
):参见 Mapping Types。 -
JsonDeserializer.USE_TYPE_INFO_HEADERS
(默认true
):可以将其设置为false
,以忽略序列器设置的标头。 -
JsonDeserializer.REMOVE_TYPE_INFO_HEADERS
(默认true
):可以将其设置为false
,以保留序列器设置的标头。 -
JsonDeserializer.KEY_DEFAULT_TYPE
:如果不存在标头信息,则为键的反序列化设置回退类型。 -
JsonDeserializer.VALUE_DEFAULT_TYPE
:如果不存在标头信息,则为值的反序列化设置回退类型。 -
JsonDeserializer.TRUSTED_PACKAGES
(默认java.util
,java.lang
):允许反序列化的包模式的逗号分隔列表。*
表示反序列化所有。 -
JsonDeserializer.TYPE_MAPPINGS
(默认empty
):参见 Mapping Types。 -
JsonDeserializer.KEY_TYPE_METHOD
(默认empty
):参见 Using Methods to Determine Types。 -
JsonDeserializer.VALUE_TYPE_METHOD
(默认empty
):参见 Using Methods to Determine Types。
从 2.2 版本开始,反序列化器会删除类型信息头(如果由序列化器添加)。您可以在反序列化器上或使用前面所述的配置属性直接将`removeTypeHeaders`属性设置为`false`,以恢复到之前的行为。
从版本 2.8 开始,如果你按 Programmatic Construction 中所示以编程方式构建序列化器或反序列化器,那么只要你未显式设置任何属性(使用 set*()
方法或使用流利 API),以上属性将由工厂应用。以前,在以编程方式创建时,配置属性永远不会被应用;如果你直接在对象上显式设置属性,则情况仍然如此。
Mapping Types
从 2.2 版本开始,当使用 JSON 时,您现在可以使用前一个列表中的属性提供类型映射。以前,您必须在序列化器和反序列化器中自定义类型映射器。映射由逗号分隔的`token:className`对列表组成。在出站时,有效负载的类名映射到相应的令牌。在入站时,类型头中的令牌映射到相应的类名。
以下示例创建一组映射:
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.Hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeSerializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.Hat");
相应的对象必须兼容。
如果您使用“ Spring Boot”,可以在“application.properties
”(或 yaml)文件中提供这些属性。以下示例显示了如何执行此操作:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat
您只能使用属性执行简单配置。对于更高级的配置(例如在序列化器和反序列化器中使用自定义`ObjectMapper`),您应该使用接受预构建序列化器和反序列化器的生产者和消费者工厂构造函数。以下 Spring Boot 示例覆盖默认工厂:
@Bean
public ConsumerFactory<String, Thing> kafkaConsumerFactory(JsonDeserializer customValueDeserializer) {
Map<String, Object> properties = new HashMap<>();
// properties.put(..., ...)
// ...
return new DefaultKafkaConsumerFactory<>(properties,
new StringDeserializer(), customValueDeserializer);
}
@Bean
public ProducerFactory<String, Thing> kafkaProducerFactory(JsonSerializer customValueSerializer) {
return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(),
new StringSerializer(), customValueSerializer);
}
还提供了 setter,作为使用这些构造函数的替代方法。
从版本 2.2 开始,您可以通过使用带有布尔型 useHeadersIfPresent
参数(默认情况下为 true
)的重载构造函数之一,明确配置反序列化器以使用提供的目标类型并忽略头信息中的类型信息。以下示例演示了如何执行此操作:
DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
Using Methods to Determine Types
从2.5版本开始,您现在可以通过属性配置反序列化程序,调用方法来确定目标类型。如果存在,这将覆盖上述任何其他技术。如果数据是由不使用Spring序列化程序的应用程序发布的,并且您需要根据数据或其他标头反序列化为不同的类型,这可能会很有用。将这些属性设置为方法名——一个完全限定的类名,后跟方法名,由句点`.分隔。该方法必须声明为`public static
,具有三个签名之一`(String topic, byte[] data, Headers headers)`(byte[] data, Headers headers)`或
(byte[] data),并返回Jackson `JavaType
。
-
JsonDeserializer.KEY_TYPE_METHOD
:spring.json.key.type.method
-
JsonDeserializer.VALUE_TYPE_METHOD
:spring.json.value.type.method
您可以使用任意的标题或检查数据来确定类型。
JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);
JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);
public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
// {"thisIsAFieldInThing1":"value", ...
if (data[21] == '1') {
return thing1Type;
}
else {
return thing2Type;
}
}
对于更复杂的 data 检查,请考虑使用 JsonPath
或类似工具,但是,确定类型的测试越简单,该过程的效率就越高。
以下是在以编程方式创建反序列化器(在构造函数中向消费者工厂提供反序列化器时)的一个示例:
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);
...
public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
...
}
Programmatic Construction
从 2.3 版开始,在生产者/消费者工厂中以编程方式构造序列化器/反序列化器时,可以使用流畅的 API,这简化了配置。
@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
new JsonSerializer<MyKeyType>()
.forKeys()
.noTypeInfo(),
new JsonSerializer<MyValueType>()
.noTypeInfo());
return pf;
}
@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
new JsonDeserializer<>(MyKeyType.class)
.forKeys()
.ignoreTypeHeaders(),
new JsonDeserializer<>(MyValueType.class)
.ignoreTypeHeaders());
return cf;
}
要以类似于“Using Methods to Determine Types”的方式提供类型映射,请使用“typeFunction
”属性。
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeFunction(MyUtils::thingOneOrThingTwo);
或者,只要您不使用流畅 API 配置属性,或者不使用“set*()
”方法设置属性,工厂就会使用配置属性配置序列化程序/反序列化程序;请参阅“Configuration Properties”。
Delegating Serializer and Deserializer
Using Headers
2.3 版引入了 DelegatingSerializer
和 DelegatingDeserializer
,允许生成和使用具有不同键和/或值类型的记录。生产者必须将标题 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
设置为用于选择用于该值序列化器的选择器值,将 DelegatingSerializer.KEY_SERIALIZATION_SELECTOR
设置为用于键的选择器值;如果找不到匹配项,则会引发 IllegalStateException
。
对于传入的记录,反序列化程序使用相同的标头来选择要使用的反序列化程序;如果没有找到匹配项或不存在标头,则返回原始`byte[]`。
您可以通过构造函数配置选择器到 Serializer
/ Deserializer
的映射,或者可以通过带有键 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG
和 DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG
的 Kafka 生产者/消费者属性进行配置。对于序列化程序,生产者属性可以是一个 Map<String, Object>
,其中键是选择器,值是 Serializer
实例、序列化程序 Class
或类名。该属性还可以是逗号分隔的地图项字符串,如下所示。
对于反序列化程序,消费者属性可以是一个 Map<String, Object>
,其中键是选择器,值是 Deserializer
实例、反序列化程序 Class
或类名。该属性还可以是逗号分隔的地图项字符串,如下所示。
要使用属性进行配置,请使用以下语法:
producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")
consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")
然后,生产者会将 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
标头设置为 thing1
或 thing2
。
此技术支持将不同类型发送到同一主题(或不同主题)。
从版本 2.5.1 开始,如果类型(键或值)是 |
有关将不同类型发送到不同主题的另一种技术,请参见 xref:kafka/sending-messages.adoc#routing-template[Using RoutingKafkaTemplate
。
By Type
2.8 版引入了 DelegatingByTypeSerializer
。
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
null, new DelegatingByTypeSerializer(Map.of(
byte[].class, new ByteArraySerializer(),
Bytes.class, new BytesSerializer(),
String.class, new StringSerializer())));
}
从版本 2.8.3 开始,您可以配置序列化程序来检查地图键是否可从目标对象赋值,这在委托序列化程序能够序列化子类时非常有用。在这种情况下,如果存在不明确的匹配项,则应提供一个有序 Map
,例如 LinkedHashMap
。
By Topic
从版本 2.8 开始,DelegatingByTopicSerializer
和 DelegatingByTopicDeserializer
允许基于主题名称选择序列化程序/反序列化程序。Regex Pattern
用于查找要使用的实例。该映射可以使用构造函数配置,也可以通过属性来配置(以逗号分隔的 pattern:serializer
列表)。
producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArraySerializer.class.getName()
+ ", topic[5-9]:" + StringSerializer.class.getName());
...
ConsumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArrayDeserializer.class.getName()
+ ", topic[5-9]:" + StringDeserializer.class.getName());
将此用于键时,请使用 KEY_SERIALIZATION_TOPIC_CONFIG
。
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
new IntegerSerializer(),
new DelegatingByTopicSerializer(Map.of(
Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
Pattern.compile("topic[5-9]"), new StringSerializer())),
new JsonSerializer<Object>()); // default
}
您可以使用 DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULT
和 DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT
指定在没有模式匹配时使用的默认序列化程序/反序列化程序。
当设置为 false
时,一个附加属性 DelegatingByTopicSerialization.CASE_SENSITIVE
(默认 true
)会使主题查找不区分大小写。
Retrying Deserializer
RetryingDeserializer
使用委托 Deserializer
和 RetryTemplate
在委托在反序列化期间可能出现瞬态错误(例如网络问题)时重试反序列化。
ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
从版本 3.1.2
开始,RecoveryCallback
可以选择性地在 RetryingDeserializer
上设置。
有关配置具有重试策略、回退策略等功能的“RetryTemplate
”,请参阅“ spring-retry”项目。
Spring Messaging Message Conversion
虽然从底层 Kafka“Consumer
”和“Producer
”角度来看,“Serializer
”和“Deserializer
”API 非常简单而灵活,但当使用“@KafkaListener
”或“ Spring Integration’s Apache Kafka Support”时,您可能需要 Spring Messaging 层面的更大灵活性。为了让您轻松地转换到或从“org.springframework.messaging.Message
”,Spring for Apache Kafka 提供了“MessageConverter
”抽象,带有“MessagingMessageConverter
”实现及其“JsonMessageConverter
”(及子类)自定义项。您可以将“MessageConverter
”直接注入“KafkaTemplate
”实例,并使用“AbstractKafkaListenerContainerFactory
”bean 定义为“@KafkaListener.containerFactory()
”属性。以下示例显示了如何执行此操作:
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordMessageConverter(new JsonMessageConverter());
return factory;
}
...
@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}
使用 Spring Boot 时,只需将转换器定义为 @Bean
,Spring Boot 自动配置就会将其连接到自动配置的模板和容器工厂中。
当你使用 @KafkaListener
时,参数类型会提供给消息转换器以协助转换。
只当在方法级别声明 |
在消费者方面,您可以配置`JsonMessageConverter`,它可以处理类型为`byte[]
同样,使用`byte[] |
从版本 2.7.1 开始,消息有效载荷转换可以委托给 spring-messaging
SmartMessageConverter
;例如,这使得转换可以基于 MessageHeaders.CONTENT_TYPE
头进行。
KafkaMessageConverter.fromMessage()
方法用于将消息有效载荷转换为 ProducerRecord
的出站转换,其中 ProducerRecord.value()
属性包含消息有效载荷。KafkaMessageConverter.toMessage()
方法用于将负载为 ConsumerRecord
的入站消息转换为 ConsumerRecord.value()
属性。SmartMessageConverter.toMessage()
方法用于使用传递给 fromMessage()
的 Message
(通常由 KafkaTemplate.send(Message<?> msg)
传递)创建一个新的出站 Message<?>
。同样,在 KafkaMessageConverter.toMessage()
方法中,在转换器从 ConsumerRecord
创建了一个新的 Message<?>
之后,调用 SmartMessageConverter.fromMessage()
方法,然后使用新转换的有效载荷创建最终入站消息。在任何一种情况下,如果 SmartMessageConverter
返回 null
,则使用原始消息。
当在 KafkaTemplate
和监听器容器工厂中使用默认转换器时,你可以通过在模板上调用 setMessagingConverter()
以及在 @KafkaListener
方法上使用 contentMessageConverter
属性来配置 SmartMessageConverter
。
示例:
template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
...
}
Using Spring Data Projection Interfaces
从版本 2.1.1 开始,你可以将 JSON 转换成 Spring Data Projection 接口,而不是具体类型。这允许非常有选择性地将数据进行低耦合绑定,包括查找 JSON 文档内部多个地方的值。例如,可以将以下接口定义为消息有效载荷类型:
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
访问器方法将默认使用字段在接收到的 JSON 文档中查找属性名称。@JsonPath
表达式允许自定义值查找,甚至定义多个 JSON 路径表达式,以便从多个地方查找值,直到表达式返回实际值。
要启用此功能,请使用配置有适当委托转换器(用于出站转换和转换非投影接口)的 ProjectingMessageConverter
。你还必须将 spring-data:spring-data-commons
和 com.jayway.jsonpath:json-path
添加到类路径中。
当用作 @KafkaListener
方法的参数时,接口类型会像往常一样自动传给转换器。
Using ErrorHandlingDeserializer
当反序列化器未能够反序列化消息时,Spring 没有办法处理这个问题,因为它发生在 poll()
返回之前。为了解决这个问题,引入了 ErrorHandlingDeserializer
。此反序列化器委托给真正的反序列化器(键或值)。如果委托未能够反序列化记录内容,则 ErrorHandlingDeserializer
会返回一个 null
值和一个包含原因和原始字节的头部中的 DeserializationException
。当你使用记录级别的 MessageListener
时,如果 ConsumerRecord
包含键或值任一者的 DeserializationException
头部,容器的 ErrorHandler
会使用失败的 ConsumerRecord
调用。记录不会传递给监听器。
或者,您可以配置“ErrorHandlingDeserializer
”以使用“failedDeserializationFunction
”创建自定义值,该值是“Function<FailedDeserializationInfo, T>
”。调用此函数可创建“T
”的实例,该实例以通常方式传递给侦听器。系统会向函数提供包含所有上下文信息的对象,类型为“FailedDeserializationInfo
”。您可以在标头中找到“DeserializationException
”(作为序列化的 Java 对象)。有关详情,请参阅“ErrorHandlingDeserializer
”的“ Javadoc”。
你可以使用 DefaultKafkaConsumerFactory
构造器,该构造器采用键和值 Deserializer
对象,并连接上使用适当委托配置的合适的 ErrorHandlingDeserializer
实例。或者,你可以使用消费者配置属性(由 ErrorHandlingDeserializer
使用)来实例化委托。属性名称是 ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS
和 ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS
。属性值可以是类或类名。以下示例展示了如何设置这些属性:
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
以下示例使用了 failedDeserializationFunction
。
public class BadThing extends Thing {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {
@Override
public Thing apply(FailedDeserializationInfo info) {
return new BadThing(info);
}
}
前面的示例使用了以下配置:
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...
如果消费者通过 ErrorHandlingDeserializer
配置,则务必使用能够处理常规对象以及反序列化异常产生的原始 byte[]
值的序列化器来配置 KafkaTemplate
及其生成器。模板的泛型值类型应为 Object
。一种方法是使用 DelegatingByTypeSerializer
;示例如下:
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
当使用具有批处理侦听器的 ErrorHandlingDeserializer
时,你必须检查消息头中的反序列化异常。当与 DefaultBatchErrorHandler
配合使用时,你可以使用该头来确定异常失败的记录,并通过 BatchListenerFailedException
传达给错误处理程序。
@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
for (int i = 0; i < in.size(); i++) {
Thing thing = in.get(i);
if (thing == null
&& headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
try {
DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
if (deserEx != null) {
logger.error(deserEx, "Record at index " + i + " could not be deserialized");
}
}
catch (Exception ex) {
logger.error(ex, "Record at index " + i + " could not be deserialized");
}
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
process(thing);
}
}
SerializationUtils.byteArrayToDeserializationException()
可用于将头转换为 DeserializationException
。
当使用 List<ConsumerRecord<?, ?>
时,使用 SerializationUtils.getExceptionFromHeader()
替代:
@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
for (int i = 0; i < in.size(); i++) {
ConsumerRecord<String, Thing> rec = in.get(i);
if (rec.value() == null) {
DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
if (deserEx != null) {
logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
}
process(rec.value());
}
}
如果你也正在使用 DeadLetterPublishingRecoverer
,则为 DeserializationException
发布的记录将具有类型为 byte[]
的 record.value()
;此类型不应被序列化。考虑使用配置为对 byte[]
使用 ByteArraySerializer
以及对所有其他类型使用普通序列化器(Json、Avro 等)的 DelegatingByTypeSerializer
。
从 3.1 版本开始,你可以在 ErrorHandlingDeserializer
中添加一个 Validator
。如果委托 Deserializer
成功反序列化对象,但该对象验证失败,则会抛出一个类似于发生反序列化异常的异常。这允许将原始原始数据传递给错误处理程序。在创建反序列化器时,只需调用 setValidator
;如果你使用属性配置序列化器,请将消费者配置属性 ErrorHandlingDeserializer.VALIDATOR_CLASS
设置为 Validator
的类或完全限定类名。在使用 Spring Boot 时,此属性名称为 spring.kafka.consumer.properties.spring.deserializer.validator.class
。
Payload Conversion with Batch Listeners
当您使用批处理侦听器容器工厂时,您还可以在“BatchMessagingMessageConverter
”中使用“JsonMessageConverter
”来转换批处理消息。有关详情,请参阅“Serialization, Deserialization, and Message Conversion”和“Spring Messaging Message Conversion”。
默认情况下,转换的类型从侦听器参数中推断出来。如果您使用其`TypePrecedence`设置为`TYPE_ID`(而不是默认的`INFERRED`)的`DefaultJackson2TypeMapper`,配置`JsonMessageConverter`,则转换器代わりに使用标头中的类型信息(如果存在)。例如,这允许将侦听器方法用接口而不是具体类声明。此外,类型转换器支持映射,因此可以将反序列化为与源不同的类型(只要数据兼容)。当您使用class-level @KafkaListener
instances时,这也很有用,在这种情况下,有效负载必须已经转换为用于确定要调用的方法。以下示例创建使用此方法的bean:
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
return factory;
}
@Bean
public JsonMessageConverter converter() {
return new JsonMessageConverter();
}
请注意,为了执行此操作,转换目标的方法签名必须是带有单个泛型参数类型的容器对象,如下所示:
@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
请注意,你仍然可以访问批处理头。
如果批处理转换器具有支持它的记录转换器,你还可以接收有效负载根据泛型类型转换的消息列表。以下示例展示了如何执行此操作:
@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
...
}
ConversionService
Customization
从 2.1.1 版本开始,默认的 org.springframework.core.convert.ConversionService
(由 org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory
用于解析侦听器方法调用的参数)提供给实现了以下任何接口的所有 bean:
-
org.springframework.core.convert.converter.Converter
-
org.springframework.core.convert.converter.GenericConverter
-
org.springframework.format.Formatter
这允许你在不更改 ConsumerFactory
和 KafkaListenerContainerFactory
的默认配置的情况下进一步自定义侦听器反序列化。
通过 KafkaListenerConfigurer
Bean 在 KafkaListenerEndpointRegistrar
上设置自定义 MessageHandlerMethodFactory
将禁用此特性。
Adding Custom HandlerMethodArgumentResolver
to @KafkaListener
从 2.4.2 版本开始,你可以添加自己的 HandlerMethodArgumentResolver
和解析自定义方法参数。你需要做的就是实现 KafkaListenerConfigurer
并从 KafkaListenerEndpointRegistrar
类使用 setCustomMethodArgumentResolvers()
方法。
@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setCustomMethodArgumentResolvers(
new HandlerMethodArgumentResolver() {
@Override
public boolean supportsParameter(MethodParameter parameter) {
return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
return new CustomMethodArgument(
message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
);
}
}
);
}
}
你还可以通过向 KafkaListenerEndpointRegistrar
bean 添加自定义 MessageHandlerMethodFactory
完全替换框架的参数解析。如果你这样做,并且你的应用程序需要处理带有 null
value()
(例如,来自压缩主题)的墓碑记录,则应该向工厂中添加 KafkaNullAwarePayloadArgumentResolver
;它必须是最后一个解析器,因为它支持所有类型并且可以匹配没有 @Payload
注解的参数。如果你正在使用 DefaultMessageHandlerMethodFactory
,请将此解析器设置为最后一个自定义解析器;工厂将确保在标准 PayloadMethodArgumentResolver
之前使用此解析器,后者不了解 KafkaNull
有效负载。