Reactive Support

该框架为几乎所有受支持的功能提供了响应式对应部分。

如果您在提供的命令式组件前面加上单词 Reactive,可能会发现它对应的响应式组件。

  • PulsarTemplate → ReactivePulsarTemplate

  • PulsarListener → ReactivePulsarListener

  • PulsarConsumerFactory → ReactivePulsarConsumerFactory

  • etc..

但是,以下是尚未支持的:

  • 非共享订阅中的错误处理

  • 在流模式中通过 @Header 访问 Pulsar 标头

  • Observations

Preface

我们建议为基于 Apache Pulsar 的 Spring 应用程序采用 Spring Boot First 方式,因为这极大地简化了流程。为此,您可以将 spring-pulsar-reactive-spring-boot-starter 模块添加为依赖项。

本参考的大部分内容都假定读者使用该启动程序,并提供了针对该特定内容的大部分配置说明。但是,有意识强调针对 Spring Boot 启动程序使用的特定说明。

@8

Design

以下是一些需要牢记的关键设计要点。

Apache Pulsar Reactive

反应式支持最终由 Apache Pulsar Reactive client 提供,其当前实现是常规 Pulsar 客户端异步 API 周围的完全非阻塞适配器。这意味着反应式客户端需要常规客户端。

Additive Auto-Configuration

由于依赖于常规(命令式)客户端,因此框架提供的响应式自动配置是对命令式自动配置的补充。换句话说,命令式启动器仅包括命令式组件,而响应式启动器既包括命令式组件,也包括响应式组件。

Reactive Pulsar Client

使用响应式 Pulsar Spring Boot 启动器后,将自动配置 ReactivePulsarClient

默认情况下,该应用程序会尝试连接到 pulsar://localhost:6650 处的本地 Pulsar 实例。可以通过将 spring.pulsar.client.service-url 属性设为不同的值来调整此功能。

该值必须是一个有效的 {apache-pulsar-docs}/client-libraries-java/#connection-urls[Pulsar 协议] URL

还可以配置许多其他应用程序属性(从已改编的命令式客户端继承而来)。请参阅 {spring-boot-pulsar-config-props}[spring.pulsar.client.*] 应用程序属性。

Authentication

若要连接到需要身份验证的 Pulsar 集群,请按照 the same steps 作为命令客户端。同样,这是因为反应式客户端调整命令客户端,该客户端负责处理所有安全配置。

Message Production

ReactivePulsarTemplate

在 Pulsar 生产者端,Spring Boot 自动配置为发布记录提供 ReactivePulsarTemplate。该模板实现了名为 ReactivePulsarOperations 的接口,并提供了通过其合约发布记录的方法。

该模板提供接受单个消息并返回 Mono<MessageId> 的发送方法。它还提供接受多个消息(采用 ReactiveStreams Publisher 类型)并返回 Flux<MessageId> 的发送方法。

对于不包含主题参数的 API 变体,topic resolution process 用于确定目标主题。

Fluent API

该模板提供了一个 {javadocs}/org/springframework/pulsar/reactive/core/ReactivePulsarOperations.html#newMessage(T)[fluent builder] 来处理更复杂的发送请求。

Message customization

您可以指定 MessageSpecBuilderCustomizer 来配置传出消息。例如,以下代码演示了如何发送键控消息:

template.newMessage(msg)
    .withMessageCustomizer((mc) -> mc.key("foo-msg-key"))
    .send();

Sender customization

您可以指定 ReactiveMessageSenderBuilderCustomizer 来配置 Pulsar 发送器构建器,它最终构建用来发送传出消息的发送器。

谨慎使用,因为这可以完全访问发送器构建器,并且调用其某些方法(例如 create)可能会产生意想不到的副作用。

例如,以下代码显示如何禁用批处理并启用块处理:

template.newMessage(msg)
    .withSenderCustomizer((sc) -> sc.enableChunking(true).enableBatching(false))
    .send();

此其他示例演示了在向分区主题发布记录时如何使用自定义路由。在发送器构建器上指定您的自定义 MessageRouter 实现,例如:

template.newMessage(msg)
    .withSenderCustomizer((sc) -> sc.messageRouter(messageRouter))
    .send();

请注意,在使用 MessageRouter 时,spring.pulsar.producer.message-routing-mode 唯一的有效设置是 custom

Specifying Schema Information

@6

Specifying Schema Information

