Message Headers

0.11.0.0 客户端引入了对消息中标头的支持。从 2.0 版开始,Spring for Apache Kafka 现在支持将这些标头映射到和映射出 spring-messaging MessageHeaders

The 0.11.0.0 client introduced support for headers in messages. As of version 2.0, Spring for Apache Kafka now supports mapping these headers to and from spring-messaging MessageHeaders.

以前的版本将 ConsumerRecordProducerRecord 映射到 spring-messaging Message<?>,其中 value 属性映射到 payload 及其他属性(topicpartition 等等)映射到标头。这仍然是这种情况,但现在可以映射其他(任意的)标头。

Previous versions mapped ConsumerRecord and ProducerRecord to spring-messaging Message<?>, where the value property is mapped to and from the payload and other properties (topic, partition, and so on) were mapped to headers. This is still the case, but additional (arbitrary) headers can now be mapped.

Apache Kafka 标头有一个简单的 API,如下面的接口定义所示:

Apache Kafka headers have a simple API, shown in the following interface definition:

public interface Header {

    String key();

    byte[] value();

}

KafkaHeaderMapper 策略用于在 Kafka HeadersMessageHeaders 之间映射标头条目。接口定义如下:

The KafkaHeaderMapper strategy is provided to map header entries between Kafka Headers and MessageHeaders. Its interface definition is as follows:

public interface KafkaHeaderMapper {

    void fromHeaders(MessageHeaders headers, Headers target);

    void toHeaders(Headers source, Map<String, Object> target);

}

“@ [10]”将原始标头映射为“@ [11]”,并提供用于转换为“@ [12]”值的配置选项。

The SimpleKafkaHeaderMapper maps raw headers as byte[], with configuration options for conversion to String values.

DefaultKafkaHeaderMapper 将键映射到 MessageHeaders 标头名称,并且为了支持出站消息的丰富标头类型,执行 JSON 转换。“special”标头(键为 spring_json_header_types)包含一个 <key>:<type> JSON 映射。在入站侧,这个标头用于适当地将每个标头的值转换为其原始类型。

The DefaultKafkaHeaderMapper maps the key to the MessageHeaders header name and, in order to support rich header types for outbound messages, JSON conversion is performed. A “special” header (with a key of spring_json_header_types) contains a JSON map of <key>:<type>. This header is used on the inbound side to provide appropriate conversion of each header value to the original type.

在入站侧,所有 Kafka Header 实例都映射到 MessageHeaders。在出站侧,默认情况下,所有 MessageHeaders 都被映射,除了 idtimestamp 以及映射到 ConsumerRecord 属性的标头。

On the inbound side, all Kafka Header instances are mapped to MessageHeaders. On the outbound side, by default, all MessageHeaders are mapped, except id, timestamp, and the headers that map to ConsumerRecord properties.

你可以通过向映射器提供模式来指定哪些标头要用于出站消息的映射。以下清单显示了一些示例映射:

You can specify which headers are to be mapped for outbound messages, by providing patterns to the mapper. The following listing shows a number of example mappings:

