Serialization, Deserialization, and Message Conversion
Overview
Apache Kafka 提供了一个高级 API,用于序列化和反序列化记录值及其键。它与 org.apache.kafka.common.serialization.Serializer<T>
和 org.apache.kafka.common.serialization.Deserializer<T>
抽象(以及一些内置实现)一起使用。同时,我们可以使用 Producer
或 Consumer
配置属性指定序列化器和反序列化器类。以下示例显示了如何这么做:
Apache Kafka provides a high-level API for serializing and deserializing record values as well as their keys.
It is present with the org.apache.kafka.common.serialization.Serializer<T>
and
org.apache.kafka.common.serialization.Deserializer<T>
abstractions with some built-in implementations.
Meanwhile, we can specify serializer and deserializer classes by using Producer
or Consumer
configuration properties.
The following example shows how to do so:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
对于更复杂或特殊的情况,KafkaConsumer
(因此,KafkaProducer
)提供重载的构造函数分别接受用于`keys`和`values`的`Serializer`和`Deserializer`实例。
For more complex or particular cases, the KafkaConsumer
(and, therefore, KafkaProducer
) provides overloaded
constructors to accept Serializer
and Deserializer
instances for keys
and values
, respectively.
使用此 API 时,DefaultKafkaProducerFactory`和`DefaultKafkaConsumerFactory`还提供属性(通过构造函数或 setter 方法)将自定义`Serializer`和`Deserializer`实例注入目标`Producer`或`Consumer`中。此外,您还可以通过构造函数传入 `Supplier<Serializer>
或 Supplier<Deserializer>
实例 - 当创建每个`Producer`或`Consumer`时调用这些 Supplier
。
When you use this API, the DefaultKafkaProducerFactory
and DefaultKafkaConsumerFactory
also provide properties (through constructors or setter methods) to inject custom Serializer
and Deserializer
instances into the target Producer
or Consumer
.
Also, you can pass in Supplier<Serializer>
or Supplier<Deserializer>
instances through constructors - these Supplier`s are called on creation of each `Producer
or Consumer
.
String serialization
自 2.5 版本以来,Spring for Apache Kafka 提供使用实体字符串表示的 ToStringSerializer
和 ParseStringDeserializer
类。它们依赖方法 toString
和一些 Function<String>
或 BiFunction<String, Headers>
解析字符串并填充实例的属性。通常,这会调用类中的一些静态方法,例如 parse
:
Since version 2.5, Spring for Apache Kafka provides ToStringSerializer
and ParseStringDeserializer
classes that use String representation of entities.
They rely on methods toString
and some Function<String>
or BiFunction<String, Headers>
to parse the String and populate properties of an instance.
Usually, this would invoke some static method on the class, such as parse
:
ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);
默认情况下,ToStringSerializer
配置为在记录`Headers`中传达序列化实体的类型信息。您可以通过将`addTypeInfo`属性设置为`false`来禁用此功能。ParseStringDeserializer
可以在接收端使用此信息。
By default, the ToStringSerializer
is configured to convey type information about the serialized entity in the record Headers
.
You can disable this by setting the addTypeInfo
property to false
.
This information can be used by ParseStringDeserializer
on the receiving side.
-
ToStringSerializer.ADD_TYPE_INFO_HEADERS
(defaulttrue
): You can set it tofalse
to disable this feature on theToStringSerializer
(sets theaddTypeInfo
property).
ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
String entityType = new String(header);
if (entityType.contains("Thing")) {
return Thing.parse(str);
}
else {
// ...parsing logic
}
});
您可以使用默认值为`UTF-8`来配置用于将`String`转换为`byte[]/从`byte[]`转换回来的`Charset
。
You can configure the Charset
used to convert String
to/from byte[]
with the default being UTF-8
.
您可以使用`ConsumerConfig`属性配置使用解析器方法名称的解串器:
You can configure the deserializer with the name of the parser method using ConsumerConfig
properties:
-
ParseStringDeserializer.KEY_PARSER
-
ParseStringDeserializer.VALUE_PARSER
这些属性必须包含类的完全限定名,后跟由句点`.分隔的方法名。该方法必须是静态方法并且具有
(String、Headers)或
(String)`的签名。
The properties must contain the fully qualified name of the class followed by the method name, separated by a period .
.
The method must be static and have a signature of either (String, Headers)
or (String)
.
还为 Kafka 流提供了`ToFromStringSerde`。
A ToFromStringSerde
is also provided, for use with Kafka Streams.
JSON
Spring for Apache Kafka还提供了基于Jackson JSON对象映射程序的`JsonSerializer`和`JsonDeserializer`实现。JsonSerializer`允许将任何Java对象写入为JSON `byte[]
。要将使用`byte[]消费的`Class<?> targetType`正确反序列化为目标对象,`JsonDeserializer`需要一个附加的`Class<?> targetType`参数。以下示例演示如何创建`JsonDeserializer
:
Spring for Apache Kafka also provides JsonSerializer
and JsonDeserializer
implementations that are based on the
Jackson JSON object mapper.
The JsonSerializer
allows writing any Java object as a JSON byte[]
.
The JsonDeserializer
requires an additional Class<?> targetType
argument to allow the deserialization of a consumed byte[]
to the proper target object.
The following example shows how to create a JsonDeserializer
:
JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);
您可以用`ObjectMapper`自定义`JsonSerializer`和`JsonDeserializer`。您还可以扩展它们以在`configure(Map<String, ?> configs, boolean isKey)`方法中实现一些特定的配置逻辑。
You can customize both JsonSerializer
and JsonDeserializer
with an ObjectMapper
.
You can also extend them to implement some particular configuration logic in the configure(Map<String, ?> configs, boolean isKey)
method.
从版本 2.3 开始,所有支持 JSON 的组件都默认配置 JacksonUtils.enhancedObjectMapper()
实例,该实例携带有已禁用的 MapperFeature.DEFAULT_VIEW_INCLUSION
和 DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES
功能。此外,此实例还随附了用于自定义数据类型(例如 Java 时间和 Kotlin 支持)的众所周知的模块。详见 JacksonUtils.enhancedObjectMapper()
JavaDoc 以获取更多信息。此方法还注册一个 org.springframework.kafka.support.JacksonMimeTypeModule
,用于将 org.springframework.util.MimeType
对象序列化为普通字符串,以实现网络上的跨平台兼容性。可以将 JacksonMimeTypeModule
作为 bean 注册在应用程序上下文中,它将自动配置到 link:https://docs.spring.io/spring-boot/docs/current/reference/html/howto.html#howto.spring-mvc.customize-jackson-objectmapper[Spring Boot ObjectMapper
实例中。
Starting with version 2.3, all the JSON-aware components are configured by default with a JacksonUtils.enhancedObjectMapper()
instance, which comes with the MapperFeature.DEFAULT_VIEW_INCLUSION
and DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES
features disabled.
Also such an instance is supplied with well-known modules for custom data types, such a Java time and Kotlin support.
See JacksonUtils.enhancedObjectMapper()
JavaDocs for more information.
This method also registers a org.springframework.kafka.support.JacksonMimeTypeModule
for org.springframework.util.MimeType
objects serialization into the plain string for inter-platform compatibility over the network.
A JacksonMimeTypeModule
can be registered as a bean in the application context and it will be auto-configured into the Spring Boot ObjectMapper
instance.
从 2.3 版本开始,`JsonDeserializer`还提供了基于`TypeReference`的构造函数,以更好地处理目标泛型容器类型。
Also starting with version 2.3, the JsonDeserializer
provides TypeReference
-based constructors for better handling of target generic container types.
从 2.1 版本开始,您可以在记录`Headers`中传达类型信息,从而处理多种类型。此外,您可以使用以下 Kafka 属性配置序列化器和反序列化器。如果您已为 KafkaConsumer
和 KafkaProducer
分别提供了 Serializer
和 Deserializer
实例,它们不会产生任何影响。
Starting with version 2.1, you can convey type information in record Headers
, allowing the handling of multiple types.
In addition, you can configure the serializer and deserializer by using the following Kafka properties.
They have no effect if you have provided Serializer
and Deserializer
instances for KafkaConsumer
and KafkaProducer
, respectively.
Configuration Properties
-
JsonSerializer.ADD_TYPE_INFO_HEADERS
(defaulttrue
): You can set it tofalse
to disable this feature on theJsonSerializer
(sets theaddTypeInfo
property). -
JsonSerializer.TYPE_MAPPINGS
(defaultempty
): See Mapping Types. -
JsonDeserializer.USE_TYPE_INFO_HEADERS
(defaulttrue
): You can set it tofalse
to ignore headers set by the serializer. -
JsonDeserializer.REMOVE_TYPE_INFO_HEADERS
(defaulttrue
): You can set it tofalse
to retain headers set by the serializer. -
JsonDeserializer.KEY_DEFAULT_TYPE
: Fallback type for deserialization of keys if no header information is present. -
JsonDeserializer.VALUE_DEFAULT_TYPE
: Fallback type for deserialization of values if no header information is present. -
JsonDeserializer.TRUSTED_PACKAGES
(defaultjava.util
,java.lang
): Comma-delimited list of package patterns allowed for deserialization.*
means deserializing all. -
JsonDeserializer.TYPE_MAPPINGS
(defaultempty
): See Mapping Types. -
JsonDeserializer.KEY_TYPE_METHOD
(defaultempty
): See Using Methods to Determine Types. -
JsonDeserializer.VALUE_TYPE_METHOD
(defaultempty
): See Using Methods to Determine Types.
从 2.2 版本开始,反序列化器会删除类型信息头(如果由序列化器添加)。您可以在反序列化器上或使用前面所述的配置属性直接将`removeTypeHeaders`属性设置为`false`,以恢复到之前的行为。
Starting with version 2.2, the type information headers (if added by the serializer) are removed by the deserializer.
You can revert to the previous behavior by setting the removeTypeHeaders
property to false
, either directly on the deserializer or with the configuration property described earlier.
从版本 2.8 开始,如果你按 Programmatic Construction 中所示以编程方式构建序列化器或反序列化器,那么只要你未显式设置任何属性(使用 set*()
方法或使用流利 API),以上属性将由工厂应用。以前,在以编程方式创建时,配置属性永远不会被应用;如果你直接在对象上显式设置属性,则情况仍然如此。
Starting with version 2.8, if you construct the serializer or deserializer programmatically as shown in Programmatic Construction, the above properties will be applied by the factories, as long as you have not set any properties explicitly (using set*()
methods or using the fluent API).
Previously, when creating programmatically, the configuration properties were never applied; this is still the case if you explicitly set properties on the object directly.
Mapping Types
从 2.2 版本开始,当使用 JSON 时,您现在可以使用前一个列表中的属性提供类型映射。以前,您必须在序列化器和反序列化器中自定义类型映射器。映射由逗号分隔的`token:className`对列表组成。在出站时,有效负载的类名映射到相应的令牌。在入站时,类型头中的令牌映射到相应的类名。
Starting with version 2.2, when using JSON, you can now provide type mappings by using the properties in the preceding list.
Previously, you had to customize the type mapper within the serializer and deserializer.
Mappings consist of a comma-delimited list of token:className
pairs.
On outbound, the payload’s class name is mapped to the corresponding token.
On inbound, the token in the type header is mapped to the corresponding class name.
以下示例创建一组映射:
The following example creates a set of mappings:
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.Hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeSerializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.Hat");
相应的对象必须兼容。
The corresponding objects must be compatible.
如果您使用“ Spring Boot”,可以在“application.properties
”(或 yaml)文件中提供这些属性。以下示例显示了如何执行此操作:
If you use Spring Boot, you can provide these properties in the application.properties
(or yaml) file.
The following example shows how to do so:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat
您只能使用属性执行简单配置。对于更高级的配置(例如在序列化器和反序列化器中使用自定义`ObjectMapper`),您应该使用接受预构建序列化器和反序列化器的生产者和消费者工厂构造函数。以下 Spring Boot 示例覆盖默认工厂:
You can perform only simple configuration with properties.
For more advanced configuration (such as using a custom ObjectMapper
in the serializer and deserializer), you should use the producer and consumer factory constructors that accept a pre-built serializer and deserializer.
The following Spring Boot example overrides the default factories:
@Bean
public ConsumerFactory<String, Thing> kafkaConsumerFactory(JsonDeserializer customValueDeserializer) {
Map<String, Object> properties = new HashMap<>();
// properties.put(..., ...)
// ...
return new DefaultKafkaConsumerFactory<>(properties,
new StringDeserializer(), customValueDeserializer);
}
@Bean
public ProducerFactory<String, Thing> kafkaProducerFactory(JsonSerializer customValueSerializer) {
return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(),
new StringSerializer(), customValueSerializer);
}
还提供了 setter,作为使用这些构造函数的替代方法。
Setters are also provided, as an alternative to using these constructors.
从版本 2.2 开始,您可以通过使用带有布尔型 useHeadersIfPresent
参数(默认情况下为 true
)的重载构造函数之一,明确配置反序列化器以使用提供的目标类型并忽略头信息中的类型信息。以下示例演示了如何执行此操作:
Starting with version 2.2, you can explicitly configure the deserializer to use the supplied target type and ignore type information in headers by using one of the overloaded constructors that have a boolean useHeadersIfPresent
argument (which is true
by default).
The following example shows how to do so:
DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
Using Methods to Determine Types
从2.5版本开始,您现在可以通过属性配置反序列化程序,调用方法来确定目标类型。如果存在,这将覆盖上述任何其他技术。如果数据是由不使用Spring序列化程序的应用程序发布的,并且您需要根据数据或其他标头反序列化为不同的类型,这可能会很有用。将这些属性设置为方法名——一个完全限定的类名,后跟方法名,由句点`.分隔。该方法必须声明为`public static
,具有三个签名之一`(String topic, byte[] data, Headers headers)`(byte[] data, Headers headers)`或
(byte[] data),并返回Jackson `JavaType
。
Starting with version 2.5, you can now configure the deserializer, via properties, to invoke a method to determine the target type.
If present, this will override any of the other techniques discussed above.
This can be useful if the data is published by an application that does not use the Spring serializer and you need to deserialize to different types depending on the data, or other headers.
Set these properties to the method name - a fully qualified class name followed by the method name, separated by a period .
.
The method must be declared as public static
, have one of three signatures (String topic, byte[] data, Headers headers)
, (byte[] data, Headers headers)
or (byte[] data)
and return a Jackson JavaType
.
-
JsonDeserializer.KEY_TYPE_METHOD
:spring.json.key.type.method
-
JsonDeserializer.VALUE_TYPE_METHOD
:spring.json.value.type.method
您可以使用任意的标题或检查数据来确定类型。
You can use arbitrary headers or inspect the data to determine the type.
JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);
JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);
public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
// {"thisIsAFieldInThing1":"value", ...
if (data[21] == '1') {
return thing1Type;
}
else {
return thing2Type;
}
}
对于更复杂的 data 检查,请考虑使用 JsonPath
或类似工具,但是,确定类型的测试越简单,该过程的效率就越高。
For more sophisticated data inspection consider using JsonPath
or similar but, the simpler the test to determine the type, the more efficient the process will be.
以下是在以编程方式创建反序列化器(在构造函数中向消费者工厂提供反序列化器时)的一个示例:
The following is an example of creating the deserializer programmatically (when providing the consumer factory with the deserializer in the constructor):
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);
...
public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
...
}
Programmatic Construction
从 2.3 版开始,在生产者/消费者工厂中以编程方式构造序列化器/反序列化器时,可以使用流畅的 API,这简化了配置。
When constructing the serializer/deserializer programmatically for use in the producer/consumer factory, since version 2.3, you can use the fluent API, which simplifies configuration.
@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
new JsonSerializer<MyKeyType>()
.forKeys()
.noTypeInfo(),
new JsonSerializer<MyValueType>()
.noTypeInfo());
return pf;
}
@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
new JsonDeserializer<>(MyKeyType.class)
.forKeys()
.ignoreTypeHeaders(),
new JsonDeserializer<>(MyValueType.class)
.ignoreTypeHeaders());
return cf;
}
要以类似于“Using Methods to Determine Types”的方式提供类型映射,请使用“typeFunction
”属性。
To provide type mapping programmatically, similar to Using Methods to Determine Types, use the typeFunction
property.
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeFunction(MyUtils::thingOneOrThingTwo);
或者,只要您不使用流畅 API 配置属性,或者不使用“set*()
”方法设置属性,工厂就会使用配置属性配置序列化程序/反序列化程序;请参阅“Configuration Properties”。
Alternatively, as long as you don’t use the fluent API to configure properties, or set them using set*()
methods, the factories will configure the serializer/deserializer using the configuration properties; see Configuration Properties.
Delegating Serializer and Deserializer
Using Headers
2.3 版引入了 DelegatingSerializer
和 DelegatingDeserializer
,允许生成和使用具有不同键和/或值类型的记录。生产者必须将标题 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
设置为用于选择用于该值序列化器的选择器值,将 DelegatingSerializer.KEY_SERIALIZATION_SELECTOR
设置为用于键的选择器值;如果找不到匹配项,则会引发 IllegalStateException
。
Version 2.3 introduced the DelegatingSerializer
and DelegatingDeserializer
, which allow producing and consuming records with different key and/or value types.
Producers must set a header DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
to a selector value that is used to select which serializer to use for the value and DelegatingSerializer.KEY_SERIALIZATION_SELECTOR
for the key; if a match is not found, an IllegalStateException
is thrown.
对于传入的记录,反序列化程序使用相同的标头来选择要使用的反序列化程序;如果没有找到匹配项或不存在标头,则返回原始`byte[]`。
For incoming records, the deserializer uses the same headers to select the deserializer to use; if a match is not found or the header is not present, the raw byte[]
is returned.
您可以通过构造函数配置选择器到 Serializer
/ Deserializer
的映射,或者可以通过带有键 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG
和 DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG
的 Kafka 生产者/消费者属性进行配置。对于序列化程序,生产者属性可以是一个 Map<String, Object>
,其中键是选择器,值是 Serializer
实例、序列化程序 Class
或类名。该属性还可以是逗号分隔的地图项字符串,如下所示。
You can configure the map of selector to Serializer
/ Deserializer
via a constructor, or you can configure it via Kafka producer/consumer properties with the keys DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG
and DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG
.
For the serializer, the producer property can be a Map<String, Object>
where the key is the selector and the value is a Serializer
instance, a serializer Class
or the class name.
The property can also be a String of comma-delimited map entries, as shown below.
对于反序列化程序,消费者属性可以是一个 Map<String, Object>
,其中键是选择器,值是 Deserializer
实例、反序列化程序 Class
或类名。该属性还可以是逗号分隔的地图项字符串,如下所示。
For the deserializer, the consumer property can be a Map<String, Object>
where the key is the selector and the value is a Deserializer
instance, a deserializer Class
or the class name.
The property can also be a String of comma-delimited map entries, as shown below.
要使用属性进行配置,请使用以下语法:
To configure using properties, use the following syntax:
producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")
consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")
然后,生产者会将 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
标头设置为 thing1
或 thing2
。
Producers would then set the DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
header to thing1
or thing2
.
此技术支持将不同类型发送到同一主题(或不同主题)。
This technique supports sending different types to the same topic (or different topics).
从版本 2.5.1 开始,如果类型(键或值)是 |
Starting with version 2.5.1, it is not necessary to set the selector header, if the type (key or value) is one of the standard types supported by |
有关将不同类型发送到不同主题的另一种技术,请参见 xref:kafka/sending-messages.adoc#routing-template[Using RoutingKafkaTemplate
。
For another technique to send different types to different topics, see Using RoutingKafkaTemplate
.
By Type
2.8 版引入了 DelegatingByTypeSerializer
。
Version 2.8 introduced the DelegatingByTypeSerializer
.
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
null, new DelegatingByTypeSerializer(Map.of(
byte[].class, new ByteArraySerializer(),
Bytes.class, new BytesSerializer(),
String.class, new StringSerializer())));
}
从版本 2.8.3 开始,您可以配置序列化程序来检查地图键是否可从目标对象赋值,这在委托序列化程序能够序列化子类时非常有用。在这种情况下,如果存在不明确的匹配项,则应提供一个有序 Map
,例如 LinkedHashMap
。
Starting with version 2.8.3, you can configure the serializer to check if the map key is assignable from the target object, useful when a delegate serializer can serialize sub classes.
In this case, if there are amiguous matches, an ordered Map
, such as a LinkedHashMap
should be provided.
By Topic
从版本 2.8 开始,DelegatingByTopicSerializer
和 DelegatingByTopicDeserializer
允许基于主题名称选择序列化程序/反序列化程序。Regex Pattern
用于查找要使用的实例。该映射可以使用构造函数配置,也可以通过属性来配置(以逗号分隔的 pattern:serializer
列表)。
Starting with version 2.8, the DelegatingByTopicSerializer
and DelegatingByTopicDeserializer
allow selection of a serializer/deserializer based on the topic name.
Regex Pattern`s are used to lookup the instance to use.
The map can be configured using a constructor, or via properties (a comma delimited list of `pattern:serializer
).
producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArraySerializer.class.getName()
+ ", topic[5-9]:" + StringSerializer.class.getName());
...
ConsumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArrayDeserializer.class.getName()
+ ", topic[5-9]:" + StringDeserializer.class.getName());
将此用于键时,请使用 KEY_SERIALIZATION_TOPIC_CONFIG
。
Use KEY_SERIALIZATION_TOPIC_CONFIG
when using this for keys.
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
new IntegerSerializer(),
new DelegatingByTopicSerializer(Map.of(
Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
Pattern.compile("topic[5-9]"), new StringSerializer())),
new JsonSerializer<Object>()); // default
}
您可以使用 DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULT
和 DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT
指定在没有模式匹配时使用的默认序列化程序/反序列化程序。
You can specify a default serializer/deserializer to use when there is no pattern match using DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULT
and DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT
.
当设置为 false
时,一个附加属性 DelegatingByTopicSerialization.CASE_SENSITIVE
(默认 true
)会使主题查找不区分大小写。
An additional property DelegatingByTopicSerialization.CASE_SENSITIVE
(default true
), when set to false
makes the topic lookup case insensitive.
Retrying Deserializer
RetryingDeserializer
使用委托 Deserializer
和 RetryTemplate
在委托在反序列化期间可能出现瞬态错误(例如网络问题)时重试反序列化。
The RetryingDeserializer
uses a delegate Deserializer
and RetryTemplate
to retry deserialization when the delegate might have transient errors, such as network issues, during deserialization.
ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
从版本 3.1.2
开始,RecoveryCallback
可以选择性地在 RetryingDeserializer
上设置。
Starting with version 3.1.2
, a RecoveryCallback
can be set on the RetryingDeserializer
optionally.
有关配置具有重试策略、回退策略等功能的“RetryTemplate
”,请参阅“ spring-retry”项目。
Refer to the spring-retry project for configuration of the RetryTemplate
with a retry policy, back off policy, etc.
Spring Messaging Message Conversion
虽然从底层 Kafka“Consumer
”和“Producer
”角度来看,“Serializer
”和“Deserializer
”API 非常简单而灵活,但当使用“@KafkaListener
”或“ Spring Integration’s Apache Kafka Support”时,您可能需要 Spring Messaging 层面的更大灵活性。为了让您轻松地转换到或从“org.springframework.messaging.Message
”,Spring for Apache Kafka 提供了“MessageConverter
”抽象,带有“MessagingMessageConverter
”实现及其“JsonMessageConverter
”(及子类)自定义项。您可以将“MessageConverter
”直接注入“KafkaTemplate
”实例,并使用“AbstractKafkaListenerContainerFactory
”bean 定义为“@KafkaListener.containerFactory()
”属性。以下示例显示了如何执行此操作:
Although the Serializer
and Deserializer
API is quite simple and flexible from the low-level Kafka Consumer
and Producer
perspective, you might need more flexibility at the Spring Messaging level, when using either @KafkaListener
or Spring Integration’s Apache Kafka Support.
To let you easily convert to and from org.springframework.messaging.Message
, Spring for Apache Kafka provides a MessageConverter
abstraction with the MessagingMessageConverter
implementation and its JsonMessageConverter
(and subclasses) customization.
You can inject the MessageConverter
into a KafkaTemplate
instance directly and by using AbstractKafkaListenerContainerFactory
bean definition for the @KafkaListener.containerFactory()
property.
The following example shows how to do so:
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordMessageConverter(new JsonMessageConverter());
return factory;
}
...
@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}
使用 Spring Boot 时,只需将转换器定义为 @Bean
,Spring Boot 自动配置就会将其连接到自动配置的模板和容器工厂中。
When using Spring Boot, simply define the converter as a @Bean
and Spring Boot auto configuration will wire it into the auto-configured template and container factory.
当你使用 @KafkaListener
时,参数类型会提供给消息转换器以协助转换。
When you use a @KafkaListener
, the parameter type is provided to the message converter to assist with the conversion.
只当在方法级别声明 This type inference can be achieved only when the |
在消费者方面,您可以配置`JsonMessageConverter`,它可以处理类型为`byte[] On the consumer side, you can configure a 在生产者端,当您使用 Spring Integration 或 On the producer side, when you use Spring Integration or the
同样,使用`byte[]`或`Bytes`更有效,因为它们避免了`String`到`byte[]`的转换。 Again, using 为方便起见,从版本 2.3 开始,框架还提供了一个 For convenience, starting with version 2.3, the framework also provides a |
从版本 2.7.1 开始,消息有效载荷转换可以委托给 spring-messaging
SmartMessageConverter
;例如,这使得转换可以基于 MessageHeaders.CONTENT_TYPE
头进行。
Starting with version 2.7.1, message payload conversion can be delegated to a spring-messaging
SmartMessageConverter
; this enables conversion, for example, to be based on the MessageHeaders.CONTENT_TYPE
header.
KafkaMessageConverter.fromMessage()
方法用于将消息有效载荷转换为 ProducerRecord
的出站转换,其中 ProducerRecord.value()
属性包含消息有效载荷。KafkaMessageConverter.toMessage()
方法用于将负载为 ConsumerRecord
的入站消息转换为 ConsumerRecord.value()
属性。SmartMessageConverter.toMessage()
方法用于使用传递给 fromMessage()
的 Message
(通常由 KafkaTemplate.send(Message<?> msg)
传递)创建一个新的出站 Message<?>
。同样,在 KafkaMessageConverter.toMessage()
方法中,在转换器从 ConsumerRecord
创建了一个新的 Message<?>
之后,调用 SmartMessageConverter.fromMessage()
方法,然后使用新转换的有效载荷创建最终入站消息。在任何一种情况下,如果 SmartMessageConverter
返回 null
,则使用原始消息。
The KafkaMessageConverter.fromMessage()
method is called for outbound conversion to a ProducerRecord
with the message payload in the ProducerRecord.value()
property.
The KafkaMessageConverter.toMessage()
method is called for inbound conversion from ConsumerRecord
with the payload being the ConsumerRecord.value()
property.
The SmartMessageConverter.toMessage()
method is called to create a new outbound Message<?>
from the Message
passed to fromMessage()
(usually by KafkaTemplate.send(Message<?> msg)
).
Similarly, in the KafkaMessageConverter.toMessage()
method, after the converter has created a new Message<?>
from the ConsumerRecord
, the SmartMessageConverter.fromMessage()
method is called and then the final inbound message is created with the newly converted payload.
In either case, if the SmartMessageConverter
returns null
, the original message is used.
当在 KafkaTemplate
和监听器容器工厂中使用默认转换器时,你可以通过在模板上调用 setMessagingConverter()
以及在 @KafkaListener
方法上使用 contentMessageConverter
属性来配置 SmartMessageConverter
。
When the default converter is used in the KafkaTemplate
and listener container factory, you configure the SmartMessageConverter
by calling setMessagingConverter()
on the template and via the contentMessageConverter
property on @KafkaListener
methods.
示例:
Examples:
template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
...
}
Using Spring Data Projection Interfaces
从版本 2.1.1 开始,你可以将 JSON 转换成 Spring Data Projection 接口,而不是具体类型。这允许非常有选择性地将数据进行低耦合绑定,包括查找 JSON 文档内部多个地方的值。例如,可以将以下接口定义为消息有效载荷类型:
Starting with version 2.1.1, you can convert JSON to a Spring Data Projection interface instead of a concrete type. This allows very selective, and low-coupled bindings to data, including the lookup of values from multiple places inside the JSON document. For example the following interface can be defined as message payload type:
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
访问器方法将默认使用字段在接收到的 JSON 文档中查找属性名称。@JsonPath
表达式允许自定义值查找,甚至定义多个 JSON 路径表达式,以便从多个地方查找值,直到表达式返回实际值。
Accessor methods will be used to lookup the property name as field in the received JSON document by default.
The @JsonPath
expression allows customization of the value lookup, and even to define multiple JSON Path expressions, to look up values from multiple places until an expression returns an actual value.
要启用此功能,请使用配置有适当委托转换器(用于出站转换和转换非投影接口)的 ProjectingMessageConverter
。你还必须将 spring-data:spring-data-commons
和 com.jayway.jsonpath:json-path
添加到类路径中。
To enable this feature, use a ProjectingMessageConverter
configured with an appropriate delegate converter (used for outbound conversion and converting non-projection interfaces).
You must also add spring-data:spring-data-commons
and com.jayway.jsonpath:json-path
to the classpath.
当用作 @KafkaListener
方法的参数时,接口类型会像往常一样自动传给转换器。
When used as the parameter to a @KafkaListener
method, the interface type is automatically passed to the converter as normal.
Using ErrorHandlingDeserializer
当反序列化器未能够反序列化消息时,Spring 没有办法处理这个问题,因为它发生在 poll()
返回之前。为了解决这个问题,引入了 ErrorHandlingDeserializer
。此反序列化器委托给真正的反序列化器(键或值)。如果委托未能够反序列化记录内容,则 ErrorHandlingDeserializer
会返回一个 null
值和一个包含原因和原始字节的头部中的 DeserializationException
。当你使用记录级别的 MessageListener
时,如果 ConsumerRecord
包含键或值任一者的 DeserializationException
头部,容器的 ErrorHandler
会使用失败的 ConsumerRecord
调用。记录不会传递给监听器。
When a deserializer fails to deserialize a message, Spring has no way to handle the problem, because it occurs before the poll()
returns.
To solve this problem, the ErrorHandlingDeserializer
has been introduced.
This deserializer delegates to a real deserializer (key or value).
If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer
returns a null
value and a DeserializationException
in a header that contains the cause and the raw bytes.
When you use a record-level MessageListener
, if the ConsumerRecord
contains a DeserializationException
header for either the key or value, the container’s ErrorHandler
is called with the failed ConsumerRecord
.
The record is not passed to the listener.
或者,您可以配置“ErrorHandlingDeserializer
”以使用“failedDeserializationFunction
”创建自定义值,该值是“Function<FailedDeserializationInfo, T>
”。调用此函数可创建“T
”的实例,该实例以通常方式传递给侦听器。系统会向函数提供包含所有上下文信息的对象,类型为“FailedDeserializationInfo
”。您可以在标头中找到“DeserializationException
”(作为序列化的 Java 对象)。有关详情,请参阅“ErrorHandlingDeserializer
”的“ Javadoc”。
Alternatively, you can configure the ErrorHandlingDeserializer
to create a custom value by providing a failedDeserializationFunction
, which is a Function<FailedDeserializationInfo, T>
.
This function is invoked to create an instance of T
, which is passed to the listener in the usual fashion.
An object of type FailedDeserializationInfo
, which contains all the contextual information is provided to the function.
You can find the DeserializationException
(as a serialized Java object) in headers.
See the Javadoc for the ErrorHandlingDeserializer
for more information.
你可以使用 DefaultKafkaConsumerFactory
构造器,该构造器采用键和值 Deserializer
对象,并连接上使用适当委托配置的合适的 ErrorHandlingDeserializer
实例。或者,你可以使用消费者配置属性(由 ErrorHandlingDeserializer
使用)来实例化委托。属性名称是 ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS
和 ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS
。属性值可以是类或类名。以下示例展示了如何设置这些属性:
You can use the DefaultKafkaConsumerFactory
constructor that takes key and value Deserializer
objects and wire in appropriate ErrorHandlingDeserializer
instances that you have configured with the proper delegates.
Alternatively, you can use consumer configuration properties (which are used by the ErrorHandlingDeserializer
) to instantiate the delegates.
The property names are ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS
and ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS
.
The property value can be a class or class name.
The following example shows how to set these properties:
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
以下示例使用了 failedDeserializationFunction
。
The following example uses a failedDeserializationFunction
.
public class BadThing extends Thing {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {
@Override
public Thing apply(FailedDeserializationInfo info) {
return new BadThing(info);
}
}
前面的示例使用了以下配置:
The preceding example uses the following configuration:
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...
如果消费者通过 ErrorHandlingDeserializer
配置,则务必使用能够处理常规对象以及反序列化异常产生的原始 byte[]
值的序列化器来配置 KafkaTemplate
及其生成器。模板的泛型值类型应为 Object
。一种方法是使用 DelegatingByTypeSerializer
;示例如下:
If the consumer is configured with an ErrorHandlingDeserializer
, it is important to configure the KafkaTemplate
and its producer with a serializer that can handle normal objects as well as raw byte[]
values, which result from deserialization exceptions.
The generic value type of the template should be Object
.
One technique is to use the DelegatingByTypeSerializer
; an example follows:
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
当使用具有批处理侦听器的 ErrorHandlingDeserializer
时,你必须检查消息头中的反序列化异常。当与 DefaultBatchErrorHandler
配合使用时,你可以使用该头来确定异常失败的记录,并通过 BatchListenerFailedException
传达给错误处理程序。
When using an ErrorHandlingDeserializer
with a batch listener, you must check for the deserialization exceptions in message headers.
When used with a DefaultBatchErrorHandler
, you can use that header to determine which record the exception failed on and communicate to the error handler via a BatchListenerFailedException
.
@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
for (int i = 0; i < in.size(); i++) {
Thing thing = in.get(i);
if (thing == null
&& headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
try {
DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
if (deserEx != null) {
logger.error(deserEx, "Record at index " + i + " could not be deserialized");
}
}
catch (Exception ex) {
logger.error(ex, "Record at index " + i + " could not be deserialized");
}
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
process(thing);
}
}
SerializationUtils.byteArrayToDeserializationException()
可用于将头转换为 DeserializationException
。
SerializationUtils.byteArrayToDeserializationException()
can be used to convert the header to a DeserializationException
.
当使用 List<ConsumerRecord<?, ?>
时,使用 SerializationUtils.getExceptionFromHeader()
替代:
When consuming List<ConsumerRecord<?, ?>
, SerializationUtils.getExceptionFromHeader()
is used instead:
@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
for (int i = 0; i < in.size(); i++) {
ConsumerRecord<String, Thing> rec = in.get(i);
if (rec.value() == null) {
DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
if (deserEx != null) {
logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
}
process(rec.value());
}
}
如果你也正在使用 DeadLetterPublishingRecoverer
,则为 DeserializationException
发布的记录将具有类型为 byte[]
的 record.value()
;此类型不应被序列化。考虑使用配置为对 byte[]
使用 ByteArraySerializer
以及对所有其他类型使用普通序列化器(Json、Avro 等)的 DelegatingByTypeSerializer
。
If you are also using a DeadLetterPublishingRecoverer
, the record published for a DeserializationException
will have a record.value()
of type byte[]
; this should not be serialized.
Consider using a DelegatingByTypeSerializer
configured to use a ByteArraySerializer
for byte[]
and the normal serializer (Json, Avro, etc) for all other types.
从 3.1 版本开始,你可以在 ErrorHandlingDeserializer
中添加一个 Validator
。如果委托 Deserializer
成功反序列化对象,但该对象验证失败,则会抛出一个类似于发生反序列化异常的异常。这允许将原始原始数据传递给错误处理程序。在创建反序列化器时,只需调用 setValidator
;如果你使用属性配置序列化器,请将消费者配置属性 ErrorHandlingDeserializer.VALIDATOR_CLASS
设置为 Validator
的类或完全限定类名。在使用 Spring Boot 时,此属性名称为 spring.kafka.consumer.properties.spring.deserializer.validator.class
。
Starting with version 3.1, you can add a Validator
to the ErrorHandlingDeserializer
.
If the delegate Deserializer
successfully deserializes the object, but that object fails validation, an exception is thrown similar to a deserialization exception occurring.
This allows the original raw data to be passed to the error handler.
WHen creating the deserializer yourself, simply call setValidator
; if you configure the serializer using properties, set the consumer configuration property ErrorHandlingDeserializer.VALIDATOR_CLASS
to the class or fully qualified class name for your Validator
.
When using Spring Boot, this property name is spring.kafka.consumer.properties.spring.deserializer.validator.class
.
Payload Conversion with Batch Listeners
当您使用批处理侦听器容器工厂时,您还可以在“BatchMessagingMessageConverter
”中使用“JsonMessageConverter
”来转换批处理消息。有关详情,请参阅“Serialization, Deserialization, and Message Conversion”和“Spring Messaging Message Conversion”。
You can also use a JsonMessageConverter
within a BatchMessagingMessageConverter
to convert batch messages when you use a batch listener container factory.
See Serialization, Deserialization, and Message Conversion and Spring Messaging Message Conversion for more information.
默认情况下,转换的类型从侦听器参数中推断出来。如果您使用其`TypePrecedence`设置为`TYPE_ID`(而不是默认的`INFERRED`)的`DefaultJackson2TypeMapper`,配置`JsonMessageConverter`,则转换器代わりに使用标头中的类型信息(如果存在)。例如,这允许将侦听器方法用接口而不是具体类声明。此外,类型转换器支持映射,因此可以将反序列化为与源不同的类型(只要数据兼容)。当您使用class-level @KafkaListener
instances时,这也很有用,在这种情况下,有效负载必须已经转换为用于确定要调用的方法。以下示例创建使用此方法的bean:
By default, the type for the conversion is inferred from the listener argument.
If you configure the JsonMessageConverter
with a DefaultJackson2TypeMapper
that has its TypePrecedence
set to TYPE_ID
(instead of the default INFERRED
), the converter uses the type information in headers (if present) instead.
This allows, for example, listener methods to be declared with interfaces instead of concrete classes.
Also, the type converter supports mapping, so the deserialization can be to a different type than the source (as long as the data is compatible).
This is also useful when you use class-level @KafkaListener
instances where the payload must have already been converted to determine which method to invoke.
The following example creates beans that use this method:
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
return factory;
}
@Bean
public JsonMessageConverter converter() {
return new JsonMessageConverter();
}
请注意,为了执行此操作,转换目标的方法签名必须是带有单个泛型参数类型的容器对象,如下所示:
Note that, for this to work, the method signature for the conversion target must be a container object with a single generic parameter type, such as the following:
@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
请注意,你仍然可以访问批处理头。
Note that you can still access the batch headers.
如果批处理转换器具有支持它的记录转换器,你还可以接收有效负载根据泛型类型转换的消息列表。以下示例展示了如何执行此操作:
If the batch converter has a record converter that supports it, you can also receive a list of messages where the payloads are converted according to the generic type. The following example shows how to do so:
@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
...
}
ConversionService
Customization
从 2.1.1 版本开始,默认的 org.springframework.core.convert.ConversionService
(由 org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory
用于解析侦听器方法调用的参数)提供给实现了以下任何接口的所有 bean:
Starting with version 2.1.1, the org.springframework.core.convert.ConversionService
used by the default org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory
to resolve parameters for the invocation of a listener method is supplied with all beans that implement any of the following interfaces:
-
org.springframework.core.convert.converter.Converter
-
org.springframework.core.convert.converter.GenericConverter
-
org.springframework.format.Formatter
这允许你在不更改 ConsumerFactory
和 KafkaListenerContainerFactory
的默认配置的情况下进一步自定义侦听器反序列化。
This lets you further customize listener deserialization without changing the default configuration for ConsumerFactory
and KafkaListenerContainerFactory
.
通过 KafkaListenerConfigurer
Bean 在 KafkaListenerEndpointRegistrar
上设置自定义 MessageHandlerMethodFactory
将禁用此特性。
Setting a custom MessageHandlerMethodFactory
on the KafkaListenerEndpointRegistrar
through a KafkaListenerConfigurer
bean disables this feature.
Adding Custom HandlerMethodArgumentResolver
to @KafkaListener
从 2.4.2 版本开始,你可以添加自己的 HandlerMethodArgumentResolver
和解析自定义方法参数。你需要做的就是实现 KafkaListenerConfigurer
并从 KafkaListenerEndpointRegistrar
类使用 setCustomMethodArgumentResolvers()
方法。
Starting with version 2.4.2 you are able to add your own HandlerMethodArgumentResolver
and resolve custom method parameters.
All you need is to implement KafkaListenerConfigurer
and use method setCustomMethodArgumentResolvers()
from class KafkaListenerEndpointRegistrar
.
@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setCustomMethodArgumentResolvers(
new HandlerMethodArgumentResolver() {
@Override
public boolean supportsParameter(MethodParameter parameter) {
return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
return new CustomMethodArgument(
message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
);
}
}
);
}
}
你还可以通过向 KafkaListenerEndpointRegistrar
bean 添加自定义 MessageHandlerMethodFactory
完全替换框架的参数解析。如果你这样做,并且你的应用程序需要处理带有 null
value()
(例如,来自压缩主题)的墓碑记录,则应该向工厂中添加 KafkaNullAwarePayloadArgumentResolver
;它必须是最后一个解析器,因为它支持所有类型并且可以匹配没有 @Payload
注解的参数。如果你正在使用 DefaultMessageHandlerMethodFactory
,请将此解析器设置为最后一个自定义解析器;工厂将确保在标准 PayloadMethodArgumentResolver
之前使用此解析器,后者不了解 KafkaNull
有效负载。
You can also completely replace the framework’s argument resolution by adding a custom MessageHandlerMethodFactory
to the KafkaListenerEndpointRegistrar
bean.
If you do this, and your application needs to handle tombstone records, with a null
value()
(e.g. from a compacted topic), you should add a KafkaNullAwarePayloadArgumentResolver
to the factory; it must be the last resolver because it supports all types and can match arguments without a @Payload
annotation.
If you are using a DefaultMessageHandlerMethodFactory
, set this resolver as the last custom resolver; the factory will ensure that this resolver will be used before the standard PayloadMethodArgumentResolver
, which has no knowledge of KafkaNull
payloads.