@KafkaListener
Annotation
@KafkaListener
注释用于将 Bean 方法指定为侦听器容器的侦听器。此 Bean 会封装在一个 MessagingMessageListenerAdapter
中,并带有各种特性进行配置,例如转换器(可根据需要将 data 转换成与方法参数匹配的 data)。
The @KafkaListener
annotation is used to designate a bean method as a listener for a listener container.
The bean is wrapped in a MessagingMessageListenerAdapter
configured with various features, such as converters to convert the data, if necessary, to match the method parameters.
您可以使用 #{…}
或属性占位符 (${…}
) 通过 SpEL 配置注释中的大多数属性。有关更多信息,请参阅 Javadoc。
You can configure most attributes on the annotation with SpEL by using #{…}
or property placeholders (${…}
).
See the Javadoc for more information.
Record Listeners
@KafkaListener
注释提供了一种机制,用于实现简单的 POJO 侦听器。以下示例演示了如何使用它:
The @KafkaListener
annotation provides a mechanism for simple POJO listeners.
The following example shows how to use it:
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
此机制要求在某个 @Configuration
类上使用 @EnableKafka
注释以及侦听器容器工厂,该工厂用于配置基础 ConcurrentMessageListenerContainer
。默认情况下,需要一个名称为 kafkaListenerContainerFactory
的 Bean。以下示例演示了如何使用 ConcurrentMessageListenerContainer
:
This mechanism requires an @EnableKafka
annotation on one of your @Configuration
classes and a listener container factory, which is used to configure the underlying ConcurrentMessageListenerContainer
.
By default, a bean with name kafkaListenerContainerFactory
is expected.
The following example shows how to use ConcurrentMessageListenerContainer
:
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}
请注意,要设置容器属性,您必须在工厂上使用 getContainerProperties()
方法。它用作注入到容器中的实际属性的模板。
Notice that, to set container properties, you must use the getContainerProperties()
method on the factory.
It is used as a template for the actual properties injected into the container.
从 2.1.1 版开始,您现在可以为由注释创建的消费者设置 client.id
属性。clientIdPrefix
会附加 -n
,其中 n
是一个整数,表示使用并发时的容器号。
Starting with version 2.1.1, you can now set the client.id
property for consumers created by the annotation.
The clientIdPrefix
is suffixed with -n
, where n
is an integer representing the container number when using concurrency.
从 2.2 版开始,您现在可以通过使用注释本身上面的属性来覆盖容器工厂的 concurrency
和 autoStartup
属性。这些属性可以是简单值、属性占位符或 SpEL 表达式。以下示例演示了如何这样做:
Starting with version 2.2, you can now override the container factory’s concurrency
and autoStartup
properties by using properties on the annotation itself.
The properties can be simple values, property placeholders, or SpEL expressions.
The following example shows how to do so:
@KafkaListener(id = "myListener", topics = "myTopic",
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
...
}
Explicit Partition Assignment
您还可以配置具有显式主题和分区(以及它们的初始偏移量)的 POJO 侦听器。以下示例演示了如何这样做:
You can also configure POJO listeners with explicit topics and partitions (and, optionally, their initial offsets). The following example shows how to do so:
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
您可以在 partitions
或 partitionOffsets
属性中指定每个分区,但不能同时指定。
You can specify each partition in the partitions
or partitionOffsets
attribute but not both.
和大多数注解属性一样,你可以使用 SpEL 表达式;有关如何生成大容量分区列表的示例,请参阅 Manually Assigning All Partitions。
As with most annotation properties, you can use SpEL expressions; for an example of how to generate a large list of partitions, see Manually Assigning All Partitions.
从 2.5.5 版开始,您可以对所有分配的分区应用初始偏移量:
Starting with version 2.5.5, you can apply an initial offset to all assigned partitions:
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" },
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
*
通配符代表 partitions
属性中的所有分区。在每个 @TopicPartition
中只能有一个带有通配符的 @PartitionOffset
。
The *
wildcard represents all partitions in the partitions
attribute.
There must only be one @PartitionOffset
with the wildcard in each @TopicPartition
.
此外,当监听器实现 ConsumerSeekAware
时,现在会调用 onPartitionsAssigned
,即使使用手动分配也是如此。例如,这允许当时进行任意查找操作。
In addition, when the listener implements ConsumerSeekAware
, onPartitionsAssigned
is now called, even when using manual assignment.
This allows, for example, any arbitrary seek operations at that time.
从 2.6.4 版开始,你可以指定一个由逗号分隔的列表,其中包括分区或分区范围:
Starting with version 2.6.4, you can specify a comma-delimited list of partitions, or partition ranges:
@KafkaListener(id = "pp", autoStartup = "false",
topicPartitions = @TopicPartition(topic = "topic1",
partitions = "0-5, 7, 10-15"))
public void process(String in) {
...
}
该范围是包含性的;上述示例将分配分区 0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15
。
The range is inclusive; the example above will assign partitions 0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15
.
指定初始偏移量时可以使用相同的方法:
The same technique can be used when specifying initial offsets:
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1",
partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
初始偏移量将应用到所有 6 个分区。
The initial offset will be applied to all 6 partitions.
Manual Acknowledgment
使用手动 AckMode
时,也可以向监听器提供 Acknowledgment
。以下示例还演示了如何使用不同的容器工厂。
When using manual AckMode
, you can also provide the listener with the Acknowledgment
.
The following example also shows how to use a different container factory.
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
Consumer Record Metadata
最后,可以从消息头中获取有关记录的元数据。你可以使用以下标头名称来检索消息的标头:
Finally, metadata about the record is available from message headers. You can use the following header names to retrieve the headers of the message:
-
KafkaHeaders.OFFSET
-
KafkaHeaders.RECEIVED_KEY
-
KafkaHeaders.RECEIVED_TOPIC
-
KafkaHeaders.RECEIVED_PARTITION
-
KafkaHeaders.RECEIVED_TIMESTAMP
-
KafkaHeaders.TIMESTAMP_TYPE
从 2.5 版开始,如果传入记录的键为 null
,则 RECEIVED_KEY
不存在;先前该标头使用 null
值填充。此更改使框架与 spring-messaging
惯例保持一致,其中不存在 null
值标头。
Starting with version 2.5 the RECEIVED_KEY
is not present if the incoming record has a null
key; previously the header was populated with a null
value.
This change is to make the framework consistent with spring-messaging
conventions where null
valued headers are not present.
以下示例演示了如何使用标头:
The following example shows how to use the headers:
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
参数注释(@Payload
、@Header
)必须指定在侦听器方法的具体实现中;如果在接口中定义,将不会检测到它们。
Parameter annotations (@Payload
, @Header
) must be specified on the concrete implementation of the listener method; they will not be detected if they are defined on an interface.
从 2.5 版开始,你可以接收 ConsumerRecordMetadata
参数中的记录元数据,而不使用离散标头。
Starting with version 2.5, instead of using discrete headers, you can receive record metadata in a ConsumerRecordMetadata
parameter.
@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
...
}
此参数包含 ConsumerRecord
的所有数据,但键和值除外。
This contains all the data from the ConsumerRecord
except the key and value.
Batch Listeners
从 1.1 版开始,你可以配置 @KafkaListener
方法以接收从使用者轮询中收到的所有消费者记录批处理。
Starting with version 1.1, you can configure @KafkaListener
methods to receive the entire batch of consumer records received from the consumer poll.
Non-Blocking Retries 不支持批处理监听器。
Non-Blocking Retries are not supported with batch listeners.
若要配置监听器容器工厂以创建批处理侦听器,可以设置 batchListener
属性。以下示例演示了如何执行此操作:
To configure the listener container factory to create batch listeners, you can set the batchListener
property.
The following example shows how to do so:
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // <<<<<<<<<<<<<<<<<<<<<<<<<
return factory;
}
从 2.8 版本开始,你可以使用 |
Starting with version 2.8, you can override the factory’s |
从版本 2.9.6 开始,容器工厂对 |
Starting with version 2.9.6, the container factory has separate setters for the |
以下示例演示了如何接收有效负载列表:
The following example shows how to receive a list of payloads:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
...
}
主题、分区、偏移量等信息存在于与有效负载相似的标头中。以下示例演示了如何使用标头:
The topic, partition, offset, and so on are available in headers that parallel the payloads. The following example shows how to use the headers:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
另外,你可以接收带有每个消息中每个偏移量和其他详细信息的 Message<?>
对象的 List
,但它必须是方法上定义的唯一参数(除了可选的 Acknowledgment
,在使用手动提交时和/或 Consumer<?, ?>
参数)。以下示例演示了如何执行此操作:
Alternatively, you can receive a List
of Message<?>
objects with each offset and other details in each message, but it must be the only parameter (aside from optional Acknowledgment
, when using manual commits, and/or Consumer<?, ?>
parameters) defined on the method.
The following example shows how to do so:
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen1(List<Message<?>> list) {
...
}
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen2(List<Message<?>> list, Acknowledgment ack) {
...
}
@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen3(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
...
}
在这种情况下,不会对有效负载进行转换。
No conversion is performed on the payloads in this case.
如果 BatchMessagingMessageConverter
配置有 RecordMessageConverter
,还可以向 Message
参数添加泛型类型,并且有效负载已转换。有关更多信息,请参阅 Payload Conversion with Batch Listeners。
If the BatchMessagingMessageConverter
is configured with a RecordMessageConverter
, you can also add a generic type to the Message
parameter and the payloads are converted.
See Payload Conversion with Batch Listeners for more information.
你还可以接收一个 ConsumerRecord<?, ?>
对象的列表,但它必须是方法上定义的唯一参数(除了可选的 Acknowledgment
,在使用手动提交和 Consumer<?, ?>
参数时)。以下示例演示了如何执行此操作:
You can also receive a list of ConsumerRecord<?, ?>
objects, but it must be the only parameter (aside from optional Acknowledgment
, when using manual commits and Consumer<?, ?>
parameters) defined on the method.
The following example shows how to do so:
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
...
}
@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
...
}
从 2.2 版开始,监听器可以接收 poll()
方法返回的完整 ConsumerRecords<?, ?>
对象,从而使监听器能够访问其他方法,例如 partitions()
(返回列表中的 TopicPartition
实例)和 records(TopicPartition)
(获取选择性记录)。同样,这必须是方法上唯一的参数(除了可选的 Acknowledgment
,在使用手动提交或 Consumer<?, ?>
参数时)。以下示例演示了如何执行此操作:
Starting with version 2.2, the listener can receive the complete ConsumerRecords<?, ?>
object returned by the poll()
method, letting the listener access additional methods, such as partitions()
(which returns the TopicPartition
instances in the list) and records(TopicPartition)
(which gets selective records).
Again, this must be the only parameter (aside from optional Acknowledgment
, when using manual commits or Consumer<?, ?>
parameters) on the method.
The following example shows how to do so:
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
...
}
如果容器工厂配置了 RecordFilterStrategy
,则会忽略 ConsumerRecords<?, ?>
侦听器,并会发出 WARN
日志消息。只有在使用 <List<?>>
形式的侦听器时,才可以使用批处理侦听器对记录进行筛选。默认情况下,一次筛选一条记录;从版本 2.8 开始,您可以覆盖 filterBatch
,以一次调用来筛选整个批处理。
If the container factory has a RecordFilterStrategy
configured, it is ignored for ConsumerRecords<?, ?>
listeners, with a WARN
log message emitted.
Records can only be filtered with a batch listener if the <List<?>>
form of listener is used.
By default, records are filtered one-at-a-time; starting with version 2.8, you can override filterBatch
to filter the entire batch in one call.
Annotation Properties
从 2.0 版本开始,如果存在 id
属性,将使用它作为 Kafka 消费者 group.id
属性,如果存在,将覆盖消费者工厂中的已配置属性。您还可以显式设置 groupId
或将 idIsGroup
设置为 false,以恢复使用消费者工厂 group.id
的先前行为。
Starting with version 2.0, the id
property (if present) is used as the Kafka consumer group.id
property, overriding the configured property in the consumer factory, if present.
You can also set groupId
explicitly or set idIsGroup
to false to restore the previous behavior of using the consumer factory group.id
.
您可以在大多数注释属性中使用属性占位符或 SpEL 表达式,如下例所示:
You can use property placeholders or SpEL expressions within most annotation properties, as the following example shows:
@KafkaListener(topics = "${some.property}")
@KafkaListener(topics = "#{someBean.someProperty}",
groupId = "#{someBean.someProperty}.group")
从 2.1.2 版本开始,SpEL 表达式支持一个特殊令牌:__listener
。它是一个伪 Bean 名称,表示此注释所在的当前 Bean 实例。
Starting with version 2.1.2, the SpEL expressions support a special token: __listener
.
It is a pseudo bean name that represents the current bean instance within which this annotation exists.
请考虑以下示例:
Consider the following example:
@Bean
public Listener listener1() {
return new Listener("topic1");
}
@Bean
public Listener listener2() {
return new Listener("topic2");
}
给定上例中的 Bean,我们接下来可以使用以下内容:
Given the beans in the previous example, we can then use the following:
public class Listener {
private final String topic;
public Listener(String topic) {
this.topic = topic;
}
@KafkaListener(topics = "#{__listener.topic}",
groupId = "#{__listener.topic}.group")
public void listen(...) {
...
}
public String getTopic() {
return this.topic;
}
}
如果在不太可能的情况下,您有一个实际名为 __listener
的 Bean,则可以通过使用 beanRef
属性更改表达式令牌。以下示例演示如何操作:
If, in the unlikely event that you have an actual bean called __listener
, you can change the expression token byusing the beanRef
attribute.
The following example shows how to do so:
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}", groupId = "#{__x.topic}.group")
从 2.2.4 版本开始,您可以在注释上直接指定 Kafka 消费者属性,这样会覆盖在消费者工厂中使用相同名称配置的任何属性。您*不能*通过这种方式指定 group.id
和 client.id
属性;它们将被忽略;对它们使用 groupId
和 clientIdPrefix
注释属性。
Starting with version 2.2.4, you can specify Kafka consumer properties directly on the annotation, these will override any properties with the same name configured in the consumer factory. You cannot specify the group.id
and client.id
properties this way; they will be ignored; use the groupId
and clientIdPrefix
annotation properties for those.
属性指定为带有正常 Java Properties
文件格式的单个字符串:foo:bar
、foo=bar
或 foo bar
,如下例所示:
The properties are specified as individual strings with the normal Java Properties
file format: foo:bar
, foo=bar
, or foo bar
, as the following example shows:
@KafkaListener(topics = "myTopic", groupId = "group", properties = {
"max.poll.interval.ms:60000",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
以下是在 xref:kafka/sending-messages.adoc#routing-template[Using RoutingKafkaTemplate
中示例的对应侦听器。
The following is an example of the corresponding listeners for the example in Using RoutingKafkaTemplate
.
@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
System.out.println("1: " + in);
}
@KafkaListener(id = "two", topics = "two",
properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
System.out.println("2: " + new String(in));
}