Producer Interceptor Managed in Spring

从 3.0.0 版本开始,对于生产者拦截器,你可以让 Spring 将其直接作为 bean 来管理,而不是将拦截器的类名提供给 Apache Kafka 生产者配置。如果你采用此方法,则需要在 KafkaTemplate 上设置此生产者拦截器。以下是一个使用上面相同的 MyProducerInterceptor 的示例,但已更改为不使用内部配置属性。

Starting with version 3.0.0, when it comes to a producer interceptor, you can let Spring manage it directly as a bean instead of providing the class name of the interceptor to the Apache Kafka producer configuration. If you go with this approach, then you need to set this producer interceptor on KafkaTemplate. Following is an example using the same MyProducerInterceptor from above, but changed to not use the internal config property.

public class MyProducerInterceptor implements ProducerInterceptor<String, String> {

    private final SomeBean bean;

    public MyProducerInterceptor(SomeBean bean) {
        this.bean = bean;
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        this.bean.someMethod("producer interceptor");
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }

    @Override
    public void close() {
    }

}
@Bean
public MyProducerInterceptor myProducerInterceptor(SomeBean someBean) {
  return new MyProducerInterceptor(someBean);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf, MyProducerInterceptor myProducerInterceptor) {
   KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(pf);
   kafkaTemplate.setProducerInterceptor(myProducerInterceptor);
}

在发送记录之前,会调用生产者拦截器的 onSend 方法。一旦服务器发送对发布数据的确认,就会调用 onAcknowledgement 方法。onAcknowledgement 在生产者调用任何用户回调之前被调用。

Right before the records are sent, the onSend method of the producer interceptor is invoked. Once the server sends an acknowledgement on publishing the data, then the onAcknowledgement method is invoked. The onAcknowledgement is called right before the producer invokes any user callbacks.

如果你有多个此类通过 Spring 管理的生产者拦截器需要在 KafkaTemplate 上应用,你需要使用 CompositeProducerInterceptorCompositeProducerInterceptor 允许按顺序添加各个生产者拦截器。从底层 ProducerInterceptor 实现调用的方法按添加到 CompositeProducerInterceptor 的顺序调用。

If you have multiple such producer interceptors managed through Spring that need to be applied on the KafkaTemplate, you need to use CompositeProducerInterceptor instead. CompositeProducerInterceptor allows individual producer interceptors to be added in order. The methods from the underlying ProducerInterceptor implementations are invoked in the order as they were added to the CompositeProducerInterceptor.