Programming Model

在使用 Kafka Streams 绑定器提供的编程模型时,既可以使用高级 Streams DSL,也可以将高级和低级 Processor-API混合使用。在将高级和低级 API 都混合使用时,这通常是通过在 `KStream`上调用 `transform`或 `process`API 方法来实现的。

Functional Style

从 Spring Cloud Stream`3.0.0`开始,Kafka Streams 绑定程序允许应用程序使用 Java 8 中的功能性编程风格进行设计和开发。这意味着可以使用类型为`java.util.function.Function`或`java.util.function.Consumer`的 lambda 表达式简洁地表示应用程序。

我们举一个非常简单的示例。

@SpringBootApplication
public class SimpleConsumerApplication {

    @Bean
    public java.util.function.Consumer<KStream<Object, String>> process() {

        return input ->
                input.foreach((key, value) -> {
                    System.out.println("Key: " + key + " Value: " + value);
                });
    }
}

虽然简单,但这是一个完整的独立 Spring Boot 应用程序,它利用 Kafka Streams 进行流处理。这是一个没有出站绑定且只有一个入站绑定的消费者应用程序。该应用程序使用数据,它仅将日志记录到标准输出中的`KStream`键和值的信息。该应用程序包含`SpringBootApplication`注解和一个标记为`Bean`的方法。Bean 方法的类型为`java.util.function.Consumer`,该类型使用`KStream`进行了参数化。然后在实现中,我们将返回一个本质上是 lambda 表达式的 Consumer 对象。在 lambda 表达式内部,提供了用于处理数据代码。

在此应用程序中,有一个类型为`KStream`的单个输入绑定。绑定程序为应用程序创建了这个绑定,名称为`process-in-0`,即函数 Bean 名称后跟连字符(-)、文字`in`,再跟连字符,然后是参数的序数位置。你可以使用该绑定名称设置其他属性,例如目标。例如,spring.cloud.stream.bindings.process-in-0.destination=my-topic

如果在绑定上没有设置目标属性,则会使用与绑定名称相同名称创建主题(如果有针对该应用程序的足够权限),或者,这个主题应已经可用。

在构建为 uber-jar(例如`kstream-consumer-app.jar`)后,你可以像下面一样运行上述示例。

如果应用程序选择使用 Spring 的`Component`注解定义函数 Bean,则绑定程序也支持该模型。上述函数 Bean 可以改写如下。

@Component(name = "process")
public class SimpleConsumer implements java.util.function.Consumer<KStream<Object, String>> {

    @Override
    public void accept(KStream<Object, String> input) {
        input.foreach((key, value) -> {
            System.out.println("Key: " + key + " Value: " + value);
        });
    }
}
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic

这里有另一个示例,其中有一输入和输出绑定的完整处理器。这是一个经典的单词计数示例,其中应用程序从一个主题接收数据,然后计算每个单词在滚动时间窗口中的出现次数。

@SpringBootApplication
public class WordCountProcessorApplication {

  @Bean
  public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

    return input -> input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, value) -> new KeyValue<>(value, value))
                .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("word-counts-state-store"))
                .toStream()
                .map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
                        new Date(key.window().start()), new Date(key.window().end()))));
  }

	public static void main(String[] args) {
		SpringApplication.run(WordCountProcessorApplication.class, args);
	}
}

这里同样是一个完整的 Spring Boot 应用程序。此应用程序与第一个应用程序的区别在于,Bean 方法的类型为`java.util.function.Function`。Function`的第一个参数化类型用于输入`KStream,第二个参数化类型用于输出。在方法主体中,提供了一个类型为`Function`的 lambda 表达式,并且提供了实际业务逻辑作为实现。与前面讨论的基于 Consumer 的应用程序类似,此处的输入绑定默认命名为`process-in-0`。对于输出,绑定名称也自动设置为`process-out-0`。

在构建为 uber-jar(例如`wordcount-processor.jar`)后,你可以像下面一样运行上述示例。

java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts

此应用程序将使用 Kafka 主题`words`,计算结果会发布到`counts`输出主题。

Spring Cloud Stream 将确保来自输入和输出主题的消息自动绑定为 KStream 对象。作为开发者,你可以专注于代码的业务方面,即在处理器中编写所需的逻辑。由 Kafka Streams 基础架构所需的特定 Kafka Streams 配置会由框架自动处理。

我们上面看到两个示例只有一个`KStream`输入绑定。在两种情况下,这些绑定都收到了来自单个主题的记录。如果你想要将多个主题复用到一个`KStream`绑定中,则可以在下面提供逗号分隔的 Kafka 主题作为目标。

spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3

此外,如果你想对比正则表达式匹配主题,还可以提供主题模式作为目标。

spring.cloud.stream.bindings.process-in-0.destination=input.*

Multiple Input Bindings

