Initial Producer Support for the RabbitMQ Stream Plugin
现在提供对 RabbitMQ Stream Plugin 的基本支持。若要启用此功能,您必须将 spring-rabbit-stream
jar 添加到类路径——它必须与 spring-amqp
和 spring-rabbit
使用相同版本。
Basic support for the RabbitMQ Stream Plugin is now provided.
To enable this feature, you must add the spring-rabbit-stream
jar to the class path - it must be the same version as spring-amqp
and spring-rabbit
.
当您将 producerType
属性设置为 STREAM_SYNC
或 STREAM_ASYNC
时,以上所述的生产者属性不受支持。
The producer properties described above are not supported when you set the producerType
property to STREAM_SYNC
or STREAM_ASYNC
.
要将 Binder 配置为使用流 ProducerType
,Spring Boot 将从应用程序属性中配置一个 Environment
@Bean
。您还可以选择性地添加一个定制器来定制消息处理程序。
To configure the binder to use a stream ProducerType
, Spring Boot will configure an Environment
@Bean
from the applicaation properties.
You can, optionally, add a customizer to customize the message handler.
@Bean
ProducerMessageHandlerCustomizer<MessageHandler> handlerCustomizer() {
return (hand, dest) -> {
RabbitStreamMessageHandler handler = (RabbitStreamMessageHandler) hand;
handler.setConfirmTimeout(5000);
((RabbitStreamTemplate) handler.getStreamOperations()).setProducerCustomizer(
(name, builder) -> {
...
});
};
}
有关配置环境和生产者构建器的信息,请参阅 RabbitMQ Stream Java Client documentation。
Refer to the RabbitMQ Stream Java Client documentation for information about configuring the environment and producer builder.
Producer Support for the RabbitMQ Super Streams
有关超级流的信息,请参见 Super Streams。
See Super Streams for information about super streams.
使用超级流可以在每个超级流分区上自动执行带有单个活动消费者的纵向或横向扩展。通过使用 Spring Cloud Stream,您可以通过 AMQP 或使用流客户端发布到超级流。
Use of super streams allows for automatic scale-up scale-down with a single active consumer on each partition of a super stream. Using Spring Cloud Stream, you can publish to a super stream either over AMQP, or using the stream client.
超级流必须已经存在;生产者绑定不支持创建超级流。
The super stream must already exist; creating a super stream is not supported by producer bindings.
通过 AMQP 发布到超级流:
Publishing to a super stream over AMQP:
spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
使用流客户端发布到超级流中:
Publishing to a super stream using the stream client:
spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.producer-type=stream-async
spring.cloud.stream.rabbit.bindings.output.producer.super-stream=true
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
使用流客户端时,如果设置了 confirmAckChannel
,成功发送的消息的副本将被发送到该通道。
When using the stream client, if you set a confirmAckChannel
, a copy of a successfully sent message will be sent to that channel.