Partitioning with the RabbitMQ Binder
RabbitMQ 本身不支持分区。
RabbitMQ does not support partitioning natively.
有时,向特定分区发送数据非常有利,例如,当你希望对消息处理进行严格的排序时,特定客户的所有消息都应该发送到相同的分区。
Sometimes, it is advantageous to send data to specific partitions — for example, when you want to strictly order message processing, all messages for a particular customer should go to the same partition.
RabbitMessageChannelBinder
通过为每个分区绑定一个队列到目标交换区来提供分区。
The RabbitMessageChannelBinder
provides partitioning by binding a queue for each partition to the destination exchange.
以下 Java 和 YAML 示例展示如何配置生产者:
The following Java and YAML examples show how to configure the producer:
@SpringBootApplication
public class RabbitPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"abc1", "def1", "qux1",
"abc2", "def2", "qux2",
"abc3", "def3", "qux3",
"abc4", "def4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
.web(false)
.run(args);
}
@Bean
public Supplier<Message<?>> generate() {
return () -> {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
};
}
}
spring:
cloud:
stream:
bindings:
generate-out-0:
destination: partitioned.destination
producer:
partitioned: true
partition-key-expression: headers['partitionKey']
partition-count: 2
required-groups:
- myGroup
前面示例中的配置使用默认分区( The configuration in the prececing example uses the default partitioning ( 只有当生产者部署时需要调配消费者队列时,才需要 The |
下面的配置调配了一个主题交换区:
The following configuration provisions a topic exchange:
以下队列绑定到该交换区:
The following queues are bound to that exchange:
以下绑定将队列与交换区关联:
The following bindings associate the queues to the exchange:
以下 Java 和 YAML 示例延续了前面的示例,并展示了如何配置消费者:
The following Java and YAML examples continue the previous examples and show how to configure the consumer:
@SpringBootApplication
public class RabbitPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
.web(false)
.run(args);
}
@Bean
public Consumer<Message<String>> listen() {
return message -> {
String queue =- message.getHeaders().get(AmqpHeaders.CONSUMER_QUEUE);
System.out.println(in + " received from queue " + queue);
};
}
}
spring:
cloud:
stream:
bindings:
listen-in-0:
destination: partitioned.destination
group: myGroup
consumer:
partitioned: true
instance-index: 0
The RabbitMessageChannelBinder
不支持动态扩展。每个分区都必须至少有一个使用者。使用者的 instanceIndex
用于指示消耗了哪个分区。诸如 Cloud Foundry 之类的平台只能使用 instanceIndex
拥有一个实例。
The RabbitMessageChannelBinder
does not support dynamic scaling.
There must be at least one consumer per partition.
The consumer’s instanceIndex
is used to indicate which partition is consumed.
Platforms such as Cloud Foundry can have only one instance with an instanceIndex
.