Using Spring for Apache Pulsar

Preface

我们建议将 Spring-Boot-First 方法用于基于 Apache Pulsar 的 Spring 应用,因为它极大地简化了操作。为此,您可以将 spring-pulsar-spring-boot-starter 模块添加为依赖项。

本参考的大部分内容都假定读者使用该启动程序,并提供了针对该特定内容的大部分配置说明。但是,有意识强调针对 Spring Boot 启动程序使用的特定说明。

Quick Tour

我们将会通过展示一个可以进行生产和使用的 Spring Boot 应用示例,来快速的了解一下 Apache Pulsar 的 Spring 相关特性。这是一个完整的应用程序,并且不需要任何额外的配置,只要你在默认位置 - localhost:6650 上运行着一个 Pulsar 集群。

Dependencies

Spring Boot 应用程序只需要 spring-pulsar-spring-boot-starter 依赖。以下清单分别展示了如何为 Maven 和 Gradle 定义依赖项:

  • Maven

  • Gradle

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-pulsar</artifactId>
        <version>{spring-boot-version}</version>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-pulsar:{spring-boot-version}'
}

当使用 0.2.x 版本 时,以上坐标将发生如下更改:

Maven
<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-spring-boot-starter</artifactId>
        <version>0.2.0</version>
    </dependency>
</dependencies>
Gradle
dependencies {
    implementation 'org.springframework.pulsar:spring-pulsar-spring-boot-starter:0.2.0'
}

Application Code

以下清单展示了示例的 Spring Boot 应用程序案例:

@SpringBootApplication
public class PulsarBootHelloWorld {

    public static void main(String[] args) {
        SpringApplication.run(PulsarBootHelloWorld.class, args);
    }

    @Bean
    ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
        return (args) -> pulsarTemplate.send("hello-pulsar-topic", "Hello Pulsar World!");
    }

    @PulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
    void listen(String message) {
        System.out.println("Message Received: " + message);
    }
}

让我们快速了解一下该应用程序的高级细节。在文档后面,我们将会看到这些更详细的组件。

在前一个示例中,我们严重依赖 Spring Boot 的自动配置。Spring Boot 为我们的应用程序自动配置了几个组件。它为应用程序自动提供了一个 PulsarClient,该客户端被生产者和使用者同时使用。

Spring Boot 还自动配置了 PulsarTemplate,将其注入到应用程序中并开始向 Pulsar 主题发送记录。该应用程序会将消息发送到名为 hello-pulsar 的主题。请注意,该应用程序没有指定任何 schema 信息,因为 Apache Pulsar 的 Spring 库会自动从发送数据的类型推断 schema 类型。

我们使用 PulsarListener 注释来获取我们发布了数据的 hello-pulsar 主题。PulsarListener 是一个便利注释,它将 Apache Pulsar 中的消息监听器容器基础设施包装在 Spring 中。它在幕后创建了一个消息监听器容器来创建和管理 Pulsar 消费者。和常规的 Pulsar 消费者一样,在使用 PulsarListener 时,订阅的默认类型是 Exclusive 模式。当记录发布到 hello-pulsar 主题时,PulsarListener 便会使用它们并在控制台上打印出来。框架还会从 PulsarListener 方法用作有效负载的数据类型中推断出所使用的 schema 类型 - 在本例中为 String

Pulsar Client

当你使用 Pulsar Spring Boot Starter 时,你会得到自动配置的 PulsarClient

默认情况下,该应用程序会尝试连接到 pulsar://localhost:6650 处的本地 Pulsar 实例。可以通过将 spring.pulsar.client.service-url 属性设为不同的值来调整此功能。

该值必须是一个有效的 {apache-pulsar-docs}/client-libraries-java/#connection-urls[Pulsar 协议] URL

您可以通过指定 {spring-boot-pulsar-config-props}[spring.pulsar.client.*] 应用程序属性来进一步配置客户端。

如果不使用该启动程序,则需要配置并自己注册 PulsarClient。有一个 DefaultPulsarClientFactory 可接受构建器定制器,可以使用它来帮助完成此操作。

TLS Encryption (SSL)

默认情况下,Pulsar 客户端以纯文本形式与 Pulsar 服务进行通信。以下部分介绍如何配置 Pulsar 客户端以使用 TLS 加密(SSL)。一个先决条件是,代理也已经配置为使用 TLS 加密。

Spring Boot 自动配置目前不支持任何 TLS/SSL 配置属性。相反,你可以提供一个 PulsarClientBuilderCustomizer,它将在 Pulsar 客户端构建器上设置必要的属性。Pulsar 同时支持隐私增强邮件(PEM)和 Java 密钥库(JKS)证书格式。

请按照以下步骤配置 TLS:

  1. 调整 Pulsar 客户端服务 URL 以使用 pulsar+ssl:// 方案和 TLS 端口(通常为 6651)。

  2. 调整管理客户端服务 URL 以使用 https:// 方案和 TLS Web 端口(通常为 8443)。

  3. 提供客户端构建器定制器,在构建器上设置相关属性。

    • {github}/blob/02730275e8d0291525eed9db5babe880c555a7bd/integration-tests/src/intTest/java/org/springframework/pulsar/inttest/app/SamplePemBasedSslConfig.java#L30-L49[PEM based sample]

    • {github}/blob/02730275e8d0291525eed9db5babe880c555a7bd/integration-tests/src/intTest/java/org/springframework/pulsar/inttest/app/SampleJksBasedSslConfig.java#L30-L57[JKS based sample]

您可以在官方 {apache-pulsar-docs}/security-tls-transport/[Pulsar TLS 加密] 文档中找到关于上述内容的更多信息。

Authentication

为了连接到需要认证的 Pulsar 集群,你需要指定要使用的认证插件和指定插件所需的任何参数。在 使用 Spring Boot 自动配置时,你可以通过配置属性设定插件和插件参数(大多数情况下)。

你需要确保 spring.pulsar.client.authentication.param.* 下定义的名称与认证插件预期完全一致(通常是大写开头的驼峰式)。Spring Boot 不会尝试放松这些条目的绑定。 例如,如果你想为 AuthenticationOAuth2 认证插件配置颁发者 URL,则必须使用 spring.pulsar.client.authentication.param.issuerUrl。如果你使用其他形式,例如 issuerurlissuer-url,则设置不会应用于插件。

通过环境变量使用认证参数通常会有问题,因为在转换过程中会丢失大小写敏感性。例如,考虑通过环境变量设置的以下 issuerUrl 认证参数:

SPRING_PULSAR_CLIENT_AUTHENTICATION_PARAM_ISSUERURL=https://some.server.com

当 Spring Boot 加载此属性时,它将使用 issuerurl(小写)而不是预期的 issuerUrl(大写开头的驼峰式)。通过在 application.yml 中使用环境变量的值作为相关认证属性的值,你可以解决此限制。继续上述示例:

spring:
  pulsar:
    client:
      authentication:
        param:
          issuerUrl: ${SPRING_PULSAR_CLIENT_AUTHENTICATION_PARAM_ISSUERURL}

不使用 Spring Boot 自动配置时,你可以使用 org.apache.pulsar.client.api.AuthenticationFactory 创建认证,然后在提供给客户端工厂的客户端自定义程序中直接将其设置在 Pulsar 客户端构建器上。

下列清单演示了如何配置每个受支持的认证机制。

Example 1. [role="underline"][.underline]Click here for Athenz
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationAthenz
        param:
          tenantDomain: ...
          tenantService: ...
          providerDomain: ...
          privateKey: ...
          keyId: ...

这也需要 TLS encryption

