Multi binders with Kafka Streams based binders and regular Kafka Binder
您可以通过一个应用程序,其中包含基于常规 Kafka Binder 的函数/使用者/供应商,以及基于 Kafka Streams 的处理器。但是,您不能在单个函数或使用者中混合两者。
这里是一个示例,其中您可以在同一应用程序中拥有基于 Binder 的组件。
@Bean
public Function<String, String> process() {
return s -> s;
}
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {
return input -> input;
}
以下是配置的相关文章:
spring.cloud.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
如果您有与上述相同的应用,但处理的是两个不同的 Kafka 集群,那么情况会变得有点复杂,例如正常的 process
适用于 Kafka 集群 1 和集群 2(从集群 1 接收数据并发送到集群 2),而 Kafka Streams 处理器适用于 Kafka 集群 2。那么您必须使用 Spring Cloud Stream 提供的 multi binder 设施。
以下是配置在此场景中的更改方式。
# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.function.definition=process;kstreamProcess
# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2
# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3
注意上述配置。我们有两种 Binder,但总共有 3 个 Binder,第一个是基于集群 1 的常规 Kafka Binder (kafka1
),然后是基于集群 2 的另一个 Kafka Binder (kafka2
),最后是 kstream
Binder (kafka3
)。应用程序中的第一个处理器从 kafka1
接收数据并将数据发布到 kafka2
,其中两个 Binder 都基于常规 Kafka Binder,但集群不同。第二个处理器,它是 Kafka Streams 处理器,从 kafka3
消耗数据,它与 kafka2
是同一个集群,但 Binder 类型不同。
由于 Kafka Streams Binder 系列中有三种不同的 Binder 类型 - kstream
、ktable
和 globalktable
- 如果您的应用程序有多个基于其中任何一个 Binder 的 Binding,则需要明确地将该 Binder 类型提供出来。
例如,如果您有一个处理器如下所示,
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
...
}
那么,这必须在多 Binder 场景中按如下方式进行配置。请注意,这只有在您有真正的多 Binder 场景(其中多个处理器在单个应用程序中处理多个集群)时才需要。在这种情况下,需要使用 Binding 明确提供 Binder,以区分其他处理器的 Binder 类型和集群。
spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1 #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2 #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3 #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream
# rest of the configuration is omitted.