Transactional Binder
通过将 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
设置为非空值来启用事务,例如 tx-
。当在处理器应用程序中使用时,消费者将启动事务;消费者线程上发送的任何记录都参与同一个事务。当侦听器正常退出时,侦听器容器将把偏移发送到事务并提交。针对所有使用 spring.cloud.stream.kafka.binder.transaction.producer.*
属性配置的生产者绑定使用一个通用的生产者工厂;将忽略单个绑定 Kafka 生产者属性。
正常的粘合剂重试(和死信)不受事务支持,因为重试将在原始事务中运行,而原始事务可能会回滚并且任何已发布的记录也将回滚。当启用重试(公共属性 maxAttempts
大于零)时,重试属性用于配置一个 DefaultAfterRollbackProcessor
,以便在容器级别启用重试。类似地,此功能不像在事务中发布死信记录,而是通过 DefaultAfterRollbackProcessor
转移到侦听器容器中,后者在主事务回滚后运行。
如果你希望在源应用程序中或从某些任意线程针对仅生产者的事务(例如,@Scheduled
方法)使用事务,你必须获取事务生产者工厂引用,并使用它定义 KafkaTransactionManager
bean。
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
@Value("${unique.tx.id.per.instance}") String txId) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
tm.setTransactionId(txId)
return tm;
}
请注意,我们使用 BinderFactory
获取了 Binder 的引用;当仅配置了一个 Binder 时,在第一个参数中使用 null
。如果配置了多个 Binder,请使用 Binder 名称获取引用。一旦我们有 Binder 的引用,即可获取 ProducerFactory
的引用并创建事务管理器。
然后,你可以使用常规 Spring 事务支持(例如,TransactionTemplate
或 @Transactional
),例如:
public static class Sender {
@Transactional
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
}
}
如果你希望将仅生产者事务与那些来自其他事务管理器的同步,请使用 ChainedTransactionManager
。
如果您部署应用程序的多个实例,则每个实例都需要唯一的 transactionIdPrefix
。