Pulsar Functions
Spring for Apache Pulsar 为 {apache-pulsar-io-docs}/[Pulsar IO](连接器)和 {apache-pulsar-function-docs}[Pulsar 函数] 提供基本支持,它们允许用户定义包含 sources
、`processors`和 `sinks`的流处理管道。`sources`和 `sinks`由 _Pulsar IO (connectors)_建模,`processors`由 _Pulsar Functions_表示。
Spring for Apache Pulsar provides basic suppport for {apache-pulsar-io-docs}/[Pulsar IO] (connectors) and {apache-pulsar-function-docs}[Pulsar Functions] which allow users to define stream processing pipelines made up of sources
, processors
, and sinks
.
The sources
and sinks
are modeled by Pulsar IO (connectors) and the processors
are represented by Pulsar Functions.
由于连接器仅仅是特殊功能,为了简单起见,我们将源、接收器和函数统称为“Pulsar Functions”。 |
Because connectors are just special functions, and for simplicity, we refer to sources, sinks and functions collectively as "Pulsar Functions". |
熟悉 - 预计受众对Pulsar IO和Pulsar Functions有一定程度的了解。如果不是这种情况,查看其入门指南可能会有所帮助。
Familiarity - the audience is expected to be somewhat familiar w/ Pulsar IO and Pulsar Functions. If that is not the case it may be helpful to see their getting started guides.
启用的功能 - 要使用这些功能,必须启用并配置 Apache Pulsar 中的功能支持(默认情况下禁用)。内置连接器也可能需要安装在 Pulsar 集群上。
Feature enabled - to use these features the functions support in Apache Pulsar must be enabled and configured (it is disabled by default). The built-in connectors may also need to be installed on the Pulsar cluster.
请参阅 {apache-pulsar-io-docs}/[Pulsar IO] 和 {apache-pulsar-function-docs}[Pulsar 函数] 文档了解更多详细信息。
See the {apache-pulsar-io-docs}/[Pulsar IO] and {apache-pulsar-function-docs}[Pulsar Functions] docs for more details.
Pulsar Function Administration
该框架提供了 PulsarFunctionAdministration
组件来管理 Pulsar 函数。当你使用 Pulsar Spring Boot starter 时,你会得到自动配置的 PulsarFunctionAdministration
。
The framework provides the PulsarFunctionAdministration
component to manage Pulsar functions.
When you use the Pulsar Spring Boot starter, you get the PulsarFunctionAdministration
auto-configured.
默认情况下,应用程序会尝试连接到 http://localhost:8080
上的本地 Pulsar 实例。但是,因为它利用已经配置的 PulsarAdministration
,有关可用的客户端选项(包括身份验证),请参阅 Pulsar Admin Client。通过 {spring-boot-pulsar-config-props}[spring.pulsar.function.*
] 应用程序属性提供更多配置选项。
By default, the application tries to connect to a local Pulsar instance at http://localhost:8080
.
However, because it leverages the already configured PulsarAdministration
, see Pulsar Admin Client for available client options (including authentication).
Additional configuration options are available with the {spring-boot-pulsar-config-props}[spring.pulsar.function.*
] application properties.
Automatic Function Management
在应用程序启动时,框架将在应用程序上下文中找到所有 PulsarFunction
、PulsarSink
和 PulsarSource
bean。对于每个 bean,将创建或更新对应的 Pulsar 函数。正确的 API 是基于函数类型、函数配置以及函数是否已存在的。
On application startup, the framework finds all PulsarFunction
, PulsarSink
, and PulsarSource
beans in the application context.
For each bean, the corresponding Pulsar function is either created or updated.
The proper API is called based on function type, function config, and whether the function already exists.
|
The |
在应用程序关闭时,将在应用程序启动期间处理的所有函数执行其停止策略,并且将其保留、停止或从 Pulsar 服务器中删除。
On application shutdown, all functions that were processed during application startup have their stop policy enforced and are either left alone, stopped, or deleted from the Pulsar server.
Limitations
No Magic Pulsar Functions
Pulsar 函数和自定义连接器由自定义应用程序代码(例如 java.util.Function
)表示。没有自动注册自定义代码的魔术支持。虽然这很神奇,但它有一些技术挑战,尚未实现。因此,由用户来确保函数(或自定义连接器)在函数配置中指定的位置可用。例如,如果函数配置的 jar
值为 ./some/path/MyFunction.jar
,则函数 jar 文件必须存在于指定路径中。
Pulsar functions and custom connectors are represented by custom application code (eg. a java.util.Function
).
There is no magic support to automatically register the custom code.
While this would be amazing, it has some technical challenges and not yet been implemented.
As such, it is up to the user to ensure the function (or custom connector) is available at the location specified in the function config.
For example, if the function config has a jar
value of ./some/path/MyFunction.jar
then the function jar file must exist at the specified path.
Name Identifier
函数配置中的 name
属性用作标识符,以确定函数是否已存在以便决定执行更新操作还是创建操作。因此,如果需要函数更新,则不应修改名称。
The name
property from the function config is used as the identifier to determine if a function already exists in order to decide if an update or create operation is performed.
As such, the name should not be modified if function updates are desired.
Configuration
Pulsar Function Archive
每个 Pulsar 函数都由实际存档(例如 jar 文件)表示。存档的路径通过源和汇的 archive
属性以及函数的 jar
属性指定。
Each Pulsar function is represented by an actual archive (eg. jar file).
The path to the archive is specified via the archive
property for sources and sinks, and the jar
property for functions.
以下规则确定路径的“类型”:
The following rules determine the "type" of path:
-
The path is a URL when it starts w/
(file|http|https|function|sink|source)://
-
The path is built-in when it starts w/
builtin://
(points to one of the provided out-of-the-box connectors) -
The path is local otherwise.
在创建/更新操作期间发生的活动取决于路径“类型”,如下所示:
The action that occurs during the create/update operation is dependent on path "type" as follows:
-
When the path is a URL the content is downloaded by the server
-
When the path is built-in the content is already available on the server
-
When the path is local the content is uploaded to the server
Built-in Source and Sinks
Apache Pulsar 提供了许多开箱即用的源和汇连接器,又称内置连接器。要使用内置连接器,只需将 archive
设置为 builtin://<connector-type>
(例如 builtin://rabbit
)。
Apache Pulsar provides many source and sink connectors out-of-the-box, aka built-in connectors. To use a built-in connector simply set the archive
to builtin://<connector-type>
(eg builtin://rabbit
).
Custom functions
可以在 {apache-pulsar-docs}/functions-develop[Pulsar 文档] 中找到有关如何开发和打包自定义函数的详细信息。但从高层面上来说,要求如下:
The details on how to develop and package custom functions can be found in the {apache-pulsar-docs}/functions-develop[Pulsar docs]. However, at a high-level, the requirements are as follows:
-
Code uses Java8
-
Code implements either
java.util.Function
ororg.apache.pulsar.functions.api.Function
-
Packaged as uber jar
函数构建并打包后,有几种方法可以使其可用于函数注册。
Once the function is built and packaged, there are several ways to make it available for function registration.
file://
可以将 jar 文件上传到服务器,然后通过函数配置的 jar
属性中的 file://
引用它。
The jar file can be uploaded to the server and then referenced via file://
in the jar
property of the function config
local
jar 文件可以保持为本地文件,然后通过函数配置的 jar
属性中的本地路径引用它。
The jar file can remain local and then referenced via the local path in the jar
property of the function config.
Examples
以下是一些示例,演示如何配置 PulsarSource
bean,这将导致 PulsarFunctionAdministration
自动创建支持的 Pulsar 源连接器。
Here are some examples that show how to configure a PulsarSource
bean which results in the PulsarFunctionAdministration
auto-creating the backing Pulsar source connector.
@Bean
PulsarSource rabbitSource() {
Map<String, Object> configs = new HashMap<>();
configs.put("host", "my.rabbit.host");
configs.put("port", 5672);
configs.put("virtualHost", "/");
configs.put("username", "guest");
configs.put("password", "guest");
configs.put("queueName", "test_rabbit");
configs.put("connectionName", "test-connection");
SourceConfig sourceConfig = SourceConfig.builder()
.tenant("public")
.namespace("default")
.name("rabbit-test-source")
.archive("builtin://rabbitmq")
.topicName("incoming_rabbit")
.configs(configs).build();
return new PulsarSource(sourceConfig, null);
}
下一个示例与上一个相同,但它使用 Spring Boot 自动配置的 RabbitProperties
来减轻配置负担。当然,这要求应用程序使用启用了 Rabbit 自动配置的 Spring Boot。
This next example is the same as the previous, except that it uses the Spring Boot auto-configured RabbitProperties
to ease the configuration burden. This of course requires the application to be using Spring Boot with Rabbit auto-configuration enabled.
@Bean
PulsarSource rabbitSourceWithBootProps(RabbitProperties props) {
Map<String, Object> configs = new HashMap<>();
configs.put("host", props.determineHost());
configs.put("port", props.determinePort());
configs.put("virtualHost", props.determineVirtualHost());
configs.put("username", props.determineUsername());
configs.put("password", props.determinePassword());
configs.put("queueName", "test_rabbit");
configs.put("connectionName", "test-connection");
SourceConfig sourceConfig = SourceConfig.builder()
.tenant("public")
.namespace("default")
.name("rabbit-test-source")
.archive("builtin://rabbitmq")
.topicName("incoming_rabbit")
.configs(configs).build();
return new PulsarSource(sourceConfig, null);
}
有关更详细的示例,请参阅 Sample Stream Pipeline with Pulsar Functions 示例应用程序 |
For a more elaborate example see the Sample Stream Pipeline with Pulsar Functions sample app |