Transactions
本部分介绍了 Spring for Apache Kafka 如何支持事务。
This section describes how Spring for Apache Kafka supports transactions.
Overview
0.11.0.0 客户端库增加了对事务的支持。Spring for Apache Kafka 通过以下方式添加支持:
The 0.11.0.0 client library added support for transactions. Spring for Apache Kafka adds support in the following ways:
-
KafkaTransactionManager
: Used with normal Spring transaction support (@Transactional
,TransactionTemplate
, etc) -
Transactional
KafkaMessageListenerContainer
-
Local transactions with
KafkaTemplate
-
Transaction synchronization with other transaction managers
通过向 DefaultKafkaProducerFactory
提供 transactionIdPrefix
来启用事务。在这种情况下,工厂维护一个事务生产者缓存,而不是管理一个共享的 Producer
。当用户在 Producer 上调用 close()
时,它将返回到缓存中以供重用,而不是实际关闭。每个生产者的 transactional.id
属性是 transactionIdPrefix
+ n
,其中 n
从 0
开始,并且为每个新生产者递增。在以前版本的 Spring for Apache Kafka 中,transactional.id
是为具有基于记录的监听器的监听器容器启动的事务而生成的,以支持隔离僵尸,现在不再需要,因为 EOSMode.V2
从 3.0 开始是唯一选项。对于以多个实例运行的应用程序,transactionIdPrefix
必须对每个实例都是唯一的。
Transactions are enabled by providing the DefaultKafkaProducerFactory
with a transactionIdPrefix
.
In that case, instead of managing a single shared Producer
, the factory maintains a cache of transactional producers.
When the user calls close()
on a producer, it is returned to the cache for reuse instead of actually being closed.
The transactional.id
property of each producer is transactionIdPrefix
+ n
, where n
starts with 0
and is incremented for each new producer.
In previous versions of Spring for Apache Kafka, the transactional.id
was generated differently for transactions started by a listener container with a record-based listener, to support fencing zombies, which is not necessary any more, with EOSMode.V2
being the only option starting with 3.0.
For applications running with multiple instances, the transactionIdPrefix
must be unique per instance.
另请参阅“@ [29]”。
Also see Exactly Once Semantics.
另请参阅 transactionIdPrefix
。
Also see transactionIdPrefix
.
使用 Spring Boot,只需设置 spring.kafka.producer.transaction-id-prefix
属性——Spring Boot 将自动配置一个 KafkaTransactionManager
bean 并将其连接到监听器容器。
With Spring Boot, it is only necessary to set the spring.kafka.producer.transaction-id-prefix
property - Spring Boot will automatically configure a KafkaTransactionManager
bean and wire it into the listener container.
从版本 2.5.8 开始,您现在可以在生成器工厂上配置 maxAge
属性。在使用事务生成器(可能对代理的 transactional.id.expiration.ms
处于空闲状态)时,这非常有用。使用当前 kafka-clients
,这会导致 ProducerFencedException
,而无需重新平衡。将 maxAge
设置为小于 transactional.id.expiration.ms
时,如果工厂超过其最长生命周期,工厂将会刷新生成器。
Starting with version 2.5.8, you can now configure the maxAge
property on the producer factory.
This is useful when using transactional producers that might lay idle for the broker’s transactional.id.expiration.ms
.
With current kafka-clients
, this can cause a ProducerFencedException
without a rebalance.
By setting the maxAge
to less than transactional.id.expiration.ms
, the factory will refresh the producer if it is past its max age.
Using KafkaTransactionManager
KafkaTransactionManager
是 Spring 框架的 PlatformTransactionManager
的实现。在构造函数中为其提供对生产者工厂的引用。如果提供自定义生产者工厂,则它必须支持事务。参见 ProducerFactory.transactionCapable()
。
The KafkaTransactionManager
is an implementation of Spring Framework’s PlatformTransactionManager
.
It is provided with a reference to the producer factory in its constructor.
If you provide a custom producer factory, it must support transactions.
See ProducerFactory.transactionCapable()
.
可以将 KafkaTransactionManager
与常规 Spring 事务支持(@Transactional
、TransactionTemplate
等)配合使用。如果事务处于活动状态,则在事务范围内执行的任何 KafkaTemplate
操作都将使用事务的 Producer
。管理器根据成功或失败提交或回滚事务。必须将 KafkaTemplate
配置为使用与事务管理器相同的 ProducerFactory
。
You can use the KafkaTransactionManager
with normal Spring transaction support (@Transactional
, TransactionTemplate
, and others).
If a transaction is active, any KafkaTemplate
operations performed within the scope of the transaction use the transaction’s Producer
.
The manager commits or rolls back the transaction, depending on success or failure.
You must configure the KafkaTemplate
to use the same ProducerFactory
as the transaction manager.
Transaction Synchronization
此部分指仅生产者的事务(不是由监听器容器启动的事务);关于当容器启动事务时链接事务的信息,请参阅 Using Consumer-Initiated Transactions。
This section refers to producer-only transactions (transactions not started by a listener container); see Using Consumer-Initiated Transactions for information about chaining transactions when the container starts the transaction.
如果你希望将记录发送到 Kafka 并执行一些数据库更新,可以使用常规的 Spring 事务管理与 DataSourceTransactionManager
配合使用。
If you want to send records to kafka and perform some database updates, you can use normal Spring transaction management with, say, a DataSourceTransactionManager
.
@Transactional
public void process(List<Thing> things) {
things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
updateDb(things);
}
@Transactional
注释的拦截器将启动事务,并且 KafkaTemplate
将与此事务管理器同步事务;每次发送都将参与该事务。当方法退出时,数据库事务将提交,其次是 Kafka 事务。如果你希望以相反的顺序进行提交(首先是 Kafka),请使用嵌套 @Transactional
方法,其中外部方法配置为使用 DataSourceTransactionManager
,而内部方法配置为使用 KafkaTransactionManager
。
The interceptor for the @Transactional
annotation starts the transaction and the KafkaTemplate
will synchronize a transaction with that transaction manager; each send will participate in that transaction.
When the method exits, the database transaction will commit followed by the Kafka transaction.
If you wish the commits to be performed in the reverse order (Kafka first), use nested @Transactional
methods, with the outer method configured to use the DataSourceTransactionManager
, and the inner method configured to use the KafkaTransactionManager
.
关于 Kafka 优先或数据库优先配置中同步 JDBC 和 Kafka 事务的应用程序的示例,请参阅 Examples of Kafka Transactions with Other Transaction Managers。
See Examples of Kafka Transactions with Other Transaction Managers for examples of an application that synchronizes JDBC and Kafka transactions in Kafka-first or DB-first configurations.
从版本 2.5.17、2.6.12、2.7.9 和 2.8.0 开始,如果在同步事务(主要事务提交后)中提交失败,则会将异常抛给调用者。过去,此异常会被忽略(记录为调试级别)。如果有必要,应用程序应采取补救措施以弥补已提交的主要事务。 |
Starting with versions 2.5.17, 2.6.12, 2.7.9 and 2.8.0, if the commit fails on the synchronized transaction (after the primary transaction has committed), the exception will be thrown to the caller. Previously, this was silently ignored (logged at debug level). Applications should take remedial action, if necessary, to compensate for the committed primary transaction. |
Using Consumer-Initiated Transactions
自版本 2.7 起,ChainedKafkaTransactionManager
已弃用;有关更多信息,请参阅其父类 ChainedTransactionManager
的 JavaDoc。相反,在容器中使用 KafkaTransactionManager
来启动 Kafka 事务,并使用 @Transactional
对侦听器方法进行注释以启动另一个事务。
The ChainedKafkaTransactionManager
is now deprecated, since version 2.7; see the JavaDocs for its super class ChainedTransactionManager
for more information.
Instead, use a KafkaTransactionManager
in the container to start the Kafka transaction and annotate the listener method with @Transactional
to start the other transaction.
关于链接 JDBC 和 Kafka 事务的示例应用程序,请参阅 Examples of Kafka Transactions with Other Transaction Managers。
See Examples of Kafka Transactions with Other Transaction Managers for an example application that chains JDBC and Kafka transactions.
Non-Blocking Retries 不能与 Container Transactions 结合使用。当侦听器代码抛出异常时,容器事务提交成功,并且记录发送到可重试的主题。
Non-Blocking Retries cannot combine with Container Transactions. When the listener code throws an exception, container transaction commit succeeds, and the record is sent to the retryable topic.
KafkaTemplate
Local Transactions
可以使用 KafkaTemplate
在本地事务内执行一系列操作。以下示例演示了如何进行操作:
You can use the KafkaTemplate
to execute a series of operations within a local transaction.
The following example shows how to do so:
boolean result = template.executeInTransaction(t -> {
t.sendDefault("thing1", "thing2");
t.sendDefault("cat", "hat");
return true;
});
回调中的参数是模板自身(this
)。如果回调正常退出,事务将进行提交。如果抛出异常,事务将回滚。
The argument in the callback is the template itself (this
).
If the callback exits normally, the transaction is committed.
If an exception is thrown, the transaction is rolled back.
如果有 |
If there is a |
TransactionIdPrefix
对于唯一的受支持模式 EOSMode.V2
(也称作 BETA
),即使对于消费者发起的的交易,也不再需要使用相同的 transactional.id
;实际上,它在每个实例上的值必须与生产者发起的交易相同,即唯一值。此属性在每个应用程序实例上必须具有不同的值。
With EOSMode.V2
(aka BETA
), the only supported mode, it is no longer necessary to use the same transactional.id
, even for consumer-initiated transactions; in fact, it must be unique on each instance the same as for producer-initiated transactions.
This property must have a different value on each application instance.
TransactionIdSuffix Fixed
自 3.2 起,引入了一个新的 TransactionIdSuffixStrategy
接口来管理 transactional.id
的后缀。当 maxCache
设置为大于 0 时,默认实现 DefaultTransactionIdSuffixStrategy
可以在特定范围内重用 transactional.id
,否则将在运行时通过递增计数器生成后缀。当请求事务生产者且所有 transactional.id
都在使用中时,会抛出 NoProducerAvailableException
。然后,用户可以使用配置为重试此异常的 RetryTemplate
,并配置适当的后退。
Since 3.2, a new TransactionIdSuffixStrategy
interface was introduced to manage transactional.id
suffix.
The default implementation is DefaultTransactionIdSuffixStrategy
when setting maxCache
greater than zero can reuse transactional.id
within a specific range, otherwise suffixes will be generated on the fly by incrementing a counter.
When a transaction producer is requested and transactional.id
all in use, throw a NoProducerAvailableException
.
User can then use a RetryTemplate
configured to retry that exception, with a suitably configured back off.
public static class Config {
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
pf.setTransactionIdSuffixStrategy(ss);
return pf;
}
}
当 maxCache
设置为 5 时,transactional.id
为 my.txid.
+{0-4}
。
When setting maxCache
to 5, transactional.id
is my.txid.
+{0-4}
.
在将 KafkaTransactionManager
与 ConcurrentMessageListenerContainer
一起使用并启用 maxCache
时,有必要将 maxCache
设置为大于或等于 concurrency
的值。如果 MessageListenerContainer
无法获取 transactional.id
后缀,它将抛出 NoProducerAvailableException
。在 ConcurrentMessageListenerContainer
中使用嵌套事务时,有必要调整 maxCache 设置以处理增加的嵌套事务数量。
When using KafkaTransactionManager
with the ConcurrentMessageListenerContainer
and enabling maxCache
, it is necessary to set maxCache
to a value greater than or equal to concurrency
.
If a MessageListenerContainer
is unable to acquire a transactional.id
suffix, it will throw a NoProducerAvailableException
.
When using nested transactions in the ConcurrentMessageListenerContainer
, it is necessary to adjust the maxCache setting to handle the increased number of nested transactions.
KafkaTemplate
Transactional and non-Transactional Publishing
通常,当 KafkaTemplate
是事务性的(使用具有事务功能的生产者工厂配置)时,需要事务。事务可以通过 TransactionTemplate
、@Transactional
方法(调用 executeInTransaction
)、或通过使用 KafkaTransactionManager
配置的监听器容器来启动。在事务范围之外使用模板的任何尝试都会导致模板抛出 IllegalStateException
。从版本 2.4.3 开始,你可以将模板的 allowNonTransactional
属性设置为 true
。在这种情况下,模板将允许操作在没有事务的情况下运行,通过调用 ProducerFactory’s `createNonTransactionalProducer()
方法;生产者将被缓存或线程绑定,正常用于重用。请参阅 Using DefaultKafkaProducerFactory
。
Normally, when a KafkaTemplate
is transactional (configured with a transaction-capable producer factory), transactions are required.
The transaction can be started by a TransactionTemplate
, a @Transactional
method, calling executeInTransaction
, or by a listener container, when configured with a KafkaTransactionManager
.
Any attempt to use the template outside the scope of a transaction results in the template throwing an IllegalStateException
.
Starting with version 2.4.3, you can set the template’s allowNonTransactional
property to true
.
In that case, the template will allow the operation to run without a transaction, by calling the ProducerFactory’s `createNonTransactionalProducer()
method; the producer will be cached, or thread-bound, as normal for reuse.
See Using DefaultKafkaProducerFactory
.
Transactions with Batch Listeners
在使用事务时,如果监听器失败,AfterRollbackProcessor
将被调用,以便在回滚发生后执行一些操作。对记录监听器使用默认 AfterRollbackProcessor
时,将执行查询以重新传送失败的记录。但是,使用批量监听器时,将重新传送整个批量,因为框架不知道该批量中的哪条记录失败了。有关更多信息,请参阅 After-rollback Processor。
When a listener fails while transactions are being used, the AfterRollbackProcessor
is invoked to take some action after the rollback occurs.
When using the default AfterRollbackProcessor
with a record listener, seeks are performed so that the failed record will be redelivered.
With a batch listener, however, the whole batch will be redelivered because the framework doesn’t know which record in the batch failed.
See After-rollback Processor for more information.
在使用批量监听器时,版本 2.4.2 引入了一种替代机制来处理批处理过程中的故障:BatchToRecordAdapter
。当将设置 batchListener
的容器工厂设置为 true 以使用 BatchToRecordAdapter
时,一次会调用监听器一条记录。这样可以在批处理中进行错误处理,同时仍有可能根据异常类型停止处理整个批处理。提供了一个默认 BatchToRecordAdapter
,可以通过标准 ConsumerRecordRecoverer
(例如 DeadLetterPublishingRecoverer
)进行配置。以下测试用例配置代码段演示了如何使用此功能:
When using a batch listener, version 2.4.2 introduced an alternative mechanism to deal with failures while processing a batch: BatchToRecordAdapter
.
When a container factory with batchListener
set to true is configured with a BatchToRecordAdapter
, the listener is invoked with one record at a time.
This enables error handling within the batch, while still making it possible to stop processing the entire batch, depending on the exception type.
A default BatchToRecordAdapter
is provided, that can be configured with a standard ConsumerRecordRecoverer
such as the DeadLetterPublishingRecoverer
.
The following test case configuration snippet illustrates how to use this feature:
public static class TestListener {
final List<String> values = new ArrayList<>();
@KafkaListener(id = "batchRecordAdapter", topics = "test")
public void listen(String data) {
values.add(data);
if ("bar".equals(data)) {
throw new RuntimeException("reject partial");
}
}
}
@Configuration
@EnableKafka
public static class Config {
ConsumerRecord<?, ?> failed;
@Bean
public TestListener test() {
return new TestListener();
}
@Bean
public ConsumerFactory<?, ?> consumerFactory() {
return mock(ConsumerFactory.class);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) -> {
this.failed = record;
}));
return factory;
}
}