@KafkaListener Annotation

@KafkaListener 注释用于将 Bean 方法指定为侦听器容器的侦听器。此 Bean 会封装在一个 MessagingMessageListenerAdapter 中,并带有各种特性进行配置,例如转换器(可根据需要将 data 转换成与方法参数匹配的 data)。 您可以使用 #{…​} 或属性占位符 (${…​}) 通过 SpEL 配置注释中的大多数属性。有关更多信息,请参阅 Javadoc

Record Listeners

@KafkaListener 注释提供了一种机制,用于实现简单的 POJO 侦听器。以下示例演示了如何使用它:

public class Listener {

    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        ...
    }

}

此机制要求在某个 @Configuration 类上使用 @EnableKafka 注释以及侦听器容器工厂,该工厂用于配置基础 ConcurrentMessageListenerContainer。默认情况下,需要一个名称为 kafkaListenerContainerFactory 的 Bean。以下示例演示了如何使用 ConcurrentMessageListenerContainer

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}

请注意,要设置容器属性,您必须在工厂上使用 getContainerProperties() 方法。它用作注入到容器中的实际属性的模板。

从 2.1.1 版开始,您现在可以为由注释创建的消费者设置 client.id 属性。clientIdPrefix 会附加 -n,其中 n 是一个整数,表示使用并发时的容器号。

从 2.2 版开始,您现在可以通过使用注释本身上面的属性来覆盖容器工厂的 concurrencyautoStartup 属性。这些属性可以是简单值、属性占位符或 SpEL 表达式。以下示例演示了如何这样做:

@KafkaListener(id = "myListener", topics = "myTopic",
        autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
    ...
}

Explicit Partition Assignment

您还可以配置具有显式主题和分区(以及它们的初始偏移量)的 POJO 侦听器。以下示例演示了如何这样做:

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

您可以在 partitionspartitionOffsets 属性中指定每个分区,但不能同时指定。

和大多数注解属性一样,你可以使用 SpEL 表达式;有关如何生成大容量分区列表的示例,请参阅 Manually Assigning All Partitions

从 2.5.5 版开始,您可以对所有分配的分区应用初始偏移量:

@KafkaListener(id = "thing3", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" },
             partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

* 通配符代表 partitions 属性中的所有分区。在每个 @TopicPartition 中只能有一个带有通配符的 @PartitionOffset

此外,当监听器实现 ConsumerSeekAware 时,现在会调用 onPartitionsAssigned,即使使用手动分配也是如此。例如,这允许当时进行任意查找操作。

从 2.6.4 版开始,你可以指定一个由逗号分隔的列表,其中包括分区或分区范围:

@KafkaListener(id = "pp", autoStartup = "false",
        topicPartitions = @TopicPartition(topic = "topic1",
                partitions = "0-5, 7, 10-15"))
public void process(String in) {
    ...
}

该范围是包含性的;上述示例将分配分区 0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15

指定初始偏移量时可以使用相同的方法:

@KafkaListener(id = "thing3", topicPartitions =
        { @TopicPartition(topic = "topic1",
             partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

初始偏移量将应用到所有 6 个分区。

Manual Acknowledgment

使用手动 AckMode 时,也可以向监听器提供 Acknowledgment。以下示例还演示了如何使用不同的容器工厂。

@KafkaListener(id = "cat", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}

Consumer Record Metadata

最后,可以从消息头中获取有关记录的元数据。你可以使用以下标头名称来检索消息的标头:

  • KafkaHeaders.OFFSET

  • KafkaHeaders.RECEIVED_KEY

  • KafkaHeaders.RECEIVED_TOPIC

  • KafkaHeaders.RECEIVED_PARTITION

  • KafkaHeaders.RECEIVED_TIMESTAMP

  • KafkaHeaders.TIMESTAMP_TYPE

从 2.5 版开始,如果传入记录的键为 null,则 RECEIVED_KEY 不存在;先前该标头使用 null 值填充。此更改使框架与 spring-messaging 惯例保持一致,其中不存在 null 值标头。

以下示例演示了如何使用标头:

@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
        ) {
    ...
}

参数注释(@Payload@Header)必须指定在侦听器方法的具体实现中;如果在接口中定义,将不会检测到它们。

从 2.5 版开始,你可以接收 ConsumerRecordMetadata 参数中的记录元数据,而不使用离散标头。

@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
    ...
}

此参数包含 ConsumerRecord 的所有数据,但键和值除外。

Batch Listeners

从 1.1 版开始,你可以配置 @KafkaListener 方法以接收从使用者轮询中收到的所有消费者记录批处理。

Non-Blocking Retries 不支持批处理监听器。

若要配置监听器容器工厂以创建批处理侦听器,可以设置 batchListener 属性。以下示例演示了如何执行此操作:

@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);  // <<<<<<<<<<<<<<<<<<<<<<<<<
   return factory;
}

