Partitioning with the Kafka Binder

Apache Kafka 本机支持主题分区。

Apache Kafka supports topic partitioning natively.

有时,将数据发送到特定分区是有利的,例如,当你希望严格按顺序处理消息(特定客户的所有消息都应发送到同一个分区)时。

Sometimes it is advantageous to send data to specific partitions — for example, when you want to strictly order message processing (all messages for a particular customer should go to the same partition).

以下示例演示如何配置生产者和消费者端:

The following example shows how to configure the producer and consumer side:

@SpringBootApplication
public class KafkaPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "foo1", "bar1", "qux1",
            "foo2", "bar2", "qux2",
            "foo3", "bar3", "qux3",
            "foo4", "bar4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public Supplier<Message<?>> generate() {
        return () -> {
            String value = data[RANDOM.nextInt(data.length)];
            System.out.println("Sending: " + value);
            return MessageBuilder.withPayload(value)
                    .setHeader("partitionKey", value)
                    .build();
        };
    }

}
application.yml
spring:
  cloud:
    stream:
      bindings:
        generate-out-0:
          destination: partitioned.topic
          producer:
            partition-key-expression: headers['partitionKey']
            partition-count: 12

需要注意的是,由于 Apache Kafka 本机支持分区,因此无需依靠上面描述的 Binder 分区,除非你正在使用自定义分区键(如示例中所示)或涉及有效负载本身的表达式。否则,由 Binder 提供的分区选择适用于不支持本机分区的中间件技术。注意,在上面的示例中,我们使用了名为 partitionKey 的自定义键,它将成为分区的确切因素,因此在这种情况下使用 Binder 分区就十分合适。在使用本机 Kafka 分区(即当你未提供 partition-key-expression 时),Apache Kafka 将选择分区,默认情况下,该分区将是记录键的哈希值除以可用分区数。要向出站记录添加键,请在 spring-messaging Message<?> 中将 KafkaHeaders.KEY 标头设置为所需的键值。默认情况下,当未提供记录键时,Apache Kafka 将根据 Apache Kafka Documentation 中描述的逻辑选择分区。

It is important to keep in mind that, since Apache Kafka supports partitioning natively, there is no need to rely on binder partitioning as described above unless you are using custom partition keys as in the example or an expression that involves the payload itself. The binder-provided partitioning selection is otherwise intended for middleware technologies that do not support native partitioning. Note that we are using a custom key called partitionKey in the above example, that will be the determining factor for the partition, thus in this case it is appropriate to use binder partitioning. When using native Kafka partitioning, i.e, when you do not provide the partition-key-expression, then Apache Kafka will select a partition, which by default will be the hash value of the record key over the available number of partitions. To add a key to an outbound record, set the KafkaHeaders.KEY header to the desired key value in a spring-messaging Message<?>. By default, when no record key is provided, Apache Kafka will choose a partition based on the logic described in the Apache Kafka Documentation.

主题必须预置有足够的分区,以实现所有消费者组所需的并发度。以上配置支持最多 12 个消费者实例(如果它们的 concurrency 为 2,则为 6 个;如果它们的并发度为 3,则为 4 个,依此类推)。通常最好 “over-provision” 分区,以便将来增加消费者或并发度。

The topic must be provisioned to have enough partitions to achieve the desired concurrency for all consumer groups. The above configuration supports up to 12 consumer instances (6 if their concurrency is 2, 4 if their concurrency is 3, and so on). It is generally best to “over-provision” the partitions to allow for future increases in consumers or concurrency.

前面的配置使用默认分区 (key.hashCode() % partitionCount)。这可能会或可能不会提供适当的平衡算法,具体取决于键值。特别是,请注意,此分区策略与独立的 Kafka 生产者使用的默认值不同——例如 Kafka Streams 使用的默认值,这意味着当由这些客户端生成时,相同键值的跨分区平衡可能会有所不同。您可以通过使用 partitionSelectorExpressionpartitionSelectorClass 属性来覆盖此默认值。

The preceding configuration uses the default partitioning (key.hashCode() % partitionCount). This may or may not provide a suitably balanced algorithm, depending on the key values. In particular, note that this partitioning strategy differs from the default used by a standalone Kafka producer - such as the one used by Kafka Streams, meaning that the same key value may balance differently across partitions when produced by those clients. You can override this default by using the partitionSelectorExpression or partitionSelectorClass properties.

由于分区是由 Kafka 本机处理的,因此在消费者端不需要特殊配置。Kafka 在实例之间分配分区。

Since partitions are natively handled by Kafka, no special configuration is needed on the consumer side. Kafka allocates partitions across the instances.

Kafka 主题的 partitionCount 可能会在运行时发生改变(例如,由于管理任务)。之后,计算出的分区将有所不同(例如,之后将使用新分区)。从 Spring Cloud Stream 4.0.3 开始,将支持 partition count 的运行时更改。另请参阅参数 'spring.kafka.producer.properties.metadata.max.age.ms' 以配置更新间隔。由于一些限制,无法使用引用消息“有效负载”的“分区键表达式”,在这种情况下该机制将被禁用。总体行为默认禁用,可以使用配置参数 'producer.dynamicPartitionUpdatesEnabled=true' 来启用。

The partitionCount for a kafka topic may change during runtime (e.g. due to an adminstration task). The calculated partitions will be different after that (e.g. new partitions will be used then). Since 4.0.3 of Spring Cloud Stream runtime changes of partition count will be supported. See also parameter 'spring.kafka.producer.properties.metadata.max.age.ms' to configure update interval. Due to some limitations it is not possible to use a 'partition-key-expression' which references the 'payload' of a message, the mechanism will be disabled in that case. The overall behavior is disabled by default and can be enabled using configuration parameter 'producer.dynamicPartitionUpdatesEnabled=true'.

以下 Spring Boot 应用程序侦听 Kafka 流并打印(到控制台)每条消息发送到的分区 ID:

The following Spring Boot application listens to a Kafka stream and prints (to the console) the partition ID to which each message goes:

@SpringBootApplication
public class KafkaPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
            .web(WebApplicationType.NONE)
            .run(args);
    }

    @Bean
    public Consumer<Message<String>> listen() {
        return message -> {
            int partition = (int) message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION);
            System.out.println(message + " received from partition " + partition);
        };
    }

}
application.yml
spring:
  cloud:
    stream:
      bindings:
        listen-in-0:
          destination: partitioned.topic
          group: myGroup

你可以根据需要添加实例。Kafka 会重新平衡分区分配。如果实例计数(或 实例计数 * 并发)超过分区数,则某些消费者会处于空闲状态。

You can add instances as needed. Kafka rebalances the partition allocations. If the instance count (or instance count * concurrency) exceeds the number of partitions, some consumers are idle.