Initial Producer Support for the RabbitMQ Stream Plugin

现在提供对 RabbitMQ Stream Plugin 的基本支持。若要启用此功能,您必须将 spring-rabbit-stream jar 添加到类路径——它必须与 spring-amqpspring-rabbit 使用相同版本。

当您将 producerType 属性设置为 STREAM_SYNCSTREAM_ASYNC 时,以上所述的生产者属性不受支持。

要将 Binder 配置为使用流 ProducerType,Spring Boot 将从应用程序属性中配置一个 Environment @Bean。您还可以选择性地添加一个定制器来定制消息处理程序。

@Bean
ProducerMessageHandlerCustomizer<MessageHandler> handlerCustomizer() {
    return (hand, dest) -> {
        RabbitStreamMessageHandler handler = (RabbitStreamMessageHandler) hand;
        handler.setConfirmTimeout(5000);
        ((RabbitStreamTemplate) handler.getStreamOperations()).setProducerCustomizer(
                (name, builder) -> {
                    ...
                });
    };
}

有关配置环境和生产者构建器的信息,请参阅 RabbitMQ Stream Java Client documentation

Producer Support for the RabbitMQ Super Streams

有关超级流的信息,请参见 Super Streams

使用超级流可以在每个超级流分区上自动执行带有单个活动消费者的纵向或横向扩展。通过使用 Spring Cloud Stream,您可以通过 AMQP 或使用流客户端发布到超级流。

超级流必须已经存在;生产者绑定不支持创建超级流。

通过 AMQP 发布到超级流:

spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false

使用流客户端发布到超级流中:

spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.producer-type=stream-async
spring.cloud.stream.rabbit.bindings.output.producer.super-stream=true
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false

使用流客户端时,如果设置了 confirmAckChannel,成功发送的消息的副本将被发送到该通道。