Serialization, Deserialization, and Message Conversion

Overview

Apache Kafka 提供了一个高级 API,用于序列化和反序列化记录值及其键。它与 org.apache.kafka.common.serialization.Serializer<T>org.apache.kafka.common.serialization.Deserializer<T> 抽象(以及一些内置实现)一起使用。同时,我们可以使用 ProducerConsumer 配置属性指定序列化器和反序列化器类。以下示例显示了如何这么做:

Apache Kafka provides a high-level API for serializing and deserializing record values as well as their keys. It is present with the org.apache.kafka.common.serialization.Serializer<T> and org.apache.kafka.common.serialization.Deserializer<T> abstractions with some built-in implementations. Meanwhile, we can specify serializer and deserializer classes by using Producer or Consumer configuration properties. The following example shows how to do so:

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

对于更复杂或特殊的情况,KafkaConsumer(因此,KafkaProducer)提供重载的构造函数分别接受用于`keys`和`values`的`Serializer`和`Deserializer`实例。

For more complex or particular cases, the KafkaConsumer (and, therefore, KafkaProducer) provides overloaded constructors to accept Serializer and Deserializer instances for keys and values, respectively.

使用此 API 时,DefaultKafkaProducerFactory`和`DefaultKafkaConsumerFactory`还提供属性(通过构造函数或 setter 方法)将自定义`Serializer`和`Deserializer`实例注入目标`Producer`或`Consumer`中。此外,您还可以通过构造函数传入 `Supplier<Serializer>Supplier<Deserializer> 实例 - 当创建每个`Producer`或`Consumer`时调用这些 Supplier

When you use this API, the DefaultKafkaProducerFactory and DefaultKafkaConsumerFactory also provide properties (through constructors or setter methods) to inject custom Serializer and Deserializer instances into the target Producer or Consumer. Also, you can pass in Supplier<Serializer> or Supplier<Deserializer> instances through constructors - these Supplier`s are called on creation of each `Producer or Consumer.

String serialization

自 2.5 版本以来,Spring for Apache Kafka 提供使用实体字符串表示的 ToStringSerializerParseStringDeserializer 类。它们依赖方法 toString 和一些 Function<String>BiFunction<String, Headers> 解析字符串并填充实例的属性。通常,这会调用类中的一些静态方法,例如 parse:

Since version 2.5, Spring for Apache Kafka provides ToStringSerializer and ParseStringDeserializer classes that use String representation of entities. They rely on methods toString and some Function<String> or BiFunction<String, Headers> to parse the String and populate properties of an instance. Usually, this would invoke some static method on the class, such as parse:

ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);

默认情况下,ToStringSerializer 配置为在记录`Headers`中传达序列化实体的类型信息。您可以通过将`addTypeInfo`属性设置为`false`来禁用此功能。ParseStringDeserializer 可以在接收端使用此信息。

By default, the ToStringSerializer is configured to convey type information about the serialized entity in the record Headers. You can disable this by setting the addTypeInfo property to false. This information can be used by ParseStringDeserializer on the receiving side.

  • ToStringSerializer.ADD_TYPE_INFO_HEADERS (default true): You can set it to false to disable this feature on the ToStringSerializer (sets the addTypeInfo property).

ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
    byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
    String entityType = new String(header);

    if (entityType.contains("Thing")) {
        return Thing.parse(str);
    }
    else {
        // ...parsing logic
    }
});

您可以使用默认值为`UTF-8`来配置用于将`String`转换为`byte[]/从`byte[]`转换回来的`Charset

You can configure the Charset used to convert String to/from byte[] with the default being UTF-8.

您可以使用`ConsumerConfig`属性配置使用解析器方法名称的解串器:

You can configure the deserializer with the name of the parser method using ConsumerConfig properties:

  • ParseStringDeserializer.KEY_PARSER

  • ParseStringDeserializer.VALUE_PARSER

这些属性必须包含类的完全限定名,后跟由句点`.分隔的方法名。该方法必须是静态方法并且具有(String、Headers)(String)`的签名。

The properties must contain the fully qualified name of the class followed by the method name, separated by a period .. The method must be static and have a signature of either (String, Headers) or (String).

还为 Kafka 流提供了`ToFromStringSerde`。

A ToFromStringSerde is also provided, for use with Kafka Streams.

JSON

Spring for Apache Kafka还提供了基于Jackson JSON对象映射程序的`JsonSerializer`和`JsonDeserializer`实现。JsonSerializer`允许将任何Java对象写入为JSON `byte[]。要将使用`byte[]消费的`Class<?> targetType`正确反序列化为目标对象,`JsonDeserializer`需要一个附加的`Class<?> targetType`参数。以下示例演示如何创建`JsonDeserializer

Spring for Apache Kafka also provides JsonSerializer and JsonDeserializer implementations that are based on the Jackson JSON object mapper. The JsonSerializer allows writing any Java object as a JSON byte[]. The JsonDeserializer requires an additional Class<?> targetType argument to allow the deserialization of a consumed byte[] to the proper target object. The following example shows how to create a JsonDeserializer:

JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);

您可以用`ObjectMapper`自定义`JsonSerializer`和`JsonDeserializer`。您还可以扩展它们以在`configure(Map<String, ?> configs, boolean isKey)`方法中实现一些特定的配置逻辑。

You can customize both JsonSerializer and JsonDeserializer with an ObjectMapper. You can also extend them to implement some particular configuration logic in the configure(Map<String, ?> configs, boolean isKey) method.