Example 2. [role="underline"][.underline]Click here for Token
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken
        param:
          token: some-token-goes-here
Example 3. [role="underline"][.underline]Click here for Basic
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationBasic
        param:
          userId: ...
          password: ...
Example 4. [role="underline"][.underline]Click here for OAuth2
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
        param:
          issuerUrl: ...
          privateKey: ...
          audience: ...
          scope: ...
Example 5. [role="underline"][.underline]Click here for Sasl
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationSasl
        param:
          saslJaasClientSectionName: ...
          serverType: ...
Example 6. [role="underline"][.underline]Click here for mTLS (PEM)

由于此选项需要使用 TLS 加密,这已要求您 provide a client builder customizer,因此建议直接在您提供的 TLS 自定程序中对客户端构建器添加认证。您可以使用 org.apache.pulsar.client.api.AuthenticationFactory 来帮助创建身份验证对象,如下所示:

Authentication auth = AuthenticationFactory.TLS("/path/to/my-role.cert.pem", "/path/to/my-role.key-pk8.pem");

有关 {apache-pulsar-docs}/security-tls-authentication/#configure-mtls-authentication-in-pulsar-clients[mTLS (PEM)],请参见 Pulsar 官方文档。

Example 7. [role="underline"][.underline]Click here for mTLS (JKS)

由于此选项需要使用 TLS 加密,这已要求您 provide a client builder customizer,因此建议直接在您提供的 TLS 自定程序中对客户端构建器添加认证。您可以使用 org.apache.pulsar.client.api.AuthenticationFactory 来帮助创建身份验证对象,如下所示:

Authentication auth = AuthenticationFactory.create(
        "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls",
        Map.of("keyStoreType", "JKS", "keyStorePath", "/path/to/my/keystore.jks", "keyStorePassword", "clientpw"));

有关 {apache-pulsar-docs}/security-tls-authentication/#configure-clients[mTLS (JKS)],请参见 Pulsar 官方文档。

您可以在官方{apache-pulsar-docs}/security-overview#authentication-providers[Pulsar 安全]文档中找到更多有关每个支持插件的详细信息以及必要的属性。

Message Production

Pulsar Template

在 Pulsar 产品方,Spring Boot 自动配置提供了一个 PulsarTemplate 来发布记录。该模板实现了名为 PulsarOperations 的一个接口,并提供了通过其契约发布记录的方法。

这些发送 API 方法有两种类型:sendsendAsyncsend 方法使用 Pulsar 生产者的同步发送功能来禁止调用。在一个消息在代理上持久化之后,它们会返回已发布的消息的 MessageIdsendAsync 方法调用是非阻塞的异步调用。它们返回一个 CompletableFuture,你可以在消息发布之后使用该 CompletableFuture 来异步接收消息 ID。

对于不包含主题参数的 API 变量,使用 topic resolution process 确定目标主题。

Simple API

该模板提供了一些方法(以 'send' 为前缀的 {javadocs}/org/springframework/pulsar/core/PulsarOperations.html)用于简单的发送请求。对于更复杂的发送请求,一个流利的 API 可以让您配置更多选项。

Fluent API

该模板提供一个{javadocs}/org/springframework/pulsar/core/PulsarOperations.html#newMessage(T)[流畅构建器]来处理更复杂的发送请求。

Message customization

您可以指定 TypedMessageBuilderCustomizer 来配置出站消息。例如,以下代码显示如何发送键入消息:

template.newMessage(msg)
    .withMessageCustomizer((mb) -> mb.key("foo-msg-key"))
    .send();

Producer customization

您可以指定 ProducerBuilderCustomizer 来配置最终构建用于发送出站消息的生成器的底层 Pulsar 生成器。

谨慎使用,因为它赋予对生产者构建器的完全访问权限,调用其中某些方法(例如 create)可能产生预料之外的副作用。

例如,以下代码显示如何禁用批处理并启用块处理:

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.enableChunking(true).enableBatching(false))
    .send();

此其他示例显示在向分区主题发布记录时如何使用自定义路由。在 Producer 生成器上指定您的自定义 MessageRouter 实现,例如:

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.messageRouter(messageRouter))
    .send();

请注意,在使用 MessageRouter 时,spring.pulsar.producer.message-routing-mode 唯一的有效设置是 custom

此其他示例显示如何在向代理发布之前拦截和修改生成器接收到的消息时添加 ProducerInterceptor

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.intercept(interceptor))
    .send();

自定义器仅适用于用于发送操作的生成器。如果您想将自定义器应用于所有生成器,则必须按照 Global producer customization 中所述将它们提供给生成器工厂。

使用 Lambda 定制器时,必须遵循 “Caution on Lambda customizers” 中描述的规则。

Specifying Schema Information

@6

Specifying Schema Information

如果你使用 Java 原语类型,框架会为你自动检测架构,且你不需要为发布数据指定任何架构类型。对于非原语类型,如果在 PulsarTemplate 上调用发送操作时未明确指定架构,Spring for Apache Pulsar 框架将尝试根据类型构建 Schema.JSON

当前支持的复杂模式类型包括 JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES 和 KEY_VALUE 以及 INLINE 编码。

Custom Schema Mapping

作为在 PulsarTemplate 上调用发送操作时为复杂类型指定架构的替代方案,可以将架构解析器配置为具有类型的映射。这消除了指定架构的需要,因为框架使用传出消息类型通过解析器进行查询。

@3

@4

有了此配置,无需设置发送操作时的架构。

Producing with AUTO_SCHEMA

如果没有办法提前了解 Pulsar 主题的架构类型,您可以使用 {apache-pulsar-docs}/schema-get-started/#auto_produce[AUTO_PRODUCE] 架构来安全地发布原始 JSON 或 Avro 负载作为 byte[]

在这种情况下,生成器会验证出站字节是否与目标主题的架构兼容。

只需在模板发送操作中指定 Schema.AUTO_PRODUCE_BYTES() 架构,如下例所示:

@7

这仅受 Avro 和 JSON 模式类型支持。

Pulsar Producer Factory

PulsarTemplate`依靠 `PulsarProducerFactory`来实际创建底层生成器。Spring Boot 自动配置还提供了此生成器工厂,您可以通过指定 {spring-boot-pulsar-config-props}[`spring.pulsar.producer.*] 应用程序属性来进一步配置该工厂。

如果在直接使用生产者工厂 API 时未指定主题信息,则与 PulsarTemplate 相同的 topic resolution process 会用于其中一个例外,即“消息类型默认值”步骤为 omitted

Global producer customization

该框架提供了 ProducerBuilderCustomizer 合约,允许您配置用于构建每个生成器的底层生成器。要自定义所有生成器,您可以将自定义器列表传递给`PulsarProducerFactory` 构造函数。使用多个自定义器时,它们将按照在列表中出现的顺序应用。

如果您使用 Spring Boot 自动配置,则可以将定制器指定为 Bean,并将根据其 @Order 注释自动将其传递给 PulsarProducerFactory,并按其 @Order 注释进行排序。

如果您只想将自定义器应用于单个生成器,则可以使用 Fluent API 和 [在发送时指定自定义器,single-producer-customize]

Pulsar Producer Caching

每个底层 Pulsar 生成器都会使用资源。为了提高性能并避免持续创建生成器,生成器工厂会缓存其创建的生成器。它们以 LRU 方式进行缓存,并在配置的时间段内未使用它们时将其逐出。 cache key仅包含足够的信息来确保在后续创建请求中向调用者返回相同的生成器。

此外,您可以通过指定 {spring-boot-pulsar-config-props}[spring.pulsar.producer.cache.*] 应用程序属性来配置缓存设置。