如果你使用 Java 原语类型,框架会为你自动检测架构,且你不需要为发布数据指定任何架构类型。对于非原语类型,如果在 PulsarTemplate 上调用发送操作时未明确指定架构,Spring for Apache Pulsar 框架将尝试根据类型构建 Schema.JSON

当前支持的复杂模式类型包括 JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES 和 KEY_VALUE 以及 INLINE 编码。

Custom Schema Mapping

作为在 PulsarTemplate 上调用发送操作时为复杂类型指定架构的替代方案,可以将架构解析器配置为具有类型的映射。这消除了指定架构的需要,因为框架使用传出消息类型通过解析器进行查询。

@3

@4

有了此配置,无需设置发送操作时的架构。

Producing with AUTO_SCHEMA

如果没有办法提前了解 Pulsar 主题的架构类型,您可以使用 {apache-pulsar-docs}/schema-get-started/#auto_produce[AUTO_PRODUCE] 架构来安全地发布原始 JSON 或 Avro 负载作为 byte[]

在这种情况下,生成器会验证出站字节是否与目标主题的架构兼容。

只需在模板发送操作中指定 Schema.AUTO_PRODUCE_BYTES() 架构,如下例所示:

@7

这仅受 Avro 和 JSON 模式类型支持。

ReactivePulsarSenderFactory

ReactivePulsarTemplate 依赖于 ReactivePulsarSenderFactory 来实际创建底层发送器。

Spring Boot 提供此发件人工厂,可以通过 {spring-boot-pulsar-config-props}[spring.pulsar.producer.*] 应用程序属性对其进行配置。

如果不直接通过发送器工厂 API 指定主题信息,则 ReactivePulsarTemplate 使用的 topic resolution process 与一个例外相同,即“消息类型默认值”步骤为 omitted

Producer Caching

每个底层 Pulsar 生产者都会消耗资源。为了提高性能并避免持续创建生产者,底层 Apache Pulsar 响应式客户端中的 ReactiveMessageSenderCache 会缓存它创建的生产者。它们以 LRU 方式进行缓存,并且在配置的时间段内未被使用时会将它们逐出。

可以通过指定 {spring-boot-pulsar-config-props}[spring.pulsar.producer.cache.*] 应用程序属性来配置缓存设置。

Message Consumption

@ReactivePulsarListener

说到 Pulsar 消费者,我们建议终端用户应用程序使用 ReactivePulsarListener 注释。若要使用 ReactivePulsarListener,则需要使用 @EnableReactivePulsar 注释。在使用 Spring Boot 支持时,它会自动启用此注释并配置所有必要的组件,例如消息侦听器基础架构(负责创建底层 Pulsar 消费者)。

让我们重新审视我们在快速浏览部分中看到的 ReactivePulsarListener 代码段:

@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}

侦听器方法返回一个 Mono<Void> 以指示消息是否已成功处理。Mono.empty() 表示成功(确认),Mono.error() 表示失败(否定确认)。

您还可以进一步简化此方法:

@ReactivePulsarListener
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}

在此最基本的表单中,当没有直接提供 topics 时,将使用 [主题解析流程,主题解析流程 - 响应式] 来确定目标主题。同样,当未在 @ReactivePulsarListener 注释上提供 subscriptionName 时,将使用自动生成的订阅名称。

在前面所示的 ReactivePulsarListener 方法中,我们以 String 的形式接收数据,但是我们没有指定任何架构类型。框架内部依赖于 Pulsar 的架构机制将数据转换为所需类型。

框架检测到您期望 String 类型,然后根据该信息推断架构类型,并将该架构提供给消费者。框架对所有基本类型执行此推断。对于所有非基本类型,默认架构假定为 JSON。如果复杂类型使用 JSON 以外的任何内容(例如 AVRO 或 KEY_VALUE),则必须使用 schemaType 属性在注释中提供架构类型。

此示例说明了如何从主题消费复杂类型:

@ReactivePulsarListener(topics = "my-topic-2", schemaType = SchemaType.JSON)
Mono<Void> listen(Foo message) {
    System.out.println(message);
    return Mono.empty();
}

让我们看看我们可以消费的更多方法。

此示例直接消耗 Pulsar 消息:

@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.apache.pulsar.client.api.Message<String> message) {
    System.out.println(message.getValue());
    return Mono.empty();
}

此示例使用 Spring 消息封套消耗记录:

@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.springframework.messaging.Message<String> message) {
    System.out.println(message.getPayload());
    return Mono.empty();
}

Streaming

以上所有示例都是逐个消耗单个记录的示例。然而,使用 Reactive 的一个令人信服的原因是具有反压支持的流功能。

