Tips, Tricks and Examples

Manually Assigning All Partitions

假设您始终希望读取所有分区的所有记录(例如在使用紧凑主题加载分布式缓存时),手动分配分区并避免使用 Kafka 的组管理可能很有用。因为您必须列出分区,所以在分区较多时这样做会很麻烦。如果分区数量随时间变化,这也是一个问题,因为分区数量每次变化时您都必须重新编译应用程序。

Let’s say you want to always read all records from all partitions (such as when using a compacted topic to load a distributed cache), it can be useful to manually assign the partitions and not use Kafka’s group management. Doing so can be unwieldy when there are many partitions, because you have to list the partitions. It’s also an issue if the number of partitions changes over time, because you would have to recompile your application each time the partition count changes.

以下是此应用程序启动时如何使用 SpEL 表达式的力量动态创建分区列表的示例:

The following is an example of how to use the power of a SpEL expression to create the partition list dynamically when the application starts:

@KafkaListener(topicPartitions = @TopicPartition(topic = "compacted",
            partitions = "#{@finder.partitions('compacted')}"),
            partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")))
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
    ...
}

@Bean
public PartitionFinder finder(ConsumerFactory<String, String> consumerFactory) {
    return new PartitionFinder(consumerFactory);
}

public static class PartitionFinder {

    private final ConsumerFactory<String, String> consumerFactory;

    public PartitionFinder(ConsumerFactory<String, String> consumerFactory) {
        this.consumerFactory = consumerFactory;
    }

    public String[] partitions(String topic) {
        try (Consumer<String, String> consumer = consumerFactory.createConsumer()) {
            return consumer.partitionsFor(topic).stream()
                .map(pi -> "" + pi.partition())
                .toArray(String[]::new);
        }
    }

}

将此功能与 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG=earliest`结合使用,将在每次启动应用程序时加载所有记录。您还应将容器的 `AckMode`设置为 `MANUAL,以防止容器提交 null`消费组的偏移量。从 3.1 版开始,当与手动主题分配一起使用但无消费 `group.id`时,容器会自动将 `AckMode`强制转换为 `MANUAL。但是,从 2.5.5 版开始,如上所示,您可以将初始偏移量应用到所有分区;有关详细信息,请参阅 Explicit Partition Assignment

Using this in conjunction with ConsumerConfig.AUTO_OFFSET_RESET_CONFIG=earliest will load all records each time the application is started. You should also set the container’s AckMode to MANUAL to prevent the container from committing offsets for a null consumer group. Starting with version 3.1, the container will automatically coerce the AckMode to MANUAL when manual topic assignment is used with no consumer group.id. However, starting with version 2.5.5, as shown above, you can apply an initial offset to all partitions; see Explicit Partition Assignment for more information.

Examples of Kafka Transactions with Other Transaction Managers

以下 Spring Boot 应用程序是链接数据库的事务和 Kafka 事务的示例。侦听器容器启动 Kafka 事务,而 @Transactional 注解启动 DB 事务。DB 事务首先提交;如果 Kafka 事务提交失败,记录将被重新交付,因此 DB 更新应该是幂等的。

The following Spring Boot application is an example of chaining database and Kafka transactions. The listener container starts the Kafka transaction and the @Transactional annotation starts the DB transaction. The DB transaction is committed first; if the Kafka transaction fails to commit, the record will be redelivered so the DB update should be idempotent.

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> template.executeInTransaction(t -> t.send("topic1", "test"));
    }

    @Bean
    public DataSourceTransactionManager dstm(DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

    @Component
    public static class Listener {

        private final JdbcTemplate jdbcTemplate;

        private final KafkaTemplate<String, String> kafkaTemplate;

        public Listener(JdbcTemplate jdbcTemplate, KafkaTemplate<String, String> kafkaTemplate) {
            this.jdbcTemplate = jdbcTemplate;
            this.kafkaTemplate = kafkaTemplate;
        }

        @KafkaListener(id = "group1", topics = "topic1")
        @Transactional("dstm")
        public void listen1(String in) {
            this.kafkaTemplate.send("topic2", in.toUpperCase());
            this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");
        }

        @KafkaListener(id = "group2", topics = "topic2")
        public void listen2(String in) {
            System.out.println(in);
        }

    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("topic1").build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("topic2").build();
    }

}
spring.datasource.url=jdbc:mysql://localhost/integration?serverTimezone=UTC
spring.datasource.username=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.properties.isolation.level=read_committed

spring.kafka.producer.transaction-id-prefix=tx-

#logging.level.org.springframework.transaction=trace
#logging.level.org.springframework.kafka.transaction=debug
#logging.level.org.springframework.jdbc=debug
create table mytable (data varchar(20));

对于仅生产者的事务,事务同步有效:

For producer-only transactions, transaction synchronization works:

@Transactional("dstm")
public void someMethod(String in) {
    this.kafkaTemplate.send("topic2", in.toUpperCase());
    this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");
}

KafkaTemplate 将与其事务同步 DB 事务,并且提交/回滚在数据库之后发生。

The KafkaTemplate will synchronize its transaction with the DB transaction and the commit/rollback occurs after the database.

如果你希望首先提交 Kafka 事务,并且仅在 Kafka 事务成功时才提交 DB 事务,请使用嵌套 @Transactional 方法:

If you wish to commit the Kafka transaction first, and only commit the DB transaction if the Kafka transaction is successful, use nested @Transactional methods:

@Transactional("dstm")
public void someMethod(String in) {
    this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");
    sendToKafka(in);
}

@Transactional("kafkaTransactionManager")
public void sendToKafka(String in) {
    this.kafkaTemplate.send("topic2", in.toUpperCase());
}

Customizing the JsonSerializer and JsonDeserializer

序列化程序和反序列化程序支持使用属性进行一些自定义,有关详细信息,请参阅 JSON。除非您将它们直接注入到消费者和生产者工厂中,否则 kafka-clients`代码(而非 Spring)会实例化这些对象。如果您希望使用属性配置(反)序列化程序,但又希望使用(说)一个自定义 `ObjectMapper,只需创建一个子类并将自定义映射程序传递给 `super`构造函数。例如:

The serializer and deserializer support a number of cusomizations using properties, see JSON for more information. The kafka-clients code, not Spring, instantiates these objects, unless you inject them directly into the consumer and producer factories. If you wish to configure the (de)serializer using properties, but wish to use, say, a custom ObjectMapper, simply create a subclass and pass the custom mapper into the super constructor. For example:

public class CustomJsonSerializer extends JsonSerializer<Object> {

    public CustomJsonSerializer() {
        super(customizedObjectMapper());
    }

    private static ObjectMapper customizedObjectMapper() {
        ObjectMapper mapper = JacksonUtils.enhancedObjectMapper();
        mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        return mapper;
    }

}