Null Payloads and Log Compaction of 'Tombstone' Records

当你使用 Log Compaction 时,你可以通过 null 有效负载发送和接收消息以识别已删除的键。

When you use Log Compaction, you can send and receive messages with null payloads to identify the deletion of a key.

你还可以出于其他原因接收 null 值,例如在无法反序列化值时可能返回 nullDeserializer

You can also receive null values for other reasons, such as a Deserializer that might return null when it cannot deserialize a value.

要使用 KafkaTemplate 发送 null 有效负载,可以将 null 传递到 send() 方法的值参数中。一个例外是 send(Message<?> message) 变量。由于 spring-messaging Message<?> 不会有 null 有效负载,所以你可以使用一个称为 KafkaNull 的特殊有效负载类型,而框架会发送 null。为了方便起见,提供了静态的 KafkaNull.INSTANCE

To send a null payload by using the KafkaTemplate, you can pass null into the value argument of the send() methods. One exception to this is the send(Message<?> message) variant. Since spring-messaging Message<?> cannot have a null payload, you can use a special payload type called KafkaNull, and the framework sends null. For convenience, the static KafkaNull.INSTANCE is provided.

当你使用消息侦听器容器时,收到的 ConsumerRecord 有一个 null value()

When you use a message listener container, the received ConsumerRecord has a null value().

要配置 @KafkaListener 来处理 null 有效负载,你必须在 required = false 时使用 @Payload 注释。如果它是压缩日志的墓碑消息,你通常还需要该密钥,以便应用程序可以确定哪个密钥已被“删除”。以下示例显示了这样的配置:

To configure the @KafkaListener to handle null payloads, you must use the @Payload annotation with required = false. If it is a tombstone message for a compacted log, you usually also need the key so that your application can determine which key was “deleted”. The following example shows such a configuration:

@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
    // value == null represents key deletion
}

当你使用具有多个 @KafkaHandler 方法的类级别 @KafkaListener 时,需要一些额外的配置。具体来说,你需要一个带有 KafkaNull 有效负载的 @KafkaHandler 方法。以下示例展示了如何配置其中一个方法:

When you use a class-level @KafkaListener with multiple @KafkaHandler methods, some additional configuration is needed. Specifically, you need a @KafkaHandler method with a KafkaNull payload. The following example shows how to configure one:

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String cat) {
        ...
    }

    @KafkaHandler
    public void listen(Integer hat) {
        ...
    }

    @KafkaHandler
    public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
        ...
    }

}

请注意,参数是 null 而不是 KafkaNull

Note that the argument is null, not KafkaNull.

此功能需要使用 KafkaNullAwarePayloadArgumentResolver,在使用默认 MessageHandlerMethodFactory 时,框架将对其进行配置。在使用自定义 MessageHandlerMethodFactory 时,参见 xref:kafka/serdes.adoc#custom-arg-resolve[Adding custom HandlerMethodArgumentResolver@KafkaListener

This feature requires the use of a KafkaNullAwarePayloadArgumentResolver which the framework will configure when using the default MessageHandlerMethodFactory. When using a custom MessageHandlerMethodFactory, see Adding custom HandlerMethodArgumentResolver to @KafkaListener.