Caution on Lambda customizers

任何用户提供的生产者自定义项也包含在高速缓存键中。由于高速缓存键依赖于 equals/hashCode 的有效实现,所以在使用 Lambda 自定义项时必须谨慎。

RULE: 作为 Lambda 实现的两个定制器将匹配在 equals/hashCode if and only if 上,它们使用相同的 Lambda 实例,且不需要在封闭之外定义任何变量。

为了阐明上述规则,我们将看几个示例。在以下示例中,自定义项被定义为内联 Lambda,这意味着每次对 sendUser 的调用都使用相同的 Lambda 实例。此外,它不需要闭包之外的变量。因此,它*将*匹配为高速缓存键。

void sendUser() {
    var user = randomUser();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> b.producerName("user"))
        .send();
}

在此下一个案例中,自定义项被定义为内联 Lambda,这意味着每次对 sendUser 的调用都使用相同的 Lambda 实例。但是,它需要闭包之外的变量。因此,它*不会*匹配为高速缓存键。

void sendUser() {
    var user = randomUser();
    var name = randomName();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> b.producerName(name))
        .send();
}

在最后这个示例中,自定义项被定义为内联 Lambda,意味着每次对 sendUser 的调用都使用相同的 Lambda 实例。虽然它确实使用变量名,但是它不是从其闭包外部产生的,因此*将*匹配为高速缓存键。这说明变量可以在 Lambda 闭包*内*使用,甚至可以对静态方法进行调用。

void sendUser() {
    var user = randomUser();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> {
           var name = SomeHelper.someStaticMethod();
           b.producerName(name);
        })
        .send();
}

RULE: 如果没有 once and only once 定义您的 Lambda 定制器(在后续调用中会使用相同的实例) OR,则需要使用有效 equals/hashCode 实现提供定制器实现,且该实现必须在封闭之外定义变量。

如果未遵循这些规则,则生产者缓存将始终错过,并且您的应用程序性能将受到负面影响。

Intercept Messages on the Producer

添加 ProducerInterceptor 允许您拦截和更改在发布到代理之前由生产者接收到的消息。要这样做,您可以将拦截器列表传递到 PulsarTemplate 构造函数中。在使用多个拦截器时,将其应用的顺序是它们在列表中的出现顺序。

如果您使用 Spring Boot 自动配置,则可以将拦截器指定为 Bean。它们会自动传递到 PulsarTemplate 中。使用 @Order 注释按如下方式对拦截器排序:

@Bean
@Order(100)
ProducerInterceptor firstInterceptor() {
  ...
}

@Bean
@Order(200)
ProducerInterceptor secondInterceptor() {
  ...
}

如果您未使用该启动器,您将需要自己配置并注册上述组件。

Message Consumption

Pulsar Listener

当涉及到 Pulsar 消费者时,我们建议终端用户应用程序使用 PulsarListener 注释。要使用 PulsarListener,您需要使用 @EnablePulsar 注释。当您使用 Spring Boot 支持时,它会自动启用此注释并配置 PulsarListener 所需的所有组件,例如消息侦听器基础设施(负责创建 Pulsar 消费者)。PulsarMessageListenerContainer 使用 PulsarConsumerFactory 创建和管理底层 Pulsar 消费者,该底层 Pulsar 消费者用来消费消息。

Spring Boot 提供此消费者工厂,您可以通过指定 {spring-boot-pulsar-config-props}[已配置属性的`spring.pulsar.consumer.`] application properties. *Most] 来进一步配置该工厂,在使用以下 *exceptions*的侦听器中尊重工厂中的属性:

spring.pulsar.consumer.subscription.name 属性被忽略,如果在注解上未指定,则生成。

spring.pulsar.consumer.subscription-type 属性被忽略,相反从注解上的值获取。但是,您可以设置注解上的 subscriptionType = {} 以将属性值作为默认值。

让我们重新审视我们在快速浏览部分中看到的 PulsarListener 代码片段:

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

您可以进一步简化此方法:

@PulsarListener
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

在此最基本的形式中,当 subscriptionName 未在 @PulsarListener 注释中提供时,将使用自动生成的订阅名称。同样,当未直接提供 topics 时,将使用 [主题解析过程,主题解析过程-命令式] 来确定目标主题。

在前面显示的 PulsarListener 方法中,我们以 String 的形式接收数据,但是我们未指定任何模式类型。在内部,该框架依赖于 Pulsar 的模式机制将数据转换为所需类型。该框架会检测到您期望 String 类型,然后根据该信息推断模式类型并将该模式提供给消费者。该框架对所有基本类型执行此推断。对于所有非基本类型,默认模式假定为 JSON。如果复杂类型使用除 JSON 以外的任何内容(例如 AVRO 或 KEY_VALUE),则必须在使用 schemaType 属性的注释中提供模式类型。

以下示例显示了另一个 PulsarListener 方法,它使用 Integer

@PulsarListener(subscriptionName = "my-subscription-1", topics = "my-topic-1")
public void listen(Integer message) {
   System.out.println(message);
}

以下 PulsarListener 方法显示了我们如何从主题消费复杂类型:

@PulsarListener(subscriptionName = "my-subscription-2", topics = "my-topic-2", schemaType = SchemaType.JSON)
public void listen(Foo message) {
    System.out.println(message);
}

让我们再看一些方法。

您可以直接消费 Pulsar 消息:

@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.apache.pulsar.client.api.Message<String> message) {
    System.out.println(message.getValue());
}

以下示例使用 Spring 消息信封来消费记录:

@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.springframework.messaging.Message<String> message) {
    System.out.println(message.getPayload());
}

现在让我们看看如何批量消费记录。以下示例使用 PulsarListener 以 POJO 的形式批量消费记录:

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Foo> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message));
}

请注意,在此例中,我们接收记录作为对象的集合 (List)。此外,为了在 PulsarListener 级别启用批量使用,需要将注解上的 batch 属性设置为 true

框架会根据 List 所保存的实际类型来尝试推断要使用的模式。如果 List 中包含 JSON 之外的复杂类型,则仍需要在 PulsarListener 上提供 schemaType

以下内容使用 Pulsar Java 客户端提供的 消息 封装:

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Message<Foo>> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}

以下示例使用 Spring 消息 Message 类型的封装来批量使用记录:

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<org.springframework.messaging.Message<Foo>> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message.getPayload()));
}

最后,还可以针对批量监听器使用来自 Pulsar 的 Messages 持有者对象:

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(org.apache.pulsar.client.api.Messages<Foo>> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}

在使用 PulsarListener 时,可以在注解本身上直接提供 Pulsar 消费者属性。在不想使用前面提到的引导配置属性或有多个 PulsarListener 方法时,这是非常方便的。

以下示例直接在 PulsarListener 上使用 Pulsar 消费者属性:

@PulsarListener(properties = { "subscriptionName=subscription-1", "topicNames=foo-1", "receiverQueueSize=5000" })
void listen(String message) {
}

所使用的属性是直接 Pulsar 消费者属性,而不是 spring.pulsar.consumer 应用程序配置属性

Generic records with AUTO_CONSUME

如果无法提前知晓 Pulsar 主题的模式类型,则可以使用 AUTO_CONSUME 模式类型来使用常规记录。在这种情况下,主题会使用与主题关联的模式信息将消息反序列化为 GenericRecord 对象。

若要使用常规记录,请在 @PulsarListener 上将 schemaType = SchemaType.AUTO_CONSUME 设置为以下所示内容,并且将类型为 GenericRecord 的 Pulsar 信息用作信息参数。

@PulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
void listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
    GenericRecord record = message.getValue();
    record.getFields().forEach((f) ->
            System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
}

