Message Headers

0.11.0.0 客户端引入了对消息中标头的支持。从 2.0 版开始,Spring for Apache Kafka 现在支持将这些标头映射到和映射出 spring-messaging MessageHeaders

以前的版本将 ConsumerRecordProducerRecord 映射到 spring-messaging Message<?>,其中 value 属性映射到 payload 及其他属性(topicpartition 等等)映射到标头。这仍然是这种情况,但现在可以映射其他(任意的)标头。

Apache Kafka 标头有一个简单的 API,如下面的接口定义所示:

public interface Header {

    String key();

    byte[] value();

}

KafkaHeaderMapper 策略用于在 Kafka HeadersMessageHeaders 之间映射标头条目。接口定义如下:

public interface KafkaHeaderMapper {

    void fromHeaders(MessageHeaders headers, Headers target);

    void toHeaders(Headers source, Map<String, Object> target);

}

“@ [10]”将原始标头映射为“@ [11]”,并提供用于转换为“@ [12]”值的配置选项。

DefaultKafkaHeaderMapper 将键映射到 MessageHeaders 标头名称,并且为了支持出站消息的丰富标头类型,执行 JSON 转换。“special”标头(键为 spring_json_header_types)包含一个 <key>:<type> JSON 映射。在入站侧,这个标头用于适当地将每个标头的值转换为其原始类型。

在入站侧,所有 Kafka Header 实例都映射到 MessageHeaders。在出站侧,默认情况下,所有 MessageHeaders 都被映射,除了 idtimestamp 以及映射到 ConsumerRecord 属性的标头。

你可以通过向映射器提供模式来指定哪些标头要用于出站消息的映射。以下清单显示了一些示例映射:

public DefaultKafkaHeaderMapper() { 1
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { 2
    ...
}

public DefaultKafkaHeaderMapper(String... patterns) { 3
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { 4
    ...
}
1 使用默认的 Jackson `ObjectMapper`并映射大多数标头, 如示例使用前所述。
2 使用提供的 Jackson `ObjectMapper`并映射大多数标头, 如示例使用前所述。
3 使用默认的 Jackson `ObjectMapper`并根据提供的模式映射标头。
4 使用提供的 Jackson `ObjectMapper`并根据提供的模式映射标头。

模式相当简单,可以包含前导通配符()、后跟通配符或两者(例如 .cat.*)。你可以使用前导 ! 来否定模式。第一个匹配标头名称的模式(无论是正向还是否定向)都会获胜。

如果您提供自己的模式,我们建议包含 !id!timestamp,因为这些标题在入站方向上是只读的。

默认情况下,映射器仅反序列化 java.langjava.util 中的类。您可以通过使用 addTrustedPackages 方法添加受信任的包来信任其他(或所有)包。如果您收到来自不受信任来源的消息,您可能希望仅添加您信任的包。要信任所有包,您可以使用 mapper.addTrustedPackages("*")

在与不知道映射器 JSON 格式的系统通信时,以原始形式映射 String 标头值非常有用。

从版本 2.2.5 开始,你可以指定某些字符串值标头不应使用 JSON 映射,而应使用原始“@ [13]”映射。 “@ [14]”具有新属性;当“@ [15]”设置为 true 时,所有字符串值标头都将使用“@ [17]”属性(默认“@ [18]”)转换为“@ [16]”。此外,还有一个属性“@ [19]”,它是“@ [20]”的映射;如果映射包含标头名称且该标头包含“@ [21]”值,则将使用字符集将它映射为原始“@ [22]”。此映射还用于在映射值中的布尔值是“@ [25]”时和仅当时,使用字符集将原始传入“@ [23]”标头映射到“@ [24]”。如果布尔值为“@ [26]”,或标头名称不在具有“@ [27]”值的映射中,则传入标头只是映射为原始未映射标头。

以下测试用例说明了此机制。

@Test
public void testSpecificStringConvert() {
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    Map<String, Boolean> rawMappedHeaders = new HashMap<>();
    rawMappedHeaders.put("thisOnesAString", true);
    rawMappedHeaders.put("thisOnesBytes", false);
    mapper.setRawMappedHeaders(rawMappedHeaders);
    Map<String, Object> headersMap = new HashMap<>();
    headersMap.put("thisOnesAString", "thing1");
    headersMap.put("thisOnesBytes", "thing2");
    headersMap.put("alwaysRaw", "thing3".getBytes());
    MessageHeaders headers = new MessageHeaders(headersMap);
    Headers target = new RecordHeaders();
    mapper.fromHeaders(headers, target);
    assertThat(target).containsExactlyInAnyOrder(
            new RecordHeader("thisOnesAString", "thing1".getBytes()),
            new RecordHeader("thisOnesBytes", "thing2".getBytes()),
            new RecordHeader("alwaysRaw", "thing3".getBytes()));
    headersMap.clear();
    mapper.toHeaders(target, headersMap);
    assertThat(headersMap).contains(
            entry("thisOnesAString", "thing1"),
            entry("thisOnesBytes", "thing2".getBytes()),
            entry("alwaysRaw", "thing3".getBytes()));
}

默认情况下,两个标题映射器都会映射所有入站标题。从 2.8.8 版本开始,模式也可以应用于入站映射。若要创建用于入站映射的映射器,请使用相应映射器上的静态方法之一:

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}

public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}

例如:

DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");

这将排除所有以 abc 开头的标题,并包括所有其他标题。

默认情况下,只要类路径上存在 Jackson,DefaultKafkaHeaderMapper 就用于 MessagingMessageConverterBatchMessagingMessageConverter 中。

对于批处理转换器,转换后的标题作为 List<Map<String, Object>>KafkaHeaders.BATCH_CONVERTED_HEADERS 中可用,其中列表中的映射对应有效负载中的数据位置。

如果没有转换器(因为 Jackson 不存在或明确设置为 null),则来自使用者记录的标题以未转换的形式在 KafkaHeaders.NATIVE_HEADERS 标题中提供。此标题是一个 Headers 对象(或在批处理转换器的情况下为 List<Headers>),其中列表中的位置对应有效负载中的数据位置。

某些类型不适合 JSON 序列化,对于这些类型,可能更适合使用简单的 toString() 序列化。DefaultKafkaHeaderMapper 有一个称为 addToStringClasses() 的方法,该方法允许您提供应该以这种方式处理的类名,以进行出带宽射。在入带宽射期间,它们以 String 进行映射。默认情况下,只有 org.springframework.util.MimeTypeorg.springframework.http.MediaType 以这种方式进行映射。

从 2.3 版本开始,处理字符串值头得到简化。默认情况下,此类头不再进行 JSON 编码(即没有添加 "…​" 来进行封装)。类型仍添加到 JSON_TYPES 头,以便接收系统能够转换回字符串(从 byte[])。映射器能够处理由旧版本产生的头(它检查是否以 " 开头);这样,使用 2.3 的应用程序能够使用旧版本中的记录。

为了与较低版本兼容,如果使用 2.3 版本创建的记录可能由较低版本应用程序使用,请将 encodeStrings 设置为 true。当所有应用程序都使用 2.3 或更高版本时,您可以保留该属性的默认值 false

@Bean
MessagingMessageConverter converter() {
    MessagingMessageConverter converter = new MessagingMessageConverter();
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    mapper.setEncodeStrings(true);
    converter.setHeaderMapper(mapper);
    return converter;
}

如果使用 Spring Boot,它会将此转换器 bean 自动配置到自动配置的 KafkaTemplate 中;否则,您应该将此转换器添加到模板中。