Ancillaries to the programming model

Multiple Kafka Streams processors within a single application

Binder 允许在单个 Spring Cloud Stream 应用程序中存在多个 Kafka Streams 处理器。你可以拥有如下应用程序。

Binder allows to have multiple Kafka Streams processors within a single Spring Cloud Stream application. You can have an application as below.

@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
   ...
}

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
   ...
}

在这种情况下,binder 将使用不同的应用程序 ID 创建 3 个单独的 Kafka Streams 对象(如下所述)。但是,如果你应用程序中有多于一个处理器,你必须告诉 Spring Cloud Stream 需要激活哪些功能。下面是如何激活这些功能。

In this case, the binder will create 3 separate Kafka Streams objects with different application ID’s (more on this below). However, if you have more than one processor in the application, you have to tell Spring Cloud Stream, which functions need to be activated. Here is how you activate the functions.

spring.cloud.function.definition: process;anotherProcess;yetAnotherProcess

如果你希望某些函数不会立即被激活,你可以从这个列表中移除该函数。

If you want certain functions to be not activated right away, you can remove that from this list.

这也适用于你的应用程序中具有单个 Kafka Streams 处理器和其他类型 Function bean 的情况,这些 bean 通过不同的 binder 处理(例如,基于常规 Kafka 消息通道 binder 的函数 bean)

This is also true when you have a single Kafka Streams processor and other types of Function beans in the same application that is handled through a different binder (for e.g., a function bean that is based on the regular Kafka Message Channel binder)

Kafka Streams Application ID

应用程序 ID 是 Kafka Streams 应用程序需要提供的强制性属性。Spring Cloud Stream Kafka Streams binder 允许你通过多种方式配置此应用程序 ID。

Application id is a mandatory property that you need to provide for a Kafka Streams application. Spring Cloud Stream Kafka Streams binder allows you to configure this application id in multiple ways.

如果你应用程序中只有一个处理器,那么可以使用以下属性在 binder 级别设置这个 ID:

If you only have one single processor in the application, then you can set this at the binder level using the following property:

spring.cloud.stream.kafka.streams.binder.applicationId.

为了简便,如果只有一个处理器,也可以将 spring.application.name 用作委托应用程序 ID 的属性。

As a convenience, if you only have a single processor, you can also use spring.application.name as the property to delegate the application id.

如果应用程序中有多个 Kafka Streams 处理器,那么需要为每个处理器设置应用程序 ID。在函数模型的情况下,可以将其作为属性附加到每个函数。

If you have multiple Kafka Streams processors in the application, then you need to set the application id per processor. In the case of the functional model, you can attach it to each function as a property.

例如,想象一下有以下函数。

For e.g. imagine that you have the following functions.

@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
   ...
}

and

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

那么可以使用以下绑定级别属性为每个函数设置应用程序 ID。

Then you can set the application id for each, using the following binder level properties.

spring.cloud.stream.kafka.streams.binder.functions.process.applicationId

and

spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId

对于基于函数的模型,这种在绑定级别设置应用程序 ID 的方法也适用。然而,如果你使用的是函数模型,那么如上所示在绑定级别为每个函数进行设置要容易得多。

For function based model also, this approach of setting application id at the binding level will work. However, setting per function at the binder level as we have seen above is much easier if you are using the functional model.

对于生产部署,强烈建议通过配置显式指定应用程序 ID。如果你正在自动扩展应用程序,这一点尤其重要,在这种情况下,你需要确保使用相同的应用程序 ID 部署每个实例。

For production deployments, it is highly recommended to explicitly specify the application ID through configuration. This is especially going to be very critical if you are auto scaling your application in which case you need to make sure that you are deploying each instance with the same application ID.

如果应用程序未提供应用程序 ID,那么在这种情况下,绑定器将自动为你生成一个静态应用程序 ID。这在开发场景中很方便,因为它避免了显式提供应用程序 ID 的需要。以这种方式生成的应用程序 ID 在应用程序重新启动后将保持静态。在函数模型的情况下,生成的应用程序 ID 将是函数 bean 名称后跟文字 applicationID,例如,process 如果 process 是函数 bean 名称,则为 process-applicationID

