Reactive Support

该框架为几乎所有受支持的功能提供了响应式对应部分。

The framework provides a Reactive counterpart for almost all supported features.

如果您在提供的命令式组件前面加上单词 Reactive,可能会发现它对应的响应式组件。

If you put the word Reactive in front of a provided imperative component, you will likely find its Reactive counterpart.

  • PulsarTemplate → ReactivePulsarTemplate

  • PulsarListener → ReactivePulsarListener

  • PulsarConsumerFactory → ReactivePulsarConsumerFactory

  • etc..

但是,以下是尚未支持的:

However, the following is not yet supported:

  • Error Handling in non-shared subscriptions

  • Accessing Pulsar headers via @Header in streaming mode

  • Observations

Preface

我们建议为基于 Apache Pulsar 的 Spring 应用程序采用 Spring Boot First 方式,因为这极大地简化了流程。为此,您可以将 spring-pulsar-reactive-spring-boot-starter 模块添加为依赖项。

We recommend using a Spring-Boot-First approach for Spring for Apache Pulsar-based applications, as that simplifies things tremendously. To do so, you can add the spring-pulsar-reactive-spring-boot-starter module as a dependency.

本参考的大部分内容都假定读者使用该启动程序,并提供了针对该特定内容的大部分配置说明。但是,有意识强调针对 Spring Boot 启动程序使用的特定说明。

The majority of this reference expects the reader to be using the starter and gives most directions for configuration with that in mind. However, an effort is made to call out when instructions are specific to the Spring Boot starter usage.

@8

Quick Tour

我们将快速浏览一下 Spring for Apache Pulsar 中的 Reactive 支持,方法是展示一个生成和以响应方式消费的 Spring Boot 样本应用程序。这是一个完整的应用程序,不需要任何其他配置,只要在默认位置运行 Pulsar 集群即可 - localhost:6650

We will take a quick tour of the Reactive support in Spring for Apache Pulsar by showing a sample Spring Boot application that produces and consumes in a Reactive fashion. This is a complete application and does not require any additional configuration, as long as you have a Pulsar cluster running on the default location - localhost:6650.

Dependencies

Spring Boot 应用程序只需要 spring-pulsar-reactive-spring-boot-starter 依赖项。以下列表分别演示了如何为 Maven 和 Gradle 定义依赖项:

Spring Boot applications need only the spring-pulsar-reactive-spring-boot-starter dependency. The following listings show how to define the dependency for Maven and Gradle, respectively:

  • Maven

  • Gradle

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-pulsar-reactive</artifactId>
        <version>{spring-boot-version}</version>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-pulsar-reactive:{spring-boot-version}'
}

当使用 0.2.x 版本 时,以上坐标将发生如下更改:

When using Version 0.2.x the above coordinates change as follows:

Maven
<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-reactive-spring-boot-starter</artifactId>
        <version>0.2.0</version>
    </dependency>
</dependencies>
Gradle
dependencies {
    implementation 'org.springframework.pulsar:spring-pulsar-reactive-spring-boot-starter:0.2.0'
}

Application Code

以下是应用程序源代码:

Here is the application source code:

@SpringBootApplication
public class ReactiveSpringPulsarHelloWorld {

    public static void main(String[] args) {
        SpringApplication.run(ReactiveSpringPulsarHelloWorld.class, args);
    }

    @Bean
    ApplicationRunner runner(ReactivePulsarTemplate<String> pulsarTemplate) {
        return (args) -> pulsarTemplate.send("hello-pulsar-topic", "Hello Reactive Pulsar World!").subscribe();
    }

    @ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
    Mono<Void> listen(String message) {
        System.out.println("Reactive listener received: " + message);
        return Mono.empty();
    }
}

就是这样,使用几行代码,我们便能够得到一个可以在响应模式下生产和使用来自 Pulsar 主题消息的 Spring Boot 应用。

That is it, with just a few lines of code we have a working Spring Boot app that is producing and consuming messages from a Pulsar topic in a Reactive fashion.

一旦启动,该应用程序便会使用一个 ReactivePulsarTemplate 将信息发送给 hello-pulsar-topic。然后,它使用一个 @ReactivePulsarListener 来使用 hello-pulsar-topic

Once started, the application uses a ReactivePulsarTemplate to send messages to the hello-pulsar-topic. It then consumes from the hello-pulsar-topic using a @ReactivePulsarListener.

简洁性的关键成分之一是 Spring Boot 启动程序,它会自动配置并向应用程序提供所需的组件。

One of the key ingredients to the simplicity is the Spring Boot starter which auto-configures and provides the required components to the application

Design

以下是一些需要牢记的关键设计要点。

Here are a few key design points to keep in mind.

Apache Pulsar Reactive

反应式支持最终由 Apache Pulsar Reactive client 提供,其当前实现是常规 Pulsar 客户端异步 API 周围的完全非阻塞适配器。这意味着反应式客户端需要常规客户端。

The reactive support is ultimately provided by the Apache Pulsar Reactive client whose current implementation is a fully non-blocking adapter around the regular Pulsar client’s asynchronous API. This implies that the Reactive client requires the regular client.

Additive Auto-Configuration

由于依赖于常规(命令式)客户端,因此框架提供的响应式自动配置是对命令式自动配置的补充。换句话说,命令式启动器仅包括命令式组件,而响应式启动器既包括命令式组件,也包括响应式组件。

Due to the dependence on the regular (imperative) client, the Reactive auto-configuration provided by the framework is additive to the imperative auto-configuration. In other words, The imperative starter only includes the imperative components but the reactive starter includes both imperative and reactive components.

Reactive Pulsar Client

使用响应式 Pulsar Spring Boot 启动器后,将自动配置 ReactivePulsarClient

When you use the Reactive Pulsar Spring Boot Starter, you get the ReactivePulsarClient auto-configured.

默认情况下,该应用程序会尝试连接到 pulsar://localhost:6650 处的本地 Pulsar 实例。可以通过将 spring.pulsar.client.service-url 属性设为不同的值来调整此功能。

By default, the application tries to connect to a local Pulsar instance at pulsar://localhost:6650. This can be adjusted by setting the spring.pulsar.client.service-url property to a different value.

该值必须是一个有效的 {apache-pulsar-docs}/client-libraries-java/#connection-urls[Pulsar 协议] URL

The value must be a valid {apache-pulsar-docs}/client-libraries-java/#connection-urls[Pulsar Protocol] URL

