Custom Kafka Binder Health Indicator
Overriding Default Kafka Binder Health Indicator
在类路径上存在 Spring Boot actuator 时,Kafka binder 会激活默认运行状况指示器。此运行状况指示器会检查 binder 的运行状况以及与 Kafka 代理的任何通信问题。如果应用程序希望禁用此默认运行状况检查实现并包含自定义实现,那么它可以为 KafkaBinderHealth
接口提供实现。KafkaBinderHealth
是从 HealthIndicator
扩展的标记接口。在自定义实现中,它必须为 health()
方法提供实现。自定义实现必须作为 bean 存在于应用程序配置中。当 binder 发现自定义实现时,它将使用它而不是默认实现。以下是该应用程序中此类自定义实现 bean 的一个示例。
Kafka binder activates a default health indicator when Spring Boot actuator is on the classpath.
This health indicator checks the health of the binder and any communication issues with the Kafka broker.
If an application wants to disable this default health check implementation and include a custom implementation, then it can provide an implementation for KafkaBinderHealth
interface.
KafkaBinderHealth
is a marker interface that extends from HealthIndicator
.
In the custom implementation, it must provide an implementation for the health()
method.
The custom implementation must be present in the application configuration as a bean.
When the binder discovers the custom implementation, it will use that instead of the default implementation.
Here is an example of such a custom implementation bean in the application.
@Bean
public KafkaBinderHealth kafkaBinderHealthIndicator() {
return new KafkaBinderHealth() {
@Override
public Health health() {
// custom implementation details.
}
};
}
Custom kafka Binder Health Indicator Example
以下是可以编写自定义 Kafka binder HealthIndicator 的伪代码。在此示例中,我们尝试通过首先专门检查群集连接性,然后检查主题相关问题,来覆盖由 binder 提供的 Kafka HealthIndicator。
Here is the pseudo-code for writing a custom Kafka binder HealthIndicator. In this example, we try to override the binder provided Kafka HealthIndicator by specifically checking first for cluster connectivity and then followed by topic-related issues.
首先,我们需要创建 KafkaBinderHealth
接口的自定义实现。
First, we need create a custom implementation of the KafkaBinderHealth
interface.
public class KafkaBinderHealthImplementation implements KafkaBinderHealth {
@Value("${spring.cloud.bus.destination}")
private String topic;
private final AdminClient client;
public KafkaBinderHealthImplementation(final KafkaAdmin admin) {
// More about configuring Kafka
// https://docs.spring.io/spring-kafka/reference/html/#configuring-topics
this.client = AdminClient.create(admin.getConfigurationProperties());
}
@Override
public Health health() {
if (!checkBrokersConnection()) {
logger.error("Error when connect brokers");
return Health.down().withDetail("BrokersConnectionError", "Error message").build();
}
if (!checkTopicConnection()) {
logger.error("Error when trying to connect with specific topic");
return Health.down().withDetail("TopicError", "Error message with topic name").build();
}
return Health.up().build();
}
public boolean checkBrokersConnection() {
// Your implementation
}
public boolean checkTopicConnection() {
// Your implementation
}
}
然后,我们需要为自定义实现创建一个 bean。
Then we need to create a bean for the custom implementation.
@Configuration
public class KafkaBinderHealthIndicatorConfiguration {
@Bean
public KafkaBinderHealth kafkaBinderHealthIndicator(final KafkaAdmin admin) {
return new KafkaBinderHealthImplementation(admin);
}
}