Spring Cloud Stream Binder for Apache Pulsar

针对 Apache Pulsar 的 Spring 提供了一个针对 Spring Cloud Stream 的 Binder,我们可以使用该 Binder 来构建使用发布订阅范式的事件驱动的微服务。在本节中,我们将介绍这个 Binder 的基本信息。

Usage

我们需要在你自己的应用程序中包含下面的依赖项,以便针对 Spring Cloud Stream 使用 Apache Pulsar Binder。

  • Maven

  • Gradle

<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-spring-cloud-stream-binder</artifactId>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.pulsar:spring-pulsar-spring-cloud-stream-binder'
}

Overview

用于 Apache Pulsar 的 Spring Cloud Stream binder 允许应用程序专注于业务逻辑,而不是处理管理和维护 Pulsar 的低级细节。binder 为应用程序开发人员处理所有这些细节。Spring Cloud Stream 带来了基于 Spring Cloud Function 的强大编程模型,允许应用程序开发人员使用函数式风格编写复杂的事件驱动型应用程序。应用程序可以从中间件中立的方式开始,然后通过 Spring Boot 配置属性将 Pulsar 主题映射为 Spring Cloud Stream 中的目的地。Spring Cloud Stream 构建在 Spring Boot 之上,在使用 Spring Cloud Stream 编写事件驱动的微服务时,您本质上是在编写 Boot 应用程序。以下是一个简单的 Spring Cloud Stream 应用程序。

@SpringBootApplication
public class SpringPulsarBinderSampleApp {

	private final Logger logger = LoggerFactory.getLogger(this.getClass());

	public static void main(String[] args) {
		SpringApplication.run(SpringPulsarBinderSampleApp.class, args);
	}

	@Bean
	public Supplier<Time> timeSupplier() {
		return () -> new Time(String.valueOf(System.currentTimeMillis()));
	}

	@Bean
	public Function<Time, EnhancedTime> timeProcessor() {
		return (time) -> {
			EnhancedTime enhancedTime = new EnhancedTime(time, "5150");
			this.logger.info("PROCESSOR: {} --> {}", time, enhancedTime);
			return enhancedTime;
		};
	}

	@Bean
	public Consumer<EnhancedTime> timeLogger() {
		return (time) -> this.logger.info("SINK:      {}", time);
	}

	record Time(String time) {
	}

	record EnhancedTime(Time time, String extra) {
	}

}

上面的示例应用程序是一个完全成熟的 Spring Boot 应用程序,需要一些解释。但是,在第一次尝试时,你可以看到它只是一些普通的 Java 代码和一些 Spring 和 Spring Boot 注释。我们这里有三个 Bean 方法 - 一个 java.util.function.Supplier,一个 java.util.function.Function,最后是一个 java.util.function.Consumer。供应商生成当前时间(以毫秒为单位),该函数接收此时间,然后通过添加一些随机数据来增强它,然后消费者记录增强的的时间。

为了简洁起见,我们忽略了所有导入,但整个应用程序中没有特定于 Spring Cloud Stream 的内容。它如何成为一个与 Apache Pulsar 交互的 Spring Cloud Stream 应用程序?你必须在应用程序中包含针对该 Binder 的上面的依赖项。在添加该依赖项后,你必须提供以下配置属性。

spring:
  cloud:
    function:
      definition: timeSupplier;timeProcessor;timeLogger;
    stream:
      bindings:
        timeProcessor-in-0:
          destination: timeSupplier-out-0
        timeProcessor-out-0:
          destination: timeProcessor-out-0
        timeLogger-in-0:
          destination: timeProcessor-out-0

通过此,以上 Spring 启动应用程序已经成为基于 Spring Cloud Stream 的端到端事件驱动应用程序。由于我们在 classpath 中有 Pulsar 绑定程序,应用程序与 Apache Pulsar 交互。如果应用程序中只有单一函数,那么我们无须告诉 Spring Cloud Stream 激活此函数进行执行,因为 Spring Cloud Stream 默认会执行此操作。如果应用程序中有多个此类函数,如我们示例中一样,我们需要告知 Spring Cloud Stream 我们希望激活哪些函数。在我们的用例中,我们需要激活所有函数,然后我们通过 spring.cloud.function.definition 属性进行此操作。Bean 名称默认成为 Spring Cloud Stream 绑定名称的一部分。绑定是 Spring Cloud Stream 中一个基本抽象概念,框架使用此概念与中间件目的地通信。Spring Cloud Stream 执行的几乎所有操作都会通过一个具体绑定进行。供应商仅有一个输出绑定;函数有输入和输出绑定,而使用者仅有输入绑定。让我们以供应商 bean 为例 - timeSupplier. 此供应商的默认绑定名称将是 timeSupplier-out-0。类似地,timeProcessor 函数的默认绑定名称在入站时为 timeProcessor-in-0,在出站时为 timeProcessor-out-0。有关如何更改默认绑定名称的详细信息,请参阅 Spring Cloud Stream 参考文档。在大多数情况下,使用默认绑定名称就足够了。如上所示,我们在绑定名称上设置目的地。如果未提供目的地,则绑定名称将成为目的地的值,如 timeSupplier-out-0 中所示。

