Spring Cloud Stream Binder for Apache Pulsar

针对 Apache Pulsar 的 Spring 提供了一个针对 Spring Cloud Stream 的 Binder,我们可以使用该 Binder 来构建使用发布订阅范式的事件驱动的微服务。在本节中,我们将介绍这个 Binder 的基本信息。

Spring for Apache Pulsar provides a binder for Spring Cloud Stream that we can use to build event-driven microservices using pub-sub paradigms. In this section, we will go through the basic details of this binder.

Usage

我们需要在你自己的应用程序中包含下面的依赖项,以便针对 Spring Cloud Stream 使用 Apache Pulsar Binder。

We need to include the following dependency on your application to use Apache Pulsar binder for Spring Cloud Stream.

  • Maven

  • Gradle

<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-spring-cloud-stream-binder</artifactId>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.pulsar:spring-pulsar-spring-cloud-stream-binder'
}

Overview

用于 Apache Pulsar 的 Spring Cloud Stream binder 允许应用程序专注于业务逻辑,而不是处理管理和维护 Pulsar 的低级细节。binder 为应用程序开发人员处理所有这些细节。Spring Cloud Stream 带来了基于 Spring Cloud Function 的强大编程模型,允许应用程序开发人员使用函数式风格编写复杂的事件驱动型应用程序。应用程序可以从中间件中立的方式开始,然后通过 Spring Boot 配置属性将 Pulsar 主题映射为 Spring Cloud Stream 中的目的地。Spring Cloud Stream 构建在 Spring Boot 之上,在使用 Spring Cloud Stream 编写事件驱动的微服务时,您本质上是在编写 Boot 应用程序。以下是一个简单的 Spring Cloud Stream 应用程序。

The Spring Cloud Stream binder for Apache Pulsar allows the applications to focus on business logic rather than dealing with the lower-level details of managing and maintaining Pulsar. The binder takes care of all those details for the application developer. Spring Cloud Stream brings a powerful programming model based on Spring Cloud Function that allows the app developer to write complex event-driven applications using a functional style. Applications can start from a middleware-neutral manner and then map Pulsar topics as destinations in Spring Cloud Stream through Spring Boot configuration properties. Spring Cloud Stream is built on top of Spring Boot, and when writing an event-driven microservice using Spring Cloud Stream, you are essentially writing a Boot application. Here is a straightforward Spring Cloud Stream application.

@SpringBootApplication
public class SpringPulsarBinderSampleApp {

	private final Logger logger = LoggerFactory.getLogger(this.getClass());

	public static void main(String[] args) {
		SpringApplication.run(SpringPulsarBinderSampleApp.class, args);
	}

	@Bean
	public Supplier<Time> timeSupplier() {
		return () -> new Time(String.valueOf(System.currentTimeMillis()));
	}

	@Bean
	public Function<Time, EnhancedTime> timeProcessor() {
		return (time) -> {
			EnhancedTime enhancedTime = new EnhancedTime(time, "5150");
			this.logger.info("PROCESSOR: {} --> {}", time, enhancedTime);
			return enhancedTime;
		};
	}

	@Bean
	public Consumer<EnhancedTime> timeLogger() {
		return (time) -> this.logger.info("SINK:      {}", time);
	}

	record Time(String time) {
	}

	record EnhancedTime(Time time, String extra) {
	}

}

上面的示例应用程序是一个完全成熟的 Spring Boot 应用程序,需要一些解释。但是,在第一次尝试时,你可以看到它只是一些普通的 Java 代码和一些 Spring 和 Spring Boot 注释。我们这里有三个 Bean 方法 - 一个 java.util.function.Supplier,一个 java.util.function.Function,最后是一个 java.util.function.Consumer。供应商生成当前时间(以毫秒为单位),该函数接收此时间,然后通过添加一些随机数据来增强它,然后消费者记录增强的的时间。

The above sample application, a full-blown Spring Boot application, deserves a few explanations. However, on a first pass, you can see that this is just plain Java and a few Spring and Spring Boot annotations. We have three Bean methods here - a java.util.function.Supplier, a java.util.function.Function, and finally, a java.util.function.Consumer. The supplier produces the current time in milliseconds, the function takes this time and then enhances it by adding some random data, and then the consumer logs the enhanced time.

