Null Payloads and Log Compaction of 'Tombstone' Records

在使用日志压缩时,您可以发送和接收具有 null 有效负载的消息,以识别密钥的删除。您还可以出于其他原因接收 null 值,例如反序列化程序在无法反序列化值时可能会返回 null

Producing Null Payloads

要使用 PulsarTemplate 发送 null 有效负载,可以使用 fluent API,并将 null 传入 newMessage() 方法的值参数,例如:

pulsarTemplate
        .newMessage(null)
        .withTopic("my-topic")
        .withSchema(Schema.STRING)
        .withMessageCustomizer((mb) -> mb.key("key:1234"))
        .send();

发送空值时,您必须指定模式类型,因为系统无法从 null 有效负载确定消息的类型。

Consuming Null Payloads

对于 @PulsarListener@PulsarReader,将基于其消息参数的类型按如下方式将 null 有效负载传递到侦听器方法中:

Parameter type Passed-in value

primitive

null

user-defined

null

org.apache.pulsar.client.api.Message<T>

非 null Pulsar 消息,其 getValue() 返回 null

org.springframework.messaging.Message<T>

非 null Spring 消息,其 getPayload() 返回 PulsarNull

List<X>

非空列表,其条目 (X) 为上述类型之一,并做相应操作(如:原始条目为 `null`等)。

org.apache.pulsar.client.api.Messages<T>

非空容器,其中包含非空 Pulsar 消息,它的 getValue() 返回 null

当传入参数为 null(例如,带有原始类型或用户自定义类型的单记录侦听器)时,您必须使用 @Payload 带有 required = false 的参数注释。

当为您的侦听器负载类型使用 Spring org.springframework.messaging.Message 时,其泛型类型信息必须足够广泛,以接受 Message<PulsarNull>(例如 MessageMessage<?>Message<Object>)。这是由于 Spring 消息不允许其负载为空值,而是使用 PulsarNull 占位符。

如果它是压缩日志的墓碑消息,则您通常还需要密钥,以便您的应用程序可以确定哪个键是被 "`删除` 的"。以下示例显示了此类配置:

@PulsarListener(
        topics = "my-topic",
        subscriptionName = "my-topic-sub",
        schemaType = SchemaType.STRING)
void myListener(
        @Payload(required = false) String msg,
        @Header(PulsarHeaders.KEY) String key) {
    ...
}

@PulsarReader 还不支持 @Header 参数,因此在日志压缩方案中不太有用。