Using the RabbitMQ Stream Plugin

版本 2.4 引入了对 RabbitMQ Stream PluginRabbitMQ Stream Plugin Java Client 的初始支持。

Version 2.4 introduces initial support for the RabbitMQ Stream Plugin Java Client for the RabbitMQ Stream Plugin.

  • RabbitStreamTemplate

  • StreamListenerContainer

spring-rabbit-stream 依赖项添加到你的项目:

Add the spring-rabbit-stream dependency to your project:

maven
<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit-stream</artifactId>
  <version>{project-version}</version>
</dependency>
gradle
compile 'org.springframework.amqp:spring-rabbit-stream:{project-version}'

你可以使用 RabbitAdmin bean 来置备队列,使用 QueueBuilder.stream() 方法指定队列类型。例如:

You can provision the queues as normal, using a RabbitAdmin bean, using the QueueBuilder.stream() method to designate the queue type. For example:

@Bean
Queue stream() {
    return QueueBuilder.durable("stream.queue1")
            .stream()
            .build();
}

然而,如果你也在使用非流组件(比如 SimpleMessageListenerContainerDirectMessageListenerContainer),这部分代码才会有用,因为当一个 AMQP 连接被打开时,该管理员被触发以声明已定义的 bean。如果你的应用程序仅使用流组件,或者你希望使用高级流配置功能,你应该配置一个 StreamAdmin

However, this will only work if you are also using non-stream components (such as the SimpleMessageListenerContainer or DirectMessageListenerContainer) because the admin is triggered to declare the defined beans when an AMQP connection is opened. If your application only uses stream components, or you wish to use advanced stream configuration features, you should configure a StreamAdmin instead:

@Bean
StreamAdmin streamAdmin(Environment env) {
    return new StreamAdmin(env, sc -> {
        sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
        sc.stream("stream.queue2").create();
    });
}

参考 RabbitMQ 文档以获取有关 StreamCreator 的更多信息。

Refer to the RabbitMQ documentation for more information about the StreamCreator.

Sending Messages

RabbitStreamTemplate 提供了一个 RabbitTemplate (AMQP) 功能的子集。

The RabbitStreamTemplate provides a subset of the RabbitTemplate (AMQP) functionality.

RabbitStreamOperations
public interface RabbitStreamOperations extends AutoCloseable {

	CompletableFuture<Boolean> send(Message message);

	CompletableFuture<Boolean> convertAndSend(Object message);

	CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);

	CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);

	MessageBuilder messageBuilder();

	MessageConverter messageConverter();

	StreamMessageConverter streamMessageConverter();

	@Override
	void close() throws AmqpException;

}

RabbitStreamTemplate 实现具有以下构造器和属性:

The RabbitStreamTemplate implementation has the following constructor and properties:

RabbitStreamTemplate
public RabbitStreamTemplate(Environment environment, String streamName) {
}

public void setMessageConverter(MessageConverter messageConverter) {
}

public void setStreamConverter(StreamMessageConverter streamConverter) {
}

public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}

MessageConverterconvertAndSend 方法中用于将对象转换为 Spring AMQP Message

The MessageConverter is used in the convertAndSend methods to convert the object to a Spring AMQP Message.

StreamMessageConverter 用于从 Spring AMQP Message 转换为本机流 Message

The StreamMessageConverter is used to convert from a Spring AMQP Message to a native stream Message.

你还可以直接发送本机流 Message;使用 messageBuilder() 方法来访问 Producer 的消息生成器。

You can also send native stream Message s directly; with the messageBuilder() method providing access to the Producer 's message builder.

ProducerCustomizer 提供了一种在生成之前定制生成器的机制。

The ProducerCustomizer provides a mechanism to customize the producer before it is built.

请参阅 Java Client Documentation 以定制 EnvironmentProducer

Refer to the Java Client Documentation about customizing the Environment and Producer.

从版本 3.0 开始,方法返回类型为 CompletableFuture,而不是 ListenableFuture

Starting with version 3.0, the method return types are CompletableFuture instead of ListenableFuture.

Receiving Messages

异步消息接收由 StreamListenerContainer(以及使用 @RabbitListener 时的 StreamRabbitListenerContainerFactory)提供。

Asynchronous message reception is provided by the StreamListenerContainer (and the StreamRabbitListenerContainerFactory when using @RabbitListener).

侦听器容器需要一个 Environment 以及一个流名称。

The listener container requires an Environment as well as a single stream name.

你可以使用经典 MessageListener 接收 Spring AMQP Message,也可以使用新界面接收本机流 Message

You can either receive Spring AMQP Message s using the classic MessageListener, or you can receive native stream Message s using a new interface:

public interface StreamMessageListener extends MessageListener {

	void onStreamMessage(Message message, Context context);

}

有关受支持属性的信息,请参见 Message Listener Container Configuration

See Message Listener Container Configuration for information about supported properties.

类似于模板,容器具有一个 ConsumerCustomizer 属性。

Similar the template, the container has a ConsumerCustomizer property.

请参阅 Java Client Documentation 以定制 EnvironmentConsumer

Refer to the Java Client Documentation about customizing the Environment and Consumer.

在使用 @RabbitListener 时,配置一个 StreamRabbitListenerContainerFactory;此时,大多数 @RabbitListener 属性(“concurrency”等)都会被忽略。只有 idqueuesautoStartupcontainerFactory 是受支持的。此外,queues 只包含一个流名称。

When using @RabbitListener, configure a StreamRabbitListenerContainerFactory; at this time, most @RabbitListener properties (concurrency, etc) are ignored. Only id, queues, autoStartup and containerFactory are supported. In addition, queues can only contain one stream name.

