Multiple Broker (or Cluster) Support
Gemini 2.3 版本在单个应用程序与多个 Broker 或 Broker 群集之间通信时更加方便。在使用者端,主要优势在于,基础设施可将自动声明队列与相应的 Broker 自动关联。
Version 2.3 added more convenience when communicating between a single application and multiple brokers or broker clusters. The main benefit, on the consumer side, is that the infrastructure can automatically associate auto-declared queues with the appropriate broker.
通过以下示例可以更好地说明这一点:
This is best illustrated with an example:
@SpringBootApplication(exclude = RabbitAutoConfiguration.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
CachingConnectionFactory cf1() {
return new CachingConnectionFactory("localhost");
}
@Bean
CachingConnectionFactory cf2() {
return new CachingConnectionFactory("otherHost");
}
@Bean
CachingConnectionFactory cf3() {
return new CachingConnectionFactory("thirdHost");
}
@Bean
SimpleRoutingConnectionFactory rcf(CachingConnectionFactory cf1,
CachingConnectionFactory cf2, CachingConnectionFactory cf3) {
SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
rcf.setDefaultTargetConnectionFactory(cf1);
rcf.setTargetConnectionFactories(Map.of("one", cf1, "two", cf2, "three", cf3));
return rcf;
}
@Bean("factory1-admin")
RabbitAdmin admin1(CachingConnectionFactory cf1) {
return new RabbitAdmin(cf1);
}
@Bean("factory2-admin")
RabbitAdmin admin2(CachingConnectionFactory cf2) {
return new RabbitAdmin(cf2);
}
@Bean("factory3-admin")
RabbitAdmin admin3(CachingConnectionFactory cf3) {
return new RabbitAdmin(cf3);
}
@Bean
public RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
@Bean
public RabbitListenerAnnotationBeanPostProcessor postProcessor(RabbitListenerEndpointRegistry registry) {
MultiRabbitListenerAnnotationBeanPostProcessor postProcessor
= new MultiRabbitListenerAnnotationBeanPostProcessor();
postProcessor.setEndpointRegistry(registry);
postProcessor.setContainerFactoryBeanName("defaultContainerFactory");
return postProcessor;
}
@Bean
public SimpleRabbitListenerContainerFactory factory1(CachingConnectionFactory cf1) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf1);
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory factory2(CachingConnectionFactory cf2) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf2);
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory factory3(CachingConnectionFactory cf3) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf3);
return factory;
}
@Bean
RabbitTemplate template(SimpleRoutingConnectionFactory rcf) {
return new RabbitTemplate(rcf);
}
@Bean
ConnectionFactoryContextWrapper wrapper(SimpleRoutingConnectionFactory rcf) {
return new ConnectionFactoryContextWrapper(rcf);
}
}
@Component
class Listeners {
@RabbitListener(queuesToDeclare = @Queue("q1"), containerFactory = "factory1")
public void listen1(String in) {
}
@RabbitListener(queuesToDeclare = @Queue("q2"), containerFactory = "factory2")
public void listen2(String in) {
}
@RabbitListener(queuesToDeclare = @Queue("q3"), containerFactory = "factory3")
public void listen3(String in) {
}
}
正如您所见,我们声明了 3 组基础设施(连接工厂、管理员、容器工厂)。如前文所讨论的,@RabbitListener
可以定义要使用的容器工厂;在此情况下,它们还使用 queuesToDeclare
,当队列在 Broker 上不存在时,它会让队列在 Broker 上声明。通过使用约定 <container-factory-name>-admin
命名 RabbitAdmin
bean,基础设施能够确定声明队列时应使用哪个管理员。这同样适用于 bindings = @QueueBinding(…)
,此时还会声明交换和绑定。它不适用于 queues
,因为它要求队列已经存在。
As you can see, we have declared 3 sets of infrastructure (connection factories, admins, container factories).
As discussed earlier, @RabbitListener
can define which container factory to use; in this case, they also use queuesToDeclare
which causes the queue(s) to be declared on the broker, if it doesn’t exist.
By naming the RabbitAdmin
beans with the convention <container-factory-name>-admin
, the infrastructure is able to determine which admin should declare the queue.
This will also work with bindings = @QueueBinding(…)
whereby the exchange and binding will also be declared.
It will NOT work with queues
, since that expects the queue(s) to already exist.
在生产者方面,提供了一个方便的 ConnectionFactoryContextWrapper
类,可以让使用 RoutingConnectionFactory
(请参阅 Routing Connection Factory)变得更简单。
On the producer side, a convenient ConnectionFactoryContextWrapper
class is provided, to make using the RoutingConnectionFactory
(see Routing Connection Factory) simpler.
如您在上方所见,SimpleRoutingConnectionFactory
bean 已经使用路由键 one
、two
和 three
添加了。还有一个使用该工厂的 RabbitTemplate
。以下是使用该模板与包装器将消息路由到其中一个 Broker 群集的示例:
As you can see above, a SimpleRoutingConnectionFactory
bean has been added with routing keys one
, two
and three
.
There is also a RabbitTemplate
that uses that factory.
Here is an example of using that template with the wrapper to route to one of the broker clusters.
@Bean
public ApplicationRunner runner(RabbitTemplate template, ConnectionFactoryContextWrapper wrapper) {
return args -> {
wrapper.run("one", () -> template.convertAndSend("q1", "toCluster1"));
wrapper.run("two", () -> template.convertAndSend("q2", "toCluster2"));
wrapper.run("three", () -> template.convertAndSend("q3", "toCluster3"));
};
}