Pulsar Functions

Spring for Apache Pulsar 为 {apache-pulsar-io-docs}/[Pulsar IO](连接器)和 {apache-pulsar-function-docs}[Pulsar 函数] 提供基本支持,它们允许用户定义包含 sources、`processors`和 `sinks`的流处理管道。`sources`和 `sinks`由 _Pulsar IO (connectors)_建模,`processors`由 _Pulsar Functions_表示。

Spring for Apache Pulsar provides basic suppport for {apache-pulsar-io-docs}/[Pulsar IO] (connectors) and {apache-pulsar-function-docs}[Pulsar Functions] which allow users to define stream processing pipelines made up of sources, processors, and sinks. The sources and sinks are modeled by Pulsar IO (connectors) and the processors are represented by Pulsar Functions.

由于连接器仅仅是特殊功能,为了简单起见,我们将源、接收器和函数统称为“Pulsar Functions”。

Because connectors are just special functions, and for simplicity, we refer to sources, sinks and functions collectively as "Pulsar Functions".

Pre-requisites

熟悉 - 预计受众对Pulsar IO和Pulsar Functions有一定程度的了解。如果不是这种情况,查看其入门指南可能会有所帮助。

Familiarity - the audience is expected to be somewhat familiar w/ Pulsar IO and Pulsar Functions. If that is not the case it may be helpful to see their getting started guides.

启用的功能 - 要使用这些功能,必须启用并配置 Apache Pulsar 中的功能支持(默认情况下禁用)。内置连接器也可能需要安装在 Pulsar 集群上。

Feature enabled - to use these features the functions support in Apache Pulsar must be enabled and configured (it is disabled by default). The built-in connectors may also need to be installed on the Pulsar cluster.

请参阅 {apache-pulsar-io-docs}/[Pulsar IO] 和 {apache-pulsar-function-docs}[Pulsar 函数] 文档了解更多详细信息。

See the {apache-pulsar-io-docs}/[Pulsar IO] and {apache-pulsar-function-docs}[Pulsar Functions] docs for more details.

Pulsar Function Administration

该框架提供了 PulsarFunctionAdministration 组件来管理 Pulsar 函数。当你使用 Pulsar Spring Boot starter 时,你会得到自动配置的 PulsarFunctionAdministration

The framework provides the PulsarFunctionAdministration component to manage Pulsar functions. When you use the Pulsar Spring Boot starter, you get the PulsarFunctionAdministration auto-configured.

默认情况下,应用程序会尝试连接到 http://localhost:8080 上的本地 Pulsar 实例。但是,因为它利用已经配置的 PulsarAdministration,有关可用的客户端选项(包括身份验证),请参阅 Pulsar Admin Client。通过 {spring-boot-pulsar-config-props}[spring.pulsar.function.*] 应用程序属性提供更多配置选项。

By default, the application tries to connect to a local Pulsar instance at http://localhost:8080. However, because it leverages the already configured PulsarAdministration, see Pulsar Admin Client for available client options (including authentication). Additional configuration options are available with the {spring-boot-pulsar-config-props}[spring.pulsar.function.*] application properties.

Automatic Function Management

在应用程序启动时,框架将在应用程序上下文中找到所有 PulsarFunctionPulsarSinkPulsarSource bean。对于每个 bean,将创建或更新对应的 Pulsar 函数。正确的 API 是基于函数类型、函数配置以及函数是否已存在的。

On application startup, the framework finds all PulsarFunction, PulsarSink, and PulsarSource beans in the application context. For each bean, the corresponding Pulsar function is either created or updated. The proper API is called based on function type, function config, and whether the function already exists.

PulsarFunctionPulsarSinkPulsarSource bean 分别是 Apache Pulsar 配置对象 FunctionConfigSinkConfigSourceConfig 的简单包装器。由于支持的连接器数量众多(及其不同的配置),该框架不尝试创建配置属性层次结构以反映各种 Apache Pulsar 连接器。相反,由用户提供完整的配置对象,然后该框架使用提供的配置处理管理(创建/更新)。

The PulsarFunction, PulsarSink, and PulsarSource beans are simple wrappers around the Apache Pulsar config objects FunctionConfig, SinkConfig, and SourceConfig, respectively. Due to the large number of supported connectors (and their varied configurations) the framework does not attempt to create a configuration properties hierarchy to mirror the varied Apache Pulsar connectors. Instead, the burden is on the user to supply the full config object and then the framework handles the management (create/update) using the supplied config.

在应用程序关闭时,将在应用程序启动期间处理的所有函数执行其停止策略,并且将其保留、停止或从 Pulsar 服务器中删除。

On application shutdown, all functions that were processed during application startup have their stop policy enforced and are either left alone, stopped, or deleted from the Pulsar server.

Limitations

No Magic Pulsar Functions

Pulsar 函数和自定义连接器由自定义应用程序代码(例如 java.util.Function)表示。没有自动注册自定义代码的魔术支持。虽然这很神奇,但它有一些技术挑战,尚未实现。因此,由用户来确保函数(或自定义连接器)在函数配置中指定的位置可用。例如,如果函数配置的 jar 值为 ./some/path/MyFunction.jar,则函数 jar 文件必须存在于指定路径中。

Pulsar functions and custom connectors are represented by custom application code (eg. a java.util.Function). There is no magic support to automatically register the custom code. While this would be amazing, it has some technical challenges and not yet been implemented. As such, it is up to the user to ensure the function (or custom connector) is available at the location specified in the function config. For example, if the function config has a jar value of ./some/path/MyFunction.jar then the function jar file must exist at the specified path.

