Binding visualization and control in Kafka Streams binder

开发者可以通过添加特定依赖项和设置属性来启用此功能,允许他们控制 Kafka Streams 函数的绑定行为。该功能对于调试和控制数据流非常有用,因为它允许暂停和恢复个别绑定。

从版本 3.1.2 开始,Kafka Streams binder 支持绑定可视化和控制。唯一支持的两个生命周期阶段是 STOPPEDSTARTED。Kafka Streams binder 中不提供生命周期阶段 PAUSEDRESUMED

Starting with version 3.1.2, Kafka Streams binder supports binding visualization and control. The only two lifecycle phases supported are STOPPED and STARTED. The lifecycle phases PAUSED and RESUMED are not available in Kafka Streams binder.

为了激活绑定可视化和控制,应用程序需要包含以下两个依赖项。

In order to activate binding visualization and control, the application needs to include the following two dependencies.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
</dependency>

如果您更喜欢使用 webflux,则可以包含 spring-boot-starter-webflux,而不是标准 web 依赖项。

If you prefer using webflux, you can then include spring-boot-starter-webflux instead of the standard web dependency.

此外,您还需要设置以下属性:

In addition, you also need to set the following property:

management.endpoints.web.exposure.include=bindings

为了进一步说明这个特性,让我们使用以下应用程序作为指南:

To illustrate this feature further, let us use the following application as a guide:

@SpringBootApplication
public class KafkaStreamsApplication {

	public static void main(String[] args) {
		SpringApplication.run(KafkaStreamsApplication.class, args);
	}

	@Bean
	public Consumer<KStream<String, String>> consumer() {
		return s -> s.foreach((key, value) -> System.out.println(value));
	}

	@Bean
	public Function<KStream<String, String>, KStream<String, String>> function() {
		return ks -> ks;
	}

}

正如我们所看到的,该应用程序有两个 Kafka Streams 函数——一个消费者和另一个函数。消费者的默认绑定名称为 consumer-in-0。类似地,对于函数,输入绑定是 function-in-0,输出绑定是 function-out-0

As we can see, the application has two Kafka Streams functions - one, a consumer and another a function. The consumer binding is named by default as consumer-in-0. Similarly, for the function, the input binding is function-in-0 and the output binding is function-out-0.

一旦应用程序启动,我们就可以使用以下绑定端点查找有关绑定的详细信息。

Once the application is started, we can find details about the bindings using the following bindings endpoint.

 curl http://localhost:8080/actuator/bindings | jq .
[
  {
    "bindingName": "consumer-in-0",
    "name": "consumer-in-0",
    "group": "consumer-applicationId",
    "pausable": false,
    "state": "running",
    "paused": false,
    "input": true,
    "extendedInfo": {}
  },
  {
    "bindingName": "function-in-0",
    "name": "function-in-0",
    "group": "function-applicationId",
    "pausable": false,
    "state": "running",
    "paused": false,
    "input": true,
    "extendedInfo": {}
  },
  {
    "bindingName": "function-out-0",
    "name": "function-out-0",
    "group": "function-applicationId",
    "pausable": false,
    "state": "running",
    "paused": false,
    "input": false,
    "extendedInfo": {}
  }
]

可以在上面找到关于所有三个绑定的详细信息。

The details about all three bindings can be found above.

现在让我们停止 consumer-in-0 绑定。

Let us now stop the consumer-in-0 binding.

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0

此时,不会通过此绑定接收任何记录。

At this point, no records will be received through this binding.

再次启动绑定。

Start the binding again.

curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0

当单个函数上存在多个绑定时,对其中任何绑定执行这些操作都有效。这是因为单个函数上的所有绑定都由同一个 StreamsBuilderFactoryBean 支持。因此,对于上述函数,function-in-0function-out-0 都可以工作。

When there are multiple bindings present on a single function, invoking these operations on any of those bindings will work. This is because all the bindings on a single function are backed by the same StreamsBuilderFactoryBean. Therefore, for the function above, either function-in-0 or function-out-0 will work.