还可以配置许多其他应用程序属性(从已改编的命令式客户端继承而来)。请参阅 {spring-boot-pulsar-config-props}[spring.pulsar.client.*] 应用程序属性。

There are many other application properties (inherited from the adapted imperative client) available to configure. See the {spring-boot-pulsar-config-props}[spring.pulsar.client.*] application properties.

Authentication

若要连接到需要身份验证的 Pulsar 集群,请按照 the same steps 作为命令客户端。同样,这是因为反应式客户端调整命令客户端,该客户端负责处理所有安全配置。

To connect to a Pulsar cluster that requires authentication, follow the same steps as the imperative client. Again, this is because the reactive client adapts the imperative client which handles all security configuration.

Message Production

ReactivePulsarTemplate

在 Pulsar 生产者端,Spring Boot 自动配置为发布记录提供 ReactivePulsarTemplate。该模板实现了名为 ReactivePulsarOperations 的接口,并提供了通过其合约发布记录的方法。

On the Pulsar producer side, Spring Boot auto-configuration provides a ReactivePulsarTemplate for publishing records. The template implements an interface called ReactivePulsarOperations and provides methods to publish records through its contract.

该模板提供接受单个消息并返回 Mono<MessageId> 的发送方法。它还提供接受多个消息(采用 ReactiveStreams Publisher 类型)并返回 Flux<MessageId> 的发送方法。

The template provides send methods that accept a single message and return a Mono<MessageId>. It also provides send methods that accept multiple messages (in the form of the ReactiveStreams Publisher type) and return a Flux<MessageId>.

对于不包含主题参数的 API 变体,topic resolution process 用于确定目标主题。

For the API variants that do not include a topic parameter, a topic-resolution-process-reactive is used to determine the destination topic.

Fluent API

该模板提供了一个 {javadocs}/org/springframework/pulsar/reactive/core/ReactivePulsarOperations.html#newMessage(T)[fluent builder] 来处理更复杂的发送请求。

The template provides a {javadocs}/org/springframework/pulsar/reactive/core/ReactivePulsarOperations.html#newMessage(T)[fluent builder] to handle more complicated send requests.

Message customization

您可以指定 MessageSpecBuilderCustomizer 来配置传出消息。例如,以下代码演示了如何发送键控消息:

You can specify a MessageSpecBuilderCustomizer to configure the outgoing message. For example, the following code shows how to send a keyed message:

template.newMessage(msg)
    .withMessageCustomizer((mc) -> mc.key("foo-msg-key"))
    .send();

Sender customization

您可以指定 ReactiveMessageSenderBuilderCustomizer 来配置 Pulsar 发送器构建器,它最终构建用来发送传出消息的发送器。

You can specify a ReactiveMessageSenderBuilderCustomizer to configure the underlying Pulsar sender builder that ultimately constructs the sender used to send the outgoing message.

谨慎使用,因为这可以完全访问发送器构建器,并且调用其某些方法(例如 create)可能会产生意想不到的副作用。

Use with caution as this gives full access to the sender builder and invoking some of its methods (such as create) may have unintended side effects.

例如,以下代码显示如何禁用批处理并启用块处理:

For example, the following code shows how to disable batching and enable chunking:

template.newMessage(msg)
    .withSenderCustomizer((sc) -> sc.enableChunking(true).enableBatching(false))
    .send();

此其他示例演示了在向分区主题发布记录时如何使用自定义路由。在发送器构建器上指定您的自定义 MessageRouter 实现,例如:

This other example shows how to use custom routing when publishing records to partitioned topics. Specify your custom MessageRouter implementation on the sender builder such as:

template.newMessage(msg)
    .withSenderCustomizer((sc) -> sc.messageRouter(messageRouter))
    .send();

请注意,在使用 MessageRouter 时,spring.pulsar.producer.message-routing-mode 唯一的有效设置是 custom

Note that, when using a MessageRouter, the only valid setting for spring.pulsar.producer.message-routing-mode is custom.

Specifying Schema Information

@6

Specifying Schema Information

如果你使用 Java 原语类型,框架会为你自动检测架构,且你不需要为发布数据指定任何架构类型。对于非原语类型,如果在 PulsarTemplate 上调用发送操作时未明确指定架构,Spring for Apache Pulsar 框架将尝试根据类型构建 Schema.JSON

If you use Java primitive types, the framework auto-detects the schema for you, and you need not specify any schema types for publishing the data. For non-primitive types, if the Schema is not explicitly specified when invoking send operations on the PulsarTemplate, the Spring for Apache Pulsar framework will try to build a Schema.JSON from the type.

当前支持的复杂模式类型包括 JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES 和 KEY_VALUE 以及 INLINE 编码。

Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, AUTO_PRODUCE_BYTES, and KEY_VALUE w/ INLINE encoding.

Custom Schema Mapping

作为在 PulsarTemplate 上调用发送操作时为复杂类型指定架构的替代方案,可以将架构解析器配置为具有类型的映射。这消除了指定架构的需要,因为框架使用传出消息类型通过解析器进行查询。

As an alternative to specifying the schema when invoking send operations on the PulsarTemplate for complex types, the schema resolver can be configured with mappings for the types. This removes the need to specify the schema as the framework consults the resolver using the outgoing message type.

@3

Configuration properties

Configuration properties

可以使用 spring.pulsar.defaults.type-mappings 属性配置架构映射。以下示例使用 application.ymlUserAddress 复杂对象添加映射,分别使用 AVROJSON 架构:

Schema mappings can be configured with the spring.pulsar.defaults.type-mappings property. The following example uses application.yml to add mappings for the User and Address complex objects using AVRO and JSON schemas, respectively:

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON

message-type 是消息类的限定名。

The message-type is the fully-qualified name of the message class.

Schema resolver customizer

添加映射的首选方法是通过上面提到的属性。但是,如果需要更多控制,则可以提供架构解析器自定义器来添加映射。

The preferred method of adding mappings is via the property mentioned above. However, if more control is needed you can provide a schema resolver customizer to add the mapping(s).

以下示例使用架构解析器自定义器分别为 UserAddress 复杂对象添加使用 AVROJSON 架构的映射:

The following example uses a schema resolver customizer to add mappings for the User and Address complex objects using AVRO and JSON schemas, respectively:

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

@4

Type mapping annotation

Type mapping annotation