为了简洁起见,我们忽略了所有导入,但整个应用程序中没有特定于 Spring Cloud Stream 的内容。它如何成为一个与 Apache Pulsar 交互的 Spring Cloud Stream 应用程序?你必须在应用程序中包含针对该 Binder 的上面的依赖项。在添加该依赖项后,你必须提供以下配置属性。

We omitted all the imports for brevity, but nothing Spring Cloud Stream specific in the entire application. How does it become a Spring Cloud Stream application that interacts with Apache Pulsar? You must include the above dependency for the binder in the application. Once that dependency is added, you must provide the following configuration properties.

spring:
  cloud:
    function:
      definition: timeSupplier;timeProcessor;timeLogger;
    stream:
      bindings:
        timeProcessor-in-0:
          destination: timeSupplier-out-0
        timeProcessor-out-0:
          destination: timeProcessor-out-0
        timeLogger-in-0:
          destination: timeProcessor-out-0

通过此,以上 Spring 启动应用程序已经成为基于 Spring Cloud Stream 的端到端事件驱动应用程序。由于我们在 classpath 中有 Pulsar 绑定程序,应用程序与 Apache Pulsar 交互。如果应用程序中只有单一函数,那么我们无须告诉 Spring Cloud Stream 激活此函数进行执行,因为 Spring Cloud Stream 默认会执行此操作。如果应用程序中有多个此类函数,如我们示例中一样,我们需要告知 Spring Cloud Stream 我们希望激活哪些函数。在我们的用例中,我们需要激活所有函数,然后我们通过 spring.cloud.function.definition 属性进行此操作。Bean 名称默认成为 Spring Cloud Stream 绑定名称的一部分。绑定是 Spring Cloud Stream 中一个基本抽象概念,框架使用此概念与中间件目的地通信。Spring Cloud Stream 执行的几乎所有操作都会通过一个具体绑定进行。供应商仅有一个输出绑定;函数有输入和输出绑定,而使用者仅有输入绑定。让我们以供应商 bean 为例 - timeSupplier. 此供应商的默认绑定名称将是 timeSupplier-out-0。类似地,timeProcessor 函数的默认绑定名称在入站时为 timeProcessor-in-0,在出站时为 timeProcessor-out-0。有关如何更改默认绑定名称的详细信息,请参阅 Spring Cloud Stream 参考文档。在大多数情况下,使用默认绑定名称就足够了。如上所示,我们在绑定名称上设置目的地。如果未提供目的地,则绑定名称将成为目的地的值,如 timeSupplier-out-0 中所示。

With this, the above Spring Boot application has become an end-to-end event-driven application based on Spring Cloud Stream. Because we have the Pulsar binder on the classpath, the application interacts with Apache Pulsar. If there is only one function in the application, then we don’t need to tell Spring Cloud Stream to activate the function for execution since it does that by default. If there is more than one such function in the application, as in our example, we need to instruct Spring Cloud Stream which functions we would like to activate. In our case, we need all of them to be activated, and we do that through the spring.cloud.function.definition property. The bean name becomes part of the Spring Cloud Stream binding name by default. A binding is a fundamentally abstract concept in Spring Cloud Stream, using which the framework communicates with the middleware destination. Almost everything that Spring Cloud Stream does occurs over a concrete binding. A supplier has only an output binding; functions have input and output bindings, and consumers have only input binding. Let’s take as an example our supplier bean - timeSupplier. The default binding name for this supplier will be timeSupplier-out-0. Similarly, the default binding names for the timeProcessor function will be timeProcessor-in-0 on the inbound and timeProcessor-out-0 on the outbound. Please refer to the Spring Cloud Stream reference docs for details on how you can change the default binding names. In most situations, using the default binding names is enough. We set the destination on the binding names, as shown above. If a destination is not provided, the binding name becomes the value for the destination as in the case of timeSupplier-out-0.

运行以上应用程序时,您应该会看到供应商每秒执行一次,然后由函数使用并增强记录程序使用者使用的时间。

When running the above app, you should see that the supplier executes every second, which is then consumed by the function and enhances the time consumed by the logger consumer.

Message Conversion in Binder-based Applications

