Ancillaries to the programming model
Multiple Kafka Streams processors within a single application
Binder 允许在单个 Spring Cloud Stream 应用程序中存在多个 Kafka Streams 处理器。你可以拥有如下应用程序。
@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
...
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
...
}
在这种情况下,binder 将使用不同的应用程序 ID 创建 3 个单独的 Kafka Streams 对象(如下所述)。但是,如果你应用程序中有多于一个处理器,你必须告诉 Spring Cloud Stream 需要激活哪些功能。下面是如何激活这些功能。
spring.cloud.function.definition: process;anotherProcess;yetAnotherProcess
如果你希望某些函数不会立即被激活,你可以从这个列表中移除该函数。
这也适用于你的应用程序中具有单个 Kafka Streams 处理器和其他类型 Function
bean 的情况,这些 bean 通过不同的 binder 处理(例如,基于常规 Kafka 消息通道 binder 的函数 bean)
Kafka Streams Application ID
应用程序 ID 是 Kafka Streams 应用程序需要提供的强制性属性。Spring Cloud Stream Kafka Streams binder 允许你通过多种方式配置此应用程序 ID。
如果你应用程序中只有一个处理器,那么可以使用以下属性在 binder 级别设置这个 ID:
spring.cloud.stream.kafka.streams.binder.applicationId
.
为了简便,如果只有一个处理器,也可以将 spring.application.name
用作委托应用程序 ID 的属性。
如果应用程序中有多个 Kafka Streams 处理器,那么需要为每个处理器设置应用程序 ID。在函数模型的情况下,可以将其作为属性附加到每个函数。
例如,想象一下有以下函数。
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
...
}
和
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
那么可以使用以下绑定级别属性为每个函数设置应用程序 ID。
spring.cloud.stream.kafka.streams.binder.functions.process.applicationId
和
spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId
对于基于函数的模型,这种在绑定级别设置应用程序 ID 的方法也适用。然而,如果你使用的是函数模型,那么如上所示在绑定级别为每个函数进行设置要容易得多。
对于生产部署,强烈建议通过配置显式指定应用程序 ID。如果你正在自动扩展应用程序,这一点尤其重要,在这种情况下,你需要确保使用相同的应用程序 ID 部署每个实例。
如果应用程序未提供应用程序 ID,那么在这种情况下,绑定器将自动为你生成一个静态应用程序 ID。这在开发场景中很方便,因为它避免了显式提供应用程序 ID 的需要。以这种方式生成的应用程序 ID 在应用程序重新启动后将保持静态。在函数模型的情况下,生成的应用程序 ID 将是函数 bean 名称后跟文字 applicationID
,例如,process
如果 process
是函数 bean 名称,则为 process-applicationID
。
Summary of setting Application ID
-
默认情况下,Binder 将根据每个函数方法自动生成应用程序 ID。
-
如果您只有一个处理器,则可以使用
spring.kafka.streams.applicationId
,spring.application.name
或spring.cloud.stream.kafka.streams.binder.applicationId
。 -
如果您有多个处理器,则可以使用属性 -
spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId
为每种函数设置应用程序 ID。
Overriding the default binding names generated by the binder with the functional style
默认情况下,绑定器使用上述讨论的策略在使用函数式样式时生成绑定名称,即 <function-bean-name>-<in>|<out>-[0..n],例如 process-in-0、process-out-0 等。如果您想覆盖这些绑定名称,可以通过指定以下属性来实现。
spring.cloud.stream.function.bindings.<default binding name>
。默认绑定名称是绑定器生成的原始绑定名称。
例如,假设有这个函数。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
绑定器将使用名称生成绑定,process-in-0
、process-in-1
和 process-out-0
。现在,如果你想将它们完全更改为其他内容,可能是更特定于域的绑定名称,则可以按照如下所示进行操作。
spring.cloud.stream.function.bindings.process-in-0=users
spring.cloud.stream.function.bindings.process-in-0=regions
和
spring.cloud.stream.function.bindings.process-out-0=clicks
之后,必须对这些新的绑定名称设置所有绑定级别属性。
请记住,对于上面描述的函数式编程模型,在大多数情况下,坚持使用默认绑定名称是有意义的。你仍然可能想要进行这种覆盖的唯一原因是你有大量的配置属性,并且你希望将绑定映射到更友好的域。
Setting up bootstrap server configuration
在运行 Kafka Streams 应用程序时,必须提供 Kafka 代理服务器信息。如果你不提供此信息,绑定器会认为你在默认 localhost:9092
处运行代理。如果情况并非如此,那么你需要覆盖它。有几种方法可以做到这一点。
-
使用引导属性 -
spring.kafka.bootstrapServers
-
Binder 级别属性 -
spring.cloud.stream.kafka.streams.binder.brokers
当涉及到绑定级别属性时,不管使用通过常规 Kafka 绑定器提供的代理属性 - spring.cloud.stream.kafka.binder.brokers
- 关系不大。Kafka Streams 绑定器首先检查 Kafka Streams 绑定器特定的代理属性是否已设置(spring.cloud.stream.kafka.streams.binder.brokers
),如果未找到,它会查找 spring.cloud.stream.kafka.binder.brokers