为特定消息类型指定要使用的默认架构信息的另一种方法是使用 @PulsarMessage 注释标记消息类。可以将架构信息通过注释的 schemaType 属性指定。

Another option for specifying default schema information to use for a particular message type is to mark the message class with the @PulsarMessage annotation. The schema info can be specified via the schemaType attribute on the annotation.

以下示例将系统配置为使用 JSON 作为在生产或消费 Foo 类型消息时的默认架构:

The following example configures the system to use JSON as the default schema when producing or consuming messages of type Foo:

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

有了此配置,无需设置发送操作时的架构。

With this configuration in place, there is no need to set specify the schema on send operations.

Producing with AUTO_SCHEMA

如果没有办法提前了解 Pulsar 主题的架构类型,您可以使用 {apache-pulsar-docs}/schema-get-started/#auto_produce[AUTO_PRODUCE] 架构来安全地发布原始 JSON 或 Avro 负载作为 byte[]

If there is no chance to know the type of schema of a Pulsar topic in advance, you can use an {apache-pulsar-docs}/schema-get-started/#auto_produce[AUTO_PRODUCE] schema to publish a raw JSON or Avro payload as a byte[] safely.

在这种情况下,生成器会验证出站字节是否与目标主题的架构兼容。

In this case, the producer validates whether the outbound bytes are compatible with the schema of the destination topic.

只需在模板发送操作中指定 Schema.AUTO_PRODUCE_BYTES() 架构,如下例所示:

Simply specify a schema of Schema.AUTO_PRODUCE_BYTES() on your template send operations as shown in the example below:

@7

使用 PulsarTemplate,可以将字节数组(userAsBytes)作为消息发送到 Pulsar。PulsarTemplate 会自动确定消息模式,如果未指定明确模式,则使用自动模式生成字节。通过此方法,可以高效地在 Pulsar 中发送字节数据。

void sendUserAsBytes(PulsarTemplate<byte[]> template, byte[] userAsBytes) {
	template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES());
}

这仅受 Avro 和 JSON 模式类型支持。

This is only supported with Avro and JSON schema types.

ReactivePulsarSenderFactory

ReactivePulsarTemplate 依赖于 ReactivePulsarSenderFactory 来实际创建底层发送器。

The ReactivePulsarTemplate relies on a ReactivePulsarSenderFactory to actually create the underlying sender.

Spring Boot 提供此发件人工厂,可以通过 {spring-boot-pulsar-config-props}[spring.pulsar.producer.*] 应用程序属性对其进行配置。

Spring Boot provides this sender factory which can be configured with any of the {spring-boot-pulsar-config-props}[spring.pulsar.producer.*] application properties.

如果不直接通过发送器工厂 API 指定主题信息,则 ReactivePulsarTemplate 使用的 topic resolution process 与一个例外相同,即“消息类型默认值”步骤为 omitted

If topic information is not specified when using the sender factory APIs directly, the same topic-resolution-process-reactive used by the ReactivePulsarTemplate is used with the one exception that the "Message type default" step is omitted.

Producer Caching

每个底层 Pulsar 生产者都会消耗资源。为了提高性能并避免持续创建生产者,底层 Apache Pulsar 响应式客户端中的 ReactiveMessageSenderCache 会缓存它创建的生产者。它们以 LRU 方式进行缓存,并且在配置的时间段内未被使用时会将它们逐出。

Each underlying Pulsar producer consumes resources. To improve performance and avoid continual creation of producers, the ReactiveMessageSenderCache in the underlying Apache Pulsar Reactive client caches the producers that it creates. They are cached in an LRU fashion and evicted when they have not been used within a configured time period.

可以通过指定 {spring-boot-pulsar-config-props}[spring.pulsar.producer.cache.*] 应用程序属性来配置缓存设置。

You can configure the cache settings by specifying any of the {spring-boot-pulsar-config-props}[spring.pulsar.producer.cache.*] application properties.

Message Consumption

@ReactivePulsarListener

说到 Pulsar 消费者,我们建议终端用户应用程序使用 ReactivePulsarListener 注释。若要使用 ReactivePulsarListener,则需要使用 @EnableReactivePulsar 注释。在使用 Spring Boot 支持时,它会自动启用此注释并配置所有必要的组件,例如消息侦听器基础架构(负责创建底层 Pulsar 消费者)。

When it comes to Pulsar consumers, we recommend that end-user applications use the ReactivePulsarListener annotation. To use ReactivePulsarListener, you need to use the @EnableReactivePulsar annotation. When you use Spring Boot support, it automatically enables this annotation and configures all necessary components, such as the message listener infrastructure (which is responsible for creating the underlying Pulsar consumer).

让我们重新审视我们在快速浏览部分中看到的 ReactivePulsarListener 代码段:

Let us revisit the ReactivePulsarListener code snippet we saw in the quick-tour section:

@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}

侦听器方法返回一个 Mono<Void> 以指示消息是否已成功处理。Mono.empty() 表示成功(确认),Mono.error() 表示失败(否定确认)。

The listener method returns a Mono<Void> to signal whether the message was successfully processed. Mono.empty() indicates success (acknowledgment) and Mono.error() indicates failure (negative acknowledgment).

您还可以进一步简化此方法:

You can also further simplify this method:

@ReactivePulsarListener
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}

在此最基本的表单中,当没有直接提供 topics 时,将使用 [主题解析流程,主题解析流程 - 响应式] 来确定目标主题。同样,当未在 @ReactivePulsarListener 注释上提供 subscriptionName 时,将使用自动生成的订阅名称。

In this most basic form, when the topics are not directly provided, a topic-resolution-process-reactive is used to determine the destination topic. Likewise, when the subscriptionName is not provided on the @ReactivePulsarListener annotation an auto-generated subscription name will be used.

在前面所示的 ReactivePulsarListener 方法中,我们以 String 的形式接收数据,但是我们没有指定任何架构类型。框架内部依赖于 Pulsar 的架构机制将数据转换为所需类型。

In the ReactivePulsarListener method shown earlier, we receive the data as String, but we do not specify any schema types. Internally, the framework relies on Pulsar’s schema mechanism to convert the data to the required type.

框架检测到您期望 String 类型,然后根据该信息推断架构类型,并将该架构提供给消费者。框架对所有基本类型执行此推断。对于所有非基本类型,默认架构假定为 JSON。如果复杂类型使用 JSON 以外的任何内容(例如 AVRO 或 KEY_VALUE),则必须使用 schemaType 属性在注释中提供架构类型。