从版本 2.3 开始,所有支持 JSON 的组件都默认配置 JacksonUtils.enhancedObjectMapper() 实例,该实例携带有已禁用的 MapperFeature.DEFAULT_VIEW_INCLUSIONDeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES 功能。此外,此实例还随附了用于自定义数据类型(例如 Java 时间和 Kotlin 支持)的众所周知的模块。详见 JacksonUtils.enhancedObjectMapper() JavaDoc 以获取更多信息。此方法还注册一个 org.springframework.kafka.support.JacksonMimeTypeModule,用于将 org.springframework.util.MimeType 对象序列化为普通字符串,以实现网络上的跨平台兼容性。可以将 JacksonMimeTypeModule 作为 bean 注册在应用程序上下文中,它将自动配置到 link:https://docs.spring.io/spring-boot/docs/current/reference/html/howto.html#howto.spring-mvc.customize-jackson-objectmapper[Spring Boot ObjectMapper 实例中。

Starting with version 2.3, all the JSON-aware components are configured by default with a JacksonUtils.enhancedObjectMapper() instance, which comes with the MapperFeature.DEFAULT_VIEW_INCLUSION and DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES features disabled. Also such an instance is supplied with well-known modules for custom data types, such a Java time and Kotlin support. See JacksonUtils.enhancedObjectMapper() JavaDocs for more information. This method also registers a org.springframework.kafka.support.JacksonMimeTypeModule for org.springframework.util.MimeType objects serialization into the plain string for inter-platform compatibility over the network. A JacksonMimeTypeModule can be registered as a bean in the application context and it will be auto-configured into the Spring Boot ObjectMapper instance.

从 2.3 版本开始,`JsonDeserializer`还提供了基于`TypeReference`的构造函数,以更好地处理目标泛型容器类型。

Also starting with version 2.3, the JsonDeserializer provides TypeReference-based constructors for better handling of target generic container types.

从 2.1 版本开始,您可以在记录`Headers`中传达类型信息,从而处理多种类型。此外,您可以使用以下 Kafka 属性配置序列化器和反序列化器。如果您已为 KafkaConsumerKafkaProducer 分别提供了 SerializerDeserializer 实例,它们不会产生任何影响。

Starting with version 2.1, you can convey type information in record Headers, allowing the handling of multiple types. In addition, you can configure the serializer and deserializer by using the following Kafka properties. They have no effect if you have provided Serializer and Deserializer instances for KafkaConsumer and KafkaProducer, respectively.

Configuration Properties

  • JsonSerializer.ADD_TYPE_INFO_HEADERS (default true): You can set it to false to disable this feature on the JsonSerializer (sets the addTypeInfo property).

  • JsonSerializer.TYPE_MAPPINGS (default empty): See Mapping Types.

  • JsonDeserializer.USE_TYPE_INFO_HEADERS (default true): You can set it to false to ignore headers set by the serializer.

  • JsonDeserializer.REMOVE_TYPE_INFO_HEADERS (default true): You can set it to false to retain headers set by the serializer.

  • JsonDeserializer.KEY_DEFAULT_TYPE: Fallback type for deserialization of keys if no header information is present.

  • JsonDeserializer.VALUE_DEFAULT_TYPE: Fallback type for deserialization of values if no header information is present.

  • JsonDeserializer.TRUSTED_PACKAGES (default java.util, java.lang): Comma-delimited list of package patterns allowed for deserialization. * means deserializing all.

  • JsonDeserializer.TYPE_MAPPINGS (default empty): See Mapping Types.

  • JsonDeserializer.KEY_TYPE_METHOD (default empty): See Using Methods to Determine Types.

  • JsonDeserializer.VALUE_TYPE_METHOD (default empty): See Using Methods to Determine Types.

从 2.2 版本开始,反序列化器会删除类型信息头(如果由序列化器添加)。您可以在反序列化器上或使用前面所述的配置属性直接将`removeTypeHeaders`属性设置为`false`,以恢复到之前的行为。

Starting with version 2.2, the type information headers (if added by the serializer) are removed by the deserializer. You can revert to the previous behavior by setting the removeTypeHeaders property to false, either directly on the deserializer or with the configuration property described earlier.

从版本 2.8 开始,如果你按 Programmatic Construction 中所示以编程方式构建序列化器或反序列化器,那么只要你未显式设置任何属性(使用 set*() 方法或使用流利 API),以上属性将由工厂应用。以前,在以编程方式创建时,配置属性永远不会被应用;如果你直接在对象上显式设置属性,则情况仍然如此。

Starting with version 2.8, if you construct the serializer or deserializer programmatically as shown in Programmatic Construction, the above properties will be applied by the factories, as long as you have not set any properties explicitly (using set*() methods or using the fluent API). Previously, when creating programmatically, the configuration properties were never applied; this is still the case if you explicitly set properties on the object directly.

Mapping Types

从 2.2 版本开始,当使用 JSON 时,您现在可以使用前一个列表中的属性提供类型映射。以前,您必须在序列化器和反序列化器中自定义类型映射器。映射由逗号分隔的`token:className`对列表组成。在出站时,有效负载的类名映射到相应的令牌。在入站时,类型头中的令牌映射到相应的类名。

Starting with version 2.2, when using JSON, you can now provide type mappings by using the properties in the preceding list. Previously, you had to customize the type mapper within the serializer and deserializer. Mappings consist of a comma-delimited list of token:className pairs. On outbound, the payload’s class name is mapped to the corresponding token. On inbound, the token in the type header is mapped to the corresponding class name.

以下示例创建一组映射:

The following example creates a set of mappings:

senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.Hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeSerializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.Hat");

相应的对象必须兼容。

The corresponding objects must be compatible.

如果您使用“ Spring Boot”,可以在“application.properties”(或 yaml)文件中提供这些属性。以下示例显示了如何执行此操作:

If you use Spring Boot, you can provide these properties in the application.properties (or yaml) file. The following example shows how to do so:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat

您只能使用属性执行简单配置。对于更高级的配置(例如在序列化器和反序列化器中使用自定义`ObjectMapper`),您应该使用接受预构建序列化器和反序列化器的生产者和消费者工厂构造函数。以下 Spring Boot 示例覆盖默认工厂:

You can perform only simple configuration with properties. For more advanced configuration (such as using a custom ObjectMapper in the serializer and deserializer), you should use the producer and consumer factory constructors that accept a pre-built serializer and deserializer. The following Spring Boot example overrides the default factories:

@Bean
public ConsumerFactory<String, Thing> kafkaConsumerFactory(JsonDeserializer customValueDeserializer) {
    Map<String, Object> properties = new HashMap<>();
    // properties.put(..., ...)
    // ...
    return new DefaultKafkaConsumerFactory<>(properties,
        new StringDeserializer(), customValueDeserializer);
}

@Bean
public ProducerFactory<String, Thing> kafkaProducerFactory(JsonSerializer customValueSerializer) {
    return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(),
        new StringSerializer(), customValueSerializer);
}

还提供了 setter,作为使用这些构造函数的替代方法。

Setters are also provided, as an alternative to using these constructors.

从版本 2.2 开始,您可以通过使用带有布尔型 useHeadersIfPresent 参数(默认情况下为 true)的重载构造函数之一,明确配置反序列化器以使用提供的目标类型并忽略头信息中的类型信息。以下示例演示了如何执行此操作:

Starting with version 2.2, you can explicitly configure the deserializer to use the supplied target type and ignore type information in headers by using one of the overloaded constructors that have a boolean useHeadersIfPresent argument (which is true by default). The following example shows how to do so:

DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
        new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));

Using Methods to Determine Types

从2.5版本开始,您现在可以通过属性配置反序列化程序,调用方法来确定目标类型。如果存在,这将覆盖上述任何其他技术。如果数据是由不使用Spring序列化程序的应用程序发布的,并且您需要根据数据或其他标头反序列化为不同的类型,这可能会很有用。将这些属性设置为方法名——一个完全限定的类名,后跟方法名,由句点`.分隔。该方法必须声明为`public static,具有三个签名之一`(String topic, byte[] data, Headers headers)`(byte[] data, Headers headers)`或(byte[] data),并返回Jackson `JavaType

Starting with version 2.5, you can now configure the deserializer, via properties, to invoke a method to determine the target type. If present, this will override any of the other techniques discussed above. This can be useful if the data is published by an application that does not use the Spring serializer and you need to deserialize to different types depending on the data, or other headers. Set these properties to the method name - a fully qualified class name followed by the method name, separated by a period .. The method must be declared as public static, have one of three signatures (String topic, byte[] data, Headers headers), (byte[] data, Headers headers) or (byte[] data) and return a Jackson JavaType.

  • JsonDeserializer.KEY_TYPE_METHOD : spring.json.key.type.method

  • JsonDeserializer.VALUE_TYPE_METHOD : spring.json.value.type.method

您可以使用任意的标题或检查数据来确定类型。

You can use arbitrary headers or inspect the data to determine the type.

JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);

JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);

public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
    // {"thisIsAFieldInThing1":"value", ...
    if (data[21] == '1') {
        return thing1Type;
    }
    else {
        return thing2Type;
    }
}

对于更复杂的 data 检查,请考虑使用 JsonPath 或类似工具,但是,确定类型的测试越简单,该过程的效率就越高。

For more sophisticated data inspection consider using JsonPath or similar but, the simpler the test to determine the type, the more efficient the process will be.

以下是在以编程方式创建反序列化器(在构造函数中向消费者工厂提供反序列化器时)的一个示例:

The following is an example of creating the deserializer programmatically (when providing the consumer factory with the deserializer in the constructor):

JsonDeserializer<Object> deser = new JsonDeserializer<>()
        .trustedPackages("*")
        .typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);

...

public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
    ...
}

Programmatic Construction

从 2.3 版开始,在生产者/消费者工厂中以编程方式构造序列化器/反序列化器时,可以使用流畅的 API,这简化了配置。

When constructing the serializer/deserializer programmatically for use in the producer/consumer factory, since version 2.3, you can use the fluent API, which simplifies configuration.

@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
    Map<String, Object> props = new HashMap<>();
    // props.put(..., ...)
    // ...
    DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
        new JsonSerializer<MyKeyType>()
            .forKeys()
            .noTypeInfo(),
        new JsonSerializer<MyValueType>()
            .noTypeInfo());
    return pf;
}

@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
    Map<String, Object> props = new HashMap<>();
    // props.put(..., ...)
    // ...
    DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
        new JsonDeserializer<>(MyKeyType.class)
            .forKeys()
            .ignoreTypeHeaders(),
        new JsonDeserializer<>(MyValueType.class)
            .ignoreTypeHeaders());
    return cf;
}

要以类似于“Using Methods to Determine Types”的方式提供类型映射,请使用“typeFunction”属性。

To provide type mapping programmatically, similar to Using Methods to Determine Types, use the typeFunction property.

JsonDeserializer<Object> deser = new JsonDeserializer<>()
        .trustedPackages("*")
        .typeFunction(MyUtils::thingOneOrThingTwo);

或者,只要您不使用流畅 API 配置属性,或者不使用“set*()”方法设置属性,工厂就会使用配置属性配置序列化程序/反序列化程序;请参阅“Configuration Properties”。

Alternatively, as long as you don’t use the fluent API to configure properties, or set them using set*() methods, the factories will configure the serializer/deserializer using the configuration properties; see Configuration Properties.

Delegating Serializer and Deserializer

Using Headers

2.3 版引入了 DelegatingSerializerDelegatingDeserializer,允许生成和使用具有不同键和/或值类型的记录。生产者必须将标题 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR 设置为用于选择用于该值序列化器的选择器值,将 DelegatingSerializer.KEY_SERIALIZATION_SELECTOR 设置为用于键的选择器值;如果找不到匹配项,则会引发 IllegalStateException

