Partition support on the outbound

Kafka 流处理程序通常将已处理的输出发送到一个出站 Kafka 主题。如果出站主题已分区,并且处理程序需要将传出数据发送到特定分区,则应用程序需要提供一个 StreamPartitioner 类型的 bean。参见 StreamPartitioner 了解更多详情。不妨看看一些示例。

这也是我们多次看到的相同的处理器:

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

    ...
}

这是输出绑定目标:

spring.cloud.stream.bindings.process-out-0.destination: outputTopic

如果主题 outputTopic 有 4 个分区,如果你不提供分区策略,则 Kafka Streams 将使用可能根据具体用例不是你想要的结果的默认分区策略。假设你想将任何匹配到 spring 的键发送到第 0 个分区,将 cloud 发送到第 1 个分区,将 stream 发送到第 2 个分区,并将其他所有内容发送到第 3 个分区。这是你需要在应用程序中执行的操作。

@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
    return (t, k, v, n) -> {
        if (k.equals("spring")) {
            return 0;
        }
        else if (k.equals("cloud")) {
            return 1;
        }
        else if (k.equals("stream")) {
            return 2;
        }
        else {
            return 3;
        }
    };
}

这是一个基本的实现,但是,你可以访问记录的键/值、主题名称和分区总数。因此,你可以根据需要实现复杂的分区策略。

你还需要在此应用程序配置中提供此 bean 名称。

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner

应用程序中的每个输出主题都需要像这样分别进行配置。