Apache Pulsar Support

通过提供 {url-spring-pulsar-site}[Spring for Apache Pulsar] 项目的自动配置来支持 Apache Pulsar。 当 org.springframework.pulsar:spring-pulsar 在类路径上时,Spring Boot 将自动配置和注册传统的(命令式)Spring for Apache Pulsar 组件。当 org.springframework.pulsar:spring-pulsar-reactive 在类路径上时,它将对响应式组件执行相同的操作。 有 spring-boot-starter-pulsarspring-boot-starter-pulsar-reactive “Starters”,分别用于方便地收集命令式和响应式使用的依赖项。

Connecting to Pulsar

当你使用 Pulsar starter 时,Spring Boot 将自动配置和注册 PulsarClient Bean。

默认情况下,应用程序尝试连接到 pulsar://localhost:6650 的本地 Pulsar 实例。这可以通过将 configprop:spring.pulsar.client.service-url[] 属性设置为其他值来调整。

值必须是有效的 Pulsar Protocol URL

你可以通过指定任何以 spring.pulsar.client.* 为前缀的应用程序属性来配置客户端。

如果需要更多地控制配置,请考虑注册一个或多个 PulsarClientBuilderCustomizer Bean。

Authentication

若要连接到需要身份验证的 Pulsar 集群,你需要通过设置 pluginClassName 和插件所需的任何参数来指定要使用的身份验证插件。你可以将参数设置为参数名称到参数值的映射。以下示例显示如何配置 AuthenticationOAuth2 插件。

spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
        param:
          issuerUrl: https://auth.server.cloud/
          privateKey: file:///Users/some-key.json
          audience: urn:sn:acme:dev:my-instance

你需要确保 spring.pulsar.client.authentication.param.* 下定义的名称与认证插件预期完全一致(通常是大写开头的驼峰式)。Spring Boot 不会尝试放松这些条目的绑定。 例如,如果你想为 AuthenticationOAuth2 认证插件配置颁发者 URL,则必须使用 spring.pulsar.client.authentication.param.issuerUrl。如果你使用其他形式,例如 issuerurlissuer-url,则设置不会应用于插件。 这种缺乏宽松绑定的情况也会使将环境变量用于身份验证参数变得困难,因为在翻译过程中会丢失大小写敏感性。如果你对参数使用环境变量,则需要遵循 Spring for Apache Pulsar 参考文档中的 {url-spring-pulsar-docs}/reference/pulsar.html#client-authentication-env-vars[这些步骤],让其正常工作。

SSL

默认情况下,Pulsar 客户端以纯文本与 Pulsar 服务通信。你可以遵循 Spring for Apache Pulsar 参考文档中的 {url-spring-pulsar-docs}reference/pulsar.html#tls-encryption[这些步骤] 来启用 TLS 加密。

有关客户端和身份验证的完整详细信息,请参阅 Spring for Apache Pulsar {url-spring-pulsar-docs}reference/pulsar.html#pulsar-client[参考文档]。

Connecting to Pulsar Reactively

当激活了 Reactive 自动配置后,Spring Boot 将自动配置并注册一个 ReactivePulsarClient bean。

ReactivePulsarClient 适配之前描述的 PulsarClient 的一个实例。因此,请按照上一节配置 ReactivePulsarClient 所使用的 PulsarClient

Connecting to Pulsar Administration

Spring for Apache Pulsar 的 PulsarAdministration 客户端也已自动配置。

默认情况下,该应用程序尝试连接到 http://localhost:8080 的本地 Pulsar 实例。可以通过在 (http|https)://<host>:<port> 中将 configprop:spring.pulsar.admin.service-url[] 属性设置为其他值来调整这一配置。

如果您需要进一步控制配置,请考虑注册一个或多个 PulsarAdminBuilderCustomizer bean。

Authentication

当访问需要身份验证的 Pulsar 集群时,管理客户端需要与常规 Pulsar 客户端相同的安全配置。您可以使用前文提到的 authentication configuration ,通过用 spring.pulsar.admin.authentication 替换 spring.pulsar.client.authentication

要创建启动时的主题,请添加一个 PulsarTopic 类型 bean。如果主题已存在,则忽略此 bean。

Sending a Message

Spring 的 PulsarTemplate 已自动配置,您可以使用它发送消息,如下例所示:

PulsarTemplate 依赖 PulsarProducerFactory 来创建基础 Pulsar 制造者。Spring Boot 自动配置也提供此生产程序工厂,默认情况下,它缓存其创建的生产程序。您可以在指定任何以 spring.pulsar.producer. and spring.pulsar.producer.cache. 为前缀的应用程序属性时配置生产程序工厂和缓存设置。

如果您需要进一步控制生产程序工厂配置,请考虑注册一个或多个 ProducerBuilderCustomizer bean。这些自定义器应用于所有已创建的生产程序。您还可以在发送消息时传递一个 ProducerBuilderCustomizer ,以仅影响当前生产程序。

如果您需要进一步控制正在发送的消息,则可以在发送消息时传递一个 TypedMessageBuilderCustomizer

Sending a Message Reactively

当激活了 Reactive 自动配置后,Spring 的 ReactivePulsarTemplate 已自动配置,您可以使用它发送消息,如下例所示:

ReactivePulsarTemplate 依赖 ReactivePulsarSenderFactory 来实际创建基础发送器。Spring Boot 自动配置也提供此发送器工厂,默认情况下,它缓存其创建的生产程序。您可以在指定任何以 spring.pulsar.producer. and spring.pulsar.producer.cache. 为前缀的应用程序属性时配置发送器工厂和缓存设置。

如果您需要进一步控制发送器工厂配置,请考虑注册一个或多个 ReactiveMessageSenderBuilderCustomizer bean。这些自定义器应用于所有已创建的发送器。您还可以在发送消息时传递一个 ReactiveMessageSenderBuilderCustomizer ,以仅影响当前发送器。

如果您需要进一步控制正在发送的消息,则可以在发送消息时传递一个 MessageSpecBuilderCustomizer

Receiving a Message

当 Apache Pulsar 基础设施存在时,可以使用 @PulsarListener 注释任何 bean 以创建一个侦听器端点。以下组件在 someTopic 主题上创建了一个侦听器端点:

Spring Boot 自动配置为 PulsarListener 提供了所有必需的组件,例如 PulsarListenerContainerFactory 以及它用来构建基础 Pulsar 消费者的使用者工厂。您可以在指定任何以 spring.pulsar.listener. and spring.pulsar.consumer. 为前缀的应用程序属性时配置这些组件。

如果您需要进一步控制使用者工厂配置,请考虑注册一个或多个 ConsumerBuilderCustomizer bean。这些自定义器应用于工厂创建的所有使用者,因此也应用于所有 @PulsarListener 实例。您还可以通过设置 @PulsarListener 注释的 consumerCustomizer 属性来自定义单个侦听器。

Receiving a Message Reactively

当 Apache Pulsar 的基础结构存在且 Reactive 自动配置被激活后,可以使用 @ReactivePulsarListener 注释任何 bean 以创建一个响应性侦听器端点。以下组件在 someTopic 主题上创建了一个响应性侦听器端点:

Spring Boot 自动配置为 ReactivePulsarListener 提供所有必要的组件,例如 ReactivePulsarListenerContainerFactory 和用于构建底层反应式 Pulsar 消费者的消费器工厂。您可以通过指定带有 spring.pulsar.listener. and spring.pulsar.consumer. 前缀的任何应用程序属性来配置这些组件。

如果您需要对消费者工厂配置进行更多控制,请考虑注册一个或多个 ReactiveMessageConsumerBuilderCustomizer Bean。这些定制器应用于工厂创建的所有消费者,因此也应用于所有 @ReactivePulsarListener 实例。您还可以通过设置 @ReactivePulsarListener 注释的 consumerCustomizer 属性来定制单个侦听器。

Reading a Message

Pulsar reader 接口使应用程序能够手动管理游标。当您使用 reader 连接到主题时,您需要指定 reader 在连接到主题时开始读取哪条消息。

当 Apache Pulsar 基础设施存在时,任何 bean 都可以通过使用 reader 注释上 @PulsarReader 来使用 reader 来使用消息。以下组件会创建一个 reader 端点,从 someTopic 主题的开头开始读取消息:

@PulsarReader 依赖 PulsarReaderFactory 来创建底层 Pulsar reader。Spring Boot 自动配置提供此 reader 工厂,可以通过设置带有 spring.pulsar.reader.* 前缀的任何应用程序属性来对其进行定制。

如果您需要对 reader 工厂配置进行更多控制,请考虑注册一个或多个 ReaderBuilderCustomizer Bean。这些定制器应用于工厂创建的所有 reader,因此也应用于所有 @PulsarReader 实例。您还可以通过设置 @PulsarReader 注释的 readerCustomizer 属性来定制单个侦听器。

Reading a Message Reactively

当 Apache Pulsar 基础设施存在且反应式自动配置已启用时,Spring 的 ReactivePulsarReaderFactory 将得到提供,您可以使用它创建 reader 以反应式的方式读取消息。以下组件使用所提供的工厂创建一个 reader,并从 someTopic 主题中读取 5 分钟前的单条消息:

Spring Boot 自动配置提供此 reader 工厂,可以通过设置带有 spring.pulsar.reader.* 前缀的任何应用程序属性来对其进行定制。

如果您需要对 reader 工厂配置进行更多控制,请考虑在使用工厂创建 reader 时传递一个或多个 ReactiveMessageReaderBuilderCustomizer 实例。

如果您需要对 reader 工厂配置进行更多控制,请考虑注册一个或多个 ReactiveMessageReaderBuilderCustomizer Bean。这些定制器应用于所有已创建的 reader。您还可以创建一个或多个 `ReactiveMessageReaderBuilderCustomizer`来只对所创建的 reader 应用定制。

有关上述任何组件的更多详细信息以及了解其他可用功能,请参见 Spring for Apache Pulsar {url-spring-pulsar-docs}[参考资料文档]。

Additional Pulsar Properties

自动配置所支持的属性显示在附录的 “Integration Properties” 部分中。请注意,对于大部分属性(连字符或驼峰式)都直接映射到 Apache Pulsar 配置属性。有关详细信息,请参见 Apache Pulsar 文档。

仅 Apache Pulsar 支持的部分属性可通过 PulsarProperties 类直接使用。如果您希望使用未直接支持的附加属性微调自动配置的组件,您可使用前面提及的每个组件所支持的定制器。