Version 2.3 introduced the DelegatingSerializer and DelegatingDeserializer, which allow producing and consuming records with different key and/or value types. Producers must set a header DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR to a selector value that is used to select which serializer to use for the value and DelegatingSerializer.KEY_SERIALIZATION_SELECTOR for the key; if a match is not found, an IllegalStateException is thrown.

对于传入的记录,反序列化程序使用相同的标头来选择要使用的反序列化程序;如果没有找到匹配项或不存在标头,则返回原始`byte[]`。

For incoming records, the deserializer uses the same headers to select the deserializer to use; if a match is not found or the header is not present, the raw byte[] is returned.

您可以通过构造函数配置选择器到 Serializer / Deserializer 的映射,或者可以通过带有键 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIGDelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG 的 Kafka 生产者/消费者属性进行配置。对于序列化程序,生产者属性可以是一个 Map<String, Object>,其中键是选择器,值是 Serializer 实例、序列化程序 Class 或类名。该属性还可以是逗号分隔的地图项字符串,如下所示。

You can configure the map of selector to Serializer / Deserializer via a constructor, or you can configure it via Kafka producer/consumer properties with the keys DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG and DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG. For the serializer, the producer property can be a Map<String, Object> where the key is the selector and the value is a Serializer instance, a serializer Class or the class name. The property can also be a String of comma-delimited map entries, as shown below.

对于反序列化程序,消费者属性可以是一个 Map<String, Object>,其中键是选择器,值是 Deserializer 实例、反序列化程序 Class 或类名。该属性还可以是逗号分隔的地图项字符串,如下所示。

For the deserializer, the consumer property can be a Map<String, Object> where the key is the selector and the value is a Deserializer instance, a deserializer Class or the class name. The property can also be a String of comma-delimited map entries, as shown below.

要使用属性进行配置,请使用以下语法:

To configure using properties, use the following syntax:

producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")

consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")

然后,生产者会将 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR 标头设置为 thing1thing2

Producers would then set the DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR header to thing1 or thing2.

此技术支持将不同类型发送到同一主题(或不同主题)。

This technique supports sending different types to the same topic (or different topics).

从版本 2.5.1 开始,如果类型(键或值)是 Serdes (LongInteger 等)支持的标准类型之一,则无需设置选择器标头。相反,序列化器将把标头设置为类型类名。无需针对这些类型配置序列化器或反序列化器,它们将(一次性)动态创建。

Starting with version 2.5.1, it is not necessary to set the selector header, if the type (key or value) is one of the standard types supported by Serdes (Long, Integer, etc). Instead, the serializer will set the header to the class name of the type. It is not necessary to configure serializers or deserializers for these types, they will be created (once) dynamically.

有关将不同类型发送到不同主题的另一种技术,请参见 xref:kafka/sending-messages.adoc#routing-template[Using RoutingKafkaTemplate

For another technique to send different types to different topics, see Using RoutingKafkaTemplate.

By Type

2.8 版引入了 DelegatingByTypeSerializer

Version 2.8 introduced the DelegatingByTypeSerializer.

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            null, new DelegatingByTypeSerializer(Map.of(
                    byte[].class, new ByteArraySerializer(),
                    Bytes.class, new BytesSerializer(),
                    String.class, new StringSerializer())));
}

从版本 2.8.3 开始,您可以配置序列化程序来检查地图键是否可从目标对象赋值,这在委托序列化程序能够序列化子类时非常有用。在这种情况下,如果存在不明确的匹配项,则应提供一个有序 Map,例如 LinkedHashMap

Starting with version 2.8.3, you can configure the serializer to check if the map key is assignable from the target object, useful when a delegate serializer can serialize sub classes. In this case, if there are amiguous matches, an ordered Map, such as a LinkedHashMap should be provided.

By Topic

从版本 2.8 开始,DelegatingByTopicSerializerDelegatingByTopicDeserializer 允许基于主题名称选择序列化程序/反序列化程序。Regex Pattern 用于查找要使用的实例。该映射可以使用构造函数配置,也可以通过属性来配置(以逗号分隔的 pattern:serializer 列表)。

Starting with version 2.8, the DelegatingByTopicSerializer and DelegatingByTopicDeserializer allow selection of a serializer/deserializer based on the topic name. Regex Pattern`s are used to lookup the instance to use. The map can be configured using a constructor, or via properties (a comma delimited list of `pattern:serializer).

producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
            "topic[0-4]:" + ByteArraySerializer.class.getName()
        + ", topic[5-9]:" + StringSerializer.class.getName());
...
ConsumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
            "topic[0-4]:" + ByteArrayDeserializer.class.getName()
        + ", topic[5-9]:" + StringDeserializer.class.getName());

将此用于键时,请使用 KEY_SERIALIZATION_TOPIC_CONFIG

Use KEY_SERIALIZATION_TOPIC_CONFIG when using this for keys.

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            new IntegerSerializer(),
            new DelegatingByTopicSerializer(Map.of(
                    Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
                    Pattern.compile("topic[5-9]"), new StringSerializer())),
                    new JsonSerializer<Object>());  // default
}

您可以使用 DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULTDelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT 指定在没有模式匹配时使用的默认序列化程序/反序列化程序。

You can specify a default serializer/deserializer to use when there is no pattern match using DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULT and DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT.

当设置为 false 时,一个附加属性 DelegatingByTopicSerialization.CASE_SENSITIVE(默认 true)会使主题查找不区分大小写。

An additional property DelegatingByTopicSerialization.CASE_SENSITIVE (default true), when set to false makes the topic lookup case insensitive.

Retrying Deserializer

RetryingDeserializer 使用委托 DeserializerRetryTemplate 在委托在反序列化期间可能出现瞬态错误(例如网络问题)时重试反序列化。