public DefaultKafkaHeaderMapper() { 1
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { 2
    ...
}

public DefaultKafkaHeaderMapper(String... patterns) { 3
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { 4
    ...
}
1 Uses a default Jackson ObjectMapper and maps most headers, as discussed before the example.
2 Uses the provided Jackson ObjectMapper and maps most headers, as discussed before the example.
3 Uses a default Jackson ObjectMapper and maps headers according to the provided patterns.
4 Uses the provided Jackson ObjectMapper and maps headers according to the provided patterns.

模式相当简单,可以包含前导通配符()、后跟通配符或两者(例如 .cat.*)。你可以使用前导 ! 来否定模式。第一个匹配标头名称的模式(无论是正向还是否定向)都会获胜。

Patterns are rather simple and can contain a leading wildcard (), a trailing wildcard, or both (for example, .cat.*). You can negate patterns with a leading !. The first pattern that matches a header name (whether positive or negative) wins.

如果您提供自己的模式,我们建议包含 !id!timestamp,因为这些标题在入站方向上是只读的。

When you provide your own patterns, we recommend including !id and !timestamp, since these headers are read-only on the inbound side.

默认情况下,映射器仅反序列化 java.langjava.util 中的类。您可以通过使用 addTrustedPackages 方法添加受信任的包来信任其他(或所有)包。如果您收到来自不受信任来源的消息,您可能希望仅添加您信任的包。要信任所有包,您可以使用 mapper.addTrustedPackages("*")

By default, the mapper deserializes only classes in java.lang and java.util. You can trust other (or all) packages by adding trusted packages with the addTrustedPackages method. If you receive messages from untrusted sources, you may wish to add only those packages you trust. To trust all packages, you can use mapper.addTrustedPackages("*").

在与不知道映射器 JSON 格式的系统通信时,以原始形式映射 String 标头值非常有用。

Mapping String header values in a raw form is useful when communicating with systems that are not aware of the mapper’s JSON format.

从版本 2.2.5 开始,你可以指定某些字符串值标头不应使用 JSON 映射,而应使用原始“@ [13]”映射。 “@ [14]”具有新属性;当“@ [15]”设置为 true 时,所有字符串值标头都将使用“@ [17]”属性(默认“@ [18]”)转换为“@ [16]”。此外,还有一个属性“@ [19]”,它是“@ [20]”的映射;如果映射包含标头名称且该标头包含“@ [21]”值,则将使用字符集将它映射为原始“@ [22]”。此映射还用于在映射值中的布尔值是“@ [25]”时和仅当时,使用字符集将原始传入“@ [23]”标头映射到“@ [24]”。如果布尔值为“@ [26]”,或标头名称不在具有“@ [27]”值的映射中,则传入标头只是映射为原始未映射标头。

Starting with version 2.2.5, you can specify that certain string-valued headers should not be mapped using JSON, but to/from a raw byte[]. The AbstractKafkaHeaderMapper has new properties; mapAllStringsOut when set to true, all string-valued headers will be converted to byte[] using the charset property (default UTF-8). In addition, there is a property rawMappedHeaders, which is a map of header name : boolean; if the map contains a header name, and the header contains a String value, it will be mapped as a raw byte[] using the charset. This map is also used to map raw incoming byte[] headers to String using the charset if, and only if, the boolean in the map value is true. If the boolean is false, or the header name is not in the map with a true value, the incoming header is simply mapped as the raw unmapped header.

以下测试用例说明了此机制。

The following test case illustrates this mechanism.

@Test
public void testSpecificStringConvert() {
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    Map<String, Boolean> rawMappedHeaders = new HashMap<>();
    rawMappedHeaders.put("thisOnesAString", true);
    rawMappedHeaders.put("thisOnesBytes", false);
    mapper.setRawMappedHeaders(rawMappedHeaders);
    Map<String, Object> headersMap = new HashMap<>();
    headersMap.put("thisOnesAString", "thing1");
    headersMap.put("thisOnesBytes", "thing2");
    headersMap.put("alwaysRaw", "thing3".getBytes());
    MessageHeaders headers = new MessageHeaders(headersMap);
    Headers target = new RecordHeaders();
    mapper.fromHeaders(headers, target);
    assertThat(target).containsExactlyInAnyOrder(
            new RecordHeader("thisOnesAString", "thing1".getBytes()),
            new RecordHeader("thisOnesBytes", "thing2".getBytes()),
            new RecordHeader("alwaysRaw", "thing3".getBytes()));
    headersMap.clear();
    mapper.toHeaders(target, headersMap);
    assertThat(headersMap).contains(
            entry("thisOnesAString", "thing1"),
            entry("thisOnesBytes", "thing2".getBytes()),
            entry("alwaysRaw", "thing3".getBytes()));
}

默认情况下,两个标题映射器都会映射所有入站标题。从 2.8.8 版本开始,模式也可以应用于入站映射。若要创建用于入站映射的映射器,请使用相应映射器上的静态方法之一:

Both header mappers map all inbound headers, by default. Starting with version 2.8.8, the patterns, can also applied to inbound mapping. To create a mapper for inbound mapping, use one of the static methods on the respective mapper:

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}

public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}