在上例应用程序中,我们没有提供用于消息转换的模式信息。这是因为在默认情况下,Spring Cloud Stream 使用消息转换机制,该机制使用通过 Spring Messaging 项目在 Spring 框架中建立的消息传递支持。除非另行指定,Spring Cloud Stream 在入站和出站绑定上将 application/json 用作消息转换的 content-type。在出站中,数据被序列化为 byte[],,然后 Pulsar binder 使用 Schema.BYTES 通过网络将其发送到 Pulsar 主题。类似地,在入站中,数据以 byte[] 的形式从 Pulsar 主题中使用,然后使用正确的消息转换器将其转换为目标类型。

In the above sample application, we provided no schema information for message conversion. That is because, by default, Spring Cloud Stream uses its message conversion mechanism using the messaging support established in Spring Framework through the Spring Messaging project. Unless specified, Spring Cloud Stream uses application/json as the content-type for message conversion on both inbound and outbound bindings. On the outbound, the data is serialized as byte[], and the Pulsar binder then uses Schema.BYTES to send it over the wire to the Pulsar topic. Similarly, on the inbound, the data is consumed as byte[] from the Pulsar topic and then converted into the target type using the proper message converter.

Using Native Conversion in Pulsar using Pulsar Schema

虽然默认使用框架提供的消息转换,但 Spring Cloud Stream 允许每个绑定程序确定消息的转换方式。假设应用程序选择采用此方式。在那种情况下,Spring Cloud Stream 完全不使用任何 Spring 提供的消息转换工具,并传递其接收或生成的数据。Spring Cloud Stream 中的此功能称为生产方端的原生编码和消费者端的原生解码。这意味着编码和解码将在目标中间件中以原生方式进行,在本例中,在 Apache Pulsar 中进行。对于以上应用程序,我们可以使用以下配置来绕过框架转换并使用原生编码和解码。

Although the default is to use the framework-provided message conversion, Spring Cloud Stream allows each binder to determine how the message should be converted. Suppose the application chooses to go this route. In that case, Spring Cloud Stream steers clear of using any Spring-provided message conversion facility and passes around the data it receives or produces. This feature in Spring Cloud Stream is known as native encoding on the producer side and native decoding on the consumer side. This means that the encoding and decoding natively occur on the target middleware, in our case, on Apache Pulsar. For the above application, we can use the following configuration to bypass the framework conversion and uses native encoding and decoding.

spring:
  cloud:
    stream:
      bindings:
        timeSupplier-out-0:
          producer:
            use-native-encoding: true
        timeProcessor-in-0:
          destination: timeSupplier-out-0
          consumer:
            use-native-decoding: true
        timeProcessor-out-0:
          destination: timeProcessor-out-0
          producer:
            use-native-encoding: true
        timeLogger-in-0:
          destination: timeProcessor-out-0
          consumer:
            use-native-decoding: true
      pulsar:
        bindings:
          timeSupplier-out-0:
            producer:
              schema-type: JSON
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
          timeProcessor-in-0:
            consumer:
              schema-type: JSON
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
          timeProcessor-out-0:
            producer:
              schema-type: AVRO
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
          timeLogger-in-0:
            consumer:
              schema-type: AVRO
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime

在生产方端启用原生编码的属性是核心 Spring Cloud Stream 中的一个绑定级别属性。您将它设置在生产方绑定上 - spring.cloud.stream.bindings.<binding-name>.producer.use-native-encoding,并将其设置为 true. 类似地,对于使用者绑定,使用 - spring.cloud.stream.bindings.<binding-name>.consumer.user-native-decoding 并将其设置为 true. 如果我们决定使用原生编码和解码,那么在 Pulsar 的情况下,我们需要设置相应的架构和底层消息类型信息。此信息作为扩展绑定属性提供。如您在上方的配置中看到,这些属性是 - spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.schema-type 用于架构信息,spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.message-type 用于实际目标类型。如果您在消息上有键和值,您可以使用 message-key-typemessage-value-type 来指定其目标类型。

The property to enable native encoding on the producer side is a binding level property from the core Spring Cloud Stream. You set it on the producer binding - spring.cloud.stream.bindings.<binding-name>.producer.use-native-encoding and set this to true. Similarly, use - spring.cloud.stream.bindings.<binding-name>.consumer.user-native-decoding for consumer bindings and set it to true. If we decide to use native encoding and decoding, in the case of Pulsar, we need to set the corresponding schema and the underlying message type information. This information is provided as extended binding properties. As you can see above in the configuration, the properties are - spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.schema-type for schema information and spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.message-type for the actual target type. If you have both keys and values on the message, you can use message-key-type and message-value-type to specify their target types.

