@KafkaListener on a Class

当您在类级别使用 @KafkaListener 时,必须在方法级别指定 @KafkaHandler。当传递消息时,转换的消息有效负载类型用于确定要调用的方法。以下示例演示如何操作:

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String foo) {
        ...
    }

    @KafkaHandler
    public void listen(Integer bar) {
        ...
    }

    @KafkaHandler(isDefault = true)
    public void listenDefault(Object object) {
        ...
    }

}

从第 2.1.3 版开始,你可以指定一个 @KafkaHandler 方法作为默认方法,该方法如果在其他方法中没有匹配项,则会被调用。最多可以指定一个方法。在使用 @KafkaHandler 方法时,有效负载必须已转换为域对象(因此可以执行匹配)。使用自定义反序列化程序、 JsonDeserializerJsonMessageConverter,其中其 TypePrecedence 设置为 TYPE_ID。请参阅 Serialization, Deserialization, and Message Conversion 了解更多信息。

由于 Spring 解析方法参数的方式存在一些限制,所以默认的 @KafkaHandler 不能接收离散的标头;它必须使用 Consumer Record Metadata 中讨论的 ConsumerRecordMetadata

例如:

@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    ...
}

如果对象是 String,那么这将不起作用;topic 参数也会获取对 object 的引用。

如果您需要有关默认方法中记录的元数据,请使用以下内容:

@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
    String topic = meta.topic();
    ...
}