The framework detects that you expect the String type and then infers the schema type based on that information and provides that schema to the consumer. The framework does this inference for all primitive types. For all non-primitive types the default schema is assumed to be JSON. If a complex type is using anything besides JSON (such as AVRO or KEY_VALUE) you must provide the schema type on the annotation using the schemaType property.

此示例说明了如何从主题消费复杂类型:

This example shows how we can consume complex types from a topic:

@ReactivePulsarListener(topics = "my-topic-2", schemaType = SchemaType.JSON)
Mono<Void> listen(Foo message) {
    System.out.println(message);
    return Mono.empty();
}

让我们看看我们可以消费的更多方法。

Let us look at a few more ways we can consume.

此示例直接消耗 Pulsar 消息:

This example consumes the Pulsar message directly:

@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.apache.pulsar.client.api.Message<String> message) {
    System.out.println(message.getValue());
    return Mono.empty();
}

此示例使用 Spring 消息封套消耗记录:

This example consumes the record wrapped in a Spring messaging envelope:

@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.springframework.messaging.Message<String> message) {
    System.out.println(message.getPayload());
    return Mono.empty();
}

Streaming

以上所有示例都是逐个消耗单个记录的示例。然而,使用 Reactive 的一个令人信服的原因是具有反压支持的流功能。

All of the above are examples of consuming a single record one-by-one. However, one of the compelling reasons to use Reactive is for the streaming capability with backpressure support.

以下示例使用 ReactivePulsarListener 消耗 POJO 流:

The following example uses ReactivePulsarListener to consume a stream of POJOs:

@ReactivePulsarListener(topics = "streaming-1", stream = true)
Flux<MessageResult<Void>> listen(Flux<org.apache.pulsar.client.api.Message<String>> messages) {
    return messages
        .doOnNext((msg) -> System.out.println("Received: " + msg.getValue()))
        .map(MessageResult::acknowledge);

在这里,我们以 Pulsar 消息的 Flux 形式接收记录。此外,要在 ReactivePulsarListener 级别启用流消耗,您需要将注释中的 stream 属性设置为 true

Here we receive the records as a Flux of Pulsar messages. In addition, to enable stream consumption at the ReactivePulsarListener level, you need to set the stream property on the annotation to true.

监听程序方法返回一个 Flux<MessageResult<Void>>,其中每个元素都表示一个已处理消息并保存有消息 ID、值和是否已确认。MessageResult 有一组静态工厂方法,可用于创建适当的 MessageResult 实例。

The listener method returns a Flux<MessageResult<Void>> where each element represents a processed message and holds the message id, value and whether it was acknowledged. The MessageResult has a set of static factory methods that can be used to create the appropriate MessageResult instance.

根据 Flux 中消息的实际类型,框架尝试推断要使用的架构。如果它包含复杂类型,您仍然需要在 ReactivePulsarListener 上提供 schemaType

Based on the actual type of the messages in the Flux, the framework tries to infer the schema to use. If it contains a complex type, you still need to provide the schemaType on ReactivePulsarListener.

以下侦听器使用带有复杂类型的 Spring 消息 Message 封套:

The following listener uses the Spring messaging Message envelope with a complex type :

@ReactivePulsarListener(topics = "streaming-2", stream = true, schemaType = SchemaType.JSON)
Flux<MessageResult<Void>> listen2(Flux<org.springframework.messaging.Message<Foo>> messages) {
    return messages
        .doOnNext((msg) -> System.out.println("Received: " + msg.getPayload()))
        .map(MessageUtils::acknowledge);
}

监听程序方法返回一个 Flux<MessageResult<Void>>,其中每个元素都表示一个已处理消息并保存有消息 ID、值和是否已确认。Spring MessageUtils 有一组静态工厂方法,可用于根据 Spring 消息创建适当的 MessageResult 实例。

The listener method returns a Flux<MessageResult<Void>> where each element represents a processed message and holds the message id, value and whether it was acknowledged. The Spring MessageUtils has a set of static factory methods that can be used to create the appropriate MessageResult instance from a Spring message.

Configuration - Application Properties

侦听器依赖于 ReactivePulsarConsumerFactory`创建和管理它用于消费消息的底层 Pulsar 消费者。Spring Boot 提供此消费者工厂,您可以通过指定 `spring.pulsar.consumer.] application properties. *Most of the configured properties on the factory will be respected in the listener with the following exceptions

The listener relies on the ReactivePulsarConsumerFactory to create and manage the underlying Pulsar consumer that it uses to consume messages. Spring Boot provides this consumer factory which you can further configure by specifying the {spring-boot-pulsar-config-props}[spring.pulsar.consumer.] application properties. *Most of the configured properties on the factory will be respected in the listener with the following exceptions:

spring.pulsar.consumer.subscription.name 属性被忽略,如果在注解上未指定,则生成。

The spring.pulsar.consumer.subscription.name property is ignored and is instead generated when not specified on the annotation.

spring.pulsar.consumer.subscription-type 属性被忽略,相反从注解上的值获取。但是,您可以设置注解上的 subscriptionType = {} 以将属性值作为默认值。

The spring.pulsar.consumer.subscription-type property is ignored and is instead taken from the value on the annotation. However, you can set the subscriptionType = {} on the annotation to instead use the property value as the default.

Generic records with AUTO_CONSUME

如果无法提前知晓 Pulsar 主题的模式类型,则可以使用 AUTO_CONSUME 模式类型来使用常规记录。在这种情况下,主题会使用与主题关联的模式信息将消息反序列化为 GenericRecord 对象。

If there is no chance to know the type of schema of a Pulsar topic in advance, you can use the AUTO_CONSUME schema type to consume generic records. In this case, the topic deserializes messages into GenericRecord objects using the schema info associated with the topic.

要消耗通用记录,请在 @ReactivePulsarListener 上设置 schemaType = SchemaType.AUTO_CONSUME,并使用类型为 GenericRecord 的 Pulsar 消息作为消息参数,如下所示。

To consume generic records set the schemaType = SchemaType.AUTO_CONSUME on your @ReactivePulsarListener and use a Pulsar message of type GenericRecord as the message parameter as shown below.

@ReactivePulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
Mono<Void> listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
    GenericRecord record = message.getValue();
    record.getFields().forEach((f) ->
            System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
	return Mono.empty();
}

