Null Payloads and Log Compaction of 'Tombstone' Records
当你使用 Log Compaction 时,你可以通过 null
有效负载发送和接收消息以识别已删除的键。
When you use Log Compaction, you can send and receive messages with null
payloads to identify the deletion of a key.
你还可以出于其他原因接收 null
值,例如在无法反序列化值时可能返回 null
的 Deserializer
。
You can also receive null
values for other reasons, such as a Deserializer
that might return null
when it cannot deserialize a value.
要使用 KafkaTemplate
发送 null
有效负载,可以将 null
传递到 send()
方法的值参数中。一个例外是 send(Message<?> message)
变量。由于 spring-messaging
Message<?>
不会有 null
有效负载,所以你可以使用一个称为 KafkaNull
的特殊有效负载类型,而框架会发送 null
。为了方便起见,提供了静态的 KafkaNull.INSTANCE
。
To send a null
payload by using the KafkaTemplate
, you can pass null into the value argument of the send()
methods.
One exception to this is the send(Message<?> message)
variant.
Since spring-messaging
Message<?>
cannot have a null
payload, you can use a special payload type called KafkaNull
, and the framework sends null
.
For convenience, the static KafkaNull.INSTANCE
is provided.
当你使用消息侦听器容器时,收到的 ConsumerRecord
有一个 null
value()
。
When you use a message listener container, the received ConsumerRecord
has a null
value()
.
要配置 @KafkaListener
来处理 null
有效负载,你必须在 required = false
时使用 @Payload
注释。如果它是压缩日志的墓碑消息,你通常还需要该密钥,以便应用程序可以确定哪个密钥已被“删除”。以下示例显示了这样的配置:
To configure the @KafkaListener
to handle null
payloads, you must use the @Payload
annotation with required = false
.
If it is a tombstone message for a compacted log, you usually also need the key so that your application can determine which key was “deleted”.
The following example shows such a configuration:
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
// value == null represents key deletion
}
当你使用具有多个 @KafkaHandler
方法的类级别 @KafkaListener
时,需要一些额外的配置。具体来说,你需要一个带有 KafkaNull
有效负载的 @KafkaHandler
方法。以下示例展示了如何配置其中一个方法:
When you use a class-level @KafkaListener
with multiple @KafkaHandler
methods, some additional configuration is needed.
Specifically, you need a @KafkaHandler
method with a KafkaNull
payload.
The following example shows how to configure one:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String cat) {
...
}
@KafkaHandler
public void listen(Integer hat) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
请注意,参数是 null
而不是 KafkaNull
。
Note that the argument is null
, not KafkaNull
.
此功能需要使用 KafkaNullAwarePayloadArgumentResolver
,在使用默认 MessageHandlerMethodFactory
时,框架将对其进行配置。在使用自定义 MessageHandlerMethodFactory
时,参见 xref:kafka/serdes.adoc#custom-arg-resolve[Adding custom HandlerMethodArgumentResolver
至 @KafkaListener
。
This feature requires the use of a KafkaNullAwarePayloadArgumentResolver
which the framework will configure when using the default MessageHandlerMethodFactory
.
When using a custom MessageHandlerMethodFactory
, see Adding custom HandlerMethodArgumentResolver
to @KafkaListener
.