Apache Flume 简明教程

Apache Flume - Quick Guide

Apache Flume - Introduction

What is Flume?

Apache Flume 是一种工具/服务/数据收集机制,用于从各种来源收集、聚集和传输大量流数据(例如日志文件、事件等)到集中式数据存储。

Flume 是一款可靠性高、可分布且可配置的工具。它主要设计为将各种 Web 服务器的流数据(日志数据)复制到 HDFS。

apache flume

Applications of Flume

假设一个电子商务 Web 应用程序想要分析特定区域的客户行为。为此,他们需要将可用的日志数据移至 Hadoop 进行分析。这里,Apache Flume 派上用场了。

Flume 用于以更高的速度将应用程序服务器生成日志数据移至 HDFS。

Advantages of Flume

以下是使用 Flume 的优点:

  1. 使用 Apache Flume,可以将数据存储到任何集中式存储(HBase、HDFS)。

  2. 当传入数据的速率超过将数据写入目标的速率时,Flume 充当数据生产者和集中式存储之间的中介,并在它们之间提供稳定的数据流。

  3. Flume 提供了 contextual routing 功能。

  4. Flume 中的事务基于通道,其中每个消息有两笔事务(一个发送方和一个接收方)。这可以确保可靠的消息传递。

  5. Flume 可靠、容错、可缩放、可管理且可自定义。

Features of Flume

以下是 Flume 的一些显着特性−

  1. Flume 可有效地将日志数据从多个网络服务器传入集中存储(HDFS、HBase)。

  2. 使用 Flume,我们可以立即从多个服务器获取数据并传入 Hadoop。

  3. 除了日志文件外,Flume 还用于导入由社交网站(如 Facebook 和 Twitter)和电子商务网站(如 Amazon 和 Flipkart)生成的大量事件数据。

  4. Flume 支持大量源类型和目标类型。

  5. Flume 支持多跳流、多路并入多路并出流、上下文路由等。

  6. Flume 可以横向扩展。

Apache Flume - Data Transfer In Hadoop

如我们所知, Big Data, 是一组无法使用传统计算技术处理的大型数据集。大数据经过分析后可得出有价值的结果。 Hadoop 是一个开源框架,允许使用简单的编程模型通过计算机集群在分布式环境中存储和处理大数据。

Streaming / Log Data

一般而言,要分析的大部分数据都是由各种数据源(如应用程序服务器、社交网站、云服务器和企业服务器)产生的。这些数据将采用 log filesevents 的形式。

Log file − 通常,日志文件是一种列出操作系统中发生的事件/操作的 file 。例如,Web 服务器会在日志文件中列出向服务器发出的每个请求。

收集此类日志数据后,我们可以获取有关以下方面的信息:

  1. 应用程序性能,以及找到各种软件和硬件故障。

  2. 用户行为,以及得出更好的业务见解。

将数据导入 HDFS 系统的传统方法是使用 put 命令。我们来看看如何使用 put 命令。

HDFS put Command

处理日志数据的主要挑战在于将多台服务器产生的这些日志移到 Hadoop 环境中。

Hadoop File System Shell 提供了将数据插入到 Hadoop 并从中读取数据的命令。您可以使用 put 命令将数据插入到 Hadoop 中,如下所示。

$ Hadoop fs –put /path of the required file  /path in HDFS where to save the file

Problem with put Command

我们可以使用 Hadoop 的 put 命令将数据从这些源传输到 HDFS。但它存在以下缺点:

  1. 使用 put 命令时,我们可以在数据生成器生成数据时以更高的速率传输 only one file at a time 。由于对较旧数据进行的分析不够准确,因此我们需要一个实时传输数据的解决方案。

  2. 如果我们使用 put 命令,则需要对数据进行打包并准备上传。由于 Web 服务器会生成持续的数据,因此这是一项非常困难的任务。

我们在这里需要一个解决方案,该解决方案可以克服 put 命令的缺点,并且可以将数据生成器中的“流式数据”传输到集中存储(尤其是 HDFS)中,且延迟较低。

Problem with HDFS

在 HDFS 中,文件以目录项的形式存在,并且在关闭文件之前,文件的长度将被视为零。例如,如果某个源正在向 HDFS 中写入数据,并且网络在操作期间中断(而没有关闭文件),那么写入该文件的数据将丢失。

因此,我们需要一个可靠、可配置且可维护的系统才能将日志数据传输到 HDFS 中。