运行以上应用程序时,您应该会看到供应商每秒执行一次,然后由函数使用并增强记录程序使用者使用的时间。

Message Conversion in Binder-based Applications

在上例应用程序中,我们没有提供用于消息转换的模式信息。这是因为在默认情况下,Spring Cloud Stream 使用消息转换机制,该机制使用通过 Spring Messaging 项目在 Spring 框架中建立的消息传递支持。除非另行指定,Spring Cloud Stream 在入站和出站绑定上将 application/json 用作消息转换的 content-type。在出站中,数据被序列化为 byte[],,然后 Pulsar binder 使用 Schema.BYTES 通过网络将其发送到 Pulsar 主题。类似地,在入站中,数据以 byte[] 的形式从 Pulsar 主题中使用,然后使用正确的消息转换器将其转换为目标类型。

Using Native Conversion in Pulsar using Pulsar Schema

虽然默认使用框架提供的消息转换,但 Spring Cloud Stream 允许每个绑定程序确定消息的转换方式。假设应用程序选择采用此方式。在那种情况下,Spring Cloud Stream 完全不使用任何 Spring 提供的消息转换工具,并传递其接收或生成的数据。Spring Cloud Stream 中的此功能称为生产方端的原生编码和消费者端的原生解码。这意味着编码和解码将在目标中间件中以原生方式进行,在本例中,在 Apache Pulsar 中进行。对于以上应用程序,我们可以使用以下配置来绕过框架转换并使用原生编码和解码。

spring:
  cloud:
    stream:
      bindings:
        timeSupplier-out-0:
          producer:
            use-native-encoding: true
        timeProcessor-in-0:
          destination: timeSupplier-out-0
          consumer:
            use-native-decoding: true
        timeProcessor-out-0:
          destination: timeProcessor-out-0
          producer:
            use-native-encoding: true
        timeLogger-in-0:
          destination: timeProcessor-out-0
          consumer:
            use-native-decoding: true
      pulsar:
        bindings:
          timeSupplier-out-0:
            producer:
              schema-type: JSON
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
          timeProcessor-in-0:
            consumer:
              schema-type: JSON
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
          timeProcessor-out-0:
            producer:
              schema-type: AVRO
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
          timeLogger-in-0:
            consumer:
              schema-type: AVRO
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime

在生产方端启用原生编码的属性是核心 Spring Cloud Stream 中的一个绑定级别属性。您将它设置在生产方绑定上 - spring.cloud.stream.bindings.<binding-name>.producer.use-native-encoding,并将其设置为 true. 类似地,对于使用者绑定,使用 - spring.cloud.stream.bindings.<binding-name>.consumer.user-native-decoding 并将其设置为 true. 如果我们决定使用原生编码和解码,那么在 Pulsar 的情况下,我们需要设置相应的架构和底层消息类型信息。此信息作为扩展绑定属性提供。如您在上方的配置中看到,这些属性是 - spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.schema-type 用于架构信息,spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.message-type 用于实际目标类型。如果您在消息上有键和值,您可以使用 message-key-typemessage-value-type 来指定其目标类型。

在省略 schema-type 属性时,将咨询任何已配置的自定义模式映射。

Message Header Conversion

每个消息通常都有标题信息,需要在消息通过 Spring Cloud Stream 输入和输出绑定在 Pulsar 和 Spring Messaging 之间传递时随消息一起携带。为了支持此传递,框架将处理必要的消息标题转换。

Custom Header Mapper

Pulsar 绑定程序配置了默认标题映射器,可以通过提供您自己的 PulsarHeaderMapper bean 来替代此映射器。

