Using the RabbitMQ Stream Plugin
版本 2.4 引入了对 RabbitMQ Stream Plugin 的 RabbitMQ Stream Plugin Java Client 的初始支持。
Version 2.4 introduces initial support for the RabbitMQ Stream Plugin Java Client for the RabbitMQ Stream Plugin.
-
RabbitStreamTemplate
-
StreamListenerContainer
将 spring-rabbit-stream
依赖项添加到你的项目:
Add the spring-rabbit-stream
dependency to your project:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-stream</artifactId>
<version>{project-version}</version>
</dependency>
compile 'org.springframework.amqp:spring-rabbit-stream:{project-version}'
你可以使用 RabbitAdmin
bean 来置备队列,使用 QueueBuilder.stream()
方法指定队列类型。例如:
You can provision the queues as normal, using a RabbitAdmin
bean, using the QueueBuilder.stream()
method to designate the queue type.
For example:
@Bean
Queue stream() {
return QueueBuilder.durable("stream.queue1")
.stream()
.build();
}
然而,如果你也在使用非流组件(比如 SimpleMessageListenerContainer
或 DirectMessageListenerContainer
),这部分代码才会有用,因为当一个 AMQP 连接被打开时,该管理员被触发以声明已定义的 bean。如果你的应用程序仅使用流组件,或者你希望使用高级流配置功能,你应该配置一个 StreamAdmin
:
However, this will only work if you are also using non-stream components (such as the SimpleMessageListenerContainer
or DirectMessageListenerContainer
) because the admin is triggered to declare the defined beans when an AMQP connection is opened.
If your application only uses stream components, or you wish to use advanced stream configuration features, you should configure a StreamAdmin
instead:
@Bean
StreamAdmin streamAdmin(Environment env) {
return new StreamAdmin(env, sc -> {
sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
sc.stream("stream.queue2").create();
});
}
参考 RabbitMQ 文档以获取有关 StreamCreator
的更多信息。
Refer to the RabbitMQ documentation for more information about the StreamCreator
.
Sending Messages
RabbitStreamTemplate
提供了一个 RabbitTemplate
(AMQP) 功能的子集。
The RabbitStreamTemplate
provides a subset of the RabbitTemplate
(AMQP) functionality.
public interface RabbitStreamOperations extends AutoCloseable {
CompletableFuture<Boolean> send(Message message);
CompletableFuture<Boolean> convertAndSend(Object message);
CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);
CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);
MessageBuilder messageBuilder();
MessageConverter messageConverter();
StreamMessageConverter streamMessageConverter();
@Override
void close() throws AmqpException;
}
RabbitStreamTemplate
实现具有以下构造器和属性:
The RabbitStreamTemplate
implementation has the following constructor and properties:
public RabbitStreamTemplate(Environment environment, String streamName) {
}
public void setMessageConverter(MessageConverter messageConverter) {
}
public void setStreamConverter(StreamMessageConverter streamConverter) {
}
public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}
MessageConverter
在 convertAndSend
方法中用于将对象转换为 Spring AMQP Message
。
The MessageConverter
is used in the convertAndSend
methods to convert the object to a Spring AMQP Message
.
StreamMessageConverter
用于从 Spring AMQP Message
转换为本机流 Message
。
The StreamMessageConverter
is used to convert from a Spring AMQP Message
to a native stream Message
.
你还可以直接发送本机流 Message
;使用 messageBuilder()
方法来访问 Producer
的消息生成器。
You can also send native stream Message
s directly; with the messageBuilder()
method providing access to the Producer
's message builder.
ProducerCustomizer
提供了一种在生成之前定制生成器的机制。
The ProducerCustomizer
provides a mechanism to customize the producer before it is built.
请参阅 Java Client Documentation 以定制 Environment
和 Producer
。
Refer to the Java Client Documentation about customizing the Environment
and Producer
.
从版本 3.0 开始,方法返回类型为 CompletableFuture
,而不是 ListenableFuture
。
Starting with version 3.0, the method return types are CompletableFuture
instead of ListenableFuture
.
Receiving Messages
异步消息接收由 StreamListenerContainer
(以及使用 @RabbitListener
时的 StreamRabbitListenerContainerFactory
)提供。
Asynchronous message reception is provided by the StreamListenerContainer
(and the StreamRabbitListenerContainerFactory
when using @RabbitListener
).
侦听器容器需要一个 Environment
以及一个流名称。
The listener container requires an Environment
as well as a single stream name.
你可以使用经典 MessageListener
接收 Spring AMQP Message
,也可以使用新界面接收本机流 Message
:
You can either receive Spring AMQP Message
s using the classic MessageListener
, or you can receive native stream Message
s using a new interface:
public interface StreamMessageListener extends MessageListener {
void onStreamMessage(Message message, Context context);
}
有关受支持属性的信息,请参见 Message Listener Container Configuration。
See Message Listener Container Configuration for information about supported properties.
类似于模板,容器具有一个 ConsumerCustomizer
属性。
Similar the template, the container has a ConsumerCustomizer
property.
请参阅 Java Client Documentation 以定制 Environment
和 Consumer
。
Refer to the Java Client Documentation about customizing the Environment
and Consumer
.
在使用 @RabbitListener
时,配置一个 StreamRabbitListenerContainerFactory
;此时,大多数 @RabbitListener
属性(“concurrency”等)都会被忽略。只有 id
、queues
、autoStartup
和 containerFactory
是受支持的。此外,queues
只包含一个流名称。
When using @RabbitListener
, configure a StreamRabbitListenerContainerFactory
; at this time, most @RabbitListener
properties (concurrency
, etc) are ignored. Only id
, queues
, autoStartup
and containerFactory
are supported.
In addition, queues
can only contain one stream name.
Examples
@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
template.setProducerCustomizer((name, builder) -> builder.name("test"));
return template;
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
return new StreamRabbitListenerContainerFactory(env);
}
@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
...
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
factory.setNativeListener(true);
factory.setConsumerCustomizer((id, builder) -> {
builder.name("myConsumer")
.offset(OffsetSpecification.first())
.manualTrackingStrategy();
});
return factory;
}
@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
...
context.storeOffset();
}
@Bean
Queue stream() {
return QueueBuilder.durable("test.stream.queue1")
.stream()
.build();
}
@Bean
Queue stream() {
return QueueBuilder.durable("test.stream.queue2")
.stream()
.build();
}
第 2.4.5 版本在 StreamListenerContainer
(及其工厂)中添加了 adviceChain
属性。还提供了一个新的工厂 bean 以创建带有可选 StreamMessageRecoverer
无状态重试拦截器,以便在使用原始流消息时使用。
Version 2.4.5 added the adviceChain
property to the StreamListenerContainer
(and its factory).
A new factory bean is also provided to create a stateless retry interceptor with an optional StreamMessageRecoverer
for use when consuming raw stream messages.
@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
StreamRetryOperationsInterceptorFactoryBean rfb =
new StreamRetryOperationsInterceptorFactoryBean();
rfb.setRetryOperations(retryTemplate);
rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
...
});
return rfb;
}
此容器不支持有状态重试。
Stateful retry is not supported with this container.
Super Streams
超级流是一个抽象概念,表示一个分区流,通过将大量流队列绑定到具有参数 x-super-stream: true
的交换机来实现。
A Super Stream is an abstract concept for a partitioned stream, implemented by binding a number of stream queues to an exchange having an argument x-super-stream: true
.
Provisioning
为了方便起见,可以通过定义一个 SuperStream
类型的 bean 来配置超级流。
For convenience, a super stream can be provisioned by defining a single bean of type SuperStream
.
@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3);
}
RabbitAdmin
检测到该 bean 并且将声明交换机 (my.super.stream
) 和 3 个队列(分区)- my.super-stream-n
,其中 n
为 0
、1
、2
,绑定到的路由键等于 n
。
The RabbitAdmin
detects this bean and will declare the exchange (my.super.stream
) and 3 queues (partitions) - my.super-stream-n
where n
is 0
, 1
, 2
, bound with routing keys equal to n
.
如果你还希望通过 AMQP 发布到该交换,你可以提供自定义路由密钥:
If you also wish to publish over AMQP to the exchange, you can provide custom routing keys:
@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
.mapToObj(j -> "rk-" + j)
.collect(Collectors.toList()));
}
密钥的数量必须等于分区数。
The number of keys must equal the number of partitions.
Producing to a SuperStream
你必须向 RabbitStreamTemplate
添加一个 superStreamRoutingFunction
:
You must add a superStreamRoutingFunction
to the RabbitStreamTemplate
:
@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
template.setSuperStreamRouting(message -> {
// some logic to return a String for the client's hashing algorithm
});
return template;
}
你还可以使用 RabbitTemplate
,通过 AMQP 发布。
You can also publish over AMQP, using the RabbitTemplate
.
Consuming Super Streams with Single Active Consumers
调用侦听器容器上的 superStream
方法,以便在超级流上启用单个活动使用者。
Invoke the superStream
method on the listener container to enable a single active consumer on a super stream.
@Bean
StreamListenerContainer container(Environment env, String name) {
StreamListenerContainer container = new StreamListenerContainer(env);
container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
container.setupMessageListener(msg -> {
...
});
container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
return container;
}
此时,当并发性大于 1 时,实际并发性会进一步受到 Environment
的控制;要实现充分并发性,请将环境的 maxConsumersByConnection
设置为 1。请参阅 Configuring the Environment。
At this time, when the concurrency is greater than 1, the actual concurrency is further controlled by the Environment
; to achieve full concurrency, set the environment’s maxConsumersByConnection
to 1.
See Configuring the Environment.
Micrometer Observation
从 3.0.5 版本开始,现在支持使用 Micrometer 来观测 RabbitStreamTemplate
和流侦听器容器。容器现在还支持 Micrometer 定时器(当未启用观测时)。
Using Micrometer for observation is now supported, since version 3.0.5, for the RabbitStreamTemplate
and the stream listener container.
The container now also supports Micrometer timers (when observation is not enabled).
在每个组件上设置`observationEnabled`以启用观察; 这会禁用Micrometer Timers,因为现在将使用每个观察管理计时器。 当使用带注释的侦听器时,在容器工厂上设置`observationEnabled`。
Set observationEnabled
on each component to enable observation; this will disable Micrometer Timers because the timers will now be managed with each observation.
When using annotated listeners, set observationEnabled
on the container factory.
有关更多信息,请参阅 Micrometer Tracing。
Refer to Micrometer Tracing for more information.
要将标记添加到定时器/跟踪中,请分别向模板或侦听器容器配置一个自定义 RabbitStreamTemplateObservationConvention
或 RabbitStreamListenerObservationConvention
。
To add tags to timers/traces, configure a custom RabbitStreamTemplateObservationConvention
or RabbitStreamListenerObservationConvention
to the template or listener container, respectively.
默认实现会为模板观察添加 name
标记,为容器添加 listener.id
标记。
The default implementations add the name
tag for template observations and listener.id
tag for containers.
你可以对 DefaultRabbitStreamTemplateObservationConvention
或 DefaultStreamRabbitListenerObservationConvention
进行子类化,或提供全新的实现。
You can either subclass DefaultRabbitStreamTemplateObservationConvention
or DefaultStreamRabbitListenerObservationConvention
or provide completely new implementations.
有关更多详细信息,请参阅 Micrometer Observation Documentation。
See Micrometer Observation Documentation for more details.