Mixing high level DSL and low level Processor API

Kafka Streams 提供两个 API 变体。它有一个高级 DSL 类似的 API,你可以在其中链接许多操作,这些操作可能对许多函数式程序员很熟悉。Kafka Streams 还可以访问低级处理器 API。处理器 API 功能非常强大,可以让你以更低级别的方式控制事物,但本质上是命令式的。适用于 Spring Cloud Stream 的 Kafka Streams 绑定器允许你使用高级 DSL 或同时混合使用 DSL 和处理器 API。同时混合使用这些变体,可以提供许多选项来控制应用程序中的各种用例。应用程序可以使用 transformprocess 方法的 API 调用来访问处理器 API。

以下是使用 process API 在 Spring Cloud Stream 应用程序中组合 DSL 和处理器 API 的示例。

@Bean
public Consumer<KStream<Object, String>> process() {
    return input ->
        input.process(() -> new Processor<Object, String>() {
            @Override
            @SuppressWarnings("unchecked")
            public void init(ProcessorContext context) {
               this.context = context;
            }

            @Override
            public void process(Object key, String value) {
                //business logic
            }

            @Override
            public void close() {

        });
}

以下是使用 transform API 的示例。

@Bean
public Consumer<KStream<Object, String>> process() {
    return (input, a) ->
        input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
            @Override
            public void init(ProcessorContext context) {

            }

            @Override
            public void close() {

            }

            @Override
            public KeyValue<Object, String> transform(Object key, String value) {
                // business logic - return transformed KStream;
            }
        });
}

process API 方法调用是一个终止操作,而 transform API 是非终止的,并使用你可以使用 DSL 或处理器 API 继续进一步处理的潜在转换的 KStream