GenericRecord API 允许访问字段及其关联的值

The GenericRecord API allows access to the fields and their associated values

Consumer Customization

您可以指定一个 ReactivePulsarListenerMessageConsumerBuilderCustomizer 来配置底层 Pulsar 消费者构建器,该构建器最终构建侦听器用于接收消息的消费者。

You can specify a ReactivePulsarListenerMessageConsumerBuilderCustomizer to configure the underlying Pulsar consumer builder that ultimately constructs the consumer used by the listener to receive the messages.

请谨慎使用,因为它会完全访问使用者生成器,并且调用其中某些方法(如 create)可能会有意想不到的副作用。

Use with caution as this gives full access to the consumer builder and invoking some of its methods (such as create) may have unintended side effects.

例如,以下代码说明如何将订阅的初始位置设置为主题上的最早消息。

For example, the following code shows how to set the initial position of the subscription to the earliest messaage on the topic.

@ReactivePulsarListener(topics = "hello-pulsar-topic", consumerCustomizer = "myConsumerCustomizer")
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}

@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myConsumerCustomizer() {
    return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}

如果你的应用程序只注册了单个 @ReactivePulsarListener 和单个 ReactivePulsarListenerMessageConsumerBuilderCustomizer Bean,那么自定义项将自动应用。

If your application only has a single @ReactivePulsarListener and a single ReactivePulsarListenerMessageConsumerBuilderCustomizer bean registered then the customizer will be automatically applied.

您还可以使用定制器向消费者构建器提供直接的 Pulsar 消费者属性。如果您不想使用前面提到的 Boot 配置属性或具有配置不同的多个 ReactivePulsarListener 方法,这会很方便。

You can also use the customizer to provide direct Pulsar consumer properties to the consumer builder. This is convenient if you do not want to use the Boot configuration properties mentioned earlier or have multiple ReactivePulsarListener methods whose configuration varies.

以下定制器示例使用直接 Pulsar 消费者属性:

The following customizer example uses direct Pulsar consumer properties:

@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> directConsumerPropsCustomizer() {
    return b -> b.property("subscriptionName", "subscription-1").property("topicNames", "foo-1");
}

使用的属性是直接 Pulsar 消费者属性,不是 spring.pulsar.consumer Spring Boot 配置属性

The properties used are direct Pulsar consumer properties, not the spring.pulsar.consumer Spring Boot configuration properties

@1

Specifying Schema Information

Specifying Schema Information

如前所述,对于 Java 原语,Spring for Apache Pulsar 框架可以推断出在 PulsarListener 上使用的正确模式。对于非原语类型,如果在注解中未明确指定 Schema,Spring for Apache Pulsar 框架将尝试根据类型构建 Schema.JSON

As indicated earlier, for Java primitives, the Spring for Apache Pulsar framework can infer the proper Schema to use on the PulsarListener. For non-primitive types, if the Schema is not explicitly specified on the annotation, the Spring for Apache Pulsar framework will try to build a Schema.JSON from the type.

当前支持的复杂模式类型包括 JSON、AVRO、PROTOBUF、AUTO_CONSUME、KEY_VALUE 和 INLINE 编码。

Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, AUTO_CONSUME, KEY_VALUE w/ INLINE encoding.

Custom Schema Mapping

作为在 PulsarListener 上为复杂类型指定架构的替代方案,可以使用映射为类型的架构解析器进行配置。这消除了在侦听器上设置架构的需要,因为框架使用传入消息类型通过解析器进行查询。

As an alternative to specifying the schema on the PulsarListener for complex types, the schema resolver can be configured with mappings for the types. This removes the need to set the schema on the listener as the framework consults the resolver using the incoming message type.

@3

Configuration properties

Configuration properties

可以使用 spring.pulsar.defaults.type-mappings 属性配置架构映射。以下示例使用 application.ymlUserAddress 复杂对象添加映射,分别使用 AVROJSON 架构:

Schema mappings can be configured with the spring.pulsar.defaults.type-mappings property. The following example uses application.yml to add mappings for the User and Address complex objects using AVRO and JSON schemas, respectively:

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON

message-type 是消息类的限定名。

The message-type is the fully-qualified name of the message class.

Schema resolver customizer

添加映射的首选方法是通过上面提到的属性。但是,如果需要更多控制,则可以提供架构解析器自定义器来添加映射。

The preferred method of adding mappings is via the property mentioned above. However, if more control is needed you can provide a schema resolver customizer to add the mapping(s).

以下示例使用架构解析器自定义器分别为 UserAddress 复杂对象添加使用 AVROJSON 架构的映射:

The following example uses a schema resolver customizer to add mappings for the User and Address complex objects using AVRO and JSON schemas, respectively:

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

@4

Type mapping annotation

Type mapping annotation

为特定消息类型指定要使用的默认架构信息的另一种方法是使用 @PulsarMessage 注释标记消息类。可以将架构信息通过注释的 schemaType 属性指定。

Another option for specifying default schema information to use for a particular message type is to mark the message class with the @PulsarMessage annotation. The schema info can be specified via the schemaType attribute on the annotation.

以下示例将系统配置为使用 JSON 作为在生产或消费 Foo 类型消息时的默认架构:

The following example configures the system to use JSON as the default schema when producing or consuming messages of type Foo:

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

实施此配置后,无需在监听器上设置模式,例如:

With this configuration in place, there is no need to set the schema on the listener, for example:

@5

@PulsarListener(subscriptionName = "user-sub", topics = "user-topic")
public void listen(User user) {
    System.out.println(user);
}

Message Listener Container Infrastructure

在大多数情况下,我们建议直接使用 ReactivePulsarListener 注释从 Pulsar 主题消费,因为该模型涵盖了广泛的应用程序用例。但是,了解 ReactivePulsarListener 在内部如何工作非常重要。

In most scenarios, we recommend using the ReactivePulsarListener annotation directly for consuming from a Pulsar topic as that model covers a broad set of application use cases. However, it is important to understand how ReactivePulsarListener works internally.

当您将 Spring 用于 Apache Pulsar 时,消息侦听器容器是消息消费的核心。ReactivePulsarListener 在幕后使用消息侦听器容器基础架构创建和管理底层 Pulsar 消费者。

The message listener container is at the heart of message consumption when you use Spring for Apache Pulsar. The ReactivePulsarListener uses the message listener container infrastructure behind the scenes to create and manage the underlying Pulsar consumer.