在省略 schema-type 属性时,将咨询任何已配置的自定义模式映射。

Any configured custom schema mappings will be consulted when the schema-type property is omitted.

Message Header Conversion

每个消息通常都有标题信息,需要在消息通过 Spring Cloud Stream 输入和输出绑定在 Pulsar 和 Spring Messaging 之间传递时随消息一起携带。为了支持此传递,框架将处理必要的消息标题转换。

Each message typically has header information that needs to be carried along as the message traverses between Pulsar and Spring Messaging via Spring Cloud Stream input and output bindings. To support this traversal, the framework handles the necessary message header conversion.

Custom Header Mapper

Pulsar 绑定程序配置了默认标题映射器,可以通过提供您自己的 PulsarHeaderMapper bean 来替代此映射器。

The Pulsar binder is configured with a default header mapper that can be overridden by providing your own PulsarHeaderMapper bean.

在以下示例中,配置了一个 JSON 标题映射器,此映射器:

In the following example, a JSON header mapper is configured that:

  • maps all inbound headers (except those with keys “top” or “secret”)

  • maps outbound headers (except those with keys “id”, “timestamp”, or “userId”)

  • only trusts objects in the “com.acme” package for outbound deserialization

  • de/serializes any “com.acme.Money” header values w/ simple toString() encoding

@Bean
public PulsarHeaderMapper customPulsarHeaderMapper() {
    return JsonPulsarHeaderMapper.builder()
            .inboundPatterns("!top", "!secret", "*")
            .outboundPatterns("!id", "!timestamp", "!userId", "*")
            .trustedPackages("com.acme")
            .toStringClasses("com.acme.Money")
            .build();
}

Using Pulsar Properties in the Binder

此绑定程序使用 Spring 对 Apache Pulsar 框架中的基本组件构建其生产方和使用者绑定。由于基于绑定程序的应用程序是 Spring Boot 应用程序,因此绑定程序默认使用 Spring 对 Apache Pulsar 的 Spring Boot 自动配置。因此,在核心框架级别可用的一切 Pulsar Spring Boot 属性,也可以通过绑定程序使用。例如,您可以使用前缀 spring.pulsar.producer…​spring.pulsar.consumer…​ 等的属性。此外,您还可以在绑定程序级别设置这些 Pulsar 属性。例如,下面这些也会起作用 - spring.cloud.stream.pulsar.binder.producer…​spring.cloud.stream.pulsar.binder.consumer…​

The binder uses basic components from Spring for Apache Pulsar framework to build its producer and consumer bindings. Since binder-based applications are Spring Boot applications, binder, by default, uses the Spring Boot autoconfiguration for Spring for Apache Pulsar. Therefore, all Pulsar Spring Boot properties available at the core framework level are also available through the binder. For example, you can use properties with the prefix spring.pulsar.producer…​, spring.pulsar.consumer…​ etc. In addition, you can also set these Pulsar properties at the binder level. For instance, this will also work - spring.cloud.stream.pulsar.binder.producer…​ or spring.cloud.stream.pulsar.binder.consumer…​.

以上任一种方法都很好,但当使用像这些一样的属性时,它们将应用于整个应用程序。如果应用程序有多个函数,它们都将获得相同的属性。您还可以在扩展绑定属性级别设置这些 Pulsar 属性来解决此问题。扩展绑定属性应用于绑定本身。例如,如果您有输入和输出绑定,并且两者都需要一组独立的 Pulsar 属性,您必须在扩展绑定中设置它们。生产方绑定的模式是 spring.cloud.stream.pulsar.bindings.<output-binding-name>.producer…​。类似地,对于使用者绑定,模式是 spring.cloud.stream.pulsar.bindings.<input-binding-name>.consumer…​。这样,您可以让同一应用程序中不同的绑定应用一组独立的 Pulsar 属性。

Either of the above approaches is fine, but when using properties like these, it is applied to the whole application. If you have multiple functions in the application, they all get the same properties. You can also set these Pulsar properties at the extended binding properties level to address this. Extended binding properties are applied at the binding itself. For instance, if you have an input and output binding, and both require a separate set of Pulsar properties, you must set them on the extended binding. The pattern for producer binding is spring.cloud.stream.pulsar.bindings.<output-binding-name>.producer…​. Similarly, for consumer binding, the pattern is spring.cloud.stream.pulsar.bindings.<input-binding-name>.consumer…​. This way, you can have a separate set of Pulsar properties applied for different bindings in the same application.

