Null Payloads and Log Compaction of 'Tombstone' Records
当你使用 Log Compaction 时,你可以通过 null
有效负载发送和接收消息以识别已删除的键。
你还可以出于其他原因接收 null
值,例如在无法反序列化值时可能返回 null
的 Deserializer
。
要使用 KafkaTemplate
发送 null
有效负载,可以将 null
传递到 send()
方法的值参数中。一个例外是 send(Message<?> message)
变量。由于 spring-messaging
Message<?>
不会有 null
有效负载,所以你可以使用一个称为 KafkaNull
的特殊有效负载类型,而框架会发送 null
。为了方便起见,提供了静态的 KafkaNull.INSTANCE
。
当你使用消息侦听器容器时,收到的 ConsumerRecord
有一个 null
value()
。
要配置 @KafkaListener
来处理 null
有效负载,你必须在 required = false
时使用 @Payload
注释。如果它是压缩日志的墓碑消息,你通常还需要该密钥,以便应用程序可以确定哪个密钥已被“删除”。以下示例显示了这样的配置:
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
// value == null represents key deletion
}
当你使用具有多个 @KafkaHandler
方法的类级别 @KafkaListener
时,需要一些额外的配置。具体来说,你需要一个带有 KafkaNull
有效负载的 @KafkaHandler
方法。以下示例展示了如何配置其中一个方法:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String cat) {
...
}
@KafkaHandler
public void listen(Integer hat) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
请注意,参数是 null
而不是 KafkaNull
。
此功能需要使用 KafkaNullAwarePayloadArgumentResolver
,在使用默认 MessageHandlerMethodFactory
时,框架将对其进行配置。在使用自定义 MessageHandlerMethodFactory
时,参见 xref:kafka/serdes.adoc#custom-arg-resolve[Adding custom HandlerMethodArgumentResolver
至 @KafkaListener
。