GenericRecord API 允许访问字段及其关联的值

Customizing the ConsumerBuilder

可以使用 PulsarListenerConsumerBuilderCustomizer 自定义任何可通过 ConsumerBuilder 获得的字段,方法是向其提供 PulsarListenerConsumerBuilderCustomizer 类型的 @Bean, 然后将其提供给 PulsarListener,如下所示。

@PulsarListener(topics = "hello-topic", consumerCustomizer = "myCustomizer")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

@Bean
PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
    return (builder) -> builder.consumerName("myConsumer");
}

如果您的应用程序只有一个 @PulsarListener 和一个 PulsarListenerConsumerBuilderCustomizer bean 注册,那么定制器将被自动应用。

@1

Accessing the Pulsar Consumer Object

有时,您需要直接访问 Pulsar Consumer 对象。以下示例展示了如何获取它:

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message, org.apache.pulsar.client.api.Consumer<String> consumer) {
    System.out.println("Message Received: " + message);
    ConsumerStats stats = consumer.getStats();
    ...
}

当以这种方式访问 Consumer 对象时,不要调用任何会通过调用任何接收方法来更改消费者游标位置的操作。所有此类操作都必须由容器完成。

Pulsar Message Listener Container

现在,我们通过 PulsarListener 查看了消费者端的基本交互。现在,让我们深入了解 PulsarListener 如何与底层 Pulsar 消费者交互。请记住,对于最终用户应用程序,大多数情况下,我们建议直接使用 PulsarListener 注释从 Pulsar 主题使用 Spring for Apache Pulsar 进行使用,因为该模型涵盖了广泛的应用程序用例。但是,了解 PulsarListener 在内部如何工作非常重要。本部分将介绍这些详细信息。

正如前面简要提到的,当您使用 Spring for Apache Pulsar 时,消息监听器容器是消息使用核心。PulsarListener 在后台使用消息监听器容器来创建和管理 Pulsar 消费者。Spring for Apache Pulsar 通过 PulsarMessageListenerContainer 提供此消息监听器容器的合同。此消息监听器容器的默认实现通过 DefaultPulsarMessageListenerContainer 提供。正如它的名称所示,PulsarMessageListenerContainer 包含消息监听器。容器创建 Pulsar 消费者,然后运行一个单独的线程来接收和处理数据。数据由所提供的消息监听器实现进行处理。

消息监听器容器使用消费者的 batchReceive 方法批量使用数据。接收到数据后,会将其传递给选定的消息监听器实现。

在使用 Spring for Apache Pulsar 时,可以使用以下消息监听器类型。

我们将在以下部分中了解有关这些各种消息监听器的详细信息。

但是,在这样做之前,让我们仔细了解容器本身。

DefaultPulsarMessageListenerContainer

这是一个基于单个消费者的消息侦听器容器。以下清单显示了其构造函数:

public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
			PulsarContainerProperties pulsarContainerProperties)
}

它接收 PulsarConsumerFactory(用于创建消费者)和 PulsarContainerProperties 对象(其中包含有关容器属性的信息)。PulsarContainerProperties 具有以下构造函数:

public PulsarContainerProperties(String... topics)

public PulsarContainerProperties(Pattern topicPattern)

您可以通过 PulsarContainerProperties 提供主题信息,或作为传输到消费者工厂的消费者属性。以下示例使用 DefaultPulsarMessageListenerContainer

Map<String, Object> config = new HashMap<>();
config.put("topics", "my-topic");
PulsarConsumerFactory<String> pulsarConsumerFactorY = DefaultPulsarConsumerFactory<>(pulsarClient, config);

PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();

pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> {
		});

DefaultPulsarMessageListenerContainer<String> pulsarListenerContainer = new DefaultPulsarMessageListenerContainer(pulsarConsumerFacotyr,
        pulsarContainerProperties);

return pulsarListenerContainer;

当使用监听器容器直接指定主题信息时,同一个 topic resolution processPulsarListener 使用,唯一例外是“默认消息类型”步骤为 omitted

DefaultPulsarMessageListenerContainer 仅创建单个消费者。如果您希望有多个消费者通过多个线程进行管理,则需要使用 ConcurrentPulsarMessageListenerContainer

ConcurrentPulsarMessageListenerContainer

ConcurrentPulsarMessageListenerContainer 具有以下构造函数:

public ConcurrentPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
    PulsarContainerProperties pulsarContainerProperties)

ConcurrentPulsarMessageListenerContainer 允许您通过 setter 指定 concurrency 属性。仅允许非独占订阅(failoversharedkey-shared)中的并发大于 1。当您具有独占订阅模式时,只能将并发设置为默认的 1

以下示例通过 PulsarListener 注释为 failover 订阅启用了 concurrency

@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
				subscriptionType = SubscriptionType.Failover, concurrency = "3")
void listen(String message, Consumer<String> consumer) {
    ...
    System.out.println("Current Thread: " + Thread.currentThread().getName());
    System.out.println("Current Consumer: " + consumer.getConsumerName());
}

在前述侦听器中,假设主题 my-topic 有三个分区。如果它是一个非分区主题,则将并发设置为 3 无效。除了主要的活动消费者之外,您还会有两个空闲消费者。如果主题有三个以上的分区,则消息将在容器创建的消费者之间进行负载均衡。如果您运行此 PulsarListener,您会看到来自不同分区的消息是通过不同的消费者使用前述示例中的线程名称和消费者名称输出消耗的。

当您以这种方式在分区主题上使用 Failover 订阅时,Pulsar 保证消息排序。

以下清单展示了 PulsarListener 的另一个示例,但启用了 Shared 订阅和 concurrency

@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
				subscriptionType = SubscriptionType.Shared, concurrency = "5")
void listen(String message) {
    ...
}

在前述示例中,PulsarListener 创建五个不同的消费者(这次我们假设主题有五个分区)。

在这个版本中,没有消息排序,因为 Shared 订阅不能保证在 Pulsar 中有任何消息排序。

如果您需要消息排序并且仍然需要共享订阅类型,则需要使用 Key_Shared 订阅类型。

Message Consumption

让我们看看消息侦听器容器如何同时启用单记录和基于批处理的消息消耗。

Single Record Consumption

让我们重新审视一下我们为此次讨论而提供的基本 PulsarListener

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

使用此 PulsarListener 方法,我们实际上要求 Spring for Apache Pulsar 每一次使用单条记录来调用侦听器方法。我们提到消息侦听器容器使用消费者的 batchReceive 方法以批处理方式使用数据。在此情形下,框架检测到 PulsarListener 接收单条记录。这意味着在每次调用此方法时,它需要一条记录。虽然消息侦听器容器以批处理方式使用记录,但它会遍历已收到的批处理并通过 PulsarRecordMessageListener 适配器调用侦听器方法。如您在前一节所见,PulsarRecordMessageListener 扩展自 Pulsar Java 客户端提供的 MessageListener,并且支持基本 received 方法。

Batch Consumption

以下示例显示了 PulsarListener 以批处理方式使用记录:

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen4(List<Foo> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message));
}

当您使用此类型的 PulsarListener 时,框架会检测到您处于批处理模式。因为它已经使用消费者的 batchReceive 方法以批处理方式接收数据,所以它将整个批处理通过 PulsarBatchMessageListener 适配器传递到侦听器方法。

Pulsar Headers

Pulsar 消息元数据可以作为 Spring 消息头被使用。可用的标头列表可以在{github}/blob/main/spring-pulsar/src/main/java/org/springframework/pulsar/support/PulsarHeaders.java[PulsarHeaders.java]中找到。