许多非平凡的 Kafka 流应用程序通常通过多个绑定从多个主题消耗数据。例如,一个主题被用作 Kstream 消费,另一个主题被用作 KTableGlobalKTable 消费。一个应用程序可能希望接收数据作为表类型的原因有很多。考虑一个用例,其中基础主题通过数据库中的变更数据捕获 (CDC) 机制填充,或者应用程序可能只关心最新的更新以供下游处理。如果应用程序指定需要将数据绑定为 KTableGlobalKTable,那么 Kafka 流绑定器将把目标正确绑定到 KTableGlobalKTable,并使其可供应用程序操作。我们将看看 Kafka 流绑定器中处理多个输入绑定的几种不同情况。

BiFunction in Kafka Streams Binder

这里有一个示例,其中我们有两个输入和一个输出。在这种情况下,应用程序可以利用 java.util.function.BiFunction

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
    return (userClicksStream, userRegionsTable) -> (userClicksStream
            .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
                            "UNKNOWN" : region, clicks),
                    Joined.with(Serdes.String(), Serdes.Long(), null))
            .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
                    regionWithClicks.getClicks()))
            .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
            .reduce(Long::sum)
            .toStream());
}

这里,基本主题与前面的示例相同,但这里我们有两个输入。Java’s BiFunction 支持用于将输入绑定到所需的目的地。绑定器为输入生成的默认绑定名称分别为 process-in-0process-in-1。默认的输出绑定是 process-out-0。在本示例中,BiFunction 的第一个参数被绑定为第一个输入的 KStream,第二个参数被绑定为第二个输入的 KTable

BiConsumer in Kafka Streams Binder

如果有两个输入,但没有输出,在这种情况下,我们可以按照下面所示使用 java.util.function.BiConsumer

@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
    return (userClicksStream, userRegionsTable) -> {}
}

Beyond two inputs

如果你有两个以上的输入呢?在某些情况下,你需要两个以上的输入。在这种情况下,绑定器允许你链接部分函数。在函数式编程术语中,这种技术通常称为柯里化。Java 8 中新增了函数式编程支持,现在 Java 允许编写柯里化函数。Spring Cloud Stream Kafka Streams 绑定器可以利用此功能启用多个输入绑定。

我们来看一个示例。

@Bean
public Function<KStream<Long, Order>,
        Function<GlobalKTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

    return orders -> (
              customers -> (
                    products -> (
                        orders.join(customers,
                            (orderId, order) -> order.getCustomerId(),
                                (order, customer) -> new CustomerOrder(customer, order))
                                .join(products,
                                        (orderId, customerOrder) -> customerOrder
                                                .productId(),
                                        (customerOrder, product) -> {
                                            EnrichedOrder enrichedOrder = new EnrichedOrder();
                                            enrichedOrder.setProduct(product);
                                            enrichedOrder.setCustomer(customerOrder.customer);
                                            enrichedOrder.setOrder(customerOrder.order);
                                            return enrichedOrder;
                                        })
                        )
                )
    );
}

我们来看看上面展示的绑定模型的详细信息。在这个模型中,我们有两个部分应用于入站的函数。我们称它们为 f(x)f(y)f(z)。如果我们以真正数学函数的意义扩展这些函数,它看起来会像这样:f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>x 变量代表 KStream<Long, Order>y 变量代表 GlobalKTable<Long, Customer>z 变量代表 GlobalKTable<Long, Product>。第一个函数 f(x) 有应用程序的第一个输入绑定 (KStream<Long, Order>),其输出是函数 f(y)。函数 f(y) 有应用程序的第二个输入绑定 (GlobalKTable<Long, Customer>),其输出是另一个函数 f(z)。函数 f(z) 的输入是应用程序的第三个输入 (GlobalKTable<Long, Product>),其输出是 KStream<Long, EnrichedOrder>,这是应用程序的最终输出绑定。来自三个部分函数的输入分别是 KStreamGlobalKTableGlobalKTable,你可以在方法体中使用它们来实现 lambda 表达式的一部分业务逻辑。

输入绑定分别命名为 enrichOrder-in-0enrichOrder-in-1enrichOrder-in-2。输出绑定命名为 enrichOrder-out-0

使用柯里化函数,你几乎可以有任意数量的输入。但是请记住,超过少量的输入以及部分应用的函数(如上面在 Java 中)可能会导致代码不可读。因此,如果你的 Kafka 流应用程序需要超过相当少量的输入绑定,并且你想使用此函数模型,那么你可能需要重新考虑你的设计并适当地分解应用程序。

Output Bindings

Kafka 流绑定器允许将 KStreamKTable 类型用作输出绑定。在后台,绑定器使用 KStream 上的 to 方法将结果记录发送到输出主题。如果应用程序在函数中提供 KTable 作为输出,那么绑定器仍然使用此技术,通过将 KStreamto 方法委托出去。

例如,下面的两个函数都可以使用:

@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
    return KStream::toTable;
    };
}

