Transactions

本部分介绍了 Spring for Apache Kafka 如何支持事务。

Overview

0.11.0.0 客户端库增加了对事务的支持。Spring for Apache Kafka 通过以下方式添加支持:

  • KafkaTransactionManager: 与常规 Spring 事务支持 (@Transactional, `TransactionTemplate`等)一起使用

  • Transactional KafkaMessageListenerContainer

  • Local transactions with KafkaTemplate

  • 事务与其他事务管理器的同步

通过向 DefaultKafkaProducerFactory 提供 transactionIdPrefix 来启用事务。在这种情况下,工厂维护一个事务生产者缓存,而不是管理一个共享的 Producer。当用户在 Producer 上调用 close() 时,它将返回到缓存中以供重用,而不是实际关闭。每个生产者的 transactional.id 属性是 transactionIdPrefix + n,其中 n0 开始,并且为每个新生产者递增。在以前版本的 Spring for Apache Kafka 中,transactional.id 是为具有基于记录的监听器的监听器容器启动的事务而生成的,以支持隔离僵尸,现在不再需要,因为 EOSMode.V2 从 3.0 开始是唯一选项。对于以多个实例运行的应用程序,transactionIdPrefix 必须对每个实例都是唯一的。

另请参阅“@ [29]”。

另请参阅 transactionIdPrefix

使用 Spring Boot,只需设置 spring.kafka.producer.transaction-id-prefix 属性——Spring Boot 将自动配置一个 KafkaTransactionManager bean 并将其连接到监听器容器。

从版本 2.5.8 开始,您现在可以在生成器工厂上配置 maxAge 属性。在使用事务生成器(可能对代理的 transactional.id.expiration.ms 处于空闲状态)时,这非常有用。使用当前 kafka-clients,这会导致 ProducerFencedException,而无需重新平衡。将 maxAge 设置为小于 transactional.id.expiration.ms 时,如果工厂超过其最长生命周期,工厂将会刷新生成器。

Using KafkaTransactionManager

KafkaTransactionManager 是 Spring 框架的 PlatformTransactionManager 的实现。在构造函数中为其提供对生产者工厂的引用。如果提供自定义生产者工厂,则它必须支持事务。参见 ProducerFactory.transactionCapable()

可以将 KafkaTransactionManager 与常规 Spring 事务支持(@TransactionalTransactionTemplate 等)配合使用。如果事务处于活动状态,则在事务范围内执行的任何 KafkaTemplate 操作都将使用事务的 Producer。管理器根据成功或失败提交或回滚事务。必须将 KafkaTemplate 配置为使用与事务管理器相同的 ProducerFactory

Transaction Synchronization

此部分指仅生产者的事务(不是由监听器容器启动的事务);关于当容器启动事务时链接事务的信息,请参阅 Using Consumer-Initiated Transactions

如果你希望将记录发送到 Kafka 并执行一些数据库更新,可以使用常规的 Spring 事务管理与 DataSourceTransactionManager 配合使用。

@Transactional
public void process(List<Thing> things) {
    things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
    updateDb(things);
}

@Transactional 注释的拦截器将启动事务,并且 KafkaTemplate 将与此事务管理器同步事务;每次发送都将参与该事务。当方法退出时,数据库事务将提交,其次是 Kafka 事务。如果你希望以相反的顺序进行提交(首先是 Kafka),请使用嵌套 @Transactional 方法,其中外部方法配置为使用 DataSourceTransactionManager,而内部方法配置为使用 KafkaTransactionManager

关于 Kafka 优先或数据库优先配置中同步 JDBC 和 Kafka 事务的应用程序的示例,请参阅 Examples of Kafka Transactions with Other Transaction Managers

从版本 2.5.17、2.6.12、2.7.9 和 2.8.0 开始,如果在同步事务(主要事务提交后)中提交失败,则会将异常抛给调用者。过去,此异常会被忽略(记录为调试级别)。如果有必要,应用程序应采取补救措施以弥补已提交的主要事务。

Using Consumer-Initiated Transactions

自版本 2.7 起,ChainedKafkaTransactionManager 已弃用;有关更多信息,请参阅其父类 ChainedTransactionManager 的 JavaDoc。相反,在容器中使用 KafkaTransactionManager 来启动 Kafka 事务,并使用 @Transactional 对侦听器方法进行注释以启动另一个事务。

关于链接 JDBC 和 Kafka 事务的示例应用程序,请参阅 Examples of Kafka Transactions with Other Transaction Managers

Non-Blocking Retries 不能与 Container Transactions 结合使用。当侦听器代码抛出异常时,容器事务提交成功,并且记录发送到可重试的主题。

