Custom Kafka Binder Health Indicator

Overriding Default Kafka Binder Health Indicator

在类路径上存在 Spring Boot actuator 时,Kafka binder 会激活默认运行状况指示器。此运行状况指示器会检查 binder 的运行状况以及与 Kafka 代理的任何通信问题。如果应用程序希望禁用此默认运行状况检查实现并包含自定义实现,那么它可以为 KafkaBinderHealth 接口提供实现。KafkaBinderHealth 是从 HealthIndicator 扩展的标记接口。在自定义实现中,它必须为 health() 方法提供实现。自定义实现必须作为 bean 存在于应用程序配置中。当 binder 发现自定义实现时,它将使用它而不是默认实现。以下是该应用程序中此类自定义实现 bean 的一个示例。

@Bean
public KafkaBinderHealth kafkaBinderHealthIndicator() {
    return new KafkaBinderHealth() {
        @Override
        public Health health() {
            // custom implementation details.
        }
    };
}

Custom kafka Binder Health Indicator Example

以下是可以编写自定义 Kafka binder HealthIndicator 的伪代码。在此示例中,我们尝试通过首先专门检查群集连接性,然后检查主题相关问题,来覆盖由 binder 提供的 Kafka HealthIndicator。

首先,我们需要创建 KafkaBinderHealth 接口的自定义实现。

public class KafkaBinderHealthImplementation implements KafkaBinderHealth {
    @Value("${spring.cloud.bus.destination}")
    private String topic;
    private final AdminClient client;

    public KafkaBinderHealthImplementation(final KafkaAdmin admin) {
		// More about configuring Kafka
		// https://docs.spring.io/spring-kafka/reference/html/#configuring-topics
        this.client = AdminClient.create(admin.getConfigurationProperties());
    }

    @Override
    public Health health() {
        if (!checkBrokersConnection()) {
            logger.error("Error when connect brokers");
			return Health.down().withDetail("BrokersConnectionError", "Error message").build();
        }
		if (!checkTopicConnection()) {
			logger.error("Error when trying to connect with specific topic");
			return Health.down().withDetail("TopicError", "Error message with topic name").build();
		}
        return Health.up().build();
    }

    public boolean checkBrokersConnection() {
        // Your implementation
    }

    public boolean checkTopicConnection() {
		// Your implementation
    }
}

然后,我们需要为自定义实现创建一个 bean。

@Configuration
public class KafkaBinderHealthIndicatorConfiguration {
	@Bean
	public KafkaBinderHealth kafkaBinderHealthIndicator(final KafkaAdmin admin) {
		return new KafkaBinderHealthImplementation(admin);
	}
}