从 2.8 版本开始,你可以使用 @KafkaListener 注释中的 batch 属性覆盖工厂的 batchListener 属性。此操作与对 Container Error Handlers 的更改一起允许将同一工厂同时用于记录和批处理侦听器。

从版本 2.9.6 开始,容器工厂对 recordMessageConverterbatchMessageConverter 属性有单独的设置器。以前,只有一个同时适用于记录和批处理侦听器的属性 messageConverter

以下示例演示了如何接收有效负载列表:

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
    ...
}

主题、分区、偏移量等信息存在于与有效负载相似的标头中。以下示例演示了如何使用标头:

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
        @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

另外,你可以接收带有每个消息中每个偏移量和其他详细信息的 Message<?> 对象的 List,但它必须是方法上定义的唯一参数(除了可选的 Acknowledgment,在使用手动提交时和/或 Consumer<?, ?> 参数)。以下示例演示了如何执行此操作:

@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen1(List<Message<?>> list) {
    ...
}

@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen2(List<Message<?>> list, Acknowledgment ack) {
    ...
}

@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen3(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
    ...
}

在这种情况下,不会对有效负载进行转换。

如果 BatchMessagingMessageConverter 配置有 RecordMessageConverter,还可以向 Message 参数添加泛型类型,并且有效负载已转换。有关更多信息,请参阅 Payload Conversion with Batch Listeners

你还可以接收一个 ConsumerRecord<?, ?> 对象的列表,但它必须是方法上定义的唯一参数(除了可选的 Acknowledgment,在使用手动提交和 Consumer<?, ?> 参数时)。以下示例演示了如何执行此操作:

@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
    ...
}

@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
    ...
}

从 2.2 版开始,监听器可以接收 poll() 方法返回的完整 ConsumerRecords<?, ?> 对象,从而使监听器能够访问其他方法,例如 partitions()(返回列表中的 TopicPartition 实例)和 records(TopicPartition)(获取选择性记录)。同样,这必须是方法上唯一的参数(除了可选的 Acknowledgment,在使用手动提交或 Consumer<?, ?> 参数时)。以下示例演示了如何执行此操作:

@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
    ...
}

如果容器工厂配置了 RecordFilterStrategy,则会忽略 ConsumerRecords<?, ?> 侦听器,并会发出 WARN 日志消息。只有在使用 <List<?>> 形式的侦听器时,才可以使用批处理侦听器对记录进行筛选。默认情况下,一次筛选一条记录;从版本 2.8 开始,您可以覆盖 filterBatch,以一次调用来筛选整个批处理。

Annotation Properties

从 2.0 版本开始,如果存在 id 属性,将使用它作为 Kafka 消费者 group.id 属性,如果存在,将覆盖消费者工厂中的已配置属性。您还可以显式设置 groupId 或将 idIsGroup 设置为 false,以恢复使用消费者工厂 group.id 的先前行为。

您可以在大多数注释属性中使用属性占位符或 SpEL 表达式,如下例所示:

@KafkaListener(topics = "${some.property}")

@KafkaListener(topics = "#{someBean.someProperty}",
    groupId = "#{someBean.someProperty}.group")

从 2.1.2 版本开始,SpEL 表达式支持一个特殊令牌:__listener。它是一个伪 Bean 名称,表示此注释所在的当前 Bean 实例。

请考虑以下示例:

@Bean
public Listener listener1() {
    return new Listener("topic1");
}

@Bean
public Listener listener2() {
    return new Listener("topic2");
}

给定上例中的 Bean,我们接下来可以使用以下内容:

public class Listener {

    private final String topic;

    public Listener(String topic) {
        this.topic = topic;
    }

    @KafkaListener(topics = "#{__listener.topic}",
        groupId = "#{__listener.topic}.group")
    public void listen(...) {
        ...
    }

    public String getTopic() {
        return this.topic;
    }

}

如果在不太可能的情况下,您有一个实际名为 __listener 的 Bean,则可以通过使用 beanRef 属性更改表达式令牌。以下示例演示如何操作:

@KafkaListener(beanRef = "__x", topics = "#{__x.topic}", groupId = "#{__x.topic}.group")

从 2.2.4 版本开始,您可以在注释上直接指定 Kafka 消费者属性,这样会覆盖在消费者工厂中使用相同名称配置的任何属性。您*不能*通过这种方式指定 group.idclient.id 属性;它们将被忽略;对它们使用 groupIdclientIdPrefix 注释属性。

属性指定为带有正常 Java Properties 文件格式的单个字符串:foo:barfoo=barfoo bar,如下例所示:

@KafkaListener(topics = "myTopic", groupId = "group", properties = {
    "max.poll.interval.ms:60000",
    ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})

以下是在 xref:kafka/sending-messages.adoc#routing-template[Using RoutingKafkaTemplate 中示例的对应侦听器。

@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
    System.out.println("1: " + in);
}

@KafkaListener(id = "two", topics = "two",
        properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
    System.out.println("2: " + new String(in));
}