Transactional Binder

通过将 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix 设置为非空值来启用事务,例如 tx-。当在处理器应用程序中使用时,消费者将启动事务;消费者线程上发送的任何记录都参与同一个事务。当侦听器正常退出时,侦听器容器将把偏移发送到事务并提交。针对所有使用 spring.cloud.stream.kafka.binder.transaction.producer.* 属性配置的生产者绑定使用一个通用的生产者工厂;将忽略单个绑定 Kafka 生产者属性。

Enable transactions by setting spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix to a non-empty value, e.g. tx-. When used in a processor application, the consumer starts the transaction; any records sent on the consumer thread participate in the same transaction. When the listener exits normally, the listener container will send the offset to the transaction and commit it. A common producer factory is used for all producer bindings configured using spring.cloud.stream.kafka.binder.transaction.producer.* properties; individual binding Kafka producer properties are ignored.

正常的粘合剂重试(和死信)不受事务支持,因为重试将在原始事务中运行,而原始事务可能会回滚并且任何已发布的记录也将回滚。当启用重试(公共属性 maxAttempts 大于零)时,重试属性用于配置一个 DefaultAfterRollbackProcessor,以便在容器级别启用重试。类似地,此功能不像在事务中发布死信记录,而是通过 DefaultAfterRollbackProcessor 转移到侦听器容器中,后者在主事务回滚后运行。

Normal binder retries (and dead lettering) are not supported with transactions because the retries will run in the original transaction, which may be rolled back and any published records will be rolled back too. When retries are enabled (the common property maxAttempts is greater than zero) the retry properties are used to configure a DefaultAfterRollbackProcessor to enable retries at the container level. Similarly, instead of publishing dead-letter records within the transaction, this functionality is moved to the listener container, again via the DefaultAfterRollbackProcessor which runs after the main transaction has rolled back.

如果你希望在源应用程序中或从某些任意线程针对仅生产者的事务(例如,@Scheduled 方法)使用事务,你必须获取事务生产者工厂引用,并使用它定义 KafkaTransactionManager bean。

If you wish to use transactions in a source application, or from some arbitrary thread for producer-only transaction (e.g. @Scheduled method), you must get a reference to the transactional producer factory and define a KafkaTransactionManager bean using it.

@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
        @Value("${unique.tx.id.per.instance}") String txId) {

    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
            MessageChannel.class)).getTransactionalProducerFactory();
    KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
    tm.setTransactionId(txId)
    return tm;
}

请注意,我们使用 BinderFactory 获取了 Binder 的引用;当仅配置了一个 Binder 时,在第一个参数中使用 null。如果配置了多个 Binder,请使用 Binder 名称获取引用。一旦我们有 Binder 的引用,即可获取 ProducerFactory 的引用并创建事务管理器。

Notice that we get a reference to the binder using the BinderFactory; use null in the first argument when there is only one binder configured. If more than one binder is configured, use the binder name to get the reference. Once we have a reference to the binder, we can obtain a reference to the ProducerFactory and create a transaction manager.

然后,你可以使用常规 Spring 事务支持(例如,TransactionTemplate@Transactional),例如:

Then you would use normal Spring transaction support, e.g. TransactionTemplate or @Transactional, for example:

public static class Sender {

    @Transactional
    public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
        stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
    }

}

如果你希望将仅生产者事务与那些来自其他事务管理器的同步,请使用 ChainedTransactionManager

If you wish to synchronize producer-only transactions with those from some other transaction manager, use a ChainedTransactionManager.

如果您部署应用程序的多个实例,则每个实例都需要唯一的 transactionIdPrefix

If you deploy multiple instances of your application, each instance needs a unique transactionIdPrefix.