Accessing in Single Record based Consumer

以下示例展示了如何在一使用单条记录使用模式的应用程序中访问各种 Pulsar 头:

@PulsarListener(topics = "simpleListenerWithHeaders")
void simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
                @Header(PulsarHeaders.RAW_DATA) byte[] rawData,
                @Header("foo") String foo) {

}

在前述示例中,我们访问 messageIdrawData 消息元数据的值,以及一个名为 foo 的自定义消息属性。Spring @Header 注释用于每个头字段。

您还可以将 Pulsar 的 Message 用作承载负载的信封。在此过程中,用户可以直接调用 Pulsar 消息上的对应方法来检索元数据。但是,为了方便起见,您也可以使用 Header 注释来检索它。请注意,您还可以使用 Spring 消息 Message 信封来承载负载,然后通过 @Header 检索 Pulsar 头。

Accessing in Batch Record based Consumer

在本节中,我们了解如何在使用批处理消费者的应用程序中访问各种 Pulsar 头部:

@PulsarListener(topics = "simpleBatchListenerWithHeaders", batch = true)
void simpleBatchListenerWithHeaders(List<String> data,
					@Header(PulsarHeaders.MESSAGE_ID) List<MessageId> messageIds,
					@Header(PulsarHeaders.TOPIC_NAME) List<String> topicNames, @Header("foo") List<String> fooValues) {

}

在前一个示例中,我们以 List<String> 的形式消费数据。在提取各种头部时,我们也以 List<> 的形式进行提取。针对 Apache Pulsar 的 Spring 确保头部列表与数据列表相对应。

当使用批处理侦听器并将有效负载接收为 List<org.apache.pulsar.client.api.Message<?>, org.apache.pulsar.client.api.Messages<?>, 或 org.springframework.messaging.Messsge<?> 时,你也可以以相同的方式提取头部。

Message Acknowledgment

当使用针对 Apache Pulsar 的 Spring 时,除非应用程序选择退出,否则消息应答由框架处理。在本节中,我们详细了解框架如何处理消息应答。

Message ACK modes

针对 Apache Pulsar 的 Spring 提供以下消息应答模式:

  • BATCH

  • RECORD

  • MANUAL

BATCH 应答模式为默认模式,但你可以在消息侦听器容器上对其进行更改。在以下部分中,我们将了解在同时使用 PulsarListener 的单版本和批处理版本时应答如何运作,以及它们如何转换为支持的消息侦听器容器(最终转换为 Pulsar 消费者)。

Automatic Message Ack in Single Record Mode

我们回顾一下基于单个基本消息的 PulsarListener

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

如果你熟悉直接使用 Pulsar 消费者,那么自然会好奇在使用 PulsarListener 时应答如何运作。答案归结为消息侦听器容器,因为它是针对 Apache Pulsar 的 Spring 中协调所有消费者相关活动的核心位置。

假设你没有覆盖默认行为,则在使用上述 PulsarListener 时幕后将发生以下情况:

  1. 首先,监听器容器从 Pulsar 消费者接收批处理消息。

  2. 接收的消息一次一条地交付给 PulsarListener

  3. 当所有记录都已交付给监听器方法并已成功处理时,容器会确认原始批处理中的所有消息。

这是正常流程。如果原始批处理中的任何记录抛出异常,针对 Apache Pulsar 的 Spring 将分别跟踪这些记录。在批处理中的所有记录都经过处理后,针对 Apache Pulsar 的 Spring 将应答所有成功消息并对所有失败消息进行否定应答 (nack)。换句话说,在使用 PulsarRecordMessageListener 消费单个记录且使用 BATCH 的默认应答模式时,该框架将等待从 batchReceive 调用接收的所有记录都成功处理,然后在 Pulsar 消费者上调用 acknowledge 方法。如果在调用处理程序方法时任何特定记录抛出异常,针对 Apache Pulsar 的 Spring 将跟踪这些记录并在处理整个批处理后对这些记录分别调用 negativeAcknowledge

如果应用程序希望应答或否定应答按记录发生,则可以启用 RECORD 应答模式。在这种情况下,在处理每条记录后,如果没有错误则应答消息,如果有错误则否定应答。以下示例在 Pulsar 侦听器上启用 RECORD 应答模式:

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.RECORD)
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

Manual Message Ack in Single Record Mode

你可能并不总是希望框架发送应答,而希望直接从应用程序本身发送应答。针对 Apache Pulsar 的 Spring 提供了几种启用手动消息应答的方法。以下示例展示了其中一种方法:

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Acknowledgment acknowledgment) {
    System.out.println("Message Received: " + message.getValue());
	acknowledgment.acknowledge();
}

这里有一些事情值得解释。首先,我们通过在 PulsarListener 上设置 ackMode 来启用手动应答模式。启用手动应答模式时,针对 Apache Pulsar 的 Spring 允许应用程序注入一个 Acknowledgment 对象。该框架通过选择兼容的消息侦听器容器来实现这一目标:基于单个记录的消费的 PulsarAcknowledgingMessageListener,它允许你访问 Acknowledgment 对象。

Acknowledgment 对象提供以下 API 方法:

void acknowledge();

void acknowledge(MessageId messageId);

void acknowledge(List<MessageId> messageIds);

void nack();

void nack(MessageId messageId);

在使用 MANUAL 应答模式的同时,你可以将此 Acknowledgment 对象注入到你的 PulsarListener 中,然后调用相应的方法之一。

在上述 PulsarListener 示例中,我们调用了一个无参数的 acknowledge 方法。这是因为框架知道它当前在哪个 Message 下操作。在调用 acknowledge() 时,你不必通过 Message 信封接收有效负载,而应使用目标类型——在本例中为 String。你还可以通过提供消息 ID 来调用 acknowledge 的不同变体:acknowledge.acknowledge(message.getMessageId());`在使用 `acknowledge(messageId) 时,你必须使用 Message<?> 信封接收有效负载。

与应答类似,Acknowledgment API 也提供否定应答选项。请参阅前面显示的 nack 方法。

你也可以直接在 Pulsar 消费者上调用 acknowledge

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Consumer<String> consumer) {
    System.out.println("Message Received: " + message.getValue());
	try {
		consumer.acknowledge(message);
	}
	catch (Exception e) {
		....
	}
}

在直接在基础消费者上调用 acknowledge 时,你需要自己进行错误处理。使用 Acknowledgment 不需要这样做,因为框架可以为你完成此操作。因此,在使用手动应答时,你应该使用 Acknowledgment 对象方法。

当使用手动确认时,理解框架完全保留所有确认非常重要。因此,在设计应用程序时,仔细考虑正确的确认策略非常重要。

Automatic Message Ack in Batch Consumption

当以批处理形式消费记录(请参阅“Message ACK modes”)且使用 BATCH 的默认应答模式时,在成功处理整个批处理后,将对整个批处理进行应答。如果任何记录抛出异常,将对整个批处理进行否定应答。请注意,这可能与在生产者端批处理的批处理不同。相反,这是从在消费者上调用 batchReceive 返回的批处理

考虑以下批处理侦听器:

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", batch = true)
public void batchListen(List<Foo> messages) {
    for (Foo foo : messages) {
		...
    }
}

当传入集合(在本例中为消息)中的所有消息均处理完毕后,框架将确认所有消息。

在批处理模式中使用时,“RECORD”不是允许的确认模式。这可能会引起问题,因为应用程序可能不希望整个批处理再次重新传达。在这种情况下,您需要使用`MANUAL`确认模式。

Manual Message Ack in Batch Consumption