以下示例使用 ReactivePulsarListener 消耗 POJO 流:

@ReactivePulsarListener(topics = "streaming-1", stream = true)
Flux<MessageResult<Void>> listen(Flux<org.apache.pulsar.client.api.Message<String>> messages) {
    return messages
        .doOnNext((msg) -> System.out.println("Received: " + msg.getValue()))
        .map(MessageResult::acknowledge);

在这里,我们以 Pulsar 消息的 Flux 形式接收记录。此外,要在 ReactivePulsarListener 级别启用流消耗,您需要将注释中的 stream 属性设置为 true

监听程序方法返回一个 Flux<MessageResult<Void>>,其中每个元素都表示一个已处理消息并保存有消息 ID、值和是否已确认。MessageResult 有一组静态工厂方法,可用于创建适当的 MessageResult 实例。

根据 Flux 中消息的实际类型,框架尝试推断要使用的架构。如果它包含复杂类型,您仍然需要在 ReactivePulsarListener 上提供 schemaType

以下侦听器使用带有复杂类型的 Spring 消息 Message 封套:

@ReactivePulsarListener(topics = "streaming-2", stream = true, schemaType = SchemaType.JSON)
Flux<MessageResult<Void>> listen2(Flux<org.springframework.messaging.Message<Foo>> messages) {
    return messages
        .doOnNext((msg) -> System.out.println("Received: " + msg.getPayload()))
        .map(MessageUtils::acknowledge);
}

监听程序方法返回一个 Flux<MessageResult<Void>>,其中每个元素都表示一个已处理消息并保存有消息 ID、值和是否已确认。Spring MessageUtils 有一组静态工厂方法,可用于根据 Spring 消息创建适当的 MessageResult 实例。

Configuration - Application Properties

侦听器依赖于 ReactivePulsarConsumerFactory`创建和管理它用于消费消息的底层 Pulsar 消费者。Spring Boot 提供此消费者工厂,您可以通过指定 `spring.pulsar.consumer.] application properties. *Most of the configured properties on the factory will be respected in the listener with the following exceptions

spring.pulsar.consumer.subscription.name 属性被忽略,如果在注解上未指定,则生成。

spring.pulsar.consumer.subscription-type 属性被忽略,相反从注解上的值获取。但是,您可以设置注解上的 subscriptionType = {} 以将属性值作为默认值。

Generic records with AUTO_CONSUME

如果无法提前知晓 Pulsar 主题的模式类型,则可以使用 AUTO_CONSUME 模式类型来使用常规记录。在这种情况下,主题会使用与主题关联的模式信息将消息反序列化为 GenericRecord 对象。

要消耗通用记录,请在 @ReactivePulsarListener 上设置 schemaType = SchemaType.AUTO_CONSUME,并使用类型为 GenericRecord 的 Pulsar 消息作为消息参数,如下所示。

@ReactivePulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
Mono<Void> listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
    GenericRecord record = message.getValue();
    record.getFields().forEach((f) ->
            System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
	return Mono.empty();
}

GenericRecord API 允许访问字段及其关联的值

Consumer Customization

您可以指定一个 ReactivePulsarListenerMessageConsumerBuilderCustomizer 来配置底层 Pulsar 消费者构建器,该构建器最终构建侦听器用于接收消息的消费者。

请谨慎使用,因为它会完全访问使用者生成器,并且调用其中某些方法(如 create)可能会有意想不到的副作用。

例如,以下代码说明如何将订阅的初始位置设置为主题上的最早消息。

@ReactivePulsarListener(topics = "hello-pulsar-topic", consumerCustomizer = "myConsumerCustomizer")
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}

@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myConsumerCustomizer() {
    return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}

如果你的应用程序只注册了单个 @ReactivePulsarListener 和单个 ReactivePulsarListenerMessageConsumerBuilderCustomizer Bean,那么自定义项将自动应用。

您还可以使用定制器向消费者构建器提供直接的 Pulsar 消费者属性。如果您不想使用前面提到的 Boot 配置属性或具有配置不同的多个 ReactivePulsarListener 方法,这会很方便。

以下定制器示例使用直接 Pulsar 消费者属性:

@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> directConsumerPropsCustomizer() {
    return b -> b.property("subscriptionName", "subscription-1").property("topicNames", "foo-1");
}

使用的属性是直接 Pulsar 消费者属性,不是 spring.pulsar.consumer Spring Boot 配置属性

@1

Message Listener Container Infrastructure