Note − 在 POSIX 文件系统中,每当我们访问某个文件(比如执行写操作)时,其他程序仍然可以读取此文件(至少是已保存的部分)。这是因为文件在关闭之前已存在于磁盘上。

Available Solutions

要将来自各种来源的流式数据(日志文件、事件等)发送到 HDFS,我们可以使用以下工具:

Facebook’s Scribe

Scribe 是一种非常受欢迎的工具,用于聚合和流式传输日志数据。它的设计可以扩展到极大量的节点,并且对于网络和节点故障具有鲁棒性。

Apache Kafka

Kafka 已由 Apache 软件基金会开发。它是一个开源消息代理。使用 Kafka,我们可以处理高吞吐量、低延迟的信息源。

Apache Flume

Apache Flume 是一个用于收集、聚合和传输大量流式数据(如日志数据、事件等)的工具/服务/数据提取机制,从各种 Web 服务到集中化数据存储。

它是一个高度可靠、可分布和可配置的工具,主要旨在传输来自各种来源的流式数据到 HDFS。

在本教程中,我们将详细讨论如何使用 Flume,并使用一些示例。

Apache Flume - Architecture

下图描述了 Flume 的基本架构。如示意图所示, data generators (如 Facebook、Twitter)会生成数据,这些数据由在这些数据上运行的各个 Flume agents 收集。此后, data collector (也是一个代理)会从代理中收集数据,这些数据会聚合并推送到 HDFS 或 HBase 等集中存储中。

flume architecture

Flume Event

eventFlume 中传输的数据的基本单位。它包含一个字节数组有效负载,该有效负载将随可选标题一起从源传输到目标。典型的 Flume 事件将具有以下结构 −

flume event

Flume Agent

在 Flume 中, agent 是一个独立的守护进程 (JVM)。它从客户端或其他代理接收数据(事件)并将其转发到其下一个目的地(汇或代理)。Flume 可以有多个代理。下图表示 Flume Agent

flume agent1

如该图所示,Flume 代理包含三个主要组件,即 sourcechannelsink

Source

source 是代理的一个组件,它从数据生成器处接收数据并将其作为 Flume 事件的形式传输到一个或多个信道。

Apache Flume 支持多种类型的源,每个源都会从指定的数据生成器接收事件。

Example − Avro 源、Thrift 源、twitter 1% 源等。

Channel

channel 是一个瞬态存储,它从源接收事件并在汇使用它们之前对其进行缓冲。它充当源和汇之间的桥梁。

这些信道是完全事务性的,并且它们可以与任何数量的源和汇协同工作。

Example − JDBC 信道、文件系统信道、内存信道等。

Sink

sink 将数据存储到 HBase 和 HDFS 等集中存储中。它使用来自信道的数据(事件)并将其传递到目的地。汇的目的地可能是另一个代理或集中存储。

Example − HDFS 汇

Note − 一个 Flume 代理可以有多源、多汇、多信道。我们在本教程的 Flume 配置章节中列出了所有受支持的源、汇和信道。

Additional Components of Flume Agent

我们在上面讨论的是代理的基本组件。除此之外,我们还有更多组件在将事件从数据生成器传输到集中存储中起着至关重要的作用。

Interceptors

拦截器用于更改/检查在源和信道之间传输的 Flume 事件。

Channel Selectors

在多个信道的情况下,这些用于确定要选择哪个信道来传输数据。信道选择器有两种类型 −

  1. Default channel selectors − 它们也称为复制信道选择器,它们会复制每个信道中的所有事件。

  2. Multiplexing channel selectors − 它们根据该事件标题中的地址来决定要发送事件的信道。

Sink Processors

这些用于从所选汇组中调用特定的汇。它们用于为您的汇创建故障转移路径,或者在信道中跨多个汇负载平衡事件。

Apache Flume - Data Flow

Flume 是一个用于将日志数据移动到 HDFS 的框架。通常,事件和日志数据由日志服务器生成,并且这些服务器运行着 Flume 代理。这些代理从数据生成器中接收数据。

通过名为“ Collector ”的中间节点收集这些代理中的数据。和代理一样,Flume 中可以有多个收集器。

最后,所有这些收集器中的数据都会被聚合起来并推送到集中存储,例如 HBase 或 HDFS。下图阐明了 Flume 中的数据流。

flume dataflow

Multi-hop Flow