如在上一节中所见,当“MANUAL”确认模式在消息侦听器容器上设置时,框架不会执行任何确认,无论为正还是负。它完全取决于应用程序来处理此类问题。当设置`MANUAL`确认模式时,Spring for Apache Pulsar 会选择一个兼容的消息侦听器容器:`PulsarBatchAcknowledgingMessageListener`用于批处理使用,它允许您访问“确认”对象。以下是“确认”API 中提供的方法:

void acknowledge();

void acknowledge(MessageId messageId);

void acknowledge(List<MessageId> messageIds);

void nack();

void nack(MessageId messageId);

您可以将此“确认”对象注入到`PulsarListener`中,同时使用`MANUAL`确认模式。以下清单显示了基于批处理的侦听器的基本示例:

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(List<Message<String>> messgaes, Acknowlegement acknowledgment) {
    for (Message<String> message : messages) {
		try {
			...
	        acknowledgment.acknowledge(message.getMessageId());
		}
		catch (Exception e) {
			acknowledgment.nack(message.getMessageId());
		}
    }
}

当您使用批处理侦听器时,消息侦听器容器无法得知它当前正在对哪条记录执行操作。因此,若要手动确认,您需要使用接受`MessageId`或`List<MessageId>`的重载`acknowledge`方法。您还可以对批处理侦听器的`MessageId`进行否定确认。

Message Redelivery and Error Handling

现在我们已经了解了`PulsarListener`和消息侦听器容器基础架构及其各种功能,接下来让我们尝试理解消息重新投递和错误处理。Apache Pulsar 为消息重新投递和错误处理提供了各种本地策略。我们将仔细观察这些策略,并了解如何通过 Spring for Apache Pulsar 使用它们。

Specifying Acknowledgment Timeout for Message Redelivery

默认情况下,Pulsar 消费者不会重新投递消息,除非消费者崩溃,但您可以通过在 Pulsar 消费者上设置确认超时来更改此行为。如果确认超时属性的值大于零,并且 Pulsar 消费者在该超时时间内未确认消息,则会重新投递消息。

当您使用 Spring for Apache Pulsar 时,您可以通过 [消费者自定义程序,在 PulsarListener 中进行消费者自定义]或使用`@PulsarListener`的`properties`属性中的本地 Pulsar`ackTimeout`属性来设置此属性:

@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
                properties = {"ackTimeout=60s"})
public void listen(String s) {
    ...
}

如果您指定确认超时,如果消费者在 60 秒内未发送确认,Pulsar 会将消息重新投递给消费者。

如果您想为具有不同延迟的确认超时指定一些高级回退选项,可以执行以下操作:

@EnablePulsar
@Configuration
class AckTimeoutRedeliveryConfig {

    @PulsarListener(subscriptionName = "withAckTimeoutRedeliveryBackoffSubscription",
            topics = "withAckTimeoutRedeliveryBackoff-test-topic",
            ackTimeoutRedeliveryBackoff = "ackTimeoutRedeliveryBackoff",
            properties = { "ackTimeout=60s" })
    void listen(String msg) {
        // some long-running process that may cause an ack timeout
    }

    @Bean
    RedeliveryBackoff ackTimeoutRedeliveryBackoff() {
        return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
                .build();
    }

}

在前面的示例中,我们使用 Pulsar 的`RedeliveryBackoff`指定一个 Bean,该 Bean 具有 1 秒的最小延迟、10 秒的最大延迟和 2 的回退倍数。在初始确认超时发生后,消息重新投递将通过此回退 Bean 进行控制。我们通过将`ackTimeoutRedeliveryBackoff`属性设置为实际 Bean 名称(在本例中为`ackTimeoutRedeliveryBackoff`)将回退 Bean 提供给`PulsarListener`注释。

Specifying Negative Acknowledgment Redelivery

在否定确认时,Pulsar 消费者允许您指定应用程序希望如何重新投递消息。默认情况下,消息将在 1 分钟内重新投递,但您可以通过 [消费者自定义程序,在 PulsarListener 中进行消费者自定义]或使用`@PulsarListener`的`properties`属性中的本地 Pulsar `negativeAckRedeliveryDelay`属性来更改默认设置:

@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
                properties = {"negativeAckRedeliveryDelay=10ms"})
public void listen(String s) {
    ...
}

您还可以使用回退倍数指定不同的延迟和回退机制,方法是提供一个`RedeliveryBackoff` Bean,并像下面这样将 Bean 名称作为 PulsarProducer 上的`negativeAckRedeliveryBackoff`属性提供:

@EnablePulsar
@Configuration
class NegativeAckRedeliveryConfig {

    @PulsarListener(subscriptionName = "withNegRedeliveryBackoffSubscription",
            topics = "withNegRedeliveryBackoff-test-topic", negativeAckRedeliveryBackoff = "redeliveryBackoff",
            subscriptionType = SubscriptionType.Shared)
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @Bean
    RedeliveryBackoff redeliveryBackoff() {
        return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
                .build();
    }

}

Using Dead Letter Topic from Apache Pulsar for Message Redelivery and Error Handling

Apache Pulsar 允许应用程序在具有`Shared`订阅类型的消费者上使用死信主题。对于`Exclusive`和`Failover`订阅类型,此功能不可用。基本思想是,如果某条消息被重试了一定次数(可能是由于确认超时或 Nack 重新投递),一旦重试次数用尽,消息就可以发送到一个名为死信队列 (DLQ) 的特殊主题中。让我们通过检查一些代码段来了解该功能的一些细节:

@EnablePulsar
@Configuration
class DeadLetterPolicyConfig {

    @PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
            topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy",
            subscriptionType = SubscriptionType.Shared, properties = { "ackTimeout=1s" })
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @PulsarListener(id = "dlqListener", topics = "my-dlq-topic")
    void listenDlq(String msg) {
        System.out.println("From DLQ: " + msg);
    }

    @Bean
    DeadLetterPolicy deadLetterPolicy() {
        return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
    }

}

首先,我们有一个用于`DeadLetterPolicy`的特殊 Bean,它被命名为`deadLetterPolicy`(可以是您想要的任何名称)。此 Bean 指定许多内容,例如最大传递次数(在本例中为 10)和死信主题的名称(在本例中为`my-dlq-topic`)。如果您不指定 DLQ 主题名称,则在 Pulsar 中默认为 <topicname>-<subscriptionname>-DLQ。接下来,我们通过设置`deadLetterPolicy`属性将此 Bean 名称提供给`PulsarListener`。请注意,PulsarListener`的订阅类型为`Shared,因为 DLQ 功能仅适用于共享订阅。此代码主要用于演示目的,因此我们提供了 1 秒的`ackTimeout`值。这个想法是代码抛出异常,如果 Pulsar 在 1 秒内没有收到确认,就会重试。如果该循环持续十次(这是我们在`DeadLetterPolicy`中的最大重新投递次数),Pulsar 消费者会将消息发布到 DLQ 主题。我们有另一个`PulsarListener`侦听 DLQ 主题,以便在数据发布到 DLQ 主题时接收数据。

Special note on DLQ topics when using partitioned topics