在以下示例中,配置了一个 JSON 标题映射器,此映射器:

  • 映射所有入站头(除了那些带有 “top” 或 “secret” 键的头)

  • 映射出站头(除了那些带有 “id”、“timestamp” 或 “userId” 键的头)

  • 仅信任 “com.acme” 包中的对象用于出站反序列化

  • 对任意 “com.acme.Money” 头值使用简单 toString() 编码进行反序列化

@Bean
public PulsarHeaderMapper customPulsarHeaderMapper() {
    return JsonPulsarHeaderMapper.builder()
            .inboundPatterns("!top", "!secret", "*")
            .outboundPatterns("!id", "!timestamp", "!userId", "*")
            .trustedPackages("com.acme")
            .toStringClasses("com.acme.Money")
            .build();
}

Using Pulsar Properties in the Binder

此绑定程序使用 Spring 对 Apache Pulsar 框架中的基本组件构建其生产方和使用者绑定。由于基于绑定程序的应用程序是 Spring Boot 应用程序,因此绑定程序默认使用 Spring 对 Apache Pulsar 的 Spring Boot 自动配置。因此,在核心框架级别可用的一切 Pulsar Spring Boot 属性,也可以通过绑定程序使用。例如,您可以使用前缀 spring.pulsar.producer…​spring.pulsar.consumer…​ 等的属性。此外,您还可以在绑定程序级别设置这些 Pulsar 属性。例如,下面这些也会起作用 - spring.cloud.stream.pulsar.binder.producer…​spring.cloud.stream.pulsar.binder.consumer…​

以上任一种方法都很好,但当使用像这些一样的属性时,它们将应用于整个应用程序。如果应用程序有多个函数,它们都将获得相同的属性。您还可以在扩展绑定属性级别设置这些 Pulsar 属性来解决此问题。扩展绑定属性应用于绑定本身。例如,如果您有输入和输出绑定,并且两者都需要一组独立的 Pulsar 属性,您必须在扩展绑定中设置它们。生产方绑定的模式是 spring.cloud.stream.pulsar.bindings.<output-binding-name>.producer…​。类似地,对于使用者绑定,模式是 spring.cloud.stream.pulsar.bindings.<input-binding-name>.consumer…​。这样,您可以让同一应用程序中不同的绑定应用一组独立的 Pulsar 属性。

最高优先级是扩展绑定属性。在绑定程序中应用属性的优先级顺序是 extended binding properties → binder properties → Spring Boot properties.(从最高到最低)。

以下是用于依赖的资源,以便了解通过 Pulsar 绑定程序可用的更多属性。

Pulsar producer binding configuration。这些属性需要 spring.cloud.stream.bindings.<binding-name>.producer 前缀。Spring Boot 提供的所有 Pulsar producer properties 也可通过此配置类获得。

Pulsar consumer binding configuration。这些属性需要 spring.cloud.stream.bindings.<binding-name>.consumer 前缀。Spring Boot 提供的所有 Pulsar consumer properties 也可通过此配置类获得。

有关 Pulsar binder 的常见特定配置属性,请参见 this。这些属性需要 spring.cloud.stream.pulsar.binder 前缀。在 binder 中可以使用上面的指定生产者和消费者属性(包括 Spring Boot 的属性),使用 spring.cloud.stream.pulsar.binder.producerspring.cloud.stream.pulsar.binder.consumer 前缀。

Pulsar Topic Provisioner

Apache Pulsar 的 Spring Cloud Stream 绑定程序带有一个开箱即用的 Pulsar 主题配置程序。运行应用程序时,如果没有必要的主题,Pulsar 会为您创建这些主题。然而,这是一个简单的非分区主题,如果您想要创建分区主题等高级特性,您可以依赖绑定程序中的主题配置程序。Pulsar 主题配置程序使用框架中的 PulsarAdministration,它使用 PulsarAdminBuilder. 因此,您需要设置 spring.pulsar.administration.service-url 属性,除非您在默认服务器和端口上运行 Pulsar。

Specifying partition count when creating the topic

创建主题时,您可以通过两种方式设置分区数量。首先,您可以使用属性 spring.cloud.stream.pulsar.binder.partition-count 在绑定程序级别设置它。如我们上面看到的一样,这样做的方式会使应用程序创建的所有主题继承此属性。假设您想要在绑定级别对设置分区进行精细控制。在这种情况下,您可以使用格式 spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.partition-count 为每个绑定设置 partition-count 属性。这样,相同应用程序中不同函数创建的各个主题将根据应用程序要求有不同的分区。