Partitioning with the RabbitMQ Binder

RabbitMQ 本身不支持分区。

RabbitMQ does not support partitioning natively.

有时,向特定分区发送数据非常有利,例如,当你希望对消息处理进行严格的排序时,特定客户的所有消息都应该发送到相同的分区。

Sometimes, it is advantageous to send data to specific partitions — for example, when you want to strictly order message processing, all messages for a particular customer should go to the same partition.

RabbitMessageChannelBinder 通过为每个分区绑定一个队列到目标交换区来提供分区。

The RabbitMessageChannelBinder provides partitioning by binding a queue for each partition to the destination exchange.

以下 Java 和 YAML 示例展示如何配置生产者:

The following Java and YAML examples show how to configure the producer:

Producer
@SpringBootApplication
public class RabbitPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "abc1", "def1", "qux1",
            "abc2", "def2", "qux2",
            "abc3", "def3", "qux3",
            "abc4", "def4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public Supplier<Message<?>> generate() {
        return () -> {
            String value = data[RANDOM.nextInt(data.length)];
            System.out.println("Sending: " + value);
            return MessageBuilder.withPayload(value)
                    .setHeader("partitionKey", value)
                    .build();
        };
    }

}
application.yml
    spring:
      cloud:
        stream:
          bindings:
            generate-out-0:
              destination: partitioned.destination
              producer:
                partitioned: true
                partition-key-expression: headers['partitionKey']
                partition-count: 2
                required-groups:
                - myGroup

前面示例中的配置使用默认分区(key.hashCode() % partitionCount)。这可能提供一个平衡的算法,也可能不能提供,具体取决于键值。你可以使用 partitionSelectorExpressionpartitionSelectorClass 属性覆盖此默认值。

The configuration in the prececing example uses the default partitioning (key.hashCode() % partitionCount). This may or may not provide a suitably balanced algorithm, depending on the key values. You can override this default by using the partitionSelectorExpression or partitionSelectorClass properties.

只有当生产者部署时需要调配消费者队列时,才需要 required-groups 属性。否则,发送到分区的任何消息都会丢失,直到部署相应的消费者。

The required-groups property is required only if you need the consumer queues to be provisioned when the producer is deployed. Otherwise, any messages sent to a partition are lost until the corresponding consumer is deployed.

下面的配置调配了一个主题交换区:

The following configuration provisions a topic exchange:

part exchange

以下队列绑定到该交换区:

The following queues are bound to that exchange:

part queues

以下绑定将队列与交换区关联:

The following bindings associate the queues to the exchange:

part bindings

以下 Java 和 YAML 示例延续了前面的示例,并展示了如何配置消费者:

The following Java and YAML examples continue the previous examples and show how to configure the consumer:

Consumer
@SpringBootApplication
public class RabbitPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public Consumer<Message<String>> listen() {
        return message -> {
            String queue =- message.getHeaders().get(AmqpHeaders.CONSUMER_QUEUE);
            System.out.println(in + " received from queue " + queue);
        };
    }

}
application.yml
    spring:
      cloud:
        stream:
          bindings:
            listen-in-0:
              destination: partitioned.destination
              group: myGroup
              consumer:
                partitioned: true
                instance-index: 0

The RabbitMessageChannelBinder 不支持动态扩展。每个分区都必须至少有一个使用者。使用者的 instanceIndex 用于指示消耗了哪个分区。诸如 Cloud Foundry 之类的平台只能使用 instanceIndex 拥有一个实例。

The RabbitMessageChannelBinder does not support dynamic scaling. There must be at least one consumer per partition. The consumer’s instanceIndex is used to indicate which partition is consumed. Platforms such as Cloud Foundry can have only one instance with an instanceIndex.