如果主主题被分区,在后台,Pulsar 将每个分区视为单独主题。Pulsar 会将`partition-<n>`附加到主主题名称,其中`n`代表分区号。问题在于,如果您没有指定 DLQ 主题(与我们上面所做相反),Pulsar 会发布到默认主题名称,其中包含此 `partition-<n> 信息——例如:topic-with-dlp-partition-0-deadLetterPolicySubscription-DLQ。解决此问题的简单方法是始终提供 DLQ 主题名称。

Native Error Handling in Spring for Apache Pulsar

正如我们前面提到的,Apache Pulsar 中的 DLQ 功能仅适用于共享订阅。如果应用程序需要对非共享订阅使用一些类似功能,该怎么办?Pulsar 不支持在独占和故障转移订阅上使用 DLQ 的主要原因是这些订阅类型有顺序保证。允许重新投递、DLQ 等实际上是以无序的方式接收消息。但是,如果应用程序对此没有意见,但更重要的是,是否需要非共享订阅的此 DLQ 功能?为此,Spring for Apache Pulsar 提供了`PulsarConsumerErrorHandler`,您可以在 Pulsar 中的任何订阅类型(ExclusiveFailoverShared`或`Key_Shared)中使用它:

当您从 Spring for Apache Pulsar 使用`PulsarConsumerErrorHandler`时,请确保不要在侦听器上设置确认超时属性。

让我们看看通过检查一些代码段来了解一些详细信息:

@EnablePulsar
@Configuration
class PulsarConsumerErrorHandlerConfig {

    @Bean
    PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
            PulsarTemplate<String> pulsarTemplate) {
        return new DefaultPulsarConsumerErrorHandler<>(
                new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
    }

    @PulsarListener(id = "pulsarConsumerErrorHandler-id", subscriptionName = "pulsatConsumerErrorHandler-subscription",
                    topics = "pulsarConsumerErrorHandler-topic",
					pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler")
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @PulsarListener(id = "pceh-dltListener", topics = "my-foo-dlt")
    void listenDlt(String msg) {
        System.out.println("From DLT: " + msg);
    }

}

考虑 pulsarConsumerErrorHandler bean。这创建了一个 PulsarConsumerErrorHandler 类型的 bean,并使用 Spring 为 Apache Pulsar 提供的开箱即用的默认实现:DefaultPulsarConsumerErrorHandlerDefaultPulsarConsumerErrorHandler 有一个构造函数,它获取一个 PulsarMessageRecovererFactory 和一个 org.springframework.util.backoff.BackoffPulsarMessageRecovererFactory 是一个具有以下 API 的函数式接口:

@FunctionalInterface
public interface PulsarMessageRecovererFactory<T> {

	/**
	 * Provides a message recoverer {@link PulsarMessageRecoverer}.
	 * @param consumer Pulsar consumer
	 * @return {@link PulsarMessageRecoverer}.
	 */
	PulsarMessageRecoverer<T> recovererForConsumer(Consumer<T> consumer);

}

recovererForConsumer 方法获取一个 Pulsar 消费者并返回一个 PulsarMessageRecoverer,这是一个另外的函数式接口。以下是 PulsarMessageRecoverer 的 API:

public interface PulsarMessageRecoverer<T> {

	/**
	 * Recover a failed message, for e.g. send the message to a DLT.
	 * @param message Pulsar message
	 * @param exception exception from failed message
	 */
	void recoverMessage(Message<T> message, Exception exception);

}

Spring for Apache Pulsar 为 PulsarMessageRecovererFactory 提供了一个实现,称为 PulsarDeadLetterPublishingRecoverer,该实现提供了一个默认实现,可以通过将其发送到死信主题 (DLT) 来恢复消息。我们将此实现提供给前面 DefaultPulsarConsumerErrorHandler 的构造函数。作为第二个参数,我们提供一个 FixedBackOff。你还可以提供 Spring 提供的 ExponentialBackoff 以获取高级后退功能。然后,我们将此 bean 名称提供给 PulsarConsumerErrorHandler,作为 PulsarListener 的属性。该属性称为 pulsarConsumerErrorHandler。每次 PulsarListener 方法因消息而失败时,它都会被重试。重试次数由提供的 Backoff 实现值控制。在我们的示例中,我们执行了 10 次重试(总共 11 次尝试——第一次和随后 10 次重试)。一旦耗尽所有重试,消息将被发送到 DLT 主题。

我们提供的 PulsarDeadLetterPublishingRecoverer 实现使用 PulsarTemplate,该模板用于将消息发布到 DLT。在大多数情况下,来自 Spring Boot 的相同自动配置 PulsarTemplate 就足够了,对于分区主题有一个警告。当使用分区主题以及对主主题使用自定义消息路由时,你必须使用不采用自动配置的 PulsarTemplate,其使用带有 message-routing-modecustompartitionPulsarProducerFactory。你可以使用带有以下蓝图的 PulsarConsumerErrorHandler

@Bean
PulsarConsumerErrorHandler<Integer> pulsarConsumerErrorHandler(PulsarClient pulsarClient) {
    PulsarProducerFactory<Integer> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Map.of());
        PulsarTemplate<Integer> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);

        BiFunction<Consumer<?>, Message<?>, String> destinationResolver =
                (c, m) -> "my-foo-dlt";

        PulsarDeadLetterPublishingRecoverer<Integer> pulsarDeadLetterPublishingRecoverer =
                new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, destinationResolver);

        return new DefaultPulsarConsumerErrorHandler<>(pulsarDeadLetterPublishingRecoverer,
                new FixedBackOff(100, 5));
}

请注意,我们要将目标解析程序作为第二个构造函数参数提供给 PulsarDeadLetterPublishingRecoverer。如果没有提供,PulsarDeadLetterPublishingRecoverer<subscription-name>-<topic-name>-DLT> 用作 DLT 主题名称。使用此功能时,你应当通过设置目标解析程序来使用一个合适的目标名称,而不是使用默认值。

当使用单记录消息侦听器时,就像我们在 PulsarConsumerErrorHnadler 中所做的那样,并且如果你使用手动确认,请务必确保在引发异常时不否定确认消息。相反,将异常重新抛回容器。否则,容器认为该消息被单独处理,并且不会触发错误处理。

最后,我们有一个第二个 PulsarListener,它从 DLT 主题接收消息。

在本节中提供的示例中,我们只看到了如何将 PulsarConsumerErrorHandler 与一个单记录消息侦听器配合使用。接下来,我们看看如何在批处理侦听器上使用它。

Batch listener with PulsarConsumerErrorHandler

首先,让我们看看一个批处理 PulsarListener 方法:

@PulsarListener(subscriptionName = "batch-demo-5-sub", topics = "batch-demo-4", batch = true, concurrency = "3",
            subscriptionType = SubscriptionType.Failover,
            pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler", ackMode = AckMode.MANUAL)
void listen(List<Message<Integer>> data, Consumer<Integer> consumer, Acknowledgment acknowledgment) {
    for (Message<Integer> datum : data) {
        if (datum.getValue() == 5) {
            throw new PulsarBatchListenerFailedException("failed", datum);
        }
        acknowledgement.acknowledge(datum.getMessageId());
    }
}

@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
        PulsarTemplate<String> pulsarTemplate) {
    return new DefaultPulsarConsumerErrorHandler<>(
            new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}

@PulsarListener(subscriptionName = "my-dlt-subscription", topics = "my-foo-dlt")
void dltReceiver(Message<Integer> message) {
    System.out.println("DLT - RECEIVED: " + message.getValue());
}

我们再次用 PulsarConsumerErrorHandler bean 名称提供 pulsarConsumerErrorHandler 属性。当你使用批处理侦听器(如前面示例中所示)并希望使用 Spring for Apache Pulsar 的 PulsarConsumerErrorHandler 时,你需要使用手动确认。通过这种方式,你可以确认所有成功的个别消息。对于失败的消息,你必须使用失败的消息抛出 PulsarBatchListenerFailedException。没有此异常,框架不知道如何处理失败。在重试时,容器会发送一批新的消息,从失败的消息开始发送给侦听器。如果再次失败,它将被重试,直到重试用尽为止,此时消息将被发送到 DLT。那时,该消息将被容器确认,侦听器将移交给原始批处理中的后续消息。

Consumer Customization on PulsarListener

Spring for Apache Pulsar 提供了一种便捷的方法来定制 PulsarListener 使用的容器创建的消费者。应用程序可以为 PulsarListenerConsumerBuilderCustomizer 提供一个 bean。这里是一个示例。

@Bean
public PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
    return cb -> {
        cb.subscriptionName("modified-subscription-name");
    };
}

然后,此自定义器 bean 名称可以像下面所示作为 PuslarListener 注解上的属性提供。

@PulsarListener(subscriptionName = "my-subscription",
        topics = "my-topic", consumerCustomizer = "myCustomizer")
void listen(String message) {

}

框架通过 PulsarListener 检测提供的 bean,并在创建 Pulsar 消费者之前对 Consumer 构建器应用此自定义器。

如果你有多个 PulsarListener 方法,并且它们各自具有不同的自定义规则,你应该创建多个自定义器 bean,并将适当的自定义器附加到每个 PulsarListener

Pausing and Resuming Message Listener Containers

在某些情况下,应用程序可能希望暂时暂停消息消费,然后稍后再恢复。Spring for Apache Pulsar 提供了暂停和恢复基础消息侦听器容器的能力。当 Pulsar 消息侦听器容器被暂停时,容器执行的任何轮询以从 Pulsar 消费者接收数据都将被暂停。类似地,当容器恢复时,如果暂停期间主题有任何新的记录添加,则下一个轮询开始返回数据。

要暂停或恢复侦听器容器,首先通过 PulsarListenerEndpointRegistry bean 获取容器实例,然后在容器实例上调用暂停/恢复 API - 如下面的代码段所示:

@Autowired
private PulsarListenerEndpointRegistry registry;

void someMethod() {
  PulsarMessageListenerContainer container = registry.getListenerContainer("my-listener-id");
  container.pause();
}

传递给 getListenerContainer 的 id 参数是容器 id - 当暂停/继续 @PulsarListener 时,它将是 @PulsarListener id 属性的值。

Pulsar Reader Support

此框架提供支持,以通过 `PulsarReaderFactory`使用 {apache-pulsar-docs}/concepts-clients/#reader-interface[Pulsar Reader]。

Spring Boot 提供此读者工厂,您可以通过指定 {spring-boot-pulsar-config-props}[spring.pulsar.reader.*] 应用程序属性来进一步配置该工厂。

PulsarReader Annotation

虽然可以直接使用 PulsarReaderFactory,但 Spring for Apache Pulsar 提供了 PulsarReader 注解,你可以使用它来从一个主题快速读取,而无需自己设置任何读取器工厂。这与 PulsarListener 背后的想法类似。这是一个快速示例。

@PulsarReader(id = "reader-demo-id", topics = "reader-demo-topic", startMessageId = "earliest")
void read(String message) {
    //...
}

id 属性是可选的,但最好为你的应用程序提供一个有意义的值。如果未指定,将使用自动生成的 ID。另一方面,topicsstartMessageId 属性是必需的。topics 属性可以是一个单独的主题或一个以逗号分隔的主题列表。startMessageId 属性指示读取器从主题中的特定消息开始。startMessageId 的有效值为 earliestlatest。假设你希望读取器从主题中的任意消息开始,而不是最早或最新的可用消息。在这种情况下,你需要使用 ReaderBuilderCustomizer 来定制 ReaderBuilder,以便它知道要从哪个正确的 MessageId 开始。

Customizing the ReaderBuilder

你可以在 Spring for Apache Pulsar 中使用 PulsarReaderReaderBuilderCustomizer 来自定义通过 ReaderBuilder 可用的任何字段。你可以提供一个 PulsarReaderReaderBuilderCustomizer 类型的 @Bean,然后如下使其对 PulsarReader 可用。

@PulsarReader(id = "reader-customizer-demo-id", topics = "reader-customizer-demo-topic",
    readerCustomizer = "myCustomizer")
void read(String message) {
    //...
}

@Bean
public PulsarReaderReaderBuilderCustomizer<String> myCustomizer() {
    return readerBuilder -> {
        readerBuilder.startMessageId(messageId); // the first message read is after this message id.
        // Any other customizations on the readerBuilder
    };
}

如果您的应用程序中仅注册有一个 @PulsarReader 和一个 PulsarReaderReaderBuilderCustomizer bean,那么该定制器会自动应用。

Topic Resolution

@2

Publishing and Consuming Partitioned Topics

在下面的示例中,我们发布到名为 hello-pulsar-partitioned 的主题。这是一个已分区的主题,并且在这个示例中,我们假设该主题已使用三个分区创建。

@SpringBootApplication
public class PulsarBootPartitioned {

	public static void main(String[] args) {
		SpringApplication.run(PulsarBootPartitioned.class, "--spring.pulsar.producer.message-routing-mode=CustomPartition");
	}

	@Bean
	public ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
		pulsarTemplate.setDefaultTopicName("hello-pulsar-partitioned");
		return args -> {
			for (int i = 0; i < 10; i++) {
				pulsarTemplate.sendAsync("hello john doe 0 ", new FooRouter());
				pulsarTemplate.sendAsync("hello alice doe 1", new BarRouter());
				pulsarTemplate.sendAsync("hello buzz doe 2", new BuzzRouter());
			}
		};
	}

	@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned")
	public void listen(String message) {
		System.out.println("Message Received: " + message);
	}

    static class FooRouter implements MessageRouter {

		@Override
		public int choosePartition(Message<?> msg, TopicMetadata metadata) {
			return 0;
		}
	}

	static class BarRouter implements MessageRouter {

		@Override
		public int choosePartition(Message<?> msg, TopicMetadata metadata) {
			return 1;
		}
	}

	static class BuzzRouter implements MessageRouter {

		@Override
		public int choosePartition(Message<?> msg, TopicMetadata metadata) {
			return 2;
		}
	}

}

在前面的示例中,我们发布到分区主题,并且我们希望将一些数据段发布到特定分区。如果你将它留给 Pulsar 的默认设置,它将遵循分区分配的循环模式,并且我们希望覆盖它。为此,我们提供了一个带有 send 方法的消息路由器对象。考虑实现的三种消息路由器。FooRouter 总发送数据到分区 0BarRouter 发送到分区 1BuzzRouter 发送到分区 2。还要注意,我们现在使用 PulsarTemplatesendAsync 方法,该方法会返回 CompletableFuture。在运行应用程序时,我们还需要将生产者的 messageRoutingMode 设置为 CustomPartition (spring.pulsar.producer.message-routing-mode)。

在消费者端,我们使用了一个具有独占订阅类型的 PulsarListener。这意味着来自所有分区的将最终出现在同一个消费者中,且没有排序的保证。

如果我们希望每个分区都被一个单独的消费者消费,我们能做些什么?我们可以切换到“故障转移”订阅模式,并添加三个单独的消费者:

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen1(String foo) {
    System.out.println("Message Received 1: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen2(String foo) {
    System.out.println("Message Received 2: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription",  topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen3(String foo) {
    System.out.println("Message Received 3: " + foo);
}

当你遵循此方法时,一个分区总会被一个专用的消费者消费。

类似地,如果你想使用 Pulsar 的共享消费者类型,可以使用 shared 订阅类型。但是,当你使用 shared 模式时,你会失去任何排序保证,因为一个消费者在另一个消费者有机会之前从所有分区接收消息。

请考虑以下示例:

@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen1(String foo) {
    System.out.println("Message Received 1: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen2(String foo) {
    System.out.println("Message Received 2: " + foo);
}