KafkaTemplate Local Transactions

可以使用 KafkaTemplate 在本地事务内执行一系列操作。以下示例演示了如何进行操作:

boolean result = template.executeInTransaction(t -> {
    t.sendDefault("thing1", "thing2");
    t.sendDefault("cat", "hat");
    return true;
});

回调中的参数是模板自身(this)。如果回调正常退出,事务将进行提交。如果抛出异常,事务将回滚。

如果有 KafkaTransactionManager(或同步)事务正在进行,则不会使用该事务。相反,会使用新的“嵌套”事务。

TransactionIdPrefix

对于唯一的受支持模式 EOSMode.V2(也称作 BETA),即使对于消费者发起的的交易,也不再需要使用相同的 transactional.id;实际上,它在每个实例上的值必须与生产者发起的交易相同,即唯一值。此属性在每个应用程序实例上必须具有不同的值。

TransactionIdSuffix Fixed

自 3.2 起,引入了一个新的 TransactionIdSuffixStrategy 接口来管理 transactional.id 的后缀。当 maxCache 设置为大于 0 时,默认实现 DefaultTransactionIdSuffixStrategy 可以在特定范围内重用 transactional.id,否则将在运行时通过递增计数器生成后缀。当请求事务生产者且所有 transactional.id 都在使用中时,会抛出 NoProducerAvailableException。然后,用户可以使用配置为重试此异常的 RetryTemplate,并配置适当的后退。

public static class Config {

    @Bean
    public ProducerFactory<String, String> myProducerFactory() {
        Map<String, Object> configs = producerConfigs();
        configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
        ...
        DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
        ...
        TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
        pf.setTransactionIdSuffixStrategy(ss);
        return pf;
    }

}

maxCache 设置为 5 时,transactional.idmy.txid.+{0-4}

在将 KafkaTransactionManagerConcurrentMessageListenerContainer 一起使用并启用 maxCache 时,有必要将 maxCache 设置为大于或等于 concurrency 的值。如果 MessageListenerContainer 无法获取 transactional.id 后缀,它将抛出 NoProducerAvailableException。在 ConcurrentMessageListenerContainer 中使用嵌套事务时,有必要调整 maxCache 设置以处理增加的嵌套事务数量。

KafkaTemplate Transactional and non-Transactional Publishing

通常,当 KafkaTemplate 是事务性的(使用具有事务功能的生产者工厂配置)时,需要事务。事务可以通过 TransactionTemplate@Transactional 方法(调用 executeInTransaction)、或通过使用 KafkaTransactionManager 配置的监听器容器来启动。在事务范围之外使用模板的任何尝试都会导致模板抛出 IllegalStateException。从版本 2.4.3 开始,你可以将模板的 allowNonTransactional 属性设置为 true。在这种情况下,模板将允许操作在没有事务的情况下运行,通过调用 ProducerFactory’s `createNonTransactionalProducer() 方法;生产者将被缓存或线程绑定,正常用于重用。请参阅 Using DefaultKafkaProducerFactory

Transactions with Batch Listeners

在使用事务时,如果监听器失败,AfterRollbackProcessor 将被调用,以便在回滚发生后执行一些操作。对记录监听器使用默认 AfterRollbackProcessor 时,将执行查询以重新传送失败的记录。但是,使用批量监听器时,将重新传送整个批量,因为框架不知道该批量中的哪条记录失败了。有关更多信息,请参阅 After-rollback Processor

在使用批量监听器时,版本 2.4.2 引入了一种替代机制来处理批处理过程中的故障:BatchToRecordAdapter。当将设置 batchListener 的容器工厂设置为 true 以使用 BatchToRecordAdapter 时,一次会调用监听器一条记录。这样可以在批处理中进行错误处理,同时仍有可能根据异常类型停止处理整个批处理。提供了一个默认 BatchToRecordAdapter,可以通过标准 ConsumerRecordRecoverer(例如 DeadLetterPublishingRecoverer)进行配置。以下测试用例配置代码段演示了如何使用此功能:

public static class TestListener {

    final List<String> values = new ArrayList<>();

    @KafkaListener(id = "batchRecordAdapter", topics = "test")
    public void listen(String data) {
        values.add(data);
        if ("bar".equals(data)) {
            throw new RuntimeException("reject partial");
        }
    }

}

@Configuration
@EnableKafka
public static class Config {

    ConsumerRecord<?, ?> failed;

    @Bean
    public TestListener test() {
        return new TestListener();
    }

    @Bean
    public ConsumerFactory<?, ?> consumerFactory() {
        return mock(ConsumerFactory.class);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) ->  {
            this.failed = record;
        }));
        return factory;
    }

}