Initial Consumer Support for the RabbitMQ Stream Plugin

现在提供对 RabbitMQ Stream Plugin 的基本支持。若要启用此功能,您必须将 spring-rabbit-stream jar 添加到类路径——它必须与 spring-amqpspring-rabbit 使用相同版本。

如果您将 containerType 属性设置为 stream 时,以上所述的使用者属性不会受到支持;concurrency 仅支持超级流。每个绑定仅可以消耗一个流队列。

要将 Binder 配置为使用 containerType=stream,Spring Boot 会自动根据应用程序属性配置 Environment @Bean。可以选择性地添加一个自定义器来自定义侦听器容器。

@Bean
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
    return (cont, dest, group) -> {
        StreamListenerContainer container = (StreamListenerContainer) cont;
        container.setConsumerCustomizer((name, builder) -> {
            builder.offset(OffsetSpecification.first());
        });
        // ...
    };
}

传递给自定义器的 name 参数是 destination + '.' + group + '.container'。 用于偏移跟踪的流 name() 被设置为绑定 destination + '.' + group。可以使用上面显示的 ConsumerCustomizer 来更改它。如果你决定使用手动偏移跟踪,Context 可以作为消息头提供:

int count;

@Bean
public Consumer<Message<?>> input() {
    return msg -> {
        System.out.println(msg);
        if (++count % 1000 == 0) {
            Context context = msg.getHeaders().get("rabbitmq_streamContext", Context.class);
            context.consumer().store(context.offset());
        }
    };
}

有关配置环境和消费者构建器的信息,请参阅 RabbitMQ Stream Java Client documentation

Consumer Support for the RabbitMQ Super Streams

有关超级流的信息,请参见 Super Streams

使用超级流允许每个超级流分区上有一个活动消费者,来自动增大和缩小规模。

配置示例:

@Bean
public Consumer<Thing> input() {
    ...
}
spring.cloud.stream.bindings.input-in-0.destination=super
spring.cloud.stream.bindings.input-in-0.group=test
spring.cloud.stream.bindings.input-in-0.consumer.instance-count=3
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=3
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.container-type=STREAM
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.super-stream=true

框架将创建一个名为 super 的超级流,其中包含 9 个分区。最多可以部署 3 个这个应用程序的实例。