Initial Consumer Support for the RabbitMQ Stream Plugin
现在提供对 RabbitMQ Stream Plugin 的基本支持。若要启用此功能,您必须将 spring-rabbit-stream
jar 添加到类路径——它必须与 spring-amqp
和 spring-rabbit
使用相同版本。
Basic support for the RabbitMQ Stream Plugin is now provided.
To enable this feature, you must add the spring-rabbit-stream
jar to the class path - it must be the same version as spring-amqp
and spring-rabbit
.
如果您将 containerType
属性设置为 stream
时,以上所述的使用者属性不会受到支持;concurrency
仅支持超级流。每个绑定仅可以消耗一个流队列。
The consumer properties described above are not supported when you set the containerType
property to stream
; concurrency
is supported for super streams only.
Only a single stream queue can be consumed by each binding.
要将 Binder 配置为使用 containerType=stream
,Spring Boot 会自动根据应用程序属性配置 Environment
@Bean
。可以选择性地添加一个自定义器来自定义侦听器容器。
To configure the binder to use containerType=stream
, Spring Boot will automatically configure an Environment
@Bean
from the application properties.
You can, optionally, add a customizer to customize the listener container.
@Bean
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
return (cont, dest, group) -> {
StreamListenerContainer container = (StreamListenerContainer) cont;
container.setConsumerCustomizer((name, builder) -> {
builder.offset(OffsetSpecification.first());
});
// ...
};
}
传递给自定义器的 name
参数是 destination + '.' + group + '.container'
。
The name
argument passed to the customizer is destination + '.' + group + '.container'
.
用于偏移跟踪的流 name()
被设置为绑定 destination + '.' + group
。可以使用上面显示的 ConsumerCustomizer
来更改它。如果你决定使用手动偏移跟踪,Context
可以作为消息头提供:
The stream name()
(for the purpose of offset tracking) is set to the binding destination + '.' + group
.
It can be changed using a ConsumerCustomizer
shown above.
If you decide to use manual offset tracking, the Context
is available as a message header:
int count;
@Bean
public Consumer<Message<?>> input() {
return msg -> {
System.out.println(msg);
if (++count % 1000 == 0) {
Context context = msg.getHeaders().get("rabbitmq_streamContext", Context.class);
context.consumer().store(context.offset());
}
};
}
有关配置环境和消费者构建器的信息,请参阅 RabbitMQ Stream Java Client documentation。
Refer to the RabbitMQ Stream Java Client documentation for information about configuring the environment and consumer builder.
Consumer Support for the RabbitMQ Super Streams
有关超级流的信息,请参见 Super Streams。
See Super Streams for information about super streams.
使用超级流允许每个超级流分区上有一个活动消费者,来自动增大和缩小规模。
Use of super streams allows for automatic scale-up scale-down with a single active consumer on each partition of a super stream.
配置示例:
Configuration example:
@Bean
public Consumer<Thing> input() {
...
}
spring.cloud.stream.bindings.input-in-0.destination=super
spring.cloud.stream.bindings.input-in-0.group=test
spring.cloud.stream.bindings.input-in-0.consumer.instance-count=3
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=3
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.container-type=STREAM
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.super-stream=true
框架将创建一个名为 super
的超级流,其中包含 9 个分区。最多可以部署 3 个这个应用程序的实例。
The framework will create a super stream named super
, with 9 partitions.
Up to 3 instances of this application can be deployed.