The RetryingDeserializer uses a delegate Deserializer and RetryTemplate to retry deserialization when the delegate might have transient errors, such as network issues, during deserialization.

ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
    new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
    new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));

从版本 3.1.2 开始,RecoveryCallback 可以选择性地在 RetryingDeserializer 上设置。

Starting with version 3.1.2, a RecoveryCallback can be set on the RetryingDeserializer optionally.

有关配置具有重试策略、回退策略等功能的“RetryTemplate”,请参阅“ spring-retry”项目。

Refer to the spring-retry project for configuration of the RetryTemplate with a retry policy, back off policy, etc.

Spring Messaging Message Conversion

虽然从底层 Kafka“Consumer”和“Producer”角度来看,“Serializer”和“Deserializer”API 非常简单而灵活,但当使用“@KafkaListener”或“ Spring Integration’s Apache Kafka Support”时,您可能需要 Spring Messaging 层面的更大灵活性。为了让您轻松地转换到或从“org.springframework.messaging.Message”,Spring for Apache Kafka 提供了“MessageConverter”抽象,带有“MessagingMessageConverter”实现及其“JsonMessageConverter”(及子类)自定义项。您可以将“MessageConverter”直接注入“KafkaTemplate”实例,并使用“AbstractKafkaListenerContainerFactory”bean 定义为“@KafkaListener.containerFactory()”属性。以下示例显示了如何执行此操作:

Although the Serializer and Deserializer API is quite simple and flexible from the low-level Kafka Consumer and Producer perspective, you might need more flexibility at the Spring Messaging level, when using either @KafkaListener or Spring Integration’s Apache Kafka Support. To let you easily convert to and from org.springframework.messaging.Message, Spring for Apache Kafka provides a MessageConverter abstraction with the MessagingMessageConverter implementation and its JsonMessageConverter (and subclasses) customization. You can inject the MessageConverter into a KafkaTemplate instance directly and by using AbstractKafkaListenerContainerFactory bean definition for the @KafkaListener.containerFactory() property. The following example shows how to do so:

@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordMessageConverter(new JsonMessageConverter());
    return factory;
}
...
@KafkaListener(topics = "jsonData",
                containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}

使用 Spring Boot 时,只需将转换器定义为 @Bean,Spring Boot 自动配置就会将其连接到自动配置的模板和容器工厂中。

When using Spring Boot, simply define the converter as a @Bean and Spring Boot auto configuration will wire it into the auto-configured template and container factory.

当你使用 @KafkaListener 时,参数类型会提供给消息转换器以协助转换。

When you use a @KafkaListener, the parameter type is provided to the message converter to assist with the conversion.

只当在方法级别声明 @KafkaListener 注释时,才能实现这种类型推断。对于类级别的 @KafkaListener,有效载荷类型用于选择要调用的 @KafkaHandler 方法,所以它必须在选择方法之前已经转换。

This type inference can be achieved only when the @KafkaListener annotation is declared at the method level. With a class-level @KafkaListener, the payload type is used to select which @KafkaHandler method to invoke, so it must already have been converted before the method can be chosen.

在消费者方面,您可以配置`JsonMessageConverter`,它可以处理类型为`byte[]Bytes`和`String`的`ConsumerRecord`值,因此应与`ByteArrayDeserializerBytesDeserializer`或`StringDeserializer`结合使用。(`byte[]`和`Bytes`更有效,因为它们避免了不必要的`byte[]`到`String`转换)。您还可以配置与反序列化程序相对应的`JsonMessageConverter`的特定子类,如果您愿意的话。

On the consumer side, you can configure a JsonMessageConverter; it can handle ConsumerRecord values of type byte[], Bytes and String so should be used in conjunction with a ByteArrayDeserializer, BytesDeserializer or StringDeserializer. (byte[] and Bytes are more efficient because they avoid an unnecessary byte[] to String conversion). You can also configure the specific subclass of JsonMessageConverter corresponding to the deserializer, if you so wish.

