Apache Nifi 简明教程
Apache NiFi - Introduction
Apache NiFi 是一个功能强大、易于使用且可靠的系统,用于在不同的系统之间处理和分发数据。它基于由 NSA 开发的 Niagara Files 技术,并在此后 8 年后捐赠给 Apache 软件基金会。它根据 Apache 许可证版本 2.0 发行,日期为 2004 年 1 月。Apache NiFi 的最新版本是 1.7.1。
Apache NiFi 是一個实时数据采集平台,可以传输和管理不同来源和目标系统之间的数据传输。它支持多种数据格式,如日志、地理位置数据、社交动态等。它还支持许多协议,如 SFTP、HDFS 和 KAFKA 等。对各种数据源和协议的支持使该平台在许多 IT 组织中广受欢迎。
Apache NiFi- General Features
Apache NiFi 的一般特性如下:
-
Apache NiFi 提供了一个基于 Web 的用户界面,它在设计、控制、反馈和监控之间提供无缝体验。
-
它具有高度的可配置性。这有助于用户确保传递、低延迟、高吞吐量、动态优先级、背压以及在运行时修改流程。
-
它还提供数据来源模块来跟踪和监控数据从流程开始到结束的过程。
-
开发者可以根据他们的需要创建自己的自定义处理器和报告任务。
-
NiFi 还提供对 SSL、HTTPS、SSH 和其他加密等安全协议的支持。
-
它还支持用户和角色管理,也可以通过 LDAP 进行身份验证配置。
Apache NiFi -Key Concepts
Apache NiFi 的主要概念如下 −
-
Process Group − 它是 NiFi 流程组,有助于用户以分层方式管理和保持流程。
-
Flow − 它通过连接不同的处理器创建,以在需要时从一个或多个数据源传输和修改数据到其他目标数据源。
-
Processor − 处理器是一个 Java 模块,负责从源系统获取数据或将其存储在目标系统中。其他处理器还用于添加属性或更改流文件中的内容。
-
Flowfile − 这是 NiFi 的基本用法,它表示从 NiFi 中源系统选取的数据的单个对象。NiFi 处理器在流文件从源处理器移动到目标处理器时对其进行更改。不同的事件,例如创建、克隆、接收等,由流程中的不同处理器对流文件执行。
-
Event − 事件表示在遍历 NiFi 流程时流文件中的更改。这些事件在数据来源中被跟踪。
-
Data provenance − 这是一个存储库。它还具有一个用户界面,使用户能够检查有关流文件的信息,并帮助解决在处理流文件期间出现的任何问题。
Apache NiFi - Basic Concepts
Apache NiFi 由 Web 服务器、流控制器和处理器组成,在 Java 虚拟机上运行。它还有 3 个存储库流文件存储库、内容存储库和来源存储库,如下图所示。
Flowfile Repository
此存储库存储通过 apache NiFi 的数据流的每个流文件的当前状态和属性。此存储库的默认位置是在 apache NiFi 的根目录中。可以通过更改名为“nifi.flowfile.repository.directory”的属性来更改此存储库的位置。
Content Repository
此存储库包含 NiFi 的所有流文件中出现的所有内容。它的默认目录也位于 NiFi 的根目录中,可以使用“org.apache.nifi.controller.repository.FileSystemRepository”属性更改此目录。此目录会在磁盘中使用大量空间,因此建议在安装磁盘中有足够的空间。
Provenance Repository
存储库会跟踪并存储所有在 NiFi 中流动的流文件的所有事件。有两个源存储库 - volatile provenance repository (在该存储库中,所有源数据都会在重启后丢失)和 persistent provenance repository 。它的默认目录也在 NiFi 的根目录中,并且可以使用“org.apache.nifi.provenance.PersistentProvenanceRepository”和“org.apache.nifi.provenance.VolatileProvenanceRepositor”属性为相应的存储库更改它。
Apache NiFi - Environment Setup
在本节中,我们将了解 Apache NiFi 的环境设置。Apache NiFi 的安装步骤如下 −
Step 1 − 在您的计算机中安装当前版本的 Java。请在您的机器中设置 JAVA_HOME。您可以按照如下所示检查版本:
在 Windows 操作系统 (OS)(使用命令提示符)中 −
> java -version
在 UNIX OS (使用终端):
$ echo $JAVA_HOME
Step 2 − 从 https://nifi.apache.org/download.html 下载 Apache NiFi
-
对于 Windows OS 下载 ZIP 文件。
-
对于 UNIX OS 下载 TAR 文件
-
对于 Docker 映像,请转到以下链接 https://hub.docker.com/r/apache/nifi/.
Step 3 ——Apache NiFi 的安装过程非常简单。该过程因操作系统不同而异——
-
Windows OS ——解压 zip 包即会安装 Apache NiFi。
-
UNIX OS ——在任意位置解压 tar 文件即会安装 Logstash。
$tar -xvf nifi-1.6.0-bin.tar.gz
Step 4 ——打开命令提示符,转到 NiFi 的 bin 目录。例如,C:\nifi-1.7.1\bin,然后执行 run-nifi.bat 文件。
C:\nifi-1.7.1\bin>run-nifi.bat
Step 5 ——NiFi UI 需要几分钟启动。用户可以在 NiFi UI 启动后检查 nifi-app.log,然后用户可以输入 http://localhost:8080/nifi/ 来访问 UI。
Apache NiFi - User Interface
Apache 是用户可以通过 Web UI 访问的基于 Web 的平台。NiFi UI 非常具有交互性,并提供有关 NiFi 的广泛信息。如下面的图像所示,用户可以访问以下属性的信息——
-
Active Threads
-
Total queued data
-
Transmitting Remote Process Groups
-
不传输远程进程组
-
Running Components
-
Stopped Components
-
Invalid Components
-
Disabled Components
-
正在更新的版本化进程组
-
本地修改的版本化进程组
-
Stale Versioned Process Groups
-
本地修改且过时的版本化进程组
-
同步失败的版本化进程组
Apache NiFi - Processors
Apache NiFi 处理器是创建数据流的基本模块。每个处理器都有不同的功能,有助于创建输出流文件。下图所示的数据流正在使用 GetFile 处理器从一个目录中获取文件,并使用 PutFile 处理器将其存储在另一个目录中。
GetFile Properties
GetFile 提供多种属性,如下图所示,范围从强制性的属性(如输入目录和文件过滤器)到可选的属性(如路径过滤器和最大文件大小)。用户可以使用这些属性管理文件获取过程。
Apache NiFi - Processors Categorization
在本章中,我们将讨论 Apache NiFi 中的流程分类。
Data Ingestion Processors
数据收集类别下的处理器用于将数据导入到 NiFi 数据流中。这些通常是 Apache NiFi 中任何数据流的起点。该类别下的某些处理器有 GetFile、GetHTTP、GetFTP、GetKAFKA 等。
Routing and Mediation Processors
路由和中介处理器用于根据那些流文件的属性或内容中的信息将流文件路由到不同的处理器或数据流。这些处理器还负责控制 NiFi 数据流。该类别下的某些处理器有 RouteOnAttribute、RouteOnContent、ControlRate、RouteText 等。
Database Access Processors
该数据库访问类别下的处理器能够从数据库中选择或插入数据,或者执行并准备其他 SQL 语句。这些处理器主要使用 Apache NiFi 的数据连接池控制器设置。该类别下的某些处理器有 ExecuteSQL、PutSQL、PutDatabaseRecord、ListDatabaseTables 等。
Attribute Extraction Processors
属性提取处理器负责在 NiFi 数据流中提取、分析、更改流文件属性的处理。该类别下的某些处理器有 UpdateAttribute、EvaluateJSONPath、ExtractText、AttributesToJSON 等。
System Interaction Processors
系统交互处理器用于在任何操作系统中运行进程或命令。这些处理器还以多种语言运行脚本,以便与各种系统进行交互。该类别下的某些处理器有 ExecuteScript、ExecuteProcess、ExecuteGroovyScript、ExecuteStreamCommand 等。
Data Transformation Processors
属于数据转换的处理器能够更改流文件的内容。当用户必须将流文件作为 HTTP 正文发送到 invokeHTTP 处理器调用时,这些处理器通常可以用于完全替换流文件的数据。该类别下的某些处理器有 ReplaceText、JoltTransformJSON 等。
Sending Data Processors
发送数据处理器通常是数据流中的最终处理器。这些处理器负责将数据存储或发送到目标服务器。在成功存储或发送数据后,这些处理器将中断与流文件的关系。该类别下的某些处理器有 PutEmail、PutKafka、PutSFTP、PutFile、PutFTP 等。
Apache NiFi - Processors Relationship
在 Apache NiFi 数据流中,流文件通过通过处理器之间的关系进行验证的连接从一个处理器移动到另一个处理器。每当建立连接时,开发人员都会选择这些处理器之间的一个人或多个关系。
如上图所示,黑框中的复选框就是关系。如果开发人员选中这些复选框,则当关系成功、失败或同时成功和失败时,流文件将在该特定处理器中终止。
Apache NiFi - FlowFile
FlowFile 是 Apache NiFi 中一个基本的处理实体。它包含数据内容和属性,这些内容和属性由 NiFi 处理器用于处理数据。文件内容通常包含从源系统获取的数据。Apache NiFi FlowFile 的最常见属性如下 -
Apache NiFi - Queues
Apache NiFi 数据流连接有一个队列系统,用来处理大量的数据流入。这些队列可以处理非常大量的 FlowFile,以便处理器按顺序对其进行处理。
上图中的队列有 1 个通过成功关系传送的 FlowFile。用户可以通过从下拉列表中选择 List queue 选项来检查 FlowFile。如果发生任何过载或错误,用户还可以通过选择 empty queue 选项清除队列,然后用户可以重新启动流程以再次在数据流中获取那些文件。
队列中 FlowFile 的列表,包含位置、UUID、文件名、文件大小、队列持续时间和关系持续时间。用户可以通过单击 FlowFile 列表第一列中的信息图标来查看 FlowFile 的所有属性和内容。
Apache NiFi - Process Groups
在 Apache NiFi 中,用户可以在不同的流程组中维护不同的数据流。这些组可以基于不同的项目或 Apache NiFi 实例支持的组织。
NiFi 用户界面顶部的菜单中的第四个符号(如上图所示)用于在 NiFi 画布中添加一个流程组。名为“Tutorialspoint.com_ProcessGroup”的流程组包含一个数据流,其中四个处理器当前处于停止阶段,如您在上图中看到的那样。可以分层方式创建流程组,以更好地组织管理流程数据,以便于理解。
在 NiFi UI 的页脚中,您可以查看流程组,还可以返回到用户当前所在的流程组的顶部。
要查看 NiFi 中存在的流程组的完整列表,用户可以使用 NiFi UI 左上方的菜单转到摘要。在摘要中,有一个流程组选项卡,其中列出了所有流程组以及版本状态、已传输/大小、输入/大小、读取/写入、输出/大小等参数,如下图所示。
Apache NiFi - Labels
Apache NiFi 提供标签允许开发者编写有关 NiFi 画布中存在的组件的信息。NiFi UI 顶部菜单最左边的图标用于在 NiFi 画布中添加标签。
开发者可以通过右击标签并从菜单中选择适合的选项来改变标签的颜色和字体大小。
Apache NiFi - Configuration
Apache NiFi 是一个高度可配置的平台。conf 目录中的 nifi.properties 文件
包含大部分配置。
Apache NiFi 常用的属性如下 −
Core properties
此部分包含运行 NiFi 实例所需的属性。
S.No. |
Property name |
Default Value |
description |
1 |
nifi.flow.configuration.file |
./conf/flow.xml.gz |
此属性包含 flow.xml 文件的路径。此文件包含在 NiFi 中创建的所有数据流。 |
2 |
nifi.flow.configuration.archive.enabled |
true |
此属性用于启用或禁用 NiFi 中的归档。 |
3 |
nifi.flow.configuration.archive.dir |
./conf/archive/ |
此属性用于指定归档目录。 |
4 |
nifi.flow.configuration.archive.max.time |
30 days |
这用于指定归档内容的保留时间。 |
5 |
nifi.flow.configuration.archive.max.storage |
500 MB |
它包含归档目录可能增长的最大大小。 |
6 |
nifi.authorizer.configuration.file |
./conf/authorizers.xml |
指定用于用户授权的授权配置器文件。 |
7 |
nifi.login.identity.provider.configuration.file |
./conf/login-identity-providers.xml |
此属性包含登录身份提供商的配置, |
8 |
nifi.templates.directory |
./conf/templates |
此属性用于指定 NiFi 模板将存储到的目录。 |
9 |
nifi.nar.library.directory |
./lib |
此属性包含 NiFi 将用来加载所有使用存在于该 lib 文件夹中的 NAR 文件的组件的库路径。 |
10 |
nifi.nar.working.directory |
./work/nar/ |
一旦 NiFi 处理完 NAR 文件,该目录将存储解压缩后的 NAR 文件。 |
11 |
nifi.documentation.working.directory |
./work/docs/components |
此目录包含所有组件的文档。 |
State Management
这些属性用于存储组件状态,这些状态有助于启动处理,即组件在重新启动后剩余的状态和在下次计划运行时的状态。
S.No. |
Property name |
Default Value |
description |
1 |
nifi.state.management.configuration.file |
./conf/state-management.xml |
此属性包含 state-management.xml 文件的路径。此文件包含 NiFi 实例数据流中存在的所有组件状态。 |
2 |
nifi.state.management.provider.local |
local-provider |
它包含本地状态提供程序的 ID。 |
3 |
nifi.state.management.provider.cluster |
zk-provider |
此属性包含群集范围状态提供程序的 ID。如果 NiFi 未集群,则此项将被忽略,但如果在集群中运行,则必须填入。 |
4 |
nifi.state.management. embedded. zookeeper. start |
false |
该属性指定此 NiFi 实例是否应运行嵌入式 ZooKeeper 服务器。 |
5 |
nifi.state.management. embedded. zookeeper.properties |
./conf/zookeeper.properties |
此属性包含提供 ZooKeeper 属性的文件的路径,如果 <nifi.state.management. embedded. zookeeper. start> 设置为 true,则使用此属性。 |
FlowFile Repository
现在,我们来看一看 FlowFile 存储库的重要说明 –
S.No. |
Property name |
Default Value |
description |
1 |
nifi.flowfile.repository. implementation |
org.apache.nifi. controller. repository. WriteAhead FlowFileRepository |
此属性用于指定将 FlowFile 存储在内存中还是磁盘中。如果用户想把 FlowFile 存储在内存中,则更改为“org.apache.nifi.controller. repository.VolatileFlowFileRepository”。 |
2 |
nifi.flowfile.repository.directory |
./flowfile_repository |
要指定 FlowFile 存储库的目录。 |
Apache NiFi - Administration
Apache NiFi 为 Ambari、Zookeeper 等多个工具提供支持,以用于管理目的。NiFi 还在 nifi.properties 文件中提供了配置,以便为管理员设置 HTTPS 和其他内容。
zookeeper
NiFi 本身不处理群集中的投票过程。这意味着创建群集时,所有节点都是主节点和协调器。因此,Zookeeper 被配置为管理主节点和协调器的投票。nifi.properties 文件包含一些用于设置 Zookeeper 的属性。
S.No. |
Property name |
Default Value |
description |
1 |
nifi.state.management.embedded.zookeeper. properties |
./conf/zookeeper.properties |
要指定 Zookeeper 属性文件的路径和名称。 |
2 |
nifi.zookeeper.connect.string |
empty |
指定 zookeeper 的连接字符串。 |
3 |
nifi.zookeeper.connect.timeout |
3 secs |
使用 NiFi 指定 zookeeper 的连接超时时间。 |
4 |
nifi.zookeeper.session.timeout |
3 secs |
使用 NiFi 指定 zookeeper 的会话超时时间。 |
5 |
nifi.zookeeper.root.node |
/nifi |
指定 zookeeper 的根节点。 |
6 |
nifi.zookeeper.auth.type |
empty |
指定 zookeeper 的身份验证类型。 |
Enable HTTPS
如要通过 HTTPS 使用 NiFi,管理员必须生成密钥库和信任库,并在 nifi.properties 文件中设置一些属性。TLS 工具包可用于生成启用 Apache NiFi 中 HTTPS 所需的所有密钥。
S.No. |
Property name |
Default Value |
description |
1 |
nifi.web.https.port |
empty |
指定 https 端口号。 |
2 |
nifi.web.https.network.interface.default |
empty |
NiFi 中 https 的默认接口。 |
3 |
nifi.security.keystore |
empty |
指定密钥库的路径和文件名。 |
4 |
nifi.security.keystoreType |
empty |
指定密钥库类型,例如 JKS。 |
5 |
nifi.security.keystorePasswd |
empty |
To specify keystore password. |
6 |
nifi.security.truststore |
empty |
指定信任库的路径和文件名。 |
7 |
nifi.security.truststoreType |
empty |
指定信任库类型,例如 JKS。 |
8 |
nifi.security.truststorePasswd |
empty |
To specify truststore password. |
Other properties for administration
管理员还使用其他一些属性来管理 NiFi 及其服务的连续性。
S.No. |
Property name |
Default Value |
description |
1 |
nifi.flowcontroller.graceful.shutdown.period |
10 sec |
指定优雅关闭 NiFi 流控制器的所需时间。 |
2 |
nifi.administrative.yield.duration |
30 sec |
指定 NiFi 的管理生成持续时间。 |
3 |
nifi.authorizer.configuration.file |
./conf/authorizers.xml |
指定授权器配置文件的路径和文件名。 |
4 |
nifi.login.identity.provider.configuration.file |
./conf/login-identity-providers.xml |
指定登录身份提供程序配置文件的路径和文件名。 |
Apache NiFi - Creating Flows
Apache NiFi 提供大量组件,帮助开发人员为任何类型的协议或数据源创建数据流。若要创建流,开发人员可将组件从菜单栏拖动至画布,然后单击并拖动鼠标,将组件彼此连接。
通常,NiFi 在流的开始部分有类似 getfile 的侦听器组件,用于从源系统获取数据。在另一端有类似 putfile 的发送器组件,以及处理数据的中间组件。
例如,让我们创建一个流,该流从一个目录中获取一个空文件,然后在该文件中添加一些文本,并将它放入另一个目录中。
-
首先,将处理器图标拖动到 NiFi 画布中,然后从列表中选择 GetFile 处理器。
-
创建一个输入目录,如 c:\inputdir.
-
右键单击处理器并选择“配置”,然后在“属性”选项卡中添加 Input Directory (c:\inputdir) ,单击“应用”,然后返回画布。
-
将处理器图标拖到画布上,然后从列表中选择 ReplaceText 处理器。
-
右键单击处理器,然后选择配置。在 properties 选项卡中,在 “替换值” 文本框中添加一些文本,例如 “Hello tutorialspoint.com” ,然后单击 “应用”。
-
转到 “设置” 选项卡,勾选右侧的 “错误” 复选框,然后返回到画布。
-
将 GetFIle 处理器连接到 ReplaceText,以建立成功关系。
-
将处理器图标拖到画布上,然后从列表中选择 PutFile 处理器。
-
创建输出目录,例如 c:\outputdir 。
-
右键单击处理器,然后选择 “配置”。在 “属性” 选项卡中,添加 Directory (c:\outputdir) ,然后单击 “应用” 并返回到画布。
-
转到 “设置” 选项卡,勾选右侧的 “错误” 和 “成功” 复选框,然后返回到画布。
-
将 ReplaceText 处理器连接到 PutFile,以建立成功关系。
-
现在开始流程,并在输入目录中添加一个空文件,您将看到它将移动到输出目录,并且该文本将被添加到文件中。
通过遵循以上步骤,开发人员可以选择任何处理器和其他 NiFi 组件,为他们的组织或客户创建合适的流程。
Apache NiFi - Templates
Apache NiFi 提供了模板的概念,可以更轻松地重用和分发 NiFi 流程。这些流程可由其他开发人员或在其他 NiFi 集群中使用。它还有助于 NiFi 开发人员在 GitHub 等存储库中共享他们的工作。
Create Template
让我们为在章节 15 “Apache NiFi - 创建流程” 中创建的流程创建一个模板。
使用 Shift 键选择流程的所有组件,然后单击 NiFi 画布左侧的 “创建模板” 图标。您还可以看到一个工具箱,如下图所示。单击如下图中用蓝色标记的图标 create template 。输入模板的名称。开发人员还可以添加说明,这是可选的。
Download Template
然后转到 NiFi UI 右上角的菜单中的 NiFi 模板选项,如下图所示。
现在,单击您要下载的模板的下载图标(位于列表右侧)。将下载带有模板名称的 XML 文件。
Apache NiFi - API
NiFi 提供了许多 API,可帮助开发人员通过任何其他工具或自定义开发的应用程序来更改并获取 NiFi 的信息。在本教程中,我们将使用谷歌 Chrome 中的 postman 应用程序来讲解一些示例。
若要将 postman 添加到您的 Google Chrome,请访问下面提到的 URL,然后单击添加到 Chrome 按钮。您现在将看到一个添加到您 Google Chrome 的新应用程序。
NiFi rest API 的当前版本是 1.8.0,文档位于下面提到的 URL 中。
以下是使用最多的 NiFi rest API 模块:
-
[role="bare"]http://<nifi url>:<nifi port>/nifi-api/<*api-path*>
-
如果启用了 HTTPS [role="bare"] [role="bare"]https://<nifi url>:<nifi port>/nifi-api/<*api-path*>
S.No. |
API module Name |
api-path |
Description |
1 |
Access |
/access |
向用户进行身份验证,并从 NiFi 获取访问令牌。 |
2 |
Controller |
/controller |
管理集群并创建报告任务。 |
3 |
Controller Services |
/controller-services |
用于管理控制器服务和更新控制器服务引用。 |
4 |
Reporting Tasks |
/reporting-tasks |
To manage reporting tasks. |
5 |
Flow |
/flow |
获取数据流元数据和组件状态,并查询历史记录 |
6 |
Process Groups |
/process-groups |
上传和实例化模板,并创建组件。 |
7 |
Processors |
/processors |
创建和计划处理器,并设置其属性。 |
8 |
Connections |
/connections |
创建一个连接,设置队列优先级,并更新连接目的地 |
9 |
FlowFile Queues |
/flowfile-queues |
查看队列内容、下载流程文件内容,以及清空队列。 |
10 |
Remote Process Groups |
/remote-process-groups |
创建一个远程组并启用传输。 |
11 |
Provenance |
/provenance |
查询来源,并搜索事件谱系。 |
让我们现在考虑一个示例,并在 postman 上运行以获取有关运行的 NiFi 实例的详细信息。
Apache NiFi - Data Provenance
Apache NiFi 会记录和存储有关流中已摄取数据上发生的事件的每个信息。数据来源存储库会存储此信息,并提供界面来搜索此事件信息。既可以访问针对整个 NiFi 层级的数据来源,也可以访问针对处理程序层级的数据来源。
下表列出了 NiFi 数据来源事件列表中各个字段,这些字段包括:
S.No. |
Field Name |
Description |
1 |
Date/Time |
事件的日期和时间。 |
2 |
Type |
事件类型,例如“CREATE”。 |
3 |
FlowFileUuid |
对其执行操作的流程文件的 UUID。 |
4 |
Size |
Size of the flowfile. |
5 |
Component Name |
执行该事件的组件名称。 |
6 |
Component Type |
Type of the component. |
7 |
Show lineage |
最后一列具有显示谱系图标,用于查看流程文件谱系,如下所示。 |
要获取有关该事件的更多信息,用户可以单击 NiFi Data Provenance UI 第一列中显示的信息图标。
nifi.properties 文件中有一些属性用于管理 NiFi Data Provenance 存储库。
S.No. |
Property Name |
Default Value |
Description |
1 |
nifi.provenance.repository.directory.default |
./provenance_repository |
指定 NiFi 数据出处的默认路径。 |
2 |
nifi.provenance.repository.max.storage.time |
24 hours |
指定 NiFi 数据出处的最大保留时间。 |
3 |
nifi.provenance.repository.max.storage.size |
1 GB |
指定 NiFi 数据出处的最大存储空间。 |
4 |
nifi.provenance.repository.rollover.time |
30 secs |
指定 NiFi 数据出处的切换时间。 |
5 |
nifi.provenance.repository.rollover.size |
100 MB |
指定 NiFi 数据出处的切换大小。 |
6 |
nifi.provenance.repository.indexed.fields |
EventType、FlowFileUUID、Filename、ProcessorID、Relationship |
指定用于搜索和编制 NiFi 数据出处索引的字段。 |
Apache NiFi - Monitoring
在 Apache NiFi 中,有多种方式可以监控系统中的不同统计信息,例如错误、内存使用情况、CPU 使用情况、数据流统计信息等。我们将在本教程中讨论最流行的统计信息。
In built Monitoring
在本节中,我们将进一步了解 Apache NiFi 中内置的监控功能。
Bulletin Board
公告栏以实时方式显示 NiFi 处理器生成的最新 ERROR 和 WARNING。要访问公告栏,用户必须转到右边的下拉菜单,然后选择“公告栏”选项。它会自动刷新,用户还可以禁用它。用户还可以通过双击错误导航到实际的处理器。用户也可以通过以下方法过滤公告:
-
by message
-
by name
-
by id
-
by group id
Data provenance UI
要监控在任何特定处理器或整个 NiFi 中发生的事件,用户可以从公告栏的同一菜单访问数据来源。用户还可以通过以下字段过滤数据来源存储库中的事件:
-
by component name
-
by component type
-
by type
NiFi Summary UI
Apache NiFi 摘要也可以从公告栏的同一菜单访问。此 UI 包含有关该特定 NiFi 实例或集群的所有组件的信息。它们可以按名称、类型或 URI 进行筛选。不同的组件类型有不同的选项卡。以下是在 NiFi 摘要 UI 中可以监控的组件:
-
Processors
-
Input ports
-
Output ports
-
Remote process groups
-
Connections
-
Process groups
在此 UI 中,在右下角有一个名为系统诊断的链接,用于检查 JVM 统计信息。
Reporting Tasks
Apache NiFi 提供多项报告任务,以支持外部监控系统,如 Ambari、Grafana 等。开发人员可以创建一个自定义报告任务或配置内置的任务,以便将 NiFi 的指标发送到外部监控系统。下表列出了 NiFi 1.7.1 提供的报告任务。
S.No. |
Reporting Task Name |
Description |
1 |
AmbariReportingTask |
设置 NiFi 的 Ambari 度量标准服务。 |
2 |
ControllerStatusReportingTask |
报告过去 5 分钟内 NiFi 摘要 UI 中的信息。 |
3 |
MonitorDiskUsage |
报告并发出关于特定目录磁盘使用情况的警告。 |
4 |
MonitorMemory |
监视 JVM Java 内存池中使用的 Java 堆大小。 |
5 |
SiteToSiteBulletinReportingTask |
使用 Site to Site 协议报告公告中的错误和警告。 |
6 |
SiteToSiteProvenanceReportingTask |
使用 Site to Site 协议报告 NiFi 数据来源事件。 |
Response
{
"systemDiagnostics": {
"aggregateSnapshot": {
"totalNonHeap": "183.89 MB",
"totalNonHeapBytes": 192819200,
"usedNonHeap": "173.47 MB",
"usedNonHeapBytes": 181894560,
"freeNonHeap": "10.42 MB",
"freeNonHeapBytes": 10924640,
"maxNonHeap": "-1 bytes",
"maxNonHeapBytes": -1,
"totalHeap": "512 MB",
"totalHeapBytes": 536870912,
"usedHeap": "273.37 MB",
"usedHeapBytes": 286652264,
"freeHeap": "238.63 MB",
"freeHeapBytes": 250218648,
"maxHeap": "512 MB",
"maxHeapBytes": 536870912,
"heapUtilization": "53.0%",
"availableProcessors": 4,
"processorLoadAverage": -1,
"totalThreads": 71,
"daemonThreads": 31,
"uptime": "17:30:35.277",
"flowFileRepositoryStorageUsage": {
"freeSpace": "286.93 GB",
"totalSpace": "464.78 GB",
"usedSpace": "177.85 GB",
"freeSpaceBytes": 308090789888,
"totalSpaceBytes": 499057160192,
"usedSpaceBytes": 190966370304,
"utilization": "38.0%"
},
"contentRepositoryStorageUsage": [
{
"identifier": "default",
"freeSpace": "286.93 GB",
"totalSpace": "464.78 GB",
"usedSpace": "177.85 GB",
"freeSpaceBytes": 308090789888,
"totalSpaceBytes": 499057160192,
"usedSpaceBytes": 190966370304,
"utilization": "38.0%"
}
],
"provenanceRepositoryStorageUsage": [
{
"identifier": "default",
"freeSpace": "286.93 GB",
"totalSpace": "464.78 GB",
"usedSpace": "177.85 GB",
"freeSpaceBytes": 308090789888,
"totalSpaceBytes": 499057160192,
"usedSpaceBytes": 190966370304,
"utilization": "38.0%"
}
],
"garbageCollection": [
{
"name": "G1 Young Generation",
"collectionCount": 344,
"collectionTime": "00:00:06.239",
"collectionMillis": 6239
},
{
"name": "G1 Old Generation",
"collectionCount": 0,
"collectionTime": "00:00:00.000",
"collectionMillis": 0
}
],
"statsLastRefreshed": "09:30:20 SGT",
"versionInfo": {
"niFiVersion": "1.7.1",
"javaVendor": "Oracle Corporation",
"javaVersion": "1.8.0_151",
"osName": "Windows 7",
"osVersion": "6.1",
"osArchitecture": "amd64",
"buildTag": "nifi-1.7.1-RC1",
"buildTimestamp": "07/12/2018 12:54:43 SGT"
}
}
}
}
Apache NiFi - Upgrade
在开始升级 Apache NiFi 之前,请阅读发行说明以了解更改和新增功能。用户需要评估这些新增和更改对其当前 NiFi 安装的影响。以下链接可获取 Apache NiFi 新版本的发布说明。
在群集设置中,用户需要升级群集中每个节点的 NiFi 安装。按照以下步骤升级 Apache NiFi。
-
备份您当前 NiFi 或 lib 或任何其他文件夹中存在的自定义 NAR。
-
下载 Apache NiFi 的新版本。以下链接可下载最新 NiFi 版本的源代码和二进制文件。 https://nifi.apache.org/download.html
-
在当前 NiFi 的同一安装目录中创建一个新目录,并解压缩 Apache NiFi 的新版本。
-
优雅地停止 NiFi。首先停止所有处理器,并让数据流中存在的所有流文件得到处理。一旦不再有流文件,停止 NiFi。
-
将 authorizers.xml 的配置从当前 NiFi 安装复制到新版本。
-
从当前版本更新新 NiFi 版本的 bootstrap-notification-services.xml 和 bootstrap.conf 中的值。
-
将来自 logback.xml 的自定义日志添加到新的 NiFi 安装。
-
从当前版本配置 login-identity-providers.xml 中的登录身份提供程序。
-
从当前版本更新新 NiFi 安装的所有 nifi.properties 属性。
-
请确保新版本的用户和组与当前版本相同,以避免任何权限拒绝错误。
-
将配置从当前版本的 state-management.xml 复制到新版本。
-
将以下目录的内容从 NiFi 安装的当前版本复制到新版本中的相同目录。../conf/flow.xml.gz,还包括存档目录中的 flow.xml.gz。对于来源和内容存储库,将 nifi.properties 文件中的值更改为当前存储库。如果指定了任何其他外部目录,请从 ./state/local 复制状态或在 nifi.properties 中更改状态。
-
重新检查所执行的所有更改,并检查它们是否对新 NiFi 版本中添加的任何新更改产生了影响。如果有任何影响,请检查解决方案。
-
启动所有 NiFi 节点,并验证所有流程是否正常工作,并且存储库是否存储数据,UI 是否正在检索数据而不出现任何错误。
-
一段时间内监控公告,以检查是否有任何新的错误。
-
如果新版本工作正常,则当前版本可以从目录存档和删除。
Apache NiFi - Remote Process Group
Apache NiFi 远程进程组或 RPG 允许流程使用站点到站点协议将流程中的 FlowFile 直接到不同的 NiFi 实例。在 1.7.1 版本中,NiFi 不提供平衡关系,因此 RPG 用于在 NiFi 数据流中实现负载均衡。
开发者可以从 NiFi UI 的顶部工具栏添加 RPG,方法是将如图所示的图标拖到画布上。要配置 RPG,开发者必须添加以下字段:
S.No. |
Field Name |
Description |
1 |
URLs |
指定以逗号分隔的远程目标 NiFI URL。 |
2 |
Transport Protocol |
指定远程 NiFi 实例的传输协议。可以是 RAW 或 HTTP。 |
3 |
Local Network Interface |
指定用于发送/接收数据的本地网络接口。 |
4 |
HTTP Proxy Server Hostname |
指定代理服务器的主机名以用于 RPG 中的传输。 |
5 |
HTTP Proxy Server Port |
指定代理服务器的端口以用于 RPG 中的传输。 |
6 |
HTTP Proxy User |
这是一个可选字段,用于指定 HTTP 代理的用户名。 |
7 |
HTTP Proxy Password |
这是一个可选字段,用于指定上述用户名的密码。 |
开发者需要在像使用处理器之前启用它。
Apache NiFi - Controller Settings
Apache NiFi 提供共享服务,这些服务可以由处理器共享,而报告任务称为控制器设置。这些服务类似于数据库连接池,可供访问同一数据库的处理器使用。
要访问控制器设置,请使用 NiFi UI 右上角的下拉菜单,如下图所示。
Apache NiFi 提供了许多控制器设置,我们将讨论一个常用的设置,以及如何将其设置为 NiFi。
DBCPConnectionPool
单击“控制器设置”选项后,在 Nifi 设置页面中添加加号。然后从控制器设置列表中选择 DBCPConnectionPool。DBCPConnectionPool 将会添加到主 NiFi 设置页面中,如下图所示。
它包含以下关于控制器 setting:Name 的信息
-
Type
-
Bundle
-
State
-
Scope
-
Configure and delete icon
单击配置图标并填写所需字段。下表列出了这些字段 -
S.No. |
Field Name |
Default value |
description |
1 |
Database Connection URL |
empty |
指定到数据库的连接 URL。 |
2 |
Database Driver Class Name |
empty |
指定数据库的驱动程序类名,例如 mysql 的 com.mysql.jdbc.Driver。 |
3 |
Max Wait Time |
500 millis |
指定等待从连接获取数据库数据的时间。 |
4 |
Max Total Connections |
8 |
指定数据库连接池中分配的最大连接数。 |
要停止或配置控制器设置,首先应停止所有已连接的 NiFi 组件。NiFi 还在控制器设置中添加范围,以管理其配置。因此,只有共享相同设置的组件不会受到影响,并将使用相同的控制器设置。
Apache NiFi - Reporting Task
Apache NiFi 报告任务类似于控制器服务,它们在后台运行并发送或记录 NiFi 实例的统计数据。还可以在与控制器设置相同的页面从 NiFi 报告任务进行访问,但位于不同的选项卡中。
若要添加报告任务,开发人员需要单击报告任务页面右上角的加号按钮。这些报告任务主要用于监视 NiFi 实例的活动,无论是在公告中还是在来源中。主要而言,这些报告任务使用“站点对站点”将 NiFi 统计数据传输到其他节点或外部系统。
现在,让我们添加配置的报告任务以加深了解。
Apache NiFi - Custom Processor
Apache NiFi 是一个开源平台,为开发者提供了在 NiFi 库中添加其定制处理器的选项。按照这些步骤创建自定义处理器。
-
从下面给出的链接下载 Maven 最新版本。 https://maven.apache.org/download.cgi
-
添加一个环境变量名为 M2_HOME,并将其值设置为 Maven 的安装目录。
-
从以下链接下载 Eclipse IDE。 https://www.eclipse.org/downloads/download.php
-
打开命令提示符并执行 Maven Archetype 命令。
> mvn archetype:generate
-
在原型项目中搜索 nifi 类型。
-
Select org.apache.nifi:nifi-processor-bundle-archetype project.
-
然后从版本列表中选择最新版本,针对本教程,即 1.7.1。
-
输入groupId、artifactId、version、package和artifactBaseName等。
-
然后将创建一个具有两个目录的maven项目。nifi-<artifactBaseName>-processors nifi-<artifactBaseName>-nar
-
在nifi-<artifactBaseName>-processors目录中运行以下命令,以在eclipse中添加该项目。
mvn install eclipse:eclipse
-
打开 Eclipse,然后从文件菜单中选择导入。
-
然后选择“将现有项目添加到工作空间”,并从eclipse中的nifi-<artifactBaseName>-processors目录添加该项目。
-
在 public void onTrigger(ProcessContext context, ProcessSession session) 函数中添加你的代码,该函数在计划运行处理器时运行。
-
然后通过运行下面提到的命令将代码打包到NAR文件中。
mvn clean install
-
将在nifi—nar/target目录中创建一个NAR文件。
-
将NAR文件复制到Apache NiFi的lib文件夹并重新启动NiFi。
-
在 NiFi 成功重新启动后,检查处理器列表以查找新的自定义处理器。
-
对于任何错误,请检查./logs/nifi.log文件。
Apache NiFi - Custom Controllers Service
Apache NiFi 是一个开源平台,它为开发人员提供了在 Apache NiFi 中添加自定义控制器服务的选项。该步骤和工具与用于创建自定义处理器的步骤和工具几乎相同。
-
打开命令提示符并执行 Maven Archetype 命令。
> mvn archetype:generate
-
在原型项目中搜索 nifi 类型。
-
Select org.apache.nifi:nifi-service-bundle-archetype project.
-
然后从版本列表中,选择最新的版本 - 本教程是 1.7.1。
-
输入 groupId、artifactId、version、package 和 artifactBaseName 等。
-
创建一个具有目录的 maven 项目。nifi-<artifactBaseName>nifi-<artifactBaseName>-narnifi-<artifactBaseName>-apinifi-<artifactBaseName>-api-nar
-
在 nifi-<artifactBaseName> 和 nifi-<artifactBaseName>-api 目录中运行以下命令,以将这两个项目添加到 Eclipse 中。mvn install eclipse:eclipse
-
打开 Eclipse,然后从文件菜单中选择导入。
-
然后,选择“将现有项目导入到工作区”,并从 Eclipse 中的 nifi-<artifactBaseName> 和 nifi-<artifactBaseName>-api 目录添加项目。
-
在源文件中添加您的代码。
-
然后通过运行以下命令将代码打包到 NAR 文件中。mvn clean install
-
将创建两个 NAR 文件,分别位于每个 nifi-<artifactBaseName>/target 和 nifi-<artifactBaseName>-api/target 目录中。
-
将这些 NAR 文件复制到 Apache NiFi 的 lib 文件夹中,然后重新启动 NiFi。
-
在 NiFi 成功重新启动后,检查处理器列表以查找新的自定义处理器。
-
对于任何错误,请检查 ./logs/nifi.log 文件。
Apache NiFi - Logging
Apache NiFi使用logback库来处理其日志记录。NiFi的conf目录中有一个文件logback.xml,用于配置NiFi中的日志记录。日志记录在NiFi的logs文件夹中生成,日志文件如下所述。
nifi-app.log
这是nifi的主要日志文件,记录了apache NiFi应用程序的所有活动,从NAR文件加载到NiFi组件遇到的运行时错误或公告。下面是 logback.xml 文件中 nifi-app.log 文件的默认附加程序。
<appender name="APP_FILE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${org.apache.nifi.bootstrap.config.log.dir}/nifi-app.log</file>
<rollingPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>
${org.apache.nifi.bootstrap.config.log.dir}/
nifi-app_%d{yyyy-MM-dd_HH}.%i.log
</fileNamePattern>
<maxFileSize>100MB</maxFileSize>
<maxHistory>30</maxHistory>
</rollingPolicy>
<immediateFlush>true</immediateFlush>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%date %level [%thread] %logger{40} %msg%n</pattern>
</encoder>
</appender>
附加程序名称为APP_FILE,类为RollingFileAppender,这意味着logger正在使用回滚策略。默认情况下,最大文件大小为100 MB,可更改为所需大小。APP_FILE的最大保留时间为30个日志文件,可以根据用户要求进行更改。
nifi-user.log
此日志包含用户事件,如Web安全、Web API配置、用户授权等。下面是在logback.xml文件中nifi-user.log的附加程序。
<appender name="USER_FILE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${org.apache.nifi.bootstrap.config.log.dir}/nifi-user.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>
${org.apache.nifi.bootstrap.config.log.dir}/
nifi-user_%d.log
</fileNamePattern>
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%date %level [%thread] %logger{40} %msg%n</pattern>
</encoder>
</appender>
附加程序名称为USER_FILE。它遵循轮换策略。USER_FILE的最大保留时间是30个日志文件。下面是nifi-user.log中存在的USER_FILE附加程序的默认记录器。
<logger name="org.apache.nifi.web.security" level="INFO" additivity="false">
<appender-ref ref="USER_FILE"/>
</logger>
<logger name="org.apache.nifi.web.api.config" level="INFO" additivity="false">
<appender-ref ref="USER_FILE"/>
</logger>
<logger name="org.apache.nifi.authorization" level="INFO" additivity="false">
<appender-ref ref="USER_FILE"/>
</logger>
<logger name="org.apache.nifi.cluster.authorization" level="INFO" additivity="false">
<appender-ref ref="USER_FILE"/>
</logger>
<logger name="org.apache.nifi.web.filter.RequestLogger" level="INFO" additivity="false">
<appender-ref ref="USER_FILE"/>
</logger>
nifi-bootstrap.log
此日志包含自举日志、apache NiFi的标准输出(所有主要用于调试在代码中编写的system.out)和标准错误(所有在代码中编写的system.err)。下面是logback.log中nifi-bootstrap.log的默认附加程序。
<appender name="BOOTSTRAP_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${org.apache.nifi.bootstrap.config.log.dir}/nifi-bootstrap.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>
${org.apache.nifi.bootstrap.config.log.dir}/nifi-bootstrap_%d.log
</fileNamePattern>
<maxHistory>5</maxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%date %level [%thread] %logger{40} %msg%n</pattern>
</encoder>
</appender>
nifi-bootstrap.log文件中的附加程序名称为BOOTSTRAP_FILE,它也遵循回滚策略。BOOTSTRAP_FILE附加程序的最大保留时间是5个日志文件。下面是nifi-bootstrap.log文件的默认记录器。
<logger name="org.apache.nifi.bootstrap" level="INFO" additivity="false">
<appender-ref ref="BOOTSTRAP_FILE" />
</logger>
<logger name="org.apache.nifi.bootstrap.Command" level="INFO" additivity="false">
<appender-ref ref="CONSOLE" />
<appender-ref ref="BOOTSTRAP_FILE" />
</logger>
<logger name="org.apache.nifi.StdOut" level="INFO" additivity="false">
<appender-ref ref="BOOTSTRAP_FILE" />
</logger>
<logger name="org.apache.nifi.StdErr" level="ERROR" additivity="false">
<appender-ref ref="BOOTSTRAP_FILE" />
</logger>