Programming Model
在使用 Kafka Streams 绑定器提供的编程模型时,既可以使用高级 Streams DSL,也可以将高级和低级 Processor-API混合使用。在将高级和低级 API 都混合使用时,这通常是通过在 `KStream`上调用 `transform`或 `process`API 方法来实现的。
When using the programming model provided by Kafka Streams binder, both the high-level Streams DSL and a mix of both the higher level and the lower level Processor-API can be used as options.
When mixing both higher and lower level API’s, this is usually achieved by invoking transform
or process
API methods on KStream
.
Functional Style
从 Spring Cloud Stream`3.0.0`开始,Kafka Streams 绑定程序允许应用程序使用 Java 8 中的功能性编程风格进行设计和开发。这意味着可以使用类型为`java.util.function.Function`或`java.util.function.Consumer`的 lambda 表达式简洁地表示应用程序。
Starting with Spring Cloud Stream 3.0.0
, Kafka Streams binder allows the applications to be designed and developed using the functional programming style that is available in Java 8.
This means that the applications can be concisely represented as a lambda expression of types java.util.function.Function
or java.util.function.Consumer
.
我们举一个非常简单的示例。
Let’s take a very basic example.
@SpringBootApplication
public class SimpleConsumerApplication {
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input ->
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
虽然简单,但这是一个完整的独立 Spring Boot 应用程序,它利用 Kafka Streams 进行流处理。这是一个没有出站绑定且只有一个入站绑定的消费者应用程序。该应用程序使用数据,它仅将日志记录到标准输出中的`KStream`键和值的信息。该应用程序包含`SpringBootApplication`注解和一个标记为`Bean`的方法。Bean 方法的类型为`java.util.function.Consumer`,该类型使用`KStream`进行了参数化。然后在实现中,我们将返回一个本质上是 lambda 表达式的 Consumer 对象。在 lambda 表达式内部,提供了用于处理数据代码。
Albeit simple, this is a complete standalone Spring Boot application that is leveraging Kafka Streams for stream processing.
This is a consumer application with no outbound binding and only a single inbound binding.
The application consumes data and it simply logs the information from the KStream
key and value on the standard output.
The application contains the SpringBootApplication
annotation and a method that is marked as Bean
.
The bean method is of type java.util.function.Consumer
which is parameterized with KStream
.
Then in the implementation, we are returning a Consumer object that is essentially a lambda expression.
Inside the lambda expression, the code for processing the data is provided.
在此应用程序中,有一个类型为`KStream`的单个输入绑定。绑定程序为应用程序创建了这个绑定,名称为`process-in-0`,即函数 Bean 名称后跟连字符(-
)、文字`in`,再跟连字符,然后是参数的序数位置。你可以使用该绑定名称设置其他属性,例如目标。例如,spring.cloud.stream.bindings.process-in-0.destination=my-topic
。
In this application, there is a single input binding that is of type KStream
.
The binder creates this binding for the application with a name process-in-0
, i.e. the name of the function bean name followed by a dash character (-
) and the literal in
followed by another dash and then the ordinal position of the parameter.
You use this binding name to set other properties such as destination.
For example, spring.cloud.stream.bindings.process-in-0.destination=my-topic
.
如果在绑定上没有设置目标属性,则会使用与绑定名称相同名称创建主题(如果有针对该应用程序的足够权限),或者,这个主题应已经可用。 |
If the destination property is not set on the binding, a topic is created with the same name as the binding (if there are sufficient privileges for the application) or that topic is expected to be already available. |
在构建为 uber-jar(例如`kstream-consumer-app.jar`)后,你可以像下面一样运行上述示例。
Once built as a uber-jar (e.g., kstream-consumer-app.jar
), you can run the above example like the following.
如果应用程序选择使用 Spring 的`Component`注解定义函数 Bean,则绑定程序也支持该模型。上述函数 Bean 可以改写如下。
If the applications choose to define the functional beans using Spring’s Component
annotation, the binder also supports that model.
The above functional bean could be rewritten as below.
@Component(name = "process")
public class SimpleConsumer implements java.util.function.Consumer<KStream<Object, String>> {
@Override
public void accept(KStream<Object, String> input) {
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic
这里有另一个示例,其中有一输入和输出绑定的完整处理器。这是一个经典的单词计数示例,其中应用程序从一个主题接收数据,然后计算每个单词在滚动时间窗口中的出现次数。
Here is another example, where it is a full processor with both input and output bindings. This is the classic word-count example in which the application receives data from a topic, the number of occurrences for each word is then computed in a tumbling time-window.
@SpringBootApplication
public class WordCountProcessorApplication {
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("word-counts-state-store"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
}
这里同样是一个完整的 Spring Boot 应用程序。此应用程序与第一个应用程序的区别在于,Bean 方法的类型为`java.util.function.Function`。Function`的第一个参数化类型用于输入`KStream
,第二个参数化类型用于输出。在方法主体中,提供了一个类型为`Function`的 lambda 表达式,并且提供了实际业务逻辑作为实现。与前面讨论的基于 Consumer 的应用程序类似,此处的输入绑定默认命名为`process-in-0`。对于输出,绑定名称也自动设置为`process-out-0`。
Here again, this is a complete Spring Boot application. The difference here from the first application is that the bean method is of type java.util.function.Function
.
The first parameterized type for the Function
is for the input KStream
and the second one is for the output.
In the method body, a lambda expression is provided that is of type Function
and as implementation, the actual business logic is given.
Similar to the previously discussed Consumer based application, the input binding here is named as process-in-0
by default. For the output, the binding name is automatically also set to process-out-0
.
在构建为 uber-jar(例如`wordcount-processor.jar`)后,你可以像下面一样运行上述示例。
Once built as an uber-jar (e.g., wordcount-processor.jar
), you can run the above example like the following.
java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts
此应用程序将使用 Kafka 主题`words`,计算结果会发布到`counts`输出主题。
This application will consume messages from the Kafka topic words
and the computed results are published to an output
topic counts
.
Spring Cloud Stream 将确保来自输入和输出主题的消息自动绑定为 KStream 对象。作为开发者,你可以专注于代码的业务方面,即在处理器中编写所需的逻辑。由 Kafka Streams 基础架构所需的特定 Kafka Streams 配置会由框架自动处理。
Spring Cloud Stream will ensure that the messages from both the incoming and outgoing topics are automatically bound as KStream objects. As a developer, you can exclusively focus on the business aspects of the code, i.e. writing the logic required in the processor. Setting up Kafka Streams specific configuration required by the Kafka Streams infrastructure is automatically handled by the framework.
我们上面看到两个示例只有一个`KStream`输入绑定。在两种情况下,这些绑定都收到了来自单个主题的记录。如果你想要将多个主题复用到一个`KStream`绑定中,则可以在下面提供逗号分隔的 Kafka 主题作为目标。
The two examples we saw above have a single KStream
input binding. In both cases, the bindings received the records from a single topic.
If you want to multiplex multiple topics into a single KStream
binding, you can provide comma separated Kafka topics as destinations below.
spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3
此外,如果你想对比正则表达式匹配主题,还可以提供主题模式作为目标。
In addition, you can also provide topic patterns as destinations if you want to match topics against a regular exression.
spring.cloud.stream.bindings.process-in-0.destination=input.*
Multiple Input Bindings
许多非平凡的 Kafka 流应用程序通常通过多个绑定从多个主题消耗数据。例如,一个主题被用作 Kstream
消费,另一个主题被用作 KTable
或 GlobalKTable
消费。一个应用程序可能希望接收数据作为表类型的原因有很多。考虑一个用例,其中基础主题通过数据库中的变更数据捕获 (CDC) 机制填充,或者应用程序可能只关心最新的更新以供下游处理。如果应用程序指定需要将数据绑定为 KTable
或 GlobalKTable
,那么 Kafka 流绑定器将把目标正确绑定到 KTable
或 GlobalKTable
,并使其可供应用程序操作。我们将看看 Kafka 流绑定器中处理多个输入绑定的几种不同情况。
Many non-trivial Kafka Streams applications often consume data from more than one topic through multiple bindings.
For instance, one topic is consumed as Kstream
and another as KTable
or GlobalKTable
.
There are many reasons why an application might want to receive data as a table type.
Think of a use-case where the underlying topic is populated through a change data capture (CDC) mechanism from a database or perhaps the application only cares about the latest updates for downstream processing.
If the application specifies that the data needs to be bound as KTable
or GlobalKTable
, then Kafka Streams binder will properly bind the destination to a KTable
or GlobalKTable
and make them available for the application to operate upon.
We will look at a few different scenarios how multiple input bindings are handled in the Kafka Streams binder.
BiFunction in Kafka Streams Binder
这里有一个示例,其中我们有两个输入和一个输出。在这种情况下,应用程序可以利用 java.util.function.BiFunction
。
Here is an example where we have two inputs and an output. In this case, the application can leverage on java.util.function.BiFunction
.
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream());
}
这里,基本主题与前面的示例相同,但这里我们有两个输入。Java’s BiFunction
支持用于将输入绑定到所需的目的地。绑定器为输入生成的默认绑定名称分别为 process-in-0
和 process-in-1
。默认的输出绑定是 process-out-0
。在本示例中,BiFunction
的第一个参数被绑定为第一个输入的 KStream
,第二个参数被绑定为第二个输入的 KTable
。
Here again, the basic theme is the same as in the previous examples, but here we have two inputs.
Java’s BiFunction
support is used to bind the inputs to the desired destinations.
The default binding names generated by the binder for the inputs are process-in-0
and process-in-1
respectively. The default output binding is process-out-0
.
In this example, the first parameter of BiFunction
is bound as a KStream
for the first input and the second parameter is bound as a KTable
for the second input.
BiConsumer in Kafka Streams Binder
如果有两个输入,但没有输出,在这种情况下,我们可以按照下面所示使用 java.util.function.BiConsumer
。
If there are two inputs, but no outputs, in that case we can use java.util.function.BiConsumer
as shown below.
@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
return (userClicksStream, userRegionsTable) -> {}
}
Beyond two inputs
如果你有两个以上的输入呢?在某些情况下,你需要两个以上的输入。在这种情况下,绑定器允许你链接部分函数。在函数式编程术语中,这种技术通常称为柯里化。Java 8 中新增了函数式编程支持,现在 Java 允许编写柯里化函数。Spring Cloud Stream Kafka Streams 绑定器可以利用此功能启用多个输入绑定。
What if you have more than two inputs? There are situations in which you need more than two inputs. In that case, the binder allows you to chain partial functions. In functional programming jargon, this technique is generally known as currying. With the functional programming support added as part of Java 8, Java now enables you to write curried functions. Spring Cloud Stream Kafka Streams binder can make use of this feature to enable multiple input bindings.
我们来看一个示例。
Let’s see an example.
@Bean
public Function<KStream<Long, Order>,
Function<GlobalKTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
return orders -> (
customers -> (
products -> (
orders.join(customers,
(orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order))
.join(products,
(orderId, customerOrder) -> customerOrder
.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
})
)
)
);
}
我们来看看上面展示的绑定模型的详细信息。在这个模型中,我们有两个部分应用于入站的函数。我们称它们为 f(x)
、f(y)
和 f(z)
。如果我们以真正数学函数的意义扩展这些函数,它看起来会像这样:f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>
。x
变量代表 KStream<Long, Order>
,y
变量代表 GlobalKTable<Long, Customer>
,z
变量代表 GlobalKTable<Long, Product>
。第一个函数 f(x)
有应用程序的第一个输入绑定 (KStream<Long, Order>
),其输出是函数 f(y)。函数 f(y)
有应用程序的第二个输入绑定 (GlobalKTable<Long, Customer>
),其输出是另一个函数 f(z)
。函数 f(z)
的输入是应用程序的第三个输入 (GlobalKTable<Long, Product>
),其输出是 KStream<Long, EnrichedOrder>
,这是应用程序的最终输出绑定。来自三个部分函数的输入分别是 KStream
、GlobalKTable
、GlobalKTable
,你可以在方法体中使用它们来实现 lambda 表达式的一部分业务逻辑。
Let’s look at the details of the binding model presented above.
In this model, we have 3 partially applied functions on the inbound. Let’s call them as f(x)
, f(y)
and f(z)
.
If we expand these functions in the sense of true mathematical functions, it will look like these: f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>
.
The x
variable stands for KStream<Long, Order>
, the y
variable stands for GlobalKTable<Long, Customer>
and the z
variable stands for GlobalKTable<Long, Product>
.
The first function f(x)
has the first input binding of the application (KStream<Long, Order>
) and its output is the function, f(y).
The function f(y)
has the second input binding for the application (GlobalKTable<Long, Customer>
) and its output is yet another function, f(z)
.
The input for the function f(z)
is the third input for the application (GlobalKTable<Long, Product>
) and its output is KStream<Long, EnrichedOrder>
which is the final output binding for the application.
The input from the three partial functions which are KStream
, GlobalKTable
, GlobalKTable
respectively are available for you in the method body for implementing the business logic as part of the lambda expression.
输入绑定分别命名为 enrichOrder-in-0
、enrichOrder-in-1
和 enrichOrder-in-2
。输出绑定命名为 enrichOrder-out-0
。
Input bindings are named as enrichOrder-in-0
, enrichOrder-in-1
and enrichOrder-in-2
respectively. Output binding is named as enrichOrder-out-0
.
使用柯里化函数,你几乎可以有任意数量的输入。但是请记住,超过少量的输入以及部分应用的函数(如上面在 Java 中)可能会导致代码不可读。因此,如果你的 Kafka 流应用程序需要超过相当少量的输入绑定,并且你想使用此函数模型,那么你可能需要重新考虑你的设计并适当地分解应用程序。
With curried functions, you can virtually have any number of inputs. However, keep in mind that, anything more than a smaller number of inputs and partially applied functions for them as above in Java might lead to unreadable code. Therefore if your Kafka Streams application requires more than a reasonably smaller number of input bindings, and you want to use this functional model, then you may want to rethink your design and decompose the application appropriately.
Output Bindings
Kafka 流绑定器允许将 KStream
或 KTable
类型用作输出绑定。在后台,绑定器使用 KStream
上的 to
方法将结果记录发送到输出主题。如果应用程序在函数中提供 KTable
作为输出,那么绑定器仍然使用此技术,通过将 KStream
的 to
方法委托出去。
Kafka Streams binder allows types of either KStream
or KTable
as output bindings.
Behind the scenes, the binder uses the to
method on KStream
to send the resultant records to the output topic.
If the application provides a KTable
as output in the function, the binder still uses this technique by delegating to the to
method of KStream
.
例如,下面的两个函数都可以使用:
For example both functions below will work:
@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
return KStream::toTable;
};
}
@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
return KTable::toStream;
}
Multiple Output Bindings
Kafka Streams 允许将出站数据写入多个主题。此功能在 Kafka Streams 中称为分支。在使用多个输出绑定时,您需要提供一个 KStream (KStream[]
) 数组作为出站返回类型。
Kafka Streams allows writing outbound data into multiple topics. This feature is known as branching in Kafka Streams.
When using multiple output bindings, you need to provide an array of KStream (KStream[]
) as the outbound return type.
这是一个示例:
Here is an example:
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input -> {
final Map<String, KStream<Object, WordCount>> stringKStreamMap = input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
.count(Materialized.as("WordCounts-branch"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))))
.split()
.branch(isEnglish)
.branch(isFrench)
.branch(isSpanish)
.noDefaultBranch();
return stringKStreamMap.values().toArray(new KStream[0]);
};
}
编程模型保持不变,但出站参数化类型为 KStream[]
。对于以上函数,默认输出绑定名称分别为`process-out-0`、process-out-1
、process-out-2
。绑定器生成三个输出绑定的原因是,它将返回的 KStream`数组的长度检测为三个。请注意,在本示例中,我们提供了一个 `noDefaultBranch()
;如果我们改为使用 defaultBranch()
,则需要额外的输出绑定,本质上返回长度为 4 的 `KStream`数组。
The programming model remains the same, however the outbound parameterized type is KStream[]
.
The default output binding names are process-out-0
, process-out-1
, process-out-2
respectively for the function above.
The reason why the binder generates three output bindings is because it detects the length of the returned KStream
array as three.
Note that in this example, we provide a noDefaultBranch()
; if we have used defaultBranch()
instead, that would have required an extra output binding, essentially returning a KStream
array of length four.
Summary of Function based Programming Styles for Kafka Streams
简而言之,下表显示了函数范例中可以使用各种选项。
In summary, the following table shows the various options that can be used in the functional paradigm.
Number of Inputs | Number of Outputs | Component to use |
---|---|---|
1 |
0 |
java.util.function.Consumer |
2 |
0 |
java.util.function.BiConsumer |
1 |
1..n |
java.util.function.Function |
2 |
1..n |
java.util.function.BiFunction |
>= 3 |
0..n |
Use curried functions |
-
In the case of more than one output in this table, the type simply becomes
KStream[]
.
Function composition in Kafka Streams binder
Kafka 流绑定器支持线性拓扑的最小形式的功能组合。使用 Java 函数 API 支持,你可以编写多个函数,然后使用 andThen
方法自行组合它们。例如,假设你有以下两个函数。
Kafka Streams binder supports minimal forms of functional composition for linear topologies.
Using the Java functional API support, you can write multiple functions and then compose them on your own using the andThen
method.
For example, assume that you have the following two functions.
@Bean
public Function<KStream<String, String>, KStream<String, String>> foo() {
return input -> input.peek((s, s2) -> {});
}
@Bean
public Function<KStream<String, String>, KStream<String, Long>> bar() {
return input -> input.peek((s, s2) -> {});
}
即使没有绑定器中的函数组合支持,你也可以如下组合这两个函数。
Even without the functional composition support in the binder, you can compose these two functions as below.
@Bean
public Function<KStream<String, String>, KStream<String, Long>> composed() {
foo().andThen(bar());
}
然后你可以提供 spring.cloud.function.definition=foo;bar;composed
形式的定义。借助绑定器中的函数组合支持,你不需要编写实现显式函数组合的第三个函数。
Then you can provide definitions of the form spring.cloud.function.definition=foo;bar;composed
.
With the functional composition support in the binder, you don’t need to write this third function in which you are doing explicit function composition.
以下是可以完成此操作的简单方法:
You can simply do this instead:
spring.cloud.function.definition=foo|bar
甚至可以这样:
You can even do this:
spring.cloud.function.definition=foo|bar;foo;bar
此示例中,合成函数的默认绑定名称变为“foobar-in-0”和“foobar-out-0”。
The composed function’s default binding names in this example becomes foobar-in-0
and foobar-out-0
.
Limitations of functional composition in Kafka Streams bincer
如果拥有 java.util.function.Function
bean,则可以将其与另一个函数或多个函数合成。也可以将同一个函数 bean 与 java.util.function.Consumer
合成。在这种情况下,使用者是你合成的最后一个组件。一个函数可以与多个函数合成,然后也可以以 java.util.function.Consumer
为结束。
When you have java.util.function.Function
bean, that can be composed with another function or multiple functions.
The same function bean can be composed with a java.util.function.Consumer
as well. In this case, consumer is the last component composed.
A function can be composed with multiple functions, then end with a java.util.function.Consumer
bean as well.
在合成 java.util.function.BiFunction
类型 bean 时,BiFunction
必须是定义中的第一个函数。合并项必须是 java.util.function.Function
或 java.util.funciton.Consumer
类型。换句话说,你无法取出 BiFunction
bean,然后与另一个 BiFunction
合成。
When composing the beans of type java.util.function.BiFunction
, the BiFunction
must be the first function in the definition.
The composed entities must be either of type java.util.function.Function
or java.util.funciton.Consumer
.
In other words, you cannot take a BiFunction
bean and then compose with another BiFunction
.
您无法使用 `BiConsumer`类型或 `Consumer`为第一个组件的定义进行组合。除非这是定义中的最后一个组件,否则您也无法使用输出为数组 (`KStream[]`用于分支) 的函数进行组合。
You cannot compose with types of BiConsumer
or definitions where Consumer
is the first component.
You cannot also compose with functions where the output is an array (KStream[]
for branching) unless this is the last component in the definition.
函数定义中 BiFunction
的第一个 Function
也可以使用柯里化形式。例如,以下情况是可能的。
The very first Function
of BiFunction
in the function definition may use a curried form also.
For example, the following is possible.
@Bean
public Function<KStream<String, String>, Function<KTable<String, String>, KStream<String, String>>> curriedFoo() {
return a -> b ->
a.join(b, (value1, value2) -> value1 + value2);
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> bar() {
return input -> input.mapValues(value -> value + "From-anotherFooFunc");
}
函数定义可能是“curriedFoo|bar”。幕后,绑定器将为柯里化函数创建两个输入绑定,并基于定义中的最终函数创建输出绑定。此处的默认输入绑定将是“curriedFoobar-in-0”和“curriedFoobar-in-1”。此示例的默认输出绑定变为“curriedFoobar-out-0”。
and the function definition could be curriedFoo|bar
.
Behind the scenes, the binder will create two input bindings for the curried function, and an output binding based on the final function in the definition.
The default input bindings in this case are going to be curriedFoobar-in-0
and curriedFoobar-in-1
.
The default output binding for this example becomes curriedFoobar-out-0
.
Special note on using KTable
as output in function composition
假设你有以下两个函数。
Lets say you have the following two functions.
@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
return KStream::toTable;
};
}
@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
return KTable::toStream;
}
你可以将它们合成“foo|bar”,但请记住,第二个函数(此处的“bar”)的输入必须是 KTable
,因为第一个函数(“foo”)的输出是 KTable
。
You can compose them as foo|bar
, but keep in mind that the second function (bar
in this case) must have a KTable
as input since the first function (foo
) has KTable
as output.