Initial Consumer Support for the RabbitMQ Stream Plugin

现在提供对 RabbitMQ Stream Plugin 的基本支持。若要启用此功能,您必须将 spring-rabbit-stream jar 添加到类路径——它必须与 spring-amqpspring-rabbit 使用相同版本。

Basic support for the RabbitMQ Stream Plugin is now provided. To enable this feature, you must add the spring-rabbit-stream jar to the class path - it must be the same version as spring-amqp and spring-rabbit.

如果您将 containerType 属性设置为 stream 时,以上所述的使用者属性不会受到支持;concurrency 仅支持超级流。每个绑定仅可以消耗一个流队列。

The consumer properties described above are not supported when you set the containerType property to stream; concurrency is supported for super streams only. Only a single stream queue can be consumed by each binding.

要将 Binder 配置为使用 containerType=stream,Spring Boot 会自动根据应用程序属性配置 Environment @Bean。可以选择性地添加一个自定义器来自定义侦听器容器。

To configure the binder to use containerType=stream, Spring Boot will automatically configure an Environment @Bean from the application properties. You can, optionally, add a customizer to customize the listener container.

@Bean
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
    return (cont, dest, group) -> {
        StreamListenerContainer container = (StreamListenerContainer) cont;
        container.setConsumerCustomizer((name, builder) -> {
            builder.offset(OffsetSpecification.first());
        });
        // ...
    };
}

传递给自定义器的 name 参数是 destination + '.' + group + '.container'

The name argument passed to the customizer is destination + '.' + group + '.container'.

用于偏移跟踪的流 name() 被设置为绑定 destination + '.' + group。可以使用上面显示的 ConsumerCustomizer 来更改它。如果你决定使用手动偏移跟踪,Context 可以作为消息头提供:

The stream name() (for the purpose of offset tracking) is set to the binding destination + '.' + group. It can be changed using a ConsumerCustomizer shown above. If you decide to use manual offset tracking, the Context is available as a message header:

int count;

@Bean
public Consumer<Message<?>> input() {
    return msg -> {
        System.out.println(msg);
        if (++count % 1000 == 0) {
            Context context = msg.getHeaders().get("rabbitmq_streamContext", Context.class);
            context.consumer().store(context.offset());
        }
    };
}

有关配置环境和消费者构建器的信息,请参阅 RabbitMQ Stream Java Client documentation

Refer to the RabbitMQ Stream Java Client documentation for information about configuring the environment and consumer builder.

Consumer Support for the RabbitMQ Super Streams

有关超级流的信息,请参见 Super Streams

See Super Streams for information about super streams.

使用超级流允许每个超级流分区上有一个活动消费者,来自动增大和缩小规模。

Use of super streams allows for automatic scale-up scale-down with a single active consumer on each partition of a super stream.

配置示例:

Configuration example:

@Bean
public Consumer<Thing> input() {
    ...
}
spring.cloud.stream.bindings.input-in-0.destination=super
spring.cloud.stream.bindings.input-in-0.group=test
spring.cloud.stream.bindings.input-in-0.consumer.instance-count=3
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=3
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.container-type=STREAM
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.super-stream=true

框架将创建一个名为 super 的超级流,其中包含 9 个分区。最多可以部署 3 个这个应用程序的实例。

The framework will create a super stream named super, with 9 partitions. Up to 3 instances of this application can be deployed.