在生产者端,当您使用 Spring Integration 或 KafkaTemplate.send(Message<?> message) 方法时(详见 xref:kafka/sending-messages.adoc#kafka-template[Using KafkaTemplate),您必须配置一个与已配置 Kafka Serializer 兼容的消息转换器。

On the producer side, when you use Spring Integration or the KafkaTemplate.send(Message<?> message) method (see Using KafkaTemplate), you must configure a message converter that is compatible with the configured Kafka Serializer.

  • StringJsonMessageConverter with StringSerializer

  • BytesJsonMessageConverter with BytesSerializer

  • ByteArrayJsonMessageConverter with ByteArraySerializer

同样,使用`byte[]`或`Bytes`更有效,因为它们避免了`String`到`byte[]`的转换。

Again, using byte[] or Bytes is more efficient because they avoid a String to byte[] conversion.

为方便起见,从版本 2.3 开始,框架还提供了一个 StringOrBytesSerializer,它可以对所有三种值类型进行序列化,因此它可以与任何消息转换器一起使用。

For convenience, starting with version 2.3, the framework also provides a StringOrBytesSerializer which can serialize all three value types so it can be used with any of the message converters.

从版本 2.7.1 开始,消息有效载荷转换可以委托给 spring-messaging SmartMessageConverter;例如,这使得转换可以基于 MessageHeaders.CONTENT_TYPE 头进行。

Starting with version 2.7.1, message payload conversion can be delegated to a spring-messaging SmartMessageConverter; this enables conversion, for example, to be based on the MessageHeaders.CONTENT_TYPE header.

KafkaMessageConverter.fromMessage() 方法用于将消息有效载荷转换为 ProducerRecord 的出站转换,其中 ProducerRecord.value() 属性包含消息有效载荷。KafkaMessageConverter.toMessage() 方法用于将负载为 ConsumerRecord 的入站消息转换为 ConsumerRecord.value() 属性。SmartMessageConverter.toMessage() 方法用于使用传递给 fromMessage()Message(通常由 KafkaTemplate.send(Message<?> msg) 传递)创建一个新的出站 Message<?>。同样,在 KafkaMessageConverter.toMessage() 方法中,在转换器从 ConsumerRecord 创建了一个新的 Message<?> 之后,调用 SmartMessageConverter.fromMessage() 方法,然后使用新转换的有效载荷创建最终入站消息。在任何一种情况下,如果 SmartMessageConverter 返回 null,则使用原始消息。

The KafkaMessageConverter.fromMessage() method is called for outbound conversion to a ProducerRecord with the message payload in the ProducerRecord.value() property. The KafkaMessageConverter.toMessage() method is called for inbound conversion from ConsumerRecord with the payload being the ConsumerRecord.value() property. The SmartMessageConverter.toMessage() method is called to create a new outbound Message<?> from the Message passed to fromMessage() (usually by KafkaTemplate.send(Message<?> msg)). Similarly, in the KafkaMessageConverter.toMessage() method, after the converter has created a new Message<?> from the ConsumerRecord, the SmartMessageConverter.fromMessage() method is called and then the final inbound message is created with the newly converted payload. In either case, if the SmartMessageConverter returns null, the original message is used.

当在 KafkaTemplate 和监听器容器工厂中使用默认转换器时,你可以通过在模板上调用 setMessagingConverter() 以及在 @KafkaListener 方法上使用 contentMessageConverter 属性来配置 SmartMessageConverter

When the default converter is used in the KafkaTemplate and listener container factory, you configure the SmartMessageConverter by calling setMessagingConverter() on the template and via the contentMessageConverter property on @KafkaListener methods.

示例:

Examples:

template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
    contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
    ...
}

Using Spring Data Projection Interfaces

从版本 2.1.1 开始,你可以将 JSON 转换成 Spring Data Projection 接口,而不是具体类型。这允许非常有选择性地将数据进行低耦合绑定,包括查找 JSON 文档内部多个地方的值。例如,可以将以下接口定义为消息有效载荷类型:

Starting with version 2.1.1, you can convert JSON to a Spring Data Projection interface instead of a concrete type. This allows very selective, and low-coupled bindings to data, including the lookup of values from multiple places inside the JSON document. For example the following interface can be defined as message payload type:

interface SomeSample {

  @JsonPath({ "$.username", "$.user.name" })
  String getUsername();

}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
    String username = in.getUsername();
    ...
}

访问器方法将默认使用字段在接收到的 JSON 文档中查找属性名称。@JsonPath 表达式允许自定义值查找,甚至定义多个 JSON 路径表达式,以便从多个地方查找值,直到表达式返回实际值。

Accessor methods will be used to lookup the property name as field in the received JSON document by default. The @JsonPath expression allows customization of the value lookup, and even to define multiple JSON Path expressions, to look up values from multiple places until an expression returns an actual value.

要启用此功能,请使用配置有适当委托转换器(用于出站转换和转换非投影接口)的 ProjectingMessageConverter。你还必须将 spring-data:spring-data-commonscom.jayway.jsonpath:json-path 添加到类路径中。

To enable this feature, use a ProjectingMessageConverter configured with an appropriate delegate converter (used for outbound conversion and converting non-projection interfaces). You must also add spring-data:spring-data-commons and com.jayway.jsonpath:json-path to the classpath.

当用作 @KafkaListener 方法的参数时,接口类型会像往常一样自动传给转换器。

When used as the parameter to a @KafkaListener method, the interface type is automatically passed to the converter as normal.

Using ErrorHandlingDeserializer

当反序列化器未能够反序列化消息时,Spring 没有办法处理这个问题,因为它发生在 poll() 返回之前。为了解决这个问题,引入了 ErrorHandlingDeserializer。此反序列化器委托给真正的反序列化器(键或值)。如果委托未能够反序列化记录内容,则 ErrorHandlingDeserializer 会返回一个 null 值和一个包含原因和原始字节的头部中的 DeserializationException。当你使用记录级别的 MessageListener 时,如果 ConsumerRecord 包含键或值任一者的 DeserializationException 头部,容器的 ErrorHandler 会使用失败的 ConsumerRecord 调用。记录不会传递给监听器。

When a deserializer fails to deserialize a message, Spring has no way to handle the problem, because it occurs before the poll() returns. To solve this problem, the ErrorHandlingDeserializer has been introduced. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer returns a null value and a DeserializationException in a header that contains the cause and the raw bytes. When you use a record-level MessageListener, if the ConsumerRecord contains a DeserializationException header for either the key or value, the container’s ErrorHandler is called with the failed ConsumerRecord. The record is not passed to the listener.

或者,您可以配置“ErrorHandlingDeserializer”以使用“failedDeserializationFunction”创建自定义值,该值是“Function<FailedDeserializationInfo, T>”。调用此函数可创建“T”的实例,该实例以通常方式传递给侦听器。系统会向函数提供包含所有上下文信息的对象,类型为“FailedDeserializationInfo”。您可以在标头中找到“DeserializationException”(作为序列化的 Java 对象)。有关详情,请参阅“ErrorHandlingDeserializer”的“ Javadoc”。

Alternatively, you can configure the ErrorHandlingDeserializer to create a custom value by providing a failedDeserializationFunction, which is a Function<FailedDeserializationInfo, T>. This function is invoked to create an instance of T, which is passed to the listener in the usual fashion. An object of type FailedDeserializationInfo, which contains all the contextual information is provided to the function. You can find the DeserializationException (as a serialized Java object) in headers. See the Javadoc for the ErrorHandlingDeserializer for more information.

