@KafkaListener
on a Class
当您在类级别使用 @KafkaListener
时,必须在方法级别指定 @KafkaHandler
。当传递消息时,转换的消息有效负载类型用于确定要调用的方法。以下示例演示如何操作:
When you use @KafkaListener
at the class-level, you must specify @KafkaHandler
at the method level.
When messages are delivered, the converted message payload type is used to determine which method to call.
The following example shows how to do so:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler(isDefault = true)
public void listenDefault(Object object) {
...
}
}
从第 2.1.3 版开始,你可以指定一个 @KafkaHandler
方法作为默认方法,该方法如果在其他方法中没有匹配项,则会被调用。最多可以指定一个方法。在使用 @KafkaHandler
方法时,有效负载必须已转换为域对象(因此可以执行匹配)。使用自定义反序列化程序、 JsonDeserializer
或 JsonMessageConverter
,其中其 TypePrecedence
设置为 TYPE_ID
。请参阅 Serialization, Deserialization, and Message Conversion 了解更多信息。
Starting with version 2.1.3, you can designate a @KafkaHandler
method as the default method that is invoked if there is no match on other methods.
At most, one method can be so designated.
When using @KafkaHandler
methods, the payload must have already been converted to the domain object (so the match can be performed).
Use a custom deserializer, the JsonDeserializer
, or the JsonMessageConverter
with its TypePrecedence
set to TYPE_ID
.
See Serialization, Deserialization, and Message Conversion for more information.
由于 Spring 解析方法参数的方式存在一些限制,所以默认的 @KafkaHandler
不能接收离散的标头;它必须使用 Consumer Record Metadata 中讨论的 ConsumerRecordMetadata
。
Due to some limitations in the way Spring resolves method arguments, a default @KafkaHandler
cannot receive discrete headers; it must use the ConsumerRecordMetadata
as discussed in Consumer Record Metadata.
例如:
For example:
@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
...
}
如果对象是 String
,那么这将不起作用;topic
参数也会获取对 object
的引用。
This won’t work if the object is a String
; the topic
parameter will also get a reference to object
.
如果您需要有关默认方法中记录的元数据,请使用以下内容:
If you need metadata about the record in a default method, use this:
@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
String topic = meta.topic();
...
}