ReactivePulsarMessageListenerContainer

此消息侦听器容器的契约通过 ReactivePulsarMessageListenerContainer 提供,其默认实现创建一个反应式 Pulsar 消费者,并通过一个使用创建的消费者的反应式消息管道进行连接。

The contract for this message listener container is provided through ReactivePulsarMessageListenerContainer whose default implementation creates a reactive Pulsar consumer and wires up a reactive message pipeline that uses the created consumer.

ReactiveMessagePipeline

该管道是 Apache Pulsar 反应式客户端的基础功能,它以反应式方式接收数据,然后将其传递给提供的消息处理器。反应式消息侦听器容器实现相当简单,因为该管道处理了大部分工作。

The pipeline is a feature of the underlying Apache Pulsar Reactive client which does the heavy lifting of receiving the data in a reactive manner and then handing it over to the provided message handler. The reactive message listener container implementation is much simpler because the pipeline handles the majority of the work.

ReactivePulsarMessageHandler

“侦听器”方面由 ReactivePulsarMessageHandler 提供,ReactivePulsarMessageHandler 有两种提供的实现:

The "listener" aspect is provided by the ReactivePulsarMessageHandler of which there are two provided implementations:

  • ReactivePulsarOneByOneMessageHandler - handles a single message one-by-one

  • ReactivePulsarStreamingHandler - handles multiple messages via a Flux

如果在直接使用侦听器容器时未指定主题信息,则会将 ReactivePulsarListener 使用的相同 topic resolution process 与“消息类型默认”步骤的唯一例外 omitted 一起使用。

If topic information is not specified when using the listener containers directly, the same topic-resolution-process-reactive used by the ReactivePulsarListener is used with the one exception that the "Message type default" step is omitted.

Concurrency

在流模式下使用记录时(stream = true),并发性自然地通过客户端实现中的底层反应式支持而来。

When consuming records in streaming mode (stream = true) concurrency comes naturally via the underlying Reactive support in the client implementation.

但是,当逐个处理消息时,可以指定并发性以增加处理吞吐量。只需在 @ReactivePulsarListener 上设置 concurrency 属性。此外,当 concurrency > 1 时,可以通过在注释上设置 useKeyOrderedProcessing = "true" 来确保消息按键排序,因此发送到同一处理器。

However, when handling messages one-by-one, concurrency can be specified to increase processing throughput. Simply set the concurrency property on @ReactivePulsarListener. Additionally, when concurrency > 1 you can ensure messages are ordered by key and therefore sent to the same handler by setting useKeyOrderedProcessing = "true" on the annotation.

同样,ReactiveMessagePipeline 也会做大量的工作,我们只需在它上面设置属性即可。

Again, the ReactiveMessagePipeline does the heavy lifting, we simply set the properties on it.

[role="small"][.small]Reactive vs Imperative

反应式容器中的并发性不同于它的命令式对应项。后者创建多个线程(每个线程都有一个 Pulsar 消费者),而前者同时将消息分派到反应式并行调度器上的多个处理器实例。

Concurrency in the reactive container is different from its imperative counterpart. The latter creates multiple threads (each with a Pulsar consumer) whereas the former dispatches the messages to multiple handler instances concurrently on the Reactive parallel scheduler.

反应式并发性模型的一个优点是,它可以与“独占”订阅一起使用,而命令式并发性模型则不行。

One advantage of the reactive concurrency model is that it can be used with Exclusive subscriptions whereas the imperative concurrency model can not.

Pulsar Headers

Pulsar 消息元数据可以作为 Spring 消息头被使用。可用的标头列表可以在{github}/blob/main/spring-pulsar/src/main/java/org/springframework/pulsar/support/PulsarHeaders.java[PulsarHeaders.java]中找到。

The Pulsar message metadata can be consumed as Spring message headers. The list of available headers can be found in {github}/blob/main/spring-pulsar/src/main/java/org/springframework/pulsar/support/PulsarHeaders.java[PulsarHeaders.java].

Accessing In OneByOne Listener

以下示例显示如何在使用逐个消息侦听器时访问 Pulsar 头信息:

The following example shows how you can access Pulsar Headers when using a one-by-one message listener:

@ReactivePulsarListener(topics = "some-topic")
Mono<Void> listen(String data,
        @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
        @Header("foo") String foo) {
    System.out.println("Received " + data + " w/ id=" + messageId + " w/ foo=" + foo);
    return Mono.empty();
}

在前面的示例中,我们访问 messageId 消息元数据的值以及名为 foo 的自定义消息属性。Spring @Header 注释用于每个标头字段。

In the preceding example, we access the values for the messageId message metadata as well as a custom message property named foo. The Spring @Header annotation is used for each header field.

您还可以将 Pulsar 的 Message 用作承载负载的信封。在此过程中,用户可以直接调用 Pulsar 消息上的对应方法来检索元数据。但是,为了方便起见,您也可以使用 Header 注释来检索它。请注意,您还可以使用 Spring 消息 Message 信封来承载负载,然后通过 @Header 检索 Pulsar 头。

You can also use Pulsar’s Message as the envelope to carry the payload. When doing so, the user can directly call the corresponding methods on the Pulsar message for retrieving the metadata. However, as a convenience, you can also retrieve it by using the Header annotation. Note that you can also use the Spring messaging Message envelope to carry the payload and then retrieve the Pulsar headers by using @Header.

Accessing In Streaming Listener

在使用流消息侦听器时,标头支持有限。仅当 Flux 包含 Spring org.springframework.messaging.Message 元素时,才会填充标头。此外,Spring @Header 注释不能用于检索数据。您必须直接调用 Spring 消息上的相应方法来检索数据。

When using a streaming message listener the header support is limited. Only when the Flux contains Spring org.springframework.messaging.Message elements will the headers be populated. Additionally, the Spring @Header annotation can not be used to retrieve the data. You must directly call the corresponding methods on the Spring message to retrieve the data.

Message Acknowledgment

该框架会自动处理消息确认。但是,侦听器方法必须发送一个信号来指示消息是否已成功处理。然后,容器实现使用该信号来执行确认或否定操作。这与其命令式对应项略有不同,命令式对应项中的信号被暗示为肯定,除非该方法引发异常。

The framework automatically handles message acknowledgement. However, the listener method must send a signal indicating whether the message was successfully processed. The container implementation then uses that signal to perform the ack or nack operation. This is a slightly different from its imperative counterpart where the signal is implied as positive unless the method throws an exception.