你可以使用 DefaultKafkaConsumerFactory 构造器,该构造器采用键和值 Deserializer 对象,并连接上使用适当委托配置的合适的 ErrorHandlingDeserializer 实例。或者,你可以使用消费者配置属性(由 ErrorHandlingDeserializer 使用)来实例化委托。属性名称是 ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASSErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS。属性值可以是类或类名。以下示例展示了如何设置这些属性:

You can use the DefaultKafkaConsumerFactory constructor that takes key and value Deserializer objects and wire in appropriate ErrorHandlingDeserializer instances that you have configured with the proper delegates. Alternatively, you can use consumer configuration properties (which are used by the ErrorHandlingDeserializer) to instantiate the delegates. The property names are ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS and ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS. The property value can be a class or class name. The following example shows how to set these properties:

... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);

以下示例使用了 failedDeserializationFunction

The following example uses a failedDeserializationFunction.

public class BadThing extends Thing {

  private final FailedDeserializationInfo failedDeserializationInfo;

  public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
    this.failedDeserializationInfo = failedDeserializationInfo;
  }

  public FailedDeserializationInfo getFailedDeserializationInfo() {
    return this.failedDeserializationInfo;
  }

}

public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {

  @Override
  public Thing apply(FailedDeserializationInfo info) {
    return new BadThing(info);
  }

}

前面的示例使用了以下配置:

The preceding example uses the following configuration:

...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...

如果消费者通过 ErrorHandlingDeserializer 配置,则务必使用能够处理常规对象以及反序列化异常产生的原始 byte[] 值的序列化器来配置 KafkaTemplate 及其生成器。模板的泛型值类型应为 Object。一种方法是使用 DelegatingByTypeSerializer;示例如下:

If the consumer is configured with an ErrorHandlingDeserializer, it is important to configure the KafkaTemplate and its producer with a serializer that can handle normal objects as well as raw byte[] values, which result from deserialization exceptions. The generic value type of the template should be Object. One technique is to use the DelegatingByTypeSerializer; an example follows:

@Bean
public ProducerFactory<String, Object> producerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
    new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
          MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
  return new KafkaTemplate<>(producerFactory());
}

当使用具有批处理侦听器的 ErrorHandlingDeserializer 时,你必须检查消息头中的反序列化异常。当与 DefaultBatchErrorHandler 配合使用时,你可以使用该头来确定异常失败的记录,并通过 BatchListenerFailedException 传达给错误处理程序。

When using an ErrorHandlingDeserializer with a batch listener, you must check for the deserialization exceptions in message headers. When used with a DefaultBatchErrorHandler, you can use that header to determine which record the exception failed on and communicate to the error handler via a BatchListenerFailedException.

@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
    for (int i = 0; i < in.size(); i++) {
        Thing thing = in.get(i);
        if (thing == null
                && headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
            try {
                DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
                        headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
                if (deserEx != null) {
                    logger.error(deserEx, "Record at index " + i + " could not be deserialized");
                }
            }
            catch (Exception ex) {
                logger.error(ex, "Record at index " + i + " could not be deserialized");
            }
            throw new BatchListenerFailedException("Deserialization", deserEx, i);
        }
        process(thing);
    }
}

SerializationUtils.byteArrayToDeserializationException() 可用于将头转换为 DeserializationException

SerializationUtils.byteArrayToDeserializationException() can be used to convert the header to a DeserializationException.

当使用 List<ConsumerRecord<?, ?> 时,使用 SerializationUtils.getExceptionFromHeader() 替代:

When consuming List<ConsumerRecord<?, ?>, SerializationUtils.getExceptionFromHeader() is used instead:

@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
    for (int i = 0; i < in.size(); i++) {
        ConsumerRecord<String, Thing> rec = in.get(i);
        if (rec.value() == null) {
            DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
                    SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
            if (deserEx != null) {
                logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
                throw new BatchListenerFailedException("Deserialization", deserEx, i);
            }
        }
        process(rec.value());
    }
}

如果你也正在使用 DeadLetterPublishingRecoverer,则为 DeserializationException 发布的记录将具有类型为 byte[]record.value();此类型不应被序列化。考虑使用配置为对 byte[] 使用 ByteArraySerializer 以及对所有其他类型使用普通序列化器(Json、Avro 等)的 DelegatingByTypeSerializer

If you are also using a DeadLetterPublishingRecoverer, the record published for a DeserializationException will have a record.value() of type byte[]; this should not be serialized. Consider using a DelegatingByTypeSerializer configured to use a ByteArraySerializer for byte[] and the normal serializer (Json, Avro, etc) for all other types.

从 3.1 版本开始,你可以在 ErrorHandlingDeserializer 中添加一个 Validator。如果委托 Deserializer 成功反序列化对象,但该对象验证失败,则会抛出一个类似于发生反序列化异常的异常。这允许将原始原始数据传递给错误处理程序。在创建反序列化器时,只需调用 setValidator;如果你使用属性配置序列化器,请将消费者配置属性 ErrorHandlingDeserializer.VALIDATOR_CLASS 设置为 Validator 的类或完全限定类名。在使用 Spring Boot 时,此属性名称为 spring.kafka.consumer.properties.spring.deserializer.validator.class

Starting with version 3.1, you can add a Validator to the ErrorHandlingDeserializer. If the delegate Deserializer successfully deserializes the object, but that object fails validation, an exception is thrown similar to a deserialization exception occurring. This allows the original raw data to be passed to the error handler. WHen creating the deserializer yourself, simply call setValidator; if you configure the serializer using properties, set the consumer configuration property ErrorHandlingDeserializer.VALIDATOR_CLASS to the class or fully qualified class name for your Validator. When using Spring Boot, this property name is spring.kafka.consumer.properties.spring.deserializer.validator.class.

Payload Conversion with Batch Listeners

