Partitioning with the Kafka Binder

Apache Kafka 本机支持主题分区。

有时,将数据发送到特定分区是有利的,例如,当你希望严格按顺序处理消息(特定客户的所有消息都应发送到同一个分区)时。

以下示例演示如何配置生产者和消费者端:

@SpringBootApplication
public class KafkaPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "foo1", "bar1", "qux1",
            "foo2", "bar2", "qux2",
            "foo3", "bar3", "qux3",
            "foo4", "bar4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public Supplier<Message<?>> generate() {
        return () -> {
            String value = data[RANDOM.nextInt(data.length)];
            System.out.println("Sending: " + value);
            return MessageBuilder.withPayload(value)
                    .setHeader("partitionKey", value)
                    .build();
        };
    }

}
application.yml
spring:
  cloud:
    stream:
      bindings:
        generate-out-0:
          destination: partitioned.topic
          producer:
            partition-key-expression: headers['partitionKey']
            partition-count: 12

需要注意的是,由于 Apache Kafka 本机支持分区,因此无需依靠上面描述的 Binder 分区,除非你正在使用自定义分区键(如示例中所示)或涉及有效负载本身的表达式。否则,由 Binder 提供的分区选择适用于不支持本机分区的中间件技术。注意,在上面的示例中,我们使用了名为 partitionKey 的自定义键,它将成为分区的确切因素,因此在这种情况下使用 Binder 分区就十分合适。在使用本机 Kafka 分区(即当你未提供 partition-key-expression 时),Apache Kafka 将选择分区,默认情况下,该分区将是记录键的哈希值除以可用分区数。要向出站记录添加键,请在 spring-messaging Message<?> 中将 KafkaHeaders.KEY 标头设置为所需的键值。默认情况下,当未提供记录键时,Apache Kafka 将根据 Apache Kafka Documentation 中描述的逻辑选择分区。

主题必须预置有足够的分区,以实现所有消费者组所需的并发度。以上配置支持最多 12 个消费者实例(如果它们的 concurrency 为 2,则为 6 个;如果它们的并发度为 3,则为 4 个,依此类推)。通常最好 “over-provision” 分区,以便将来增加消费者或并发度。

前面的配置使用默认分区 (key.hashCode() % partitionCount)。这可能会或可能不会提供适当的平衡算法,具体取决于键值。特别是,请注意,此分区策略与独立的 Kafka 生产者使用的默认值不同——例如 Kafka Streams 使用的默认值,这意味着当由这些客户端生成时,相同键值的跨分区平衡可能会有所不同。您可以通过使用 partitionSelectorExpressionpartitionSelectorClass 属性来覆盖此默认值。

由于分区是由 Kafka 本机处理的,因此在消费者端不需要特殊配置。Kafka 在实例之间分配分区。

Kafka 主题的 partitionCount 可能会在运行时发生改变(例如,由于管理任务)。之后,计算出的分区将有所不同(例如,之后将使用新分区)。从 Spring Cloud Stream 4.0.3 开始,将支持 partition count 的运行时更改。另请参阅参数 'spring.kafka.producer.properties.metadata.max.age.ms' 以配置更新间隔。由于一些限制,无法使用引用消息“有效负载”的“分区键表达式”,在这种情况下该机制将被禁用。总体行为默认禁用,可以使用配置参数 'producer.dynamicPartitionUpdatesEnabled=true' 来启用。

以下 Spring Boot 应用程序侦听 Kafka 流并打印(到控制台)每条消息发送到的分区 ID:

@SpringBootApplication
public class KafkaPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
            .web(WebApplicationType.NONE)
            .run(args);
    }

    @Bean
    public Consumer<Message<String>> listen() {
        return message -> {
            int partition = (int) message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION);
            System.out.println(message + " received from partition " + partition);
        };
    }

}
application.yml
spring:
  cloud:
    stream:
      bindings:
        listen-in-0:
          destination: partitioned.topic
          group: myGroup

你可以根据需要添加实例。Kafka 会重新平衡分区分配。如果实例计数(或 实例计数 * 并发)超过分区数,则某些消费者会处于空闲状态。