Null Payloads and Log Compaction of 'Tombstone' Records

在使用日志压缩时,您可以发送和接收具有 null 有效负载的消息,以识别密钥的删除。您还可以出于其他原因接收 null 值,例如反序列化程序在无法反序列化值时可能会返回 null

When using log compaction, you can send and receive messages with null payloads to identify the deletion of a key. You can also receive null values for other reasons, such as a deserializer that might return null when it cannot deserialize a value.

Producing Null Payloads

您可以通过将 null 消息参数值传递到 send 方法之一,以带 ReactivePulsarTemplate 发送一个 null 值,例如:

You can send a null value with the ReactivePulsarTemplate by passing a null message parameter value to one of the send methods, for example:

reactiveTemplate
        .send(null, Schema.STRING)
        .subscribe();

发送空值时,您必须指定模式类型,因为系统无法从 null 有效负载确定消息的类型。

When sending null values you must specify the schema type as the system can not determine the type of the message from a null payload.

Consuming Null Payloads

对于 @ReactivePularListener,根据其消息参数类型将 null 载荷传递到侦听器方法,如下所示:

For @ReactivePularListener, the null payload is passed into the listener method based on the type of its message parameter as follows:

Parameter type Passed-in value

primitive

null

user-defined

null

org.apache.pulsar.client.api.Message<T>

non-null Pulsar message whose getValue() returns null

org.springframework.messaging.Message<T>

non-null Spring message whose getPayload() returns PulsarNull

Flux<org.apache.pulsar.client.api.Message<T>>

non-null flux whose entries are non-null Pulsar messages whose getValue() returns null

Flux<org.apache.pulsar.client.api.Messages<T>>

non-null flux whose entries are non-null Spring messages whose getPayload() returns PulsarNull

当传入参数为 null(例如,带有原始类型或用户自定义类型的单记录侦听器)时,您必须使用 @Payload 带有 required = false 的参数注释。

When the passed-in value is null (ie. single record listeners with primitive or user-defined types) you must use the @Payload parameter annotation with required = false.

当为您的侦听器负载类型使用 Spring org.springframework.messaging.Message 时,其泛型类型信息必须足够广泛,以接受 Message<PulsarNull>(例如 MessageMessage<?>Message<Object>)。这是由于 Spring 消息不允许其负载为空值,而是使用 PulsarNull 占位符。

When using the Spring org.springframework.messaging.Message for your listener payload type, its generic type information must be wide enough to accept Message<PulsarNull> (eg. Message, Message<?>, or Message<Object>). This is due to the fact that the Spring Message does not allow null values for its payload and instead uses the PulsarNull placeholder.

如果它是压缩日志的墓碑消息,则您通常还需要密钥,以便您的应用程序可以确定哪个键是被 "`删除` 的"。以下示例显示了此类配置:

If it is a tombstone message for a compacted log, you usually also need the key so that your application can determine which key was “deleted”. The following example shows such a configuration:

@ReactivePulsarListener(
        topics = "my-topic",
        subscriptionName = "my-topic-sub",
        schemaType = SchemaType.STRING)
Mono<Void> myListener(
        @Payload(required = false) String msg,
        @Header(PulsarHeaders.KEY) String key) {
    ...
}

当使用流消息侦听器( Flux )时,header support is limited,因此在日志压实场景中不太有用。

When using a streaming message listener (Flux) the header support is limited, so it less useful in the log compaction scenario.