OneByOne Listener

单个消息(又名 OneByOne)消息侦听器方法返回一个 Mono<Void> 来表示该消息是否已成功处理。 Mono.empty() 表示成功(确认),而 Mono.error() 表示失败(否定确认)。

The single message (aka OneByOne) message listener method returns a Mono<Void> to signal whether the message was successfully processed. Mono.empty() indicates success (acknowledgment) and Mono.error() indicates failure (negative acknowledgment).

Streaming Listener

流侦听器方法返回一个 Flux<MessageResult<Void>>,其中每个 MessageResult 元素表示已处理的消息并保存消息 ID、值以及是否确认。MessageResult 有一组 acknowledgenegativeAcknowledge 静态工厂方法,可用于创建适当的 MessageResult 实例。

The streaming listener method returns a Flux<MessageResult<Void>> where each MessageResult element represents a processed message and holds the message id, value and whether it was acknowledged. The MessageResult has a set of acknowledge and negativeAcknowledge static factory methods that can be used to create the appropriate MessageResult instance.

Message Redelivery and Error Handling

Apache Pulsar 为消息重新传递和错误处理提供了多种原生策略。我们将了解它们,并了解如何通过 Spring for Apache Pulsar 使用它们。

Apache Pulsar provides various native strategies for message redelivery and error handling. We will take a look at them and see how to use them through Spring for Apache Pulsar.

Acknowledgment Timeout

默认情况下,Pulsar 消费者不会重新投递消息,除非消费者崩溃,但您可以通过在 Pulsar 消费者上设置确认超时来更改此行为。如果确认超时属性的值大于零,并且 Pulsar 消费者在该超时时间内未确认消息,则会重新投递消息。

By default, Pulsar consumers do not redeliver messages unless the consumer crashes, but you can change this behavior by setting an ack timeout on the Pulsar consumer. If the ack timeout property has a value above zero and if the Pulsar consumer does not acknowledge a message within that timeout period, the message is redelivered.

您可以通过 reactive-consumer-customizer 直接指定此属性作为 Pulsar 消费者属性,例如:

You can specify this property directly as a Pulsar consumer property via a reactive-consumer-customizer such as:

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
    return b -> b.property("ackTimeout", "60s");
}

Negative Acknowledgment Redelivery Delay

否定确认时,Pulsar 消费者可让您指定应用程序希望如何重新传递消息。默认情况下,在一分钟内重新传递消息,但您可以通过 reactive-consumer-customizer 改变它,例如:

When acknowledging negatively, Pulsar consumer lets you specify how the application wants the message to be re-delivered. The default is to redeliver the message in one minute, but you can change it via a reactive-consumer-customizer such as:

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
    return b -> b.property("negativeAckRedeliveryDelay", "10ms");
}

Dead Letter Topic

Apache Pulsar 允许应用程序在具有`Shared`订阅类型的消费者上使用死信主题。对于`Exclusive`和`Failover`订阅类型,此功能不可用。基本思想是,如果某条消息被重试了一定次数(可能是由于确认超时或 Nack 重新投递),一旦重试次数用尽,消息就可以发送到一个名为死信队列 (DLQ) 的特殊主题中。让我们通过检查一些代码段来了解该功能的一些细节:

Apache Pulsar lets applications use a dead letter topic on consumers with a Shared subscription type. For the Exclusive and Failover subscription types, this feature is not available. The basic idea is that, if a message is retried a certain number of times (maybe due to an ack timeout or nack redelivery), once the number of retries are exhausted, the message can be sent to a special topic called the dead letter queue (DLQ). Let us see some details around this feature in action by inspecting some code snippets:

@Configuration(proxyBeanMethods = false)
class DeadLetterPolicyConfig {

    @ReactivePulsarListener(
            topics = "topic-with-dlp",
            subscriptionType = SubscriptionType.Shared,
            deadLetterPolicy = "myDeadLetterPolicy",
            consumerCustomizer = "ackTimeoutCustomizer" )
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @ReactivePulsarListener(topics = "my-dlq-topic")
    void listenDlq(String msg) {
        System.out.println("From DLQ: " + msg);
    }

    @Bean
    DeadLetterPolicy myDeadLetterPolicy() {
        return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
    }

    @Bean
    ReactiveMessageConsumerBuilderCustomizer<String> ackTimeoutCustomizer() {
        return b -> b.property("ackTimeout", "1s");
    }
}

首先,我们有一个 DeadLetterPolicy 的特殊 Bean,它命名为 deadLetterPolicy(可以根据需要取任何名称)。此 Bean 指定了多项内容,例如最大传递次数(本例中为 10)以及死信主题名称 — 本例中为 my-dlq-topic。如果您未指定 DLQ 主题名称,它在 Pulsar 中默认为 <topicname>-<subscriptionname>-DLQ。接下来,我们将此 Bean 名称提供给 ReactivePulsarListener,方法是设置 deadLetterPolicy 属性。请注意,ReactivePulsarListener 的订阅类型为 “共享”,因为 DLQ 特性仅适用于共享订阅。此代码主要用于演示目的,因此我们提供了 1 秒的 ackTimeout 值。这个想法是,代码引发异常,如果 Pulsar 在 1 秒内没有收到确认,它就会重试。如果该循环持续十次(因为这是 DeadLetterPolicy 中的最大重新传递次数),Pulsar 消费者就会将消息发布到 DLQ 主题。我们还有另一个 ReactivePulsarListener 监听 DLQ 主题,以便在数据发布到 DLQ 主题时接收数据。

First, we have a special bean for DeadLetterPolicy, and it is named as deadLetterPolicy (it can be any name as you wish). This bean specifies a number of things, such as the max delivery (10, in this case) and the name of the dead letter topic — my-dlq-topic, in this case. If you do not specify a DLQ topic name, it defaults to <topicname>-<subscriptionname>-DLQ in Pulsar. Next, we provide this bean name to ReactivePulsarListener by setting the deadLetterPolicy property. Note that the ReactivePulsarListener has a subscription type of Shared, as the DLQ feature only works with shared subscriptions. This code is primarily for demonstration purposes, so we provide an ackTimeout value of 1 second. The idea is that the code throws the exception and, if Pulsar does not receive an ack within 1 second, it does a retry. If that cycle continues ten times (as that is our max redelivery count in the DeadLetterPolicy), the Pulsar consumer publishes the messages to the DLQ topic. We have another ReactivePulsarListener that listens on the DLQ topic to receive data as it is published to the DLQ topic.