Name Identifier

函数配置中的 name 属性用作标识符,以确定函数是否已存在以便决定执行更新操作还是创建操作。因此,如果需要函数更新,则不应修改名称。

The name property from the function config is used as the identifier to determine if a function already exists in order to decide if an update or create operation is performed. As such, the name should not be modified if function updates are desired.

Configuration

Pulsar Function Archive

每个 Pulsar 函数都由实际存档(例如 jar 文件)表示。存档的路径通过源和汇的 archive 属性以及函数的 jar 属性指定。

Each Pulsar function is represented by an actual archive (eg. jar file). The path to the archive is specified via the archive property for sources and sinks, and the jar property for functions.

以下规则确定路径的“类型”:

The following rules determine the "type" of path:

  • The path is a URL when it starts w/ (file|http|https|function|sink|source)://

  • The path is built-in when it starts w/ builtin:// (points to one of the provided out-of-the-box connectors)

  • The path is local otherwise.

在创建/更新操作期间发生的活动取决于路径“类型”,如下所示:

The action that occurs during the create/update operation is dependent on path "type" as follows:

  • When the path is a URL the content is downloaded by the server

  • When the path is built-in the content is already available on the server

  • When the path is local the content is uploaded to the server

Built-in Source and Sinks

Apache Pulsar 提供了许多开箱即用的源和汇连接器,又称内置连接器。要使用内置连接器,只需将 archive 设置为 builtin://<connector-type>(例如 builtin://rabbit)。

Apache Pulsar provides many source and sink connectors out-of-the-box, aka built-in connectors. To use a built-in connector simply set the archive to builtin://<connector-type> (eg builtin://rabbit).

Custom functions

可以在 {apache-pulsar-docs}/functions-develop[Pulsar 文档] 中找到有关如何开发和打包自定义函数的详细信息。但从高层面上来说,要求如下:

The details on how to develop and package custom functions can be found in the {apache-pulsar-docs}/functions-develop[Pulsar docs]. However, at a high-level, the requirements are as follows:

  • Code uses Java8

  • Code implements either java.util.Function or org.apache.pulsar.functions.api.Function

  • Packaged as uber jar

函数构建并打包后,有几种方法可以使其可用于函数注册。

Once the function is built and packaged, there are several ways to make it available for function registration.

file://

可以将 jar 文件上传到服务器,然后通过函数配置的 jar 属性中的 file:// 引用它。

The jar file can be uploaded to the server and then referenced via file:// in the jar property of the function config

local

jar 文件可以保持为本地文件,然后通过函数配置的 jar 属性中的本地路径引用它。

The jar file can remain local and then referenced via the local path in the jar property of the function config.

http://

可以通过 HTTP 服务器使 jar 文件可用,然后通过函数配置的 jar 属性中的 http(s):// 引用它。

The jar file can be made available via HTTP server and then referenced via http(s):// in the jar property of the function config

function://

jar 文件可以上传到 Pulsar 包管理器,然后通过函数配置的 jar 属性中的 function:// 引用它。

The jar file can be uploaded to the Pulsar package manager and then referenced via function:// in the jar property of the function config

Examples

以下是一些示例,演示如何配置 PulsarSource bean,这将导致 PulsarFunctionAdministration 自动创建支持的 Pulsar 源连接器。

Here are some examples that show how to configure a PulsarSource bean which results in the PulsarFunctionAdministration auto-creating the backing Pulsar source connector.

PulsarSource using built-in Rabbit connector
@Bean
PulsarSource rabbitSource() {
    Map<String, Object> configs = new HashMap<>();
    configs.put("host", "my.rabbit.host");
    configs.put("port", 5672);
    configs.put("virtualHost", "/");
    configs.put("username", "guest");
    configs.put("password", "guest");
    configs.put("queueName", "test_rabbit");
    configs.put("connectionName", "test-connection");
    SourceConfig sourceConfig = SourceConfig.builder()
            .tenant("public")
            .namespace("default")
            .name("rabbit-test-source")
            .archive("builtin://rabbitmq")
            .topicName("incoming_rabbit")
            .configs(configs).build();
    return new PulsarSource(sourceConfig, null);
}

下一个示例与上一个相同,但它使用 Spring Boot 自动配置的 RabbitProperties 来减轻配置负担。当然,这要求应用程序使用启用了 Rabbit 自动配置的 Spring Boot。

This next example is the same as the previous, except that it uses the Spring Boot auto-configured RabbitProperties to ease the configuration burden. This of course requires the application to be using Spring Boot with Rabbit auto-configuration enabled.

PulsarSource using built-in Rabbit connector and Spring Boot RabbitProperties
@Bean
PulsarSource rabbitSourceWithBootProps(RabbitProperties props) {
    Map<String, Object> configs = new HashMap<>();
    configs.put("host", props.determineHost());
    configs.put("port", props.determinePort());
    configs.put("virtualHost", props.determineVirtualHost());
    configs.put("username", props.determineUsername());
    configs.put("password", props.determinePassword());
    configs.put("queueName", "test_rabbit");
    configs.put("connectionName", "test-connection");
    SourceConfig sourceConfig = SourceConfig.builder()
            .tenant("public")
            .namespace("default")
            .name("rabbit-test-source")
            .archive("builtin://rabbitmq")
            .topicName("incoming_rabbit")
            .configs(configs).build();
    return new PulsarSource(sourceConfig, null);
}

有关更详细的示例,请参阅 Sample Stream Pipeline with Pulsar Functions 示例应用程序

For a more elaborate example see the Sample Stream Pipeline with Pulsar Functions sample app