Null Payloads and Log Compaction of 'Tombstone' Records
在使用日志压缩时,您可以发送和接收具有 null
有效负载的消息,以识别密钥的删除。您还可以出于其他原因接收 null
值,例如反序列化程序在无法反序列化值时可能会返回 null
。
Producing Null Payloads
要使用 PulsarTemplate
发送 null
有效负载,可以使用 fluent API,并将 null 传入 newMessage()
方法的值参数,例如:
pulsarTemplate
.newMessage(null)
.withTopic("my-topic")
.withSchema(Schema.STRING)
.withMessageCustomizer((mb) -> mb.key("key:1234"))
.send();
发送空值时,您必须指定模式类型,因为系统无法从 |
Consuming Null Payloads
对于 @PulsarListener
和 @PulsarReader
,将基于其消息参数的类型按如下方式将 null
有效负载传递到侦听器方法中:
Parameter type | Passed-in value |
---|---|
primitive |
|
user-defined |
|
|
非 null Pulsar 消息,其 |
|
非 null Spring 消息,其 |
|
非空列表,其条目 ( |
|
非空容器,其中包含非空 Pulsar 消息,它的 |
当传入参数为 null
(例如,带有原始类型或用户自定义类型的单记录侦听器)时,您必须使用 @Payload
带有 required = false
的参数注释。
当为您的侦听器负载类型使用 Spring org.springframework.messaging.Message
时,其泛型类型信息必须足够广泛,以接受 Message<PulsarNull>
(例如 Message
、Message<?>
或 Message<Object>
)。这是由于 Spring 消息不允许其负载为空值,而是使用 PulsarNull
占位符。
如果它是压缩日志的墓碑消息,则您通常还需要密钥,以便您的应用程序可以确定哪个键是被 "`删除` 的"。以下示例显示了此类配置:
@PulsarListener(
topics = "my-topic",
subscriptionName = "my-topic-sub",
schemaType = SchemaType.STRING)
void myListener(
@Payload(required = false) String msg,
@Header(PulsarHeaders.KEY) String key) {
...
}
|