If the application does not provide an application ID, then in that case the binder will auto generate a static application ID for you. This is convenient in development scenarios as it avoids the need for explicitly providing the application ID. The generated application ID in this manner will be static over application restarts. In the case of functional model, the generated application ID will be the function bean name followed by the literal applicationID, for e.g process-applicationID if process if the function bean name.

Summary of setting Application ID

  • By default, binder will auto generate the application ID per function methods.

  • If you have a single processor, then you can use spring.kafka.streams.applicationId, spring.application.name or spring.cloud.stream.kafka.streams.binder.applicationId.

  • If you have multiple processors, then application ID can be set per function using the property - spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId.

Overriding the default binding names generated by the binder with the functional style

默认情况下,绑定器使用上述讨论的策略在使用函数式样式时生成绑定名称,即 <function-bean-name>-<in>|<out>-[0..n],例如 process-in-0、process-out-0 等。如果您想覆盖这些绑定名称,可以通过指定以下属性来实现。

By default, the binder uses the strategy discussed above to generate the binding name when using the functional style, i.e. <function-bean-name>-<in>|<out>-[0..n], for e.g. process-in-0, process-out-0 etc. If you want to override those binding names, you can do that by specifying the following properties.

spring.cloud.stream.function.bindings.<default binding name>。默认绑定名称是绑定器生成的原始绑定名称。

spring.cloud.stream.function.bindings.<default binding name>. Default binding name is the original binding name generated by the binder.

例如,假设有这个函数。

For e.g. lets say, you have this function.

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

绑定器将使用名称生成绑定,process-in-0process-in-1process-out-0。现在,如果你想将它们完全更改为其他内容,可能是更特定于域的绑定名称,则可以按照如下所示进行操作。

Binder will generate bindings with names, process-in-0, process-in-1 and process-out-0. Now, if you want to change them to something else completely, maybe more domain specific binding names, then you can do so as below.

spring.cloud.stream.function.bindings.process-in-0=users

spring.cloud.stream.function.bindings.process-in-0=regions

and

spring.cloud.stream.function.bindings.process-out-0=clicks

之后,必须对这些新的绑定名称设置所有绑定级别属性。

After that, you must set all the binding level properties on these new binding names.

请记住,对于上面描述的函数式编程模型,在大多数情况下,坚持使用默认绑定名称是有意义的。你仍然可能想要进行这种覆盖的唯一原因是你有大量的配置属性,并且你希望将绑定映射到更友好的域。

Please keep in mind that with the functional programming model described above, adhering to the default binding names make sense in most situations. The only reason you may still want to do this overriding is when you have larger number of configuration properties and you want to map the bindings to something more domain friendly.

Setting up bootstrap server configuration

在运行 Kafka Streams 应用程序时,必须提供 Kafka 代理服务器信息。如果你不提供此信息,绑定器会认为你在默认 localhost:9092 处运行代理。如果情况并非如此,那么你需要覆盖它。有几种方法可以做到这一点。

When running Kafka Streams applications, you must provide the Kafka broker server information. If you don’t provide this information, the binder expects that you are running the broker at the default localhost:9092. If that is not the case, then you need to override that. There are a couple of ways to do that.

  • Using the boot property - spring.kafka.bootstrapServers

  • Binder level property - spring.cloud.stream.kafka.streams.binder.brokers

当涉及到绑定级别属性时,不管使用通过常规 Kafka 绑定器提供的代理属性 - spring.cloud.stream.kafka.binder.brokers - 关系不大。Kafka Streams 绑定器首先检查 Kafka Streams 绑定器特定的代理属性是否已设置(spring.cloud.stream.kafka.streams.binder.brokers),如果未找到,它会查找 spring.cloud.stream.kafka.binder.brokers

When it comes to the binder level property, it doesn’t matter if you use the broker property provided through the regular Kafka binder - spring.cloud.stream.kafka.binder.brokers. Kafka Streams binder will first check if Kafka Streams binder specific broker property is set (spring.cloud.stream.kafka.streams.binder.brokers) and if not found, it looks for spring.cloud.stream.kafka.binder.brokers.