在 Flume 中,可以有多个代理,而且在到达最终目的地之前,一个事件可能会通过多个代理。这称为“ multi-hop flow ”。

Fan-out Flow

从一个源到多个通道的数据流称为“ fan-out flow ”。它有两种类型 −

  1. Replicating − 数据流,其中的数据将在所有配置的通道中复制。

  2. Multiplexing − 数据流,其中的数据将被发送到事件标头中提到的所选通道。

Fan-in Flow

将数据从许多源传输到一个通道的数据流称为“ fan-in flow ”。

Failure Handling

在 Flume 中,每个事件都会进行两个事务:一个在发送方,一个在接收方。发送方将事件发送到接收方。在收到数据后,接收方立即提交它自己的事务,并向发送方发送“已收到”信号。在收到信号后,发送方提交自己的事务。(在收到接收方的信号之前,发送方不会提交其事务。)

Apache Flume - Environment

我们已经在上一章讨论了 Flume 的架构。在本章中,让我们看看如何下载并设置 Apache Flume。

在继续操作之前,您的系统中需要有 Java 环境。所以首先,请确保您的系统中已安装 Java。在本教程中的一些示例中,我们使用了 Hadoop HDFS(作为接收层)。因此,我们建议您和 Java 一起安装 Hadoop。要收集更多信息,请访问以下链接 − https://www.tutorialspoint.com/hadoop/hadoop_enviornment_setup.htm

Installing Flume

首先,从网站 https://flume.apache.org/ 下载最新版本的 Apache Flume 软件。

Step 1

打开该网站。点击主页左侧的 download 链接。它将带您到 Apache Flume 的下载页面。

installing flume

Step 2

在“下载”页面中,您可以看到 Apache Flume 的二进制文件和源文件的链接。点击 apache-flume-1.6.0-bin.tar.gz 链接

您将被重新定向到一个镜像列表,您可以通过点击其中任何一个镜像开始下载。同样,您可以通过点击 apache-flume-1.6.0-src.tar.gz 下载 Apache Flume 的源代码。

Step 3

在安装 HadoopHBase 和其他软件的安装目录所在的同一个目录中创建一个名为 Flume 的目录(如果您已经安装了任何软件),如下所示。

$ mkdir Flume

Step 4

解压已下载的 tar 文件,如下所示。

$ cd Downloads/
$ tar zxvf apache-flume-1.6.0-bin.tar.gz
$ tar zxvf apache-flume-1.6.0-src.tar.gz

Step 5

将 apache- flume-1.6.0-bin.tar 文件的内容移动到前面创建的 Flume 目录中,如下所示。(假设我们在名为 Hadoop 的本地用户中创建了 Flume 目录。)