@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
    return KTable::toStream;
}

Multiple Output Bindings

Kafka Streams 允许将出站数据写入多个主题。此功能在 Kafka Streams 中称为分支。在使用多个输出绑定时,您需要提供一个 KStream (KStream[]) 数组作为出站返回类型。

这是一个示例:

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {

    Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
    Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
    Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

    return input -> {
        final Map<String, KStream<Object, WordCount>> stringKStreamMap = input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
                .count(Materialized.as("WordCounts-branch"))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
                        new Date(key.window().start()), new Date(key.window().end()))))
                .split()
                .branch(isEnglish)
                .branch(isFrench)
                .branch(isSpanish)
                .noDefaultBranch();

        return stringKStreamMap.values().toArray(new KStream[0]);
    };
}

编程模型保持不变,但出站参数化类型为 KStream[]。对于以上函数,默认输出绑定名称分别为`process-out-0`、process-out-1process-out-2。绑定器生成三个输出绑定的原因是,它将返回的 KStream`数组的长度检测为三个。请注意,在本示例中,我们提供了一个 `noDefaultBranch();如果我们改为使用 defaultBranch(),则需要额外的输出绑定,本质上返回长度为 4 的 `KStream`数组。

Summary of Function based Programming Styles for Kafka Streams

简而言之,下表显示了函数范例中可以使用各种选项。

Number of Inputs Number of Outputs Component to use

1

0

java.util.function.Consumer

2

0

java.util.function.BiConsumer

1

1..n

java.util.function.Function

2

1..n

java.util.function.BiFunction

>= 3

0..n

Use curried functions

  • 如果此表格中有多个输出,类型将简单地变为`KStream[]`。

Function composition in Kafka Streams binder

Kafka 流绑定器支持线性拓扑的最小形式的功能组合。使用 Java 函数 API 支持,你可以编写多个函数,然后使用 andThen 方法自行组合它们。例如,假设你有以下两个函数。

@Bean
public Function<KStream<String, String>, KStream<String, String>> foo() {
    return input -> input.peek((s, s2) -> {});
}

@Bean
public Function<KStream<String, String>, KStream<String, Long>> bar() {
    return input -> input.peek((s, s2) -> {});
}

即使没有绑定器中的函数组合支持,你也可以如下组合这两个函数。

@Bean
public Function<KStream<String, String>, KStream<String, Long>> composed() {
    foo().andThen(bar());
}

然后你可以提供 spring.cloud.function.definition=foo;bar;composed 形式的定义。借助绑定器中的函数组合支持,你不需要编写实现显式函数组合的第三个函数。

以下是可以完成此操作的简单方法:

spring.cloud.function.definition=foo|bar

甚至可以这样:

spring.cloud.function.definition=foo|bar;foo;bar

此示例中,合成函数的默认绑定名称变为“foobar-in-0”和“foobar-out-0”。

Limitations of functional composition in Kafka Streams bincer

如果拥有 java.util.function.Function bean,则可以将其与另一个函数或多个函数合成。也可以将同一个函数 bean 与 java.util.function.Consumer 合成。在这种情况下,使用者是你合成的最后一个组件。一个函数可以与多个函数合成,然后也可以以 java.util.function.Consumer 为结束。

在合成 java.util.function.BiFunction 类型 bean 时,BiFunction 必须是定义中的第一个函数。合并项必须是 java.util.function.Functionjava.util.funciton.Consumer 类型。换句话说,你无法取出 BiFunction bean,然后与另一个 BiFunction 合成。

您无法使用 `BiConsumer`类型或 `Consumer`为第一个组件的定义进行组合。除非这是定义中的最后一个组件,否则您也无法使用输出为数组 (`KStream[]`用于分支) 的函数进行组合。

函数定义中 BiFunction 的第一个 Function 也可以使用柯里化形式。例如,以下情况是可能的。

@Bean
public Function<KStream<String, String>, Function<KTable<String, String>, KStream<String, String>>> curriedFoo() {
    return a -> b ->
            a.join(b, (value1, value2) -> value1 + value2);
}

@Bean
public Function<KStream<String, String>, KStream<String, String>> bar() {
    return input -> input.mapValues(value -> value + "From-anotherFooFunc");
}

函数定义可能是“curriedFoo|bar”。幕后,绑定器将为柯里化函数创建两个输入绑定,并基于定义中的最终函数创建输出绑定。此处的默认输入绑定将是“curriedFoobar-in-0”和“curriedFoobar-in-1”。此示例的默认输出绑定变为“curriedFoobar-out-0”。

Special note on using KTable as output in function composition

假设你有以下两个函数。

@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
    return KStream::toTable;
    };
}

@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
    return KTable::toStream;
}

你可以将它们合成“foo|bar”,但请记住,第二个函数(此处的“bar”)的输入必须是 KTable,因为第一个函数(“foo”)的输出是 KTable