最高优先级是扩展绑定属性。在绑定程序中应用属性的优先级顺序是 extended binding properties → binder properties → Spring Boot properties.(从最高到最低)。

The highest precedence is for extended binding properties. The precedence order of applying the properties in the binder is extended binding properties → binder properties → Spring Boot properties. (going from highest to lowest).

以下是用于依赖的资源,以便了解通过 Pulsar 绑定程序可用的更多属性。

Following are some resources to rely upon for finding more about the properties available through the Pulsar binder.

Pulsar producer binding configuration。这些属性需要 spring.cloud.stream.bindings.<binding-name>.producer 前缀。Spring Boot 提供的所有 Pulsar producer properties 也可通过此配置类获得。

Pulsar producer binding configuration. These properties need the spring.cloud.stream.bindings.<binding-name>.producer prefix. All the Spring Boot provided Pulsar producer properties are also available through this configuration class.

Pulsar consumer binding configuration。这些属性需要 spring.cloud.stream.bindings.<binding-name>.consumer 前缀。Spring Boot 提供的所有 Pulsar consumer properties 也可通过此配置类获得。

Pulsar consumer binding configuration. These properties need the spring.cloud.stream.bindings.<binding-name>.consumer prefix. All the Spring Boot provided Pulsar consumer properties are also available through this configuration class.

有关 Pulsar binder 的常见特定配置属性,请参见 this。这些属性需要 spring.cloud.stream.pulsar.binder 前缀。在 binder 中可以使用上面的指定生产者和消费者属性(包括 Spring Boot 的属性),使用 spring.cloud.stream.pulsar.binder.producerspring.cloud.stream.pulsar.binder.consumer 前缀。

For common Pulsar binder specific configuration properties, see this. These properties require a prefix of spring.cloud.stream.pulsar.binder. The above specified producer and consumer properties (including the Spring Boot ones) can be used at the binder using the spring.cloud.stream.pulsar.binder.producer or spring.cloud.stream.pulsar.binder.consumer prefix.

Pulsar Topic Provisioner

Apache Pulsar 的 Spring Cloud Stream 绑定程序带有一个开箱即用的 Pulsar 主题配置程序。运行应用程序时,如果没有必要的主题,Pulsar 会为您创建这些主题。然而,这是一个简单的非分区主题,如果您想要创建分区主题等高级特性,您可以依赖绑定程序中的主题配置程序。Pulsar 主题配置程序使用框架中的 PulsarAdministration,它使用 PulsarAdminBuilder. 因此,您需要设置 spring.pulsar.administration.service-url 属性,除非您在默认服务器和端口上运行 Pulsar。

Spring Cloud Stream binder for Apache Pulsar comes with an out-of-the-box provisioner for Pulsar topics. When running an application, if the necessary topics are absent, Pulsar will create the topics for you. However, this is a basic non-partitioned topic, and if you want advanced features like creating a partitioned topic, you can rely on the topic provisioner in the binder. Pulsar topic provisioner uses PulsarAdministration from the framework, which uses the PulsarAdminBuilder. For this reason, you need to set the spring.pulsar.administration.service-url property unless you are running Pulsar on the default server and port.

Specifying partition count when creating the topic

创建主题时,您可以通过两种方式设置分区数量。首先,您可以使用属性 spring.cloud.stream.pulsar.binder.partition-count 在绑定程序级别设置它。如我们上面看到的一样,这样做的方式会使应用程序创建的所有主题继承此属性。假设您想要在绑定级别对设置分区进行精细控制。在这种情况下,您可以使用格式 spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.partition-count 为每个绑定设置 partition-count 属性。这样,相同应用程序中不同函数创建的各个主题将根据应用程序要求有不同的分区。

When creating the topic, you can set the partition count in two ways. First, you can set it at the binder level using the property spring.cloud.stream.pulsar.binder.partition-count. As we saw above, doing this way will make all the topics created by the application inherit this property. Suppose you want granular control at the binding level for setting partitions. In that case, you can set the partition-count property per binding using the format spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.partition-count. This way, various topics created by different functions in the same application will have different partitions based on the application requirements.