Pulsar Functions
Spring for Apache Pulsar 为 {apache-pulsar-io-docs}/[Pulsar IO](连接器)和 {apache-pulsar-function-docs}[Pulsar 函数] 提供基本支持,它们允许用户定义包含 sources
、`processors`和 `sinks`的流处理管道。`sources`和 `sinks`由 _Pulsar IO (connectors)_建模,`processors`由 _Pulsar Functions_表示。
由于连接器仅仅是特殊功能,为了简单起见,我们将源、接收器和函数统称为“Pulsar Functions”。 |
熟悉 - 预计受众对Pulsar IO和Pulsar Functions有一定程度的了解。如果不是这种情况,查看其入门指南可能会有所帮助。 启用的功能 - 要使用这些功能,必须启用并配置 Apache Pulsar 中的功能支持(默认情况下禁用)。内置连接器也可能需要安装在 Pulsar 集群上。 请参阅 {apache-pulsar-io-docs}/[Pulsar IO] 和 {apache-pulsar-function-docs}[Pulsar 函数] 文档了解更多详细信息。
Pulsar Function Administration
该框架提供了 PulsarFunctionAdministration
组件来管理 Pulsar 函数。当你使用 Pulsar Spring Boot starter 时,你会得到自动配置的 PulsarFunctionAdministration
。
默认情况下,应用程序会尝试连接到 http://localhost:8080
上的本地 Pulsar 实例。但是,因为它利用已经配置的 PulsarAdministration
,有关可用的客户端选项(包括身份验证),请参阅 Pulsar Admin Client。通过 {spring-boot-pulsar-config-props}[spring.pulsar.function.*
] 应用程序属性提供更多配置选项。
Automatic Function Management
在应用程序启动时,框架将在应用程序上下文中找到所有 PulsarFunction
、PulsarSink
和 PulsarSource
bean。对于每个 bean,将创建或更新对应的 Pulsar 函数。正确的 API 是基于函数类型、函数配置以及函数是否已存在的。
|
在应用程序关闭时,将在应用程序启动期间处理的所有函数执行其停止策略,并且将其保留、停止或从 Pulsar 服务器中删除。
Limitations
Configuration
Pulsar Function Archive
每个 Pulsar 函数都由实际存档(例如 jar 文件)表示。存档的路径通过源和汇的 archive
属性以及函数的 jar
属性指定。
以下规则确定路径的“类型”:
-
以
(file|http|https|function|sink|source)://
开头时,路径是 URL -
以
builtin://
开头时,路径是 built-in(指向预先提供的开箱即用型连接器之一) -
否则,路径是 local。
在创建/更新操作期间发生的活动取决于路径“类型”,如下所示:
-
当路径是 URL 时,服务器下载内容
-
当路径是 built-in 时,内容已在服务器上可用
-
当路径是 local 时,内容上传到服务器
Custom functions
可以在 {apache-pulsar-docs}/functions-develop[Pulsar 文档] 中找到有关如何开发和打包自定义函数的详细信息。但从高层面上来说,要求如下:
-
Code uses Java8
-
代码实现
java.util.Function
或org.apache.pulsar.functions.api.Function
-
Packaged as uber jar
函数构建并打包后,有几种方法可以使其可用于函数注册。
Examples
以下是一些示例,演示如何配置 PulsarSource
bean,这将导致 PulsarFunctionAdministration
自动创建支持的 Pulsar 源连接器。
@Bean
PulsarSource rabbitSource() {
Map<String, Object> configs = new HashMap<>();
configs.put("host", "my.rabbit.host");
configs.put("port", 5672);
configs.put("virtualHost", "/");
configs.put("username", "guest");
configs.put("password", "guest");
configs.put("queueName", "test_rabbit");
configs.put("connectionName", "test-connection");
SourceConfig sourceConfig = SourceConfig.builder()
.tenant("public")
.namespace("default")
.name("rabbit-test-source")
.archive("builtin://rabbitmq")
.topicName("incoming_rabbit")
.configs(configs).build();
return new PulsarSource(sourceConfig, null);
}
下一个示例与上一个相同,但它使用 Spring Boot 自动配置的 RabbitProperties
来减轻配置负担。当然,这要求应用程序使用启用了 Rabbit 自动配置的 Spring Boot。
@Bean
PulsarSource rabbitSourceWithBootProps(RabbitProperties props) {
Map<String, Object> configs = new HashMap<>();
configs.put("host", props.determineHost());
configs.put("port", props.determinePort());
configs.put("virtualHost", props.determineVirtualHost());
configs.put("username", props.determineUsername());
configs.put("password", props.determinePassword());
configs.put("queueName", "test_rabbit");
configs.put("connectionName", "test-connection");
SourceConfig sourceConfig = SourceConfig.builder()
.tenant("public")
.namespace("default")
.name("rabbit-test-source")
.archive("builtin://rabbitmq")
.topicName("incoming_rabbit")
.configs(configs).build();
return new PulsarSource(sourceConfig, null);
}
有关更详细的示例,请参阅 Sample Stream Pipeline with Pulsar Functions 示例应用程序 |