Pulsar Functions

Spring for Apache Pulsar 为 {apache-pulsar-io-docs}/[Pulsar IO](连接器)和 {apache-pulsar-function-docs}[Pulsar 函数] 提供基本支持,它们允许用户定义包含 sources、`processors`和 `sinks`的流处理管道。`sources`和 `sinks`由 _Pulsar IO (connectors)_建模,`processors`由 _Pulsar Functions_表示。

由于连接器仅仅是特殊功能,为了简单起见,我们将源、接收器和函数统称为“Pulsar Functions”。

Pre-requisites

熟悉 - 预计受众对Pulsar IO和Pulsar Functions有一定程度的了解。如果不是这种情况,查看其入门指南可能会有所帮助。 启用的功能 - 要使用这些功能,必须启用并配置 Apache Pulsar 中的功能支持(默认情况下禁用)。内置连接器也可能需要安装在 Pulsar 集群上。 请参阅 {apache-pulsar-io-docs}/[Pulsar IO] 和 {apache-pulsar-function-docs}[Pulsar 函数] 文档了解更多详细信息。

Pulsar Function Administration

该框架提供了 PulsarFunctionAdministration 组件来管理 Pulsar 函数。当你使用 Pulsar Spring Boot starter 时,你会得到自动配置的 PulsarFunctionAdministration

默认情况下,应用程序会尝试连接到 http://localhost:8080 上的本地 Pulsar 实例。但是,因为它利用已经配置的 PulsarAdministration,有关可用的客户端选项(包括身份验证),请参阅 Pulsar Admin Client。通过 {spring-boot-pulsar-config-props}[spring.pulsar.function.*] 应用程序属性提供更多配置选项。

Automatic Function Management

在应用程序启动时,框架将在应用程序上下文中找到所有 PulsarFunctionPulsarSinkPulsarSource bean。对于每个 bean,将创建或更新对应的 Pulsar 函数。正确的 API 是基于函数类型、函数配置以及函数是否已存在的。

PulsarFunctionPulsarSinkPulsarSource bean 分别是 Apache Pulsar 配置对象 FunctionConfigSinkConfigSourceConfig 的简单包装器。由于支持的连接器数量众多(及其不同的配置),该框架不尝试创建配置属性层次结构以反映各种 Apache Pulsar 连接器。相反,由用户提供完整的配置对象,然后该框架使用提供的配置处理管理(创建/更新)。

在应用程序关闭时,将在应用程序启动期间处理的所有函数执行其停止策略,并且将其保留、停止或从 Pulsar 服务器中删除。

Limitations

No Magic Pulsar Functions

Pulsar 函数和自定义连接器由自定义应用程序代码(例如 java.util.Function)表示。没有自动注册自定义代码的魔术支持。虽然这很神奇,但它有一些技术挑战,尚未实现。因此,由用户来确保函数(或自定义连接器)在函数配置中指定的位置可用。例如,如果函数配置的 jar 值为 ./some/path/MyFunction.jar,则函数 jar 文件必须存在于指定路径中。

Name Identifier

函数配置中的 name 属性用作标识符,以确定函数是否已存在以便决定执行更新操作还是创建操作。因此,如果需要函数更新,则不应修改名称。

Configuration

Pulsar Function Archive

每个 Pulsar 函数都由实际存档(例如 jar 文件)表示。存档的路径通过源和汇的 archive 属性以及函数的 jar 属性指定。

以下规则确定路径的“类型”:

  • (file|http|https|function|sink|source):// 开头时,路径是 URL

  • builtin:// 开头时,路径是 built-in(指向预先提供的开箱即用型连接器之一)

  • 否则,路径是 local

在创建/更新操作期间发生的活动取决于路径“类型”,如下所示:

  • 当路径是 URL 时,服务器下载内容

  • 当路径是 built-in 时,内容已在服务器上可用

  • 当路径是 local 时,内容上传到服务器

Built-in Source and Sinks

Apache Pulsar 提供了许多开箱即用的源和汇连接器,又称内置连接器。要使用内置连接器,只需将 archive 设置为 builtin://<connector-type>(例如 builtin://rabbit)。

Custom functions

可以在 {apache-pulsar-docs}/functions-develop[Pulsar 文档] 中找到有关如何开发和打包自定义函数的详细信息。但从高层面上来说,要求如下:

  • Code uses Java8

  • 代码实现 java.util.Functionorg.apache.pulsar.functions.api.Function

  • Packaged as uber jar

函数构建并打包后,有几种方法可以使其可用于函数注册。

file://

可以将 jar 文件上传到服务器,然后通过函数配置的 jar 属性中的 file:// 引用它。

local

jar 文件可以保持为本地文件,然后通过函数配置的 jar 属性中的本地路径引用它。

http://

可以通过 HTTP 服务器使 jar 文件可用,然后通过函数配置的 jar 属性中的 http(s):// 引用它。

function://

jar 文件可以上传到 Pulsar 包管理器,然后通过函数配置的 jar 属性中的 function:// 引用它。

Examples

以下是一些示例,演示如何配置 PulsarSource bean,这将导致 PulsarFunctionAdministration 自动创建支持的 Pulsar 源连接器。

PulsarSource using built-in Rabbit connector
@Bean
PulsarSource rabbitSource() {
    Map<String, Object> configs = new HashMap<>();
    configs.put("host", "my.rabbit.host");
    configs.put("port", 5672);
    configs.put("virtualHost", "/");
    configs.put("username", "guest");
    configs.put("password", "guest");
    configs.put("queueName", "test_rabbit");
    configs.put("connectionName", "test-connection");
    SourceConfig sourceConfig = SourceConfig.builder()
            .tenant("public")
            .namespace("default")
            .name("rabbit-test-source")
            .archive("builtin://rabbitmq")
            .topicName("incoming_rabbit")
            .configs(configs).build();
    return new PulsarSource(sourceConfig, null);
}

下一个示例与上一个相同,但它使用 Spring Boot 自动配置的 RabbitProperties 来减轻配置负担。当然,这要求应用程序使用启用了 Rabbit 自动配置的 Spring Boot。

PulsarSource using built-in Rabbit connector and Spring Boot RabbitProperties
@Bean
PulsarSource rabbitSourceWithBootProps(RabbitProperties props) {
    Map<String, Object> configs = new HashMap<>();
    configs.put("host", props.determineHost());
    configs.put("port", props.determinePort());
    configs.put("virtualHost", props.determineVirtualHost());
    configs.put("username", props.determineUsername());
    configs.put("password", props.determinePassword());
    configs.put("queueName", "test_rabbit");
    configs.put("connectionName", "test-connection");
    SourceConfig sourceConfig = SourceConfig.builder()
            .tenant("public")
            .namespace("default")
            .name("rabbit-test-source")
            .archive("builtin://rabbitmq")
            .topicName("incoming_rabbit")
            .configs(configs).build();
    return new PulsarSource(sourceConfig, null);
}

有关更详细的示例,请参阅 Sample Stream Pipeline with Pulsar Functions 示例应用程序