Multi binders with Kafka Streams based binders and regular Kafka Binder

您可以通过一个应用程序,其中包含基于常规 Kafka Binder 的函数/使用者/供应商,以及基于 Kafka Streams 的处理器。但是,您不能在单个函数或使用者中混合两者。

You can have an application where you have both a function/consumer/supplier that is based on the regular Kafka binder and a Kafka Streams based processor. However, you cannot mix both of them within a single function or consumer.

这里是一个示例,其中您可以在同一应用程序中拥有基于 Binder 的组件。

Here is an example, where you have both binder based components within the same application.

@Bean
public Function<String, String> process() {
    return s -> s;
}

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {

    return input -> input;
}

以下是配置的相关文章:

This is the relevant parts from the configuration:

spring.cloud.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar

如果您有与上述相同的应用,但处理的是两个不同的 Kafka 集群,那么情况会变得有点复杂,例如正常的 process 适用于 Kafka 集群 1 和集群 2(从集群 1 接收数据并发送到集群 2),而 Kafka Streams 处理器适用于 Kafka 集群 2。那么您必须使用 Spring Cloud Stream 提供的 multi binder 设施。

Things become a bit more complex if you have the same application as above, but is dealing with two different Kafka clusters, for e.g. the regular process is acting upon both Kafka cluster 1 and cluster 2 (receiving data from cluster-1 and sending to cluster-2) and the Kafka Streams processor is acting upon Kafka cluster 2. Then you have to use the multi binder facilities provided by Spring Cloud Stream.

以下是配置在此场景中的更改方式。

Here is how your configuration may change in that scenario.

# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster

spring.cloud.function.definition=process;kstreamProcess

# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2

# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3

注意上述配置。我们有两种 Binder,但总共有 3 个 Binder,第一个是基于集群 1 的常规 Kafka Binder (kafka1),然后是基于集群 2 的另一个 Kafka Binder (kafka2),最后是 kstream Binder (kafka3)。应用程序中的第一个处理器从 kafka1 接收数据并将数据发布到 kafka2,其中两个 Binder 都基于常规 Kafka Binder,但集群不同。第二个处理器,它是 Kafka Streams 处理器,从 kafka3 消耗数据,它与 kafka2 是同一个集群,但 Binder 类型不同。

Pay attention to the above configuration. We have two kinds of binders, but 3 binders all in all, first one is the regular Kafka binder based on cluster 1 (kafka1), then another Kafka binder based on cluster 2 (kafka2) and finally the kstream one (kafka3). The first processor in the application receives data from kafka1 and publishes to kafka2 where both binders are based on regular Kafka binder but differnt clusters. The second processor, which is a Kafka Streams processor consumes data from kafka3 which is the same cluster as kafka2, but a different binder type.

由于 Kafka Streams Binder 系列中有三种不同的 Binder 类型 - kstreamktableglobalktable - 如果您的应用程序有多个基于其中任何一个 Binder 的 Binding,则需要明确地将该 Binder 类型提供出来。

Since there are three different binder types available in the Kafka Streams family of binders - kstream, ktable and globalktable - if your application has multiple bindings based on any of these binders, that needs to be explicitly provided as the binder type.

例如,如果您有一个处理器如下所示,

For e.g if you have a processor as below,

@Bean
public Function<KStream<Long, Order>,
        Function<KTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

    ...
}

那么,这必须在多 Binder 场景中按如下方式进行配置。请注意,这只有在您有真正的多 Binder 场景(其中多个处理器在单个应用程序中处理多个集群)时才需要。在这种情况下,需要使用 Binding 明确提供 Binder,以区分其他处理器的 Binder 类型和集群。

then, this has to be configured in a multi binder scenario as the following. Please note that this is only needed if you have a true multi-binder scenario where there are multiple processors dealing with multiple clusters within a single application. In that case, the binders need to be explicitly provided with the bindings to distinguish from other processor’s binder types and clusters.

spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}

spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1  #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2  #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3  #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream

# rest of the configuration is omitted.