Producer Interceptor Managed in Spring
从 3.0.0 版本开始,对于生产者拦截器,你可以让 Spring 将其直接作为 bean 来管理,而不是将拦截器的类名提供给 Apache Kafka 生产者配置。如果你采用此方法,则需要在 KafkaTemplate
上设置此生产者拦截器。以下是一个使用上面相同的 MyProducerInterceptor
的示例,但已更改为不使用内部配置属性。
Starting with version 3.0.0, when it comes to a producer interceptor, you can let Spring manage it directly as a bean instead of providing the class name of the interceptor to the Apache Kafka producer configuration.
If you go with this approach, then you need to set this producer interceptor on KafkaTemplate
.
Following is an example using the same MyProducerInterceptor
from above, but changed to not use the internal config property.
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
private final SomeBean bean;
public MyProducerInterceptor(SomeBean bean) {
this.bean = bean;
}
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
this.bean.someMethod("producer interceptor");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
@Bean
public MyProducerInterceptor myProducerInterceptor(SomeBean someBean) {
return new MyProducerInterceptor(someBean);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf, MyProducerInterceptor myProducerInterceptor) {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(pf);
kafkaTemplate.setProducerInterceptor(myProducerInterceptor);
}
在发送记录之前,会调用生产者拦截器的 onSend
方法。一旦服务器发送对发布数据的确认,就会调用 onAcknowledgement
方法。onAcknowledgement
在生产者调用任何用户回调之前被调用。
Right before the records are sent, the onSend
method of the producer interceptor is invoked.
Once the server sends an acknowledgement on publishing the data, then the onAcknowledgement
method is invoked.
The onAcknowledgement
is called right before the producer invokes any user callbacks.
如果你有多个此类通过 Spring 管理的生产者拦截器需要在 KafkaTemplate
上应用,你需要使用 CompositeProducerInterceptor
。CompositeProducerInterceptor
允许按顺序添加各个生产者拦截器。从底层 ProducerInterceptor
实现调用的方法按添加到 CompositeProducerInterceptor
的顺序调用。
If you have multiple such producer interceptors managed through Spring that need to be applied on the KafkaTemplate
, you need to use CompositeProducerInterceptor
instead.
CompositeProducerInterceptor
allows individual producer interceptors to be added in order.
The methods from the underlying ProducerInterceptor
implementations are invoked in the order as they were added to the CompositeProducerInterceptor
.