在大多数情况下,我们建议直接使用 ReactivePulsarListener 注释从 Pulsar 主题消费,因为该模型涵盖了广泛的应用程序用例。但是,了解 ReactivePulsarListener 在内部如何工作非常重要。

当您将 Spring 用于 Apache Pulsar 时,消息侦听器容器是消息消费的核心。ReactivePulsarListener 在幕后使用消息侦听器容器基础架构创建和管理底层 Pulsar 消费者。

ReactivePulsarMessageListenerContainer

此消息侦听器容器的契约通过 ReactivePulsarMessageListenerContainer 提供,其默认实现创建一个反应式 Pulsar 消费者,并通过一个使用创建的消费者的反应式消息管道进行连接。

ReactiveMessagePipeline

该管道是 Apache Pulsar 反应式客户端的基础功能,它以反应式方式接收数据,然后将其传递给提供的消息处理器。反应式消息侦听器容器实现相当简单,因为该管道处理了大部分工作。

ReactivePulsarMessageHandler

“侦听器”方面由 ReactivePulsarMessageHandler 提供,ReactivePulsarMessageHandler 有两种提供的实现:

  • ReactivePulsarOneByOneMessageHandler - 单个处理一条消息

  • ReactivePulsarStreamingHandler - 通过 Flux 处理多条消息

如果在直接使用侦听器容器时未指定主题信息,则会将 ReactivePulsarListener 使用的相同 topic resolution process 与“消息类型默认”步骤的唯一例外 omitted 一起使用。

Concurrency

在流模式下使用记录时(stream = true),并发性自然地通过客户端实现中的底层反应式支持而来。

但是,当逐个处理消息时,可以指定并发性以增加处理吞吐量。只需在 @ReactivePulsarListener 上设置 concurrency 属性。此外,当 concurrency > 1 时,可以通过在注释上设置 useKeyOrderedProcessing = "true" 来确保消息按键排序,因此发送到同一处理器。

同样,ReactiveMessagePipeline 也会做大量的工作,我们只需在它上面设置属性即可。

[role="small"][.small]Reactive vs Imperative

反应式容器中的并发性不同于它的命令式对应项。后者创建多个线程(每个线程都有一个 Pulsar 消费者),而前者同时将消息分派到反应式并行调度器上的多个处理器实例。 反应式并发性模型的一个优点是,它可以与“独占”订阅一起使用,而命令式并发性模型则不行。

Pulsar Headers

Pulsar 消息元数据可以作为 Spring 消息头被使用。可用的标头列表可以在{github}/blob/main/spring-pulsar/src/main/java/org/springframework/pulsar/support/PulsarHeaders.java[PulsarHeaders.java]中找到。

Accessing In OneByOne Listener

以下示例显示如何在使用逐个消息侦听器时访问 Pulsar 头信息:

@ReactivePulsarListener(topics = "some-topic")
Mono<Void> listen(String data,
        @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
        @Header("foo") String foo) {
    System.out.println("Received " + data + " w/ id=" + messageId + " w/ foo=" + foo);
    return Mono.empty();
}

在前面的示例中,我们访问 messageId 消息元数据的值以及名为 foo 的自定义消息属性。Spring @Header 注释用于每个标头字段。

您还可以将 Pulsar 的 Message 用作承载负载的信封。在此过程中,用户可以直接调用 Pulsar 消息上的对应方法来检索元数据。但是,为了方便起见,您也可以使用 Header 注释来检索它。请注意,您还可以使用 Spring 消息 Message 信封来承载负载,然后通过 @Header 检索 Pulsar 头。

Accessing In Streaming Listener

在使用流消息侦听器时,标头支持有限。仅当 Flux 包含 Spring org.springframework.messaging.Message 元素时,才会填充标头。此外,Spring @Header 注释不能用于检索数据。您必须直接调用 Spring 消息上的相应方法来检索数据。

Message Acknowledgment

该框架会自动处理消息确认。但是,侦听器方法必须发送一个信号来指示消息是否已成功处理。然后,容器实现使用该信号来执行确认或否定操作。这与其命令式对应项略有不同,命令式对应项中的信号被暗示为肯定,除非该方法引发异常。

OneByOne Listener

单个消息(又名 OneByOne)消息侦听器方法返回一个 Mono<Void> 来表示该消息是否已成功处理。 Mono.empty() 表示成功(确认),而 Mono.error() 表示失败(否定确认)。

Streaming Listener

