Binding visualization and control in Kafka Streams binder
开发者可以通过添加特定依赖项和设置属性来启用此功能,允许他们控制 Kafka Streams 函数的绑定行为。该功能对于调试和控制数据流非常有用,因为它允许暂停和恢复个别绑定。
从版本 3.1.2 开始,Kafka Streams binder 支持绑定可视化和控制。唯一支持的两个生命周期阶段是 STOPPED
和 STARTED
。Kafka Streams binder 中不提供生命周期阶段 PAUSED
和 RESUMED
。
为了激活绑定可视化和控制,应用程序需要包含以下两个依赖项。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
如果您更喜欢使用 webflux,则可以包含 spring-boot-starter-webflux
,而不是标准 web 依赖项。
此外,您还需要设置以下属性:
management.endpoints.web.exposure.include=bindings
为了进一步说明这个特性,让我们使用以下应用程序作为指南:
@SpringBootApplication
public class KafkaStreamsApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsApplication.class, args);
}
@Bean
public Consumer<KStream<String, String>> consumer() {
return s -> s.foreach((key, value) -> System.out.println(value));
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> function() {
return ks -> ks;
}
}
正如我们所看到的,该应用程序有两个 Kafka Streams 函数——一个消费者和另一个函数。消费者的默认绑定名称为 consumer-in-0
。类似地,对于函数,输入绑定是 function-in-0
,输出绑定是 function-out-0
。
一旦应用程序启动,我们就可以使用以下绑定端点查找有关绑定的详细信息。
curl http://localhost:8080/actuator/bindings | jq .
[
{
"bindingName": "consumer-in-0",
"name": "consumer-in-0",
"group": "consumer-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": true,
"extendedInfo": {}
},
{
"bindingName": "function-in-0",
"name": "function-in-0",
"group": "function-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": true,
"extendedInfo": {}
},
{
"bindingName": "function-out-0",
"name": "function-out-0",
"group": "function-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": false,
"extendedInfo": {}
}
]
可以在上面找到关于所有三个绑定的详细信息。
现在让我们停止 consumer-in-0 绑定。
curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
此时,不会通过此绑定接收任何记录。
再次启动绑定。
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
当单个函数上存在多个绑定时,对其中任何绑定执行这些操作都有效。这是因为单个函数上的所有绑定都由同一个 StreamsBuilderFactoryBean
支持。因此,对于上述函数,function-in-0
或 function-out-0
都可以工作。