当您使用批处理侦听器容器工厂时,您还可以在“BatchMessagingMessageConverter”中使用“JsonMessageConverter”来转换批处理消息。有关详情,请参阅“Serialization, Deserialization, and Message Conversion”和“Spring Messaging Message Conversion”。

You can also use a JsonMessageConverter within a BatchMessagingMessageConverter to convert batch messages when you use a batch listener container factory. See Serialization, Deserialization, and Message Conversion and Spring Messaging Message Conversion for more information.

默认情况下,转换的类型从侦听器参数中推断出来。如果您使用其`TypePrecedence`设置为`TYPE_ID`(而不是默认的`INFERRED`)的`DefaultJackson2TypeMapper`,配置`JsonMessageConverter`,则转换器代わりに使用标头中的类型信息(如果存在)。例如,这允许将侦听器方法用接口而不是具体类声明。此外,类型转换器支持映射,因此可以将反序列化为与源不同的类型(只要数据兼容)。当您使用class-level @KafkaListener instances时,这也很有用,在这种情况下,有效负载必须已经转换为用于确定要调用的方法。以下示例创建使用此方法的bean:

By default, the type for the conversion is inferred from the listener argument. If you configure the JsonMessageConverter with a DefaultJackson2TypeMapper that has its TypePrecedence set to TYPE_ID (instead of the default INFERRED), the converter uses the type information in headers (if present) instead. This allows, for example, listener methods to be declared with interfaces instead of concrete classes. Also, the type converter supports mapping, so the deserialization can be to a different type than the source (as long as the data is compatible). This is also useful when you use class-level @KafkaListener instances where the payload must have already been converted to determine which method to invoke. The following example creates beans that use this method:

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
    return factory;
}

@Bean
public JsonMessageConverter converter() {
    return new JsonMessageConverter();
}

请注意,为了执行此操作,转换目标的方法签名必须是带有单个泛型参数类型的容器对象,如下所示:

Note that, for this to work, the method signature for the conversion target must be a container object with a single generic parameter type, such as the following:

@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

请注意,你仍然可以访问批处理头。

Note that you can still access the batch headers.

如果批处理转换器具有支持它的记录转换器,你还可以接收有效负载根据泛型类型转换的消息列表。以下示例展示了如何执行此操作:

If the batch converter has a record converter that supports it, you can also receive a list of messages where the payloads are converted according to the generic type. The following example shows how to do so:

@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
    ...
}

ConversionService Customization

从 2.1.1 版本开始,默认的 org.springframework.core.convert.ConversionService(由 org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory 用于解析侦听器方法调用的参数)提供给实现了以下任何接口的所有 bean:

Starting with version 2.1.1, the org.springframework.core.convert.ConversionService used by the default org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory to resolve parameters for the invocation of a listener method is supplied with all beans that implement any of the following interfaces:

  • org.springframework.core.convert.converter.Converter

  • org.springframework.core.convert.converter.GenericConverter

  • org.springframework.format.Formatter

这允许你在不更改 ConsumerFactoryKafkaListenerContainerFactory 的默认配置的情况下进一步自定义侦听器反序列化。

This lets you further customize listener deserialization without changing the default configuration for ConsumerFactory and KafkaListenerContainerFactory.

通过 KafkaListenerConfigurer Bean 在 KafkaListenerEndpointRegistrar 上设置自定义 MessageHandlerMethodFactory 将禁用此特性。

Setting a custom MessageHandlerMethodFactory on the KafkaListenerEndpointRegistrar through a KafkaListenerConfigurer bean disables this feature.

Adding Custom HandlerMethodArgumentResolver to @KafkaListener

从 2.4.2 版本开始,你可以添加自己的 HandlerMethodArgumentResolver 和解析自定义方法参数。你需要做的就是实现 KafkaListenerConfigurer 并从 KafkaListenerEndpointRegistrar 类使用 setCustomMethodArgumentResolvers() 方法。

Starting with version 2.4.2 you are able to add your own HandlerMethodArgumentResolver and resolve custom method parameters. All you need is to implement KafkaListenerConfigurer and use method setCustomMethodArgumentResolvers() from class KafkaListenerEndpointRegistrar.

@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        registrar.setCustomMethodArgumentResolvers(
            new HandlerMethodArgumentResolver() {

                @Override
                public boolean supportsParameter(MethodParameter parameter) {
                    return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
                }

                @Override
                public Object resolveArgument(MethodParameter parameter, Message<?> message) {
                    return new CustomMethodArgument(
                        message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
                    );
                }
            }
        );
    }

}

你还可以通过向 KafkaListenerEndpointRegistrar bean 添加自定义 MessageHandlerMethodFactory 完全替换框架的参数解析。如果你这样做,并且你的应用程序需要处理带有 null value()(例如,来自压缩主题)的墓碑记录,则应该向工厂中添加 KafkaNullAwarePayloadArgumentResolver;它必须是最后一个解析器,因为它支持所有类型并且可以匹配没有 @Payload 注解的参数。如果你正在使用 DefaultMessageHandlerMethodFactory,请将此解析器设置为最后一个自定义解析器;工厂将确保在标准 PayloadMethodArgumentResolver 之前使用此解析器,后者不了解 KafkaNull 有效负载。

You can also completely replace the framework’s argument resolution by adding a custom MessageHandlerMethodFactory to the KafkaListenerEndpointRegistrar bean. If you do this, and your application needs to handle tombstone records, with a null value() (e.g. from a compacted topic), you should add a KafkaNullAwarePayloadArgumentResolver to the factory; it must be the last resolver because it supports all types and can match arguments without a @Payload annotation. If you are using a DefaultMessageHandlerMethodFactory, set this resolver as the last custom resolver; the factory will ensure that this resolver will be used before the standard PayloadMethodArgumentResolver, which has no knowledge of KafkaNull payloads.