Examples

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
    template.setProducerCustomizer((name, builder) -> builder.name("test"));
    return template;
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
    return new StreamRabbitListenerContainerFactory(env);
}

@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
    ...
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
    StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
    factory.setNativeListener(true);
    factory.setConsumerCustomizer((id, builder) -> {
        builder.name("myConsumer")
                .offset(OffsetSpecification.first())
                .manualTrackingStrategy();
    });
    return factory;
}

@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
    ...
    context.storeOffset();
}

@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue1")
            .stream()
            .build();
}

@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue2")
            .stream()
            .build();
}

第 2.4.5 版本在 StreamListenerContainer(及其工厂)中添加了 adviceChain 属性。还提供了一个新的工厂 bean 以创建带有可选 StreamMessageRecoverer 无状态重试拦截器,以便在使用原始流消息时使用。

Version 2.4.5 added the adviceChain property to the StreamListenerContainer (and its factory). A new factory bean is also provided to create a stateless retry interceptor with an optional StreamMessageRecoverer for use when consuming raw stream messages.

@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
    StreamRetryOperationsInterceptorFactoryBean rfb =
            new StreamRetryOperationsInterceptorFactoryBean();
    rfb.setRetryOperations(retryTemplate);
    rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
        ...
    });
    return rfb;
}

此容器不支持有状态重试。

Stateful retry is not supported with this container.

Super Streams

超级流是一个抽象概念,表示一个分区流,通过将大量流队列绑定到具有参数 x-super-stream: true 的交换机来实现。

A Super Stream is an abstract concept for a partitioned stream, implemented by binding a number of stream queues to an exchange having an argument x-super-stream: true.

Provisioning

为了方便起见,可以通过定义一个 SuperStream 类型的 bean 来配置超级流。

For convenience, a super stream can be provisioned by defining a single bean of type SuperStream.

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3);
}

RabbitAdmin 检测到该 bean 并且将声明交换机 (my.super.stream) 和 3 个队列(分区)- my.super-stream-n,其中 n012,绑定到的路由键等于 n

The RabbitAdmin detects this bean and will declare the exchange (my.super.stream) and 3 queues (partitions) - my.super-stream-n where n is 0, 1, 2, bound with routing keys equal to n.

如果你还希望通过 AMQP 发布到该交换,你可以提供自定义路由密钥:

If you also wish to publish over AMQP to the exchange, you can provide custom routing keys:

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
					.mapToObj(j -> "rk-" + j)
					.collect(Collectors.toList()));
}

密钥的数量必须等于分区数。

The number of keys must equal the number of partitions.

Producing to a SuperStream

你必须向 RabbitStreamTemplate 添加一个 superStreamRoutingFunction

You must add a superStreamRoutingFunction to the RabbitStreamTemplate:

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
    template.setSuperStreamRouting(message -> {
        // some logic to return a String for the client's hashing algorithm
    });
    return template;
}

你还可以使用 RabbitTemplate,通过 AMQP 发布。

You can also publish over AMQP, using the RabbitTemplate.

Consuming Super Streams with Single Active Consumers

调用侦听器容器上的 superStream 方法,以便在超级流上启用单个活动使用者。

Invoke the superStream method on the listener container to enable a single active consumer on a super stream.

@Bean
StreamListenerContainer container(Environment env, String name) {
    StreamListenerContainer container = new StreamListenerContainer(env);
    container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
    container.setupMessageListener(msg -> {
        ...
    });
    container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
    return container;
}

此时,当并发性大于 1 时,实际并发性会进一步受到 Environment 的控制;要实现充分并发性,请将环境的 maxConsumersByConnection 设置为 1。请参阅 Configuring the Environment

At this time, when the concurrency is greater than 1, the actual concurrency is further controlled by the Environment; to achieve full concurrency, set the environment’s maxConsumersByConnection to 1. See Configuring the Environment.

Micrometer Observation

从 3.0.5 版本开始,现在支持使用 Micrometer 来观测 RabbitStreamTemplate 和流侦听器容器。容器现在还支持 Micrometer 定时器(当未启用观测时)。

Using Micrometer for observation is now supported, since version 3.0.5, for the RabbitStreamTemplate and the stream listener container. The container now also supports Micrometer timers (when observation is not enabled).

在每个组件上设置`observationEnabled`以启用观察; 这会禁用Micrometer Timers,因为现在将使用每个观察管理计时器。 当使用带注释的侦听器时,在容器工厂上设置`observationEnabled`。

Set observationEnabled on each component to enable observation; this will disable Micrometer Timers because the timers will now be managed with each observation. When using annotated listeners, set observationEnabled on the container factory.

有关更多信息,请参阅 Micrometer Tracing

Refer to Micrometer Tracing for more information.

要将标记添加到定时器/跟踪中,请分别向模板或侦听器容器配置一个自定义 RabbitStreamTemplateObservationConventionRabbitStreamListenerObservationConvention

To add tags to timers/traces, configure a custom RabbitStreamTemplateObservationConvention or RabbitStreamListenerObservationConvention to the template or listener container, respectively.

默认实现会为模板观察添加 name 标记,为容器添加 listener.id 标记。

The default implementations add the name tag for template observations and listener.id tag for containers.

你可以对 DefaultRabbitStreamTemplateObservationConventionDefaultStreamRabbitListenerObservationConvention 进行子类化,或提供全新的实现。

You can either subclass DefaultRabbitStreamTemplateObservationConvention or DefaultStreamRabbitListenerObservationConvention or provide completely new implementations.

有关更多详细信息,请参阅 Micrometer Observation Documentation

See Micrometer Observation Documentation for more details.