Multiple Broker (or Cluster) Support

Gemini 2.3 版本在单个应用程序与多个 Broker 或 Broker 群集之间通信时更加方便。在使用者端,主要优势在于,基础设施可将自动声明队列与相应的 Broker 自动关联。

Version 2.3 added more convenience when communicating between a single application and multiple brokers or broker clusters. The main benefit, on the consumer side, is that the infrastructure can automatically associate auto-declared queues with the appropriate broker.

通过以下示例可以更好地说明这一点:

This is best illustrated with an example:

@SpringBootApplication(exclude = RabbitAutoConfiguration.class)
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    CachingConnectionFactory cf1() {
        return new CachingConnectionFactory("localhost");
    }

    @Bean
    CachingConnectionFactory cf2() {
        return new CachingConnectionFactory("otherHost");
    }

    @Bean
    CachingConnectionFactory cf3() {
        return new CachingConnectionFactory("thirdHost");
    }

    @Bean
    SimpleRoutingConnectionFactory rcf(CachingConnectionFactory cf1,
            CachingConnectionFactory cf2, CachingConnectionFactory cf3) {

        SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
        rcf.setDefaultTargetConnectionFactory(cf1);
        rcf.setTargetConnectionFactories(Map.of("one", cf1, "two", cf2, "three", cf3));
        return rcf;
    }

    @Bean("factory1-admin")
    RabbitAdmin admin1(CachingConnectionFactory cf1) {
        return new RabbitAdmin(cf1);
    }

    @Bean("factory2-admin")
    RabbitAdmin admin2(CachingConnectionFactory cf2) {
        return new RabbitAdmin(cf2);
    }

    @Bean("factory3-admin")
    RabbitAdmin admin3(CachingConnectionFactory cf3) {
        return new RabbitAdmin(cf3);
    }

    @Bean
    public RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry() {
        return new RabbitListenerEndpointRegistry();
    }

    @Bean
    public RabbitListenerAnnotationBeanPostProcessor postProcessor(RabbitListenerEndpointRegistry registry) {
        MultiRabbitListenerAnnotationBeanPostProcessor postProcessor
                = new MultiRabbitListenerAnnotationBeanPostProcessor();
        postProcessor.setEndpointRegistry(registry);
        postProcessor.setContainerFactoryBeanName("defaultContainerFactory");
        return postProcessor;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory factory1(CachingConnectionFactory cf1) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(cf1);
        return factory;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory factory2(CachingConnectionFactory cf2) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(cf2);
        return factory;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory factory3(CachingConnectionFactory cf3) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(cf3);
        return factory;
    }

    @Bean
    RabbitTemplate template(SimpleRoutingConnectionFactory rcf) {
        return new RabbitTemplate(rcf);
    }

    @Bean
    ConnectionFactoryContextWrapper wrapper(SimpleRoutingConnectionFactory rcf) {
        return new ConnectionFactoryContextWrapper(rcf);
    }

}

@Component
class Listeners {

    @RabbitListener(queuesToDeclare = @Queue("q1"), containerFactory = "factory1")
    public void listen1(String in) {

    }

    @RabbitListener(queuesToDeclare = @Queue("q2"), containerFactory = "factory2")
    public void listen2(String in) {

    }

    @RabbitListener(queuesToDeclare = @Queue("q3"), containerFactory = "factory3")
    public void listen3(String in) {

    }

}

正如您所见,我们声明了 3 组基础设施(连接工厂、管理员、容器工厂)。如前文所讨论的,@RabbitListener 可以定义要使用的容器工厂;在此情况下,它们还使用 queuesToDeclare,当队列在 Broker 上不存在时,它会让队列在 Broker 上声明。通过使用约定 <container-factory-name>-admin 命名 RabbitAdmin bean,基础设施能够确定声明队列时应使用哪个管理员。这同样适用于 bindings = @QueueBinding(…​),此时还会声明交换和绑定。它不适用于 queues,因为它要求队列已经存在。

As you can see, we have declared 3 sets of infrastructure (connection factories, admins, container factories). As discussed earlier, @RabbitListener can define which container factory to use; in this case, they also use queuesToDeclare which causes the queue(s) to be declared on the broker, if it doesn’t exist. By naming the RabbitAdmin beans with the convention <container-factory-name>-admin, the infrastructure is able to determine which admin should declare the queue. This will also work with bindings = @QueueBinding(…​) whereby the exchange and binding will also be declared. It will NOT work with queues, since that expects the queue(s) to already exist.

在生产者方面,提供了一个方便的 ConnectionFactoryContextWrapper 类,可以让使用 RoutingConnectionFactory(请参阅 Routing Connection Factory)变得更简单。

On the producer side, a convenient ConnectionFactoryContextWrapper class is provided, to make using the RoutingConnectionFactory (see Routing Connection Factory) simpler.

如您在上方所见,SimpleRoutingConnectionFactory bean 已经使用路由键 onetwothree 添加了。还有一个使用该工厂的 RabbitTemplate。以下是使用该模板与包装器将消息路由到其中一个 Broker 群集的示例:

As you can see above, a SimpleRoutingConnectionFactory bean has been added with routing keys one, two and three. There is also a RabbitTemplate that uses that factory. Here is an example of using that template with the wrapper to route to one of the broker clusters.

@Bean
public ApplicationRunner runner(RabbitTemplate template, ConnectionFactoryContextWrapper wrapper) {
    return args -> {
        wrapper.run("one", () -> template.convertAndSend("q1", "toCluster1"));
        wrapper.run("two", () -> template.convertAndSend("q2", "toCluster2"));
        wrapper.run("three", () -> template.convertAndSend("q3", "toCluster3"));
    };
}