流侦听器方法返回一个 Flux<MessageResult<Void>>,其中每个 MessageResult 元素表示已处理的消息并保存消息 ID、值以及是否确认。MessageResult 有一组 acknowledgenegativeAcknowledge 静态工厂方法,可用于创建适当的 MessageResult 实例。

Message Redelivery and Error Handling

Apache Pulsar 为消息重新传递和错误处理提供了多种原生策略。我们将了解它们,并了解如何通过 Spring for Apache Pulsar 使用它们。

Acknowledgment Timeout

默认情况下,Pulsar 消费者不会重新投递消息,除非消费者崩溃,但您可以通过在 Pulsar 消费者上设置确认超时来更改此行为。如果确认超时属性的值大于零,并且 Pulsar 消费者在该超时时间内未确认消息,则会重新投递消息。

您可以通过 reactive-consumer-customizer 直接指定此属性作为 Pulsar 消费者属性,例如:

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
    return b -> b.property("ackTimeout", "60s");
}

Negative Acknowledgment Redelivery Delay

否定确认时,Pulsar 消费者可让您指定应用程序希望如何重新传递消息。默认情况下,在一分钟内重新传递消息,但您可以通过 reactive-consumer-customizer 改变它,例如:

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
    return b -> b.property("negativeAckRedeliveryDelay", "10ms");
}

Dead Letter Topic

Apache Pulsar 允许应用程序在具有`Shared`订阅类型的消费者上使用死信主题。对于`Exclusive`和`Failover`订阅类型,此功能不可用。基本思想是,如果某条消息被重试了一定次数(可能是由于确认超时或 Nack 重新投递),一旦重试次数用尽,消息就可以发送到一个名为死信队列 (DLQ) 的特殊主题中。让我们通过检查一些代码段来了解该功能的一些细节:

@Configuration(proxyBeanMethods = false)
class DeadLetterPolicyConfig {

    @ReactivePulsarListener(
            topics = "topic-with-dlp",
            subscriptionType = SubscriptionType.Shared,
            deadLetterPolicy = "myDeadLetterPolicy",
            consumerCustomizer = "ackTimeoutCustomizer" )
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @ReactivePulsarListener(topics = "my-dlq-topic")
    void listenDlq(String msg) {
        System.out.println("From DLQ: " + msg);
    }

    @Bean
    DeadLetterPolicy myDeadLetterPolicy() {
        return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
    }

    @Bean
    ReactiveMessageConsumerBuilderCustomizer<String> ackTimeoutCustomizer() {
        return b -> b.property("ackTimeout", "1s");
    }
}

首先,我们有一个 DeadLetterPolicy 的特殊 Bean,它命名为 deadLetterPolicy(可以根据需要取任何名称)。此 Bean 指定了多项内容,例如最大传递次数(本例中为 10)以及死信主题名称 — 本例中为 my-dlq-topic。如果您未指定 DLQ 主题名称,它在 Pulsar 中默认为 <topicname>-<subscriptionname>-DLQ。接下来,我们将此 Bean 名称提供给 ReactivePulsarListener,方法是设置 deadLetterPolicy 属性。请注意,ReactivePulsarListener 的订阅类型为 “共享”,因为 DLQ 特性仅适用于共享订阅。此代码主要用于演示目的,因此我们提供了 1 秒的 ackTimeout 值。这个想法是,代码引发异常,如果 Pulsar 在 1 秒内没有收到确认,它就会重试。如果该循环持续十次(因为这是 DeadLetterPolicy 中的最大重新传递次数),Pulsar 消费者就会将消息发布到 DLQ 主题。我们还有另一个 ReactivePulsarListener 监听 DLQ 主题,以便在数据发布到 DLQ 主题时接收数据。

Special note on DLQ topics when using partitioned topics

如果主主题被分区,在后台,Pulsar 将每个分区视为单独主题。Pulsar 会将`partition-<n>`附加到主主题名称,其中`n`代表分区号。问题在于,如果您没有指定 DLQ 主题(与我们上面所做相反),Pulsar 会发布到默认主题名称,其中包含此 `partition-<n> 信息——例如:topic-with-dlp-partition-0-deadLetterPolicySubscription-DLQ。解决此问题的简单方法是始终提供 DLQ 主题名称。

Pulsar Reader Support

该框架支持以响应方式使用 {apache-pulsar-docs}/concepts-clients/#reader-interface[Pulsar Reader],方法是通过 ReactivePulsarReaderFactory

Spring Boot 提供此阅读器工厂,可以通过 {spring-boot-pulsar-config-props}[spring.pulsar.reader.*] 应用程序属性对其进行配置。

Topic Resolution

@2