Special note on DLQ topics when using partitioned topics

如果主主题被分区,在后台,Pulsar 将每个分区视为单独主题。Pulsar 会将`partition-<n>`附加到主主题名称,其中`n`代表分区号。问题在于,如果您没有指定 DLQ 主题(与我们上面所做相反),Pulsar 会发布到默认主题名称,其中包含此 `partition-<n> 信息——例如:topic-with-dlp-partition-0-deadLetterPolicySubscription-DLQ。解决此问题的简单方法是始终提供 DLQ 主题名称。

If the main topic is partitioned, behind the scenes, each partition is treated as a separate topic by Pulsar. Pulsar appends partition-<n>, where n stands for the partition number to the main topic name. The problem is that, if you do not specify a DLQ topic (as opposed to what we did above), Pulsar publishes to a default topic name that has this `partition-<n> info in it — for example: topic-with-dlp-partition-0-deadLetterPolicySubscription-DLQ. The easy way to solve this is to provide a DLQ topic name always.

Pulsar Reader Support

该框架支持以响应方式使用 {apache-pulsar-docs}/concepts-clients/#reader-interface[Pulsar Reader],方法是通过 ReactivePulsarReaderFactory

The framework provides support for using {apache-pulsar-docs}/concepts-clients/#reader-interface[Pulsar Reader] in a Reactive fashion via the ReactivePulsarReaderFactory.

Spring Boot 提供此阅读器工厂,可以通过 {spring-boot-pulsar-config-props}[spring.pulsar.reader.*] 应用程序属性对其进行配置。

Spring Boot provides this reader factory which can be configured with any of the {spring-boot-pulsar-config-props}[spring.pulsar.reader.*] application properties.

Topic Resolution

@2

User specified

在生成或使用消息时需要目标主题。框架按以下顺序查找主题,以便确定主题(在首次找到时停止):

A destination topic is needed when producing or consuming messages. The framework looks in the following ordered locations to determine a topic (stopping at the first find):

  • User specified

  • Message type default

  • Global default

当通过其中一种默认机制找到主题时,无需在生成或使用 API 时指定主题。

When a topic is found via one of the default mechanisms, there is no need to specify the topic on the produce or consume API.

当未找到主题时,API 将相应抛出一个异常。

When a topic is not found, the API will throw an exception accordingly.

User specified

传递给所用 API 的主题具有最高优先级(例如,PulsarTemplate.send("my-topic", myMessage)@PulsarListener(topics = "my-topic")。

A topic passed into the API being used has the highest precedence (eg. PulsarTemplate.send("my-topic", myMessage) or @PulsarListener(topics = "my-topic").

Message type default

当未将主题传递给 API 时,系统会查找为正在生成或消费的消息类型配置到主题的映射。

When no topic is passed into the API, the system looks for a message type to topic mapping configured for the type of the message being produced or consumed.

可以使用 spring.pulsar.defaults.type-mappings 属性配置映射。以下示例使用 application.yml 为在消费或生成 FooBar 消息时使用默认主题配置:

Mappings can be configured with the spring.pulsar.defaults.type-mappings property. The following example uses application.yml to configure default topics to use when consuming or producing Foo or Bar messages:

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.Foo
          topic-name: foo-topic
        - message-type: com.acme.Bar
          topic-name: bar-topic

message-type 是消息类的限定名。

The message-type is the fully-qualified name of the message class.

如果消息(或 Publisher 输入的第一个消息)是 null,框架将无法从中确定主题。如果您的应用程序可能发送 null 消息,则应使用另一种方法指定主题。

If the message (or the first message of a Publisher input) is null, the framework won’t be able to determine the topic from it. Another method shall be used to specify the topic if your application is likely to send null messages.

Specified via annotation

当未将主题传递给 API 并且未配置自定义主题映射时,系统会在正在生成或消费的消息的类上查找 @PulsarMessage 批注。可以在批注上的 topic 特性中指定默认主题。

When no topic is passed into the API and there are no custom topic mappings configured, the system looks for a @PulsarMessage annotation on the class of the message being produced or consumed. The default topic can be specified via the topic attribute on the annotation.

以下示例配置在生成或消费 Foo 类型的消息时使用的默认主题:

The following example configures the default topic to use when producing or consuming messages of type Foo:

@PulsarMessage(topic = "foo-topic")
record Foo(String value) {
}

@PulsarMessage 批注中支持属性占位符和 SpEL 表达式,例如:

Property placeholders and SpEL expressions are supported in the @PulsarMessage annotation, for example:

@PulsarMessage(topic = "${app.topics.foo}")
record Foo(String value) {
}

@PulsarMessage(topic = "#{someBean.getTopic()}")
record Bar(String value) {
}

Custom topic resolver

添加映射的首选方法是通过上述属性。然而,如果需要更多控制,可以通过提供自己的实现来替换默认解析器,例如:

The preferred method of adding mappings is via the property mentioned above. However, if more control is needed you can replace the default resolver by proving your own implementation, for example:

@Bean
public MyTopicResolver topicResolver() {
	return new MyTopicResolver();
}

Producer global default

在(生成时)咨询的最终位置是系统范围的生成器默认主题。使用命令式 API 时,它通过 spring.pulsar.producer.topic-name 属性配置,使用反应式 API 时,通过 spring.pulsar.reactive.sender.topic-name 属性配置。

The final location consulted (when producing) is the system-wide producer default topic. It is configured via the spring.pulsar.producer.topic-name property when using the imperative API and the spring.pulsar.reactive.sender.topic-name property when using the reactive API.

Consumer global default

在(消费时)咨询的最终位置是系统范围的消费者默认主题。使用命令式 API 时,它通过 spring.pulsar.consumer.topicsspring.pulsar.consumer.topics-pattern 属性配置,使用反应式 API 时,它通过 spring.pulsar.reactive.consumer.topicsspring.pulsar.reactive.consumer.topics-pattern 属性中的一个属性配置。

The final location consulted (when consuming) is the system-wide consumer default topic. It is configured via the spring.pulsar.consumer.topics or spring.pulsar.consumer.topics-pattern property when using the imperative API and one of the spring.pulsar.reactive.consumer.topics or spring.pulsar.reactive.consumer.topics-pattern property when using the reactive API.