例如:

For example:

DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");

这将排除所有以 abc 开头的标题,并包括所有其他标题。

This will exclude all headers beginning with abc and include all others.

默认情况下,只要类路径上存在 Jackson,DefaultKafkaHeaderMapper 就用于 MessagingMessageConverterBatchMessagingMessageConverter 中。

By default, the DefaultKafkaHeaderMapper is used in the MessagingMessageConverter and BatchMessagingMessageConverter, as long as Jackson is on the classpath.

对于批处理转换器,转换后的标题作为 List<Map<String, Object>>KafkaHeaders.BATCH_CONVERTED_HEADERS 中可用,其中列表中的映射对应有效负载中的数据位置。

With the batch converter, the converted headers are available in the KafkaHeaders.BATCH_CONVERTED_HEADERS as a List<Map<String, Object>> where the map in a position of the list corresponds to the data position in the payload.

如果没有转换器(因为 Jackson 不存在或明确设置为 null),则来自使用者记录的标题以未转换的形式在 KafkaHeaders.NATIVE_HEADERS 标题中提供。此标题是一个 Headers 对象(或在批处理转换器的情况下为 List<Headers>),其中列表中的位置对应有效负载中的数据位置。

If there is no converter (either because Jackson is not present or it is explicitly set to null), the headers from the consumer record are provided unconverted in the KafkaHeaders.NATIVE_HEADERS header. This header is a Headers object (or a List<Headers> in the case of the batch converter), where the position in the list corresponds to the data position in the payload.

某些类型不适合 JSON 序列化,对于这些类型,可能更适合使用简单的 toString() 序列化。DefaultKafkaHeaderMapper 有一个称为 addToStringClasses() 的方法,该方法允许您提供应该以这种方式处理的类名,以进行出带宽射。在入带宽射期间,它们以 String 进行映射。默认情况下,只有 org.springframework.util.MimeTypeorg.springframework.http.MediaType 以这种方式进行映射。

Certain types are not suitable for JSON serialization, and a simple toString() serialization might be preferred for these types. The DefaultKafkaHeaderMapper has a method called addToStringClasses() that lets you supply the names of classes that should be treated this way for outbound mapping. During inbound mapping, they are mapped as String. By default, only org.springframework.util.MimeType and org.springframework.http.MediaType are mapped this way.

从 2.3 版本开始,处理字符串值头得到简化。默认情况下,此类头不再进行 JSON 编码(即没有添加 "…​" 来进行封装)。类型仍添加到 JSON_TYPES 头,以便接收系统能够转换回字符串(从 byte[])。映射器能够处理由旧版本产生的头(它检查是否以 " 开头);这样,使用 2.3 的应用程序能够使用旧版本中的记录。

Starting with version 2.3, handling of String-valued headers is simplified. Such headers are no longer JSON encoded, by default (i.e. they do not have enclosing "…​" added). The type is still added to the JSON_TYPES header so the receiving system can convert back to a String (from byte[]). The mapper can handle (decode) headers produced by older versions (it checks for a leading "); in this way an application using 2.3 can consume records from older versions.

为了与较低版本兼容,如果使用 2.3 版本创建的记录可能由较低版本应用程序使用,请将 encodeStrings 设置为 true。当所有应用程序都使用 2.3 或更高版本时,您可以保留该属性的默认值 false

To be compatible with earlier versions, set encodeStrings to true, if records produced by a version using 2.3 might be consumed by applications using earlier versions. When all applications are using 2.3 or higher, you can leave the property at its default value of false.

@Bean
MessagingMessageConverter converter() {
    MessagingMessageConverter converter = new MessagingMessageConverter();
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    mapper.setEncodeStrings(true);
    converter.setHeaderMapper(mapper);
    return converter;
}

如果使用 Spring Boot,它会将此转换器 bean 自动配置到自动配置的 KafkaTemplate 中;否则,您应该将此转换器添加到模板中。

If using Spring Boot, it will auto configure this converter bean into the auto-configured KafkaTemplate; otherwise you should add this converter to the template.