$ mv apache-flume-1.6.0-bin.tar/* /home/Hadoop/Flume/

Configuring Flume

若要配置 Flume,我们必须修改三个文件,即 flume-env.sh, flumeconf.properties,bash.rc

Setting the Path / Classpath

.bashrc 文件中,为 Flume 设置主页文件夹、路径和类路径,如下所示。

setting the path

conf Folder

如果你打开 Apache Flume 的 conf ,你将有以下四个文件 −

  1. flume-conf.properties.template,

  2. flume-env.sh.template,

  3. flume-env.ps1.template, and

  4. log4j.properties.

conf folder

现在将

  1. flume-conf.properties.template 文件重命名为 flume-conf.properties

  2. flume-env.sh.template as flume-env.sh

flume-env.sh

打开 flume-env.sh 文件并将 JAVA_Home 设置到你在系统里安装 Java 的文件夹。

flume env sh

Verifying the Installation

通过浏览 bin 文件夹并输入以下命令,确认 Apache Flume 的安装。

$ ./flume-ng

如果你已经成功安装 Flume,你将得到一个如图所示的 Flume 帮助提示。

verifying the installation

Apache Flume - Configuration

安装 Flume 后,我们需要使用配置文件对其进行配置,配置文件是一个具有 key-value pairs 的 Java 属性文件。我们需要将值传递到文件中的键。

在 Flume 配置文件中,我们需要 −

  1. 命名当前代理的组件。

  2. Describe/Configure the source.

  3. Describe/Configure the sink.

  4. Describe/Configure the channel.

  5. 将源和汇聚绑定到通道。

通常我们可以在 Flume 中有多个代理。我们可以使用唯一名称来区分每个代理。并使用此名称,我们必须配置每个代理。

Naming the Components

首先,您需要对组件命名/列出组件,例如代理的源、汇聚和通道,如下所示。

agent_name.sources = source_name
agent_name.sinks = sink_name
agent_name.channels = channel_name

Flume 支持各种源、汇聚和通道。它们列在下面的表格中。

Sources

Channels

Sinks

Avro 源Thrift 源Exec 源JMS 源Spooling 目录源Twitter 1% firehose 源Kafka 源NetCat 源Sequence Generator 源Syslog 源Syslog TCP 源Multiport Syslog TCP 源Syslog UDP 源HTTP 源Stress 源Legacy 源Thrift Legacy 源Custom 源Scribe 源

Memory 通道JDBC 通道Kafka 通道File 通道Spillable Memory 通道Pseudo Transaction 通道

HDFS 汇聚Hive 汇聚Logger 汇聚Avro 汇聚Thrift 汇聚IRC 汇聚File Roll 汇聚Null 汇聚HBase 汇聚AsyncHBase 汇聚MorphlineSolr 汇聚ElasticSearch 汇聚Kite Dataset 汇聚Kafka 汇聚

您可以使用其中的任何一个。例如,如果使用 Twitter 源通过内存通道将 Twitter 数据传输到 HDFS 收集器,且代理名称 ID 为 TwitterAgent ,则:

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

列出代理组件后,您必须通过向属性提供值来描述源、收集器和通道。

Describing the Source

每个源将有一个单独的属性列表。名为“type”的属性是每个源的共有属性,用于指定我们正在使用的源类型。

除属性“type”外,还需要提供特定源的所有 required 属性的值才能对其进行配置,如下所示。

agent_name.sources. source_name.type = value
agent_name.sources. source_name.property2 = value
agent_name.sources. source_name.property3 = value

例如,如果我们考虑 twitter source ,则以下属性是我们必须为其提供值才能对其进行配置的属性。

TwitterAgent.sources.Twitter.type = Twitter (type name)
TwitterAgent.sources.Twitter.consumerKey =
TwitterAgent.sources.Twitter.consumerSecret =
TwitterAgent.sources.Twitter.accessToken =
TwitterAgent.sources.Twitter.accessTokenSecret =

Describing the Sink

与源一样,每个收集器都将有一个单独的属性列表。名为“type”的属性是每个收集器的共有属性,用于指定我们正在使用的收集器类型。除属性“type”外,还需要提供特定收集器所有 required 属性的值才能对其进行配置,如下所示。

agent_name.sinks. sink_name.type = value
agent_name.sinks. sink_name.property2 = value
agent_name.sinks. sink_name.property3 = value

例如,如果我们考虑 HDFS sink ,则以下属性是我们必须为其提供值才能对其进行配置的属性。

TwitterAgent.sinks.HDFS.type = hdfs (type name)
TwitterAgent.sinks.HDFS.hdfs.path = HDFS directory’s Path to store the data

Describing the Channel

Flume 提供了多种通道可以在源和收集器之间传输数据。因此,除了源和通道之外,还需要描述代理中使用的通道。

要描述每个通道,您需要设置必需的属性,如下所示。

agent_name.channels.channel_name.type = value
agent_name.channels.channel_name. property2 = value
agent_name.channels.channel_name. property3 = value

例如,如果我们考虑 memory channel ,则以下属性是我们必须为其提供值才能对其进行配置的属性。

TwitterAgent.channels.MemChannel.type = memory (type name)

Binding the Source and the Sink to the Channel

由于通道连接了源和收集器,因此需要将两者都绑定到通道,如下所示。

agent_name.sources.source_name.channels = channel_name
agent_name.sinks.sink_name.channels = channel_name

以下示例展示了如何将源和收集器绑定到通道。在此,我们考虑 twitter source, memory channel,HDFS sink

TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channels = MemChannel

Starting a Flume Agent

配置完成后,我们必须启动 Flume 代理。执行方式如下:

$ bin/flume-ng agent --conf ./conf/ -f conf/twitter.conf
Dflume.root.logger=DEBUG,console -n TwitterAgent

其中:

  1. agent - 启动 Flume 代理的命令

  2. --conf ,-c<conf> - 使用 conf 目录中的配置文件

  3. -f<file> - 如果缺少,指定配置文件路径

  4. --name, -n <name> - twitter 代理的名称

  5. -D property =value - 设置 Java 系统属性值。

Apache Flume - Fetching Twitter Data

使用 Flume,我们可以从各种服务获取数据并将其传输到集中化存储(HDFS 和 HBase)。本章介绍了如何从 Twitter 服务获取数据并使用 Apache Flume 将其存储在 HDFS 中。

如 Flume 体系结构中所述,Web 服务器生成日志数据,而这些数据由 Flume 中的代理收集。通道将这些数据缓冲到一个接收器中,而该接收器最终将其推送到集中化存储。

在本教程中提供的示例中,我们将创建一个应用程序并使用 Apache Flume 提供的实验性 Twitter 源从中获取推文。我们将使用内存通道缓冲这些推文,并使用 HDFS 接收器将这些推文推入 HDFS。

fetch data

要获取 Twitter 数据,我们必须执行以下步骤:

  1. Create a twitter Application

  2. Install / Start HDFS

  3. Configure Flume

Creating a Twitter Application

要从 Twitter 获取推文,则需要创建一个 Twitter 应用程序。请按照以下步骤创建 Twitter 应用程序。

Step 1

要创建 Twitter 应用程序,点击以下链接 https://apps.twitter.com/ 。登录你的 Twitter 账号。你将有一个 Twitter 应用程序管理窗口,你可以在该窗口创建、删除和管理 Twitter 应用程序。

image::https://www.iokays.com/tutorialspoint/apache_flume/_images/application_management_window.jpg [应用程序管理窗口]

Step 2

单击 Create New App 按钮。您将重定向到一个窗口,您将在其中获取一个应用程序表单,您必须在其中填写详细信息才能创建该应用程序。填写网站地址时,请提供完整的 URL 模式,例如 http://example.com.

create an application

Step 3

填入详细信息,完成后接受 Developer Agreement ,单击页面底部的 Create your Twitter application button 。如果一切顺利,将使用给定的详细信息创建一个应用程序,如下所示。

application created

Step 4

您可以在页面底部的 keys and Access Tokens 选项卡下,看到一个名为 Create my access token 的按钮。单击此按钮以生成访问令牌。

key access tokens

Step 5

最后,单击页面右上角的 Test OAuth 按钮。这将转到一个页面,其中显示您的 Consumer key, Consumer secret, Access token,Access token secret 。复制这些详细信息。它们可用于配置 Flume 中的代理。

oauth tool

Starting HDFS

由于我们会将数据存储在 HDFS 中,因此我们需要安装/验证 Hadoop。启动 Hadoop并在其中创建一个文件夹以存储 Flume 数据。在配置 Flume 之前,请执行以下步骤。

Step 1: Install / Verify Hadoop

安装 Hadoop 。如果您的系统中已安装 Hadoop,请使用 Hadoop 版本命令验证安装,如下所示。

$ hadoop version

如果您的系统中包含 Hadoop,并且您已设置路径变量,那么您将获得以下输出:

Hadoop 2.6.0
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r
e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1
Compiled by jenkins on 2014-11-13T21:10Z
Compiled with protoc 2.5.0
From source with checksum 18e43357c8f927c0695f1e9522859d6a
This command was run using /home/Hadoop/hadoop/share/hadoop/common/hadoop-common-2.6.0.jar

Step 2: Starting Hadoop

浏览 Hadoop 的 sbin 目录,按如下所示启动 yarn 和 Hadoop dfs(分布式文件系统)。

cd /$Hadoop_Home/sbin/
$ start-dfs.sh
localhost: starting namenode, logging to
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-namenode-localhost.localdomain.out
localhost: starting datanode, logging to
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-datanode-localhost.localdomain.out
Starting secondary namenodes [0.0.0.0]
starting secondarynamenode, logging to
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-secondarynamenode-localhost.localdomain.out

$ start-yarn.sh
starting yarn daemons
starting resourcemanager, logging to
   /home/Hadoop/hadoop/logs/yarn-Hadoop-resourcemanager-localhost.localdomain.out
localhost: starting nodemanager, logging to
   /home/Hadoop/hadoop/logs/yarn-Hadoop-nodemanager-localhost.localdomain.out

Step 3: Create a Directory in HDFS

在 Hadoop DFS 中,可以使用命令 mkdir 创建目录。浏览它,并在所需路径中创建名称为 twitter_data 的目录,如下所示。

$cd /$Hadoop_Home/bin/
$ hdfs dfs -mkdir hdfs://localhost:9000/user/Hadoop/twitter_data

Configuring Flume

我们必须使用 conf 文件夹中的配置文件配置源、通道和接收器。本章中给出的示例使用 Apache Flume 提供的实验性源,称为 Twitter 1% Firehose 内存通道和 HDFS 接收器。

Twitter 1% Firehose Source

该源具有高度实验性。它使用流式 API 连接到 1% 示例 Twitter Firehose,并连续下载推文,将它们转换为 Avro 格式,并将 Avro 事件发送到下游的 Flume 接收器。

安装 Flume 后,我们默认会得到该源。对应于该源的 jar 文件可以位于 lib 文件夹中,如下所示。

twitter jarfiles

Setting the classpath

classpath 变量设置为 Flume-env.sh 文件中 Flume 的 lib 文件夹,如下所示。

export CLASSPATH=$CLASSPATH:/FLUME_HOME/lib/*

该源需要详细信息,如 Twitter 应用程序的 Consumer key, Consumer secret, Access token,Access token secret 。配置该源时,您必须向以下属性提供值 −

  1. Channels

  2. Source type : org.apache.flume.source.twitter.TwitterSource

  3. consumerKey − OAuth 消费密钥

  4. consumerSecret − OAuth 消费密钥

  5. accessToken − OAuth 访问令牌

  6. accessTokenSecret − OAuth 令牌密钥

  7. maxBatchSize − Twitter 批次中应该存在的 Twitter 消息的最大数量。默认值为 1000(可选)。

  8. maxBatchDurationMillis − 关闭批次前等待的最大毫秒数。默认值为 1000(可选)。

Channel

我们正在使用内存通道。要配置内存通道,您必须向通道类型提供值。

  1. type − 它保存通道的类型。在我们的示例中,类型为 MemChannel

  2. Capacity − 这是存储在通道中的最大事件数。其默认值为 100(可选)。

  3. TransactionCapacity − 这是通道接受或发送的最大事件数。其默认值为 100(可选)。

HDFS Sink

该接收器将数据写入 HDFS。要配置该接收器,您必须提供以下详细信息。

  1. Channel

  2. type − hdfs

  3. hdfs.path − 存储数据的 HDFS 中目录的路径。

我们可以根据场景提供一些可选值。以下是我们在应用程序中配置的 HDFS 接收器的可选属性。

  1. fileType − 这是 HDFS 文件所需的文件格式。 SequenceFile, DataStreamCompressedStream 是此流中可用的三种类型。在我们的示例中,我们使用 DataStream

  2. writeFormat − 可以是文本或可写。

  3. batchSize − 写入 HDFS 文件前写入文件中的事件数量。其默认值为 100。

  4. rollsize − 触发滚动所需的的文件大小。其默认值为 100。

  5. rollCount − 滚动前写入文件中的事件数量。其默认值为 10。

Example – Configuration File

以下是配置文件的一个示例。复制此内容,并将其保存为 Flume conf 文件夹中的 twitter.conf

# Naming the components on the current agent.
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

# Describing/Configuring the source
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.consumerKey = Your OAuth consumer key
TwitterAgent.sources.Twitter.consumerSecret = Your OAuth consumer secret
TwitterAgent.sources.Twitter.accessToken = Your OAuth consumer key access token
TwitterAgent.sources.Twitter.accessTokenSecret = Your OAuth consumer key access token secret
TwitterAgent.sources.Twitter.keywords = tutorials point,java, bigdata, mapreduce, mahout, hbase, nosql

# Describing/Configuring the sink

TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/user/Hadoop/twitter_data/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000

# Describing/Configuring the channel
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100

# Binding the source and sink to the channel
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channel = MemChannel

Execution

浏览 Flume 主目录,并如下所示执行应用程序。

$ cd $FLUME_HOME
$ bin/flume-ng agent --conf ./conf/ -f conf/twitter.conf
Dflume.root.logger=DEBUG,console -n TwitterAgent

如果一切顺利,开始向 HDFS 中流式传输推文。以下是获取推文时的命令提示符窗口的快照。

fetching tweets

Verifying HDFS

可以使用以下 URL 访问 Hadoop 管理 Web UI。

http://localhost:50070/

点击页面右侧名为 Utilities 的下拉菜单。你可以看到两个选项,如下面的快照所示。

verifying the hdfs

单击 Browse the file system ,然后输入你在其中存储推文的 HDFS 目录路径。在我们的示例中,路径为 /user/Hadoop/twitter_data/ 。然后,你可以看到如下所示,存储在 HDFS 中的 Twitter 日志文件列表。

browse file system

Apache Flume - Sequence Generator Source

在上一章,我们已经了解到如何从 twitter 来源获取数据到 HDFS。本章将会讲解如何从 Sequence generator 获取数据。

Prerequisites

要运行本章提供的示例,你需要安装 HDFSFlume 。因此,在继续阅读后续内容之前,请确认 Hadoop 安装并启动 HDFS。(参考上一章来学习如何启动 HDFS)。

Configuring Flume

我们必须使用 conf 文件夹中的配置文件来配置来源、通道和接收器。本章中的示例使用一个 sequence generator source 、一个 memory channel 和一个 HDFS sink

Sequence Generator Source

它是不断产生事件的来源。它维护了一个从 0 开始并按 1 递增的计数器。它用于测试目的。在配置此来源时,你必须为以下属性提供值 −

  1. Channels

  2. Source type − seq

Channel

我们正在使用 memory 通道。若要配置内存通道,您必须为通道类型提供值。以下是配置内存通道时您需要提供的属性列表 −

  1. type − 它保存通道类型。在我们的示例中,类型为 MemChannel。

  2. Capacity − 通道中存储的最大事件数。其默认值为 100。(可选)

  3. TransactionCapacity − 通道接受或发送的最大事件数。它的默认值为 100。(可选)。

HDFS Sink

该接收器将数据写入 HDFS。要配置该接收器,您必须提供以下详细信息。

  1. Channel

  2. type − hdfs

  3. hdfs.path − 存储数据的 HDFS 中目录的路径。

我们可以根据场景提供一些可选值。以下是我们在应用程序中配置的 HDFS 接收器的可选属性。

  1. fileType − 这是 HDFS 文件所需的文件格式。 SequenceFile, DataStreamCompressedStream 是此流中可用的三种类型。在我们的示例中,我们使用 DataStream

  2. writeFormat − 可以是文本或可写。

  3. batchSize − 写入 HDFS 文件前写入文件中的事件数量。其默认值为 100。

  4. rollsize − 触发滚动所需的的文件大小。其默认值为 100。

  5. rollCount − 滚动前写入文件中的事件数量。其默认值为 10。

Example – Configuration File

以下是配置文件的示例。复制此内容并将其保存为 Flume 的 conf 文件夹中的 seq_gen .conf

# Naming the components on the current agent

SeqGenAgent.sources = SeqSource
SeqGenAgent.channels = MemChannel
SeqGenAgent.sinks = HDFS

# Describing/Configuring the source
SeqGenAgent.sources.SeqSource.type = seq

# Describing/Configuring the sink
SeqGenAgent.sinks.HDFS.type = hdfs
SeqGenAgent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/user/Hadoop/seqgen_data/
SeqGenAgent.sinks.HDFS.hdfs.filePrefix = log
SeqGenAgent.sinks.HDFS.hdfs.rollInterval = 0
SeqGenAgent.sinks.HDFS.hdfs.rollCount = 10000
SeqGenAgent.sinks.HDFS.hdfs.fileType = DataStream

# Describing/Configuring the channel
SeqGenAgent.channels.MemChannel.type = memory
SeqGenAgent.channels.MemChannel.capacity = 1000
SeqGenAgent.channels.MemChannel.transactionCapacity = 100

# Binding the source and sink to the channel
SeqGenAgent.sources.SeqSource.channels = MemChannel
SeqGenAgent.sinks.HDFS.channel = MemChannel

Execution

浏览 Flume 主目录,并如下所示执行应用程序。

$ cd $FLUME_HOME
$./bin/flume-ng agent --conf $FLUME_CONF --conf-file $FLUME_CONF/seq_gen.conf
   --name SeqGenAgent

如果一切顺利,来源开始生成序列号,这些序列号将以日志文件形式推送到 HDFS。

以下是命令提示符窗口的截图,该窗口将序列号生成器生成的数据获取到 HDFS。

data generated

Verifying the HDFS

你可以使用以下 URL 访问 Hadoop 管理 Web UI: −

http://localhost:50070/

在页面的右侧点击名为 Utilities 的下拉栏。如以下图表所示,你可以看到两个选项。

verifying the hdfs

点击 Browse the file system 并输入已将序列号生成器生成的数据存储至 HDFS 目录的路径。

在我们的示例中,路径将为 /user/Hadoop/ seqgen_data / 。然后,您可以看到由序列发生器生成的日志文件列表,如下所示,该列表存储在 HDFS 中。

browse file system

Verifying the Contents of the File

所有这些日志文件都按顺序格式包含数字。您可以使用 cat 命令验证文件系统中这些文件的内容,如下所示。

verifying the contents of file

Apache Flume - NetCat Source

本章提供一个示例,说明如何生成事件并在随后将其记录到控制台。为此,我们使用了 NetCat 源和 logger 汇。

Prerequisites

要运行本章中提供的示例,您需要安装 Flume

Configuring Flume

我们必须使用 conf 文件夹中的配置文件配置源、通道和汇。本章中给出的示例使用了 NetCat Source, Memory channellogger sink

NetCat Source

在配置 NetCat 源时,配置源的同时我们必须指定一个端口。现在源(NetCat 源)侦听给定的端口,并将我们输入该端口的每行接收为单个事件,并通过指定通道将其传输到汇。

在配置此源的同时,您必须为以下属性提供值−

  1. channels

  2. Source type − netcat

  3. bind − 要绑定的主机名或 IP 地址。

  4. port − 我们希望源侦听的端口号。

Channel

我们正在使用 memory 通道。若要配置内存通道,您必须为通道类型提供值。以下是配置内存通道时您需要提供的属性列表 −

  1. type − 它保存通道的类型。在我们的示例中,类型为 MemChannel

  2. Capacity − 通道中存储的最大事件数。其默认值为 100。(可选)

  3. TransactionCapacity − 这是通道接受或发送的最大事件数。其默认值为 100。(可选)。

Logger Sink

此汇聚记录传递给它的所有事件。通常,它用于测试或调试目的。若要配置此汇聚,您必须提供以下详细信息。

  1. Channel

  2. type − logger

Example Configuration File

下面给出了配置文件的一个示例。复制此内容并将其保存在 Flume 的 conf 文件夹中的 netcat.conf 中。

# Naming the components on the current agent
NetcatAgent.sources = Netcat
NetcatAgent.channels = MemChannel
NetcatAgent.sinks = LoggerSink

# Describing/Configuring the source
NetcatAgent.sources.Netcat.type = netcat
NetcatAgent.sources.Netcat.bind = localhost
NetcatAgent.sources.Netcat.port = 56565

# Describing/Configuring the sink
NetcatAgent.sinks.LoggerSink.type = logger

# Describing/Configuring the channel
NetcatAgent.channels.MemChannel.type = memory
NetcatAgent.channels.MemChannel.capacity = 1000
NetcatAgent.channels.MemChannel.transactionCapacity = 100

# Bind the source and sink to the channel
NetcatAgent.sources.Netcat.channels = MemChannel
NetcatAgent.sinks. LoggerSink.channel = MemChannel

Execution

浏览 Flume 主目录,并如下所示执行应用程序。

$ cd $FLUME_HOME
$ ./bin/flume-ng agent --conf $FLUME_CONF --conf-file $FLUME_CONF/netcat.conf
   --name NetcatAgent -Dflume.root.logger=INFO,console

如果一切都正常,则源将开始侦听给定的端口。在此情况下,它是 56565 。下面给出了已启动并正在侦听端口 56565 的 NetCat 源的命令提示符窗口的快照。

execution

Passing Data to the Source

若要将数据传递到 NetCat 源,您必须打开配置文件中给出的端口。打开一个单独的终端并使用 curl 命令连接到源 (56565)。当连接成功后,您会收到一条 connected 消息,如下所示。

$ curl telnet://localhost:56565
connected

现在您可以逐行输入您的数据(在每一行后,您必须按 Enter)。NetCat 源将每行作为单独的事件接收,您将收到一条 OK 已接收消息。

每当您完成数据传递时,您都可以按 ( Ctrl+C ) 退出控制台。下面给出了我们使用 curl 命令连接到的源的控制台的快照。

passing data

在上述控制台中输入的每一行都将被源作为单独的事件接收。由于我们使用了 Logger 汇聚,这些事件将被记录到控制台(源控制台),通过指定的通道(在本例中为内存通道)。

以下快照显示了 NetCat 控制台,事件被记录在那里。

netcat console