Using the RabbitMQ Stream Plugin

版本 2.4 引入了对 RabbitMQ Stream PluginRabbitMQ Stream Plugin Java Client 的初始支持。

  • RabbitStreamTemplate

  • StreamListenerContainer

spring-rabbit-stream 依赖项添加到你的项目:

maven
<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit-stream</artifactId>
  <version>{project-version}</version>
</dependency>
gradle
compile 'org.springframework.amqp:spring-rabbit-stream:{project-version}'

你可以使用 RabbitAdmin bean 来置备队列,使用 QueueBuilder.stream() 方法指定队列类型。例如:

@Bean
Queue stream() {
    return QueueBuilder.durable("stream.queue1")
            .stream()
            .build();
}

然而,如果你也在使用非流组件(比如 SimpleMessageListenerContainerDirectMessageListenerContainer),这部分代码才会有用,因为当一个 AMQP 连接被打开时,该管理员被触发以声明已定义的 bean。如果你的应用程序仅使用流组件,或者你希望使用高级流配置功能,你应该配置一个 StreamAdmin

@Bean
StreamAdmin streamAdmin(Environment env) {
    return new StreamAdmin(env, sc -> {
        sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
        sc.stream("stream.queue2").create();
    });
}

参考 RabbitMQ 文档以获取有关 StreamCreator 的更多信息。

Sending Messages

RabbitStreamTemplate 提供了一个 RabbitTemplate (AMQP) 功能的子集。

RabbitStreamOperations
public interface RabbitStreamOperations extends AutoCloseable {

	CompletableFuture<Boolean> send(Message message);

	CompletableFuture<Boolean> convertAndSend(Object message);

	CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);

	CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);

	MessageBuilder messageBuilder();

	MessageConverter messageConverter();

	StreamMessageConverter streamMessageConverter();

	@Override
	void close() throws AmqpException;

}

RabbitStreamTemplate 实现具有以下构造器和属性:

RabbitStreamTemplate
public RabbitStreamTemplate(Environment environment, String streamName) {
}

public void setMessageConverter(MessageConverter messageConverter) {
}

public void setStreamConverter(StreamMessageConverter streamConverter) {
}

public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}

MessageConverterconvertAndSend 方法中用于将对象转换为 Spring AMQP Message

StreamMessageConverter 用于从 Spring AMQP Message 转换为本机流 Message

你还可以直接发送本机流 Message;使用 messageBuilder() 方法来访问 Producer 的消息生成器。

ProducerCustomizer 提供了一种在生成之前定制生成器的机制。

请参阅 Java Client Documentation 以定制 EnvironmentProducer

从版本 3.0 开始,方法返回类型为 CompletableFuture,而不是 ListenableFuture

Receiving Messages

异步消息接收由 StreamListenerContainer(以及使用 @RabbitListener 时的 StreamRabbitListenerContainerFactory)提供。

侦听器容器需要一个 Environment 以及一个流名称。

你可以使用经典 MessageListener 接收 Spring AMQP Message,也可以使用新界面接收本机流 Message

public interface StreamMessageListener extends MessageListener {

	void onStreamMessage(Message message, Context context);

}

有关受支持属性的信息,请参见 Message Listener Container Configuration

类似于模板,容器具有一个 ConsumerCustomizer 属性。

请参阅 Java Client Documentation 以定制 EnvironmentConsumer

在使用 @RabbitListener 时,配置一个 StreamRabbitListenerContainerFactory;此时,大多数 @RabbitListener 属性(“concurrency”等)都会被忽略。只有 idqueuesautoStartupcontainerFactory 是受支持的。此外,queues 只包含一个流名称。

Examples

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
    template.setProducerCustomizer((name, builder) -> builder.name("test"));
    return template;
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
    return new StreamRabbitListenerContainerFactory(env);
}

@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
    ...
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
    StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
    factory.setNativeListener(true);
    factory.setConsumerCustomizer((id, builder) -> {
        builder.name("myConsumer")
                .offset(OffsetSpecification.first())
                .manualTrackingStrategy();
    });
    return factory;
}

@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
    ...
    context.storeOffset();
}

@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue1")
            .stream()
            .build();
}

@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue2")
            .stream()
            .build();
}

第 2.4.5 版本在 StreamListenerContainer(及其工厂)中添加了 adviceChain 属性。还提供了一个新的工厂 bean 以创建带有可选 StreamMessageRecoverer 无状态重试拦截器,以便在使用原始流消息时使用。

@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
    StreamRetryOperationsInterceptorFactoryBean rfb =
            new StreamRetryOperationsInterceptorFactoryBean();
    rfb.setRetryOperations(retryTemplate);
    rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
        ...
    });
    return rfb;
}

此容器不支持有状态重试。

Super Streams

超级流是一个抽象概念,表示一个分区流,通过将大量流队列绑定到具有参数 x-super-stream: true 的交换机来实现。

Provisioning

为了方便起见,可以通过定义一个 SuperStream 类型的 bean 来配置超级流。

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3);
}

RabbitAdmin 检测到该 bean 并且将声明交换机 (my.super.stream) 和 3 个队列(分区)- my.super-stream-n,其中 n012,绑定到的路由键等于 n

如果你还希望通过 AMQP 发布到该交换,你可以提供自定义路由密钥:

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
					.mapToObj(j -> "rk-" + j)
					.collect(Collectors.toList()));
}

密钥的数量必须等于分区数。

Producing to a SuperStream

你必须向 RabbitStreamTemplate 添加一个 superStreamRoutingFunction

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
    template.setSuperStreamRouting(message -> {
        // some logic to return a String for the client's hashing algorithm
    });
    return template;
}

你还可以使用 RabbitTemplate,通过 AMQP 发布。

Consuming Super Streams with Single Active Consumers

调用侦听器容器上的 superStream 方法,以便在超级流上启用单个活动使用者。

@Bean
StreamListenerContainer container(Environment env, String name) {
    StreamListenerContainer container = new StreamListenerContainer(env);
    container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
    container.setupMessageListener(msg -> {
        ...
    });
    container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
    return container;
}

此时,当并发性大于 1 时,实际并发性会进一步受到 Environment 的控制;要实现充分并发性,请将环境的 maxConsumersByConnection 设置为 1。请参阅 Configuring the Environment

Micrometer Observation

从 3.0.5 版本开始,现在支持使用 Micrometer 来观测 RabbitStreamTemplate 和流侦听器容器。容器现在还支持 Micrometer 定时器(当未启用观测时)。

在每个组件上设置`observationEnabled`以启用观察; 这会禁用Micrometer Timers,因为现在将使用每个观察管理计时器。 当使用带注释的侦听器时,在容器工厂上设置`observationEnabled`。

有关更多信息,请参阅 Micrometer Tracing

要将标记添加到定时器/跟踪中,请分别向模板或侦听器容器配置一个自定义 RabbitStreamTemplateObservationConventionRabbitStreamListenerObservationConvention

默认实现会为模板观察添加 name 标记,为容器添加 listener.id 标记。

你可以对 DefaultRabbitStreamTemplateObservationConventionDefaultStreamRabbitListenerObservationConvention 进行子类化,或提供全新的实现。

有关更多详细信息,请参阅 Micrometer Observation Documentation