Apache Flume 简明教程

Apache Flume - Fetching Twitter Data

使用 Flume,我们可以从各种服务获取数据并将其传输到集中化存储(HDFS 和 HBase)。本章介绍了如何从 Twitter 服务获取数据并使用 Apache Flume 将其存储在 HDFS 中。

如 Flume 体系结构中所述,Web 服务器生成日志数据,而这些数据由 Flume 中的代理收集。通道将这些数据缓冲到一个接收器中,而该接收器最终将其推送到集中化存储。

在本教程中提供的示例中,我们将创建一个应用程序并使用 Apache Flume 提供的实验性 Twitter 源从中获取推文。我们将使用内存通道缓冲这些推文,并使用 HDFS 接收器将这些推文推入 HDFS。

fetch data

要获取 Twitter 数据,我们必须执行以下步骤:

  1. Create a twitter Application

  2. Install / Start HDFS

  3. Configure Flume

Creating a Twitter Application

要从 Twitter 获取推文,则需要创建一个 Twitter 应用程序。请按照以下步骤创建 Twitter 应用程序。

Step 1

要创建 Twitter 应用程序,请单击以下链接 https://apps.twitter.com/ 。登录到您的 Twitter 账户。您将拥有一个 Twitter 应用程序管理窗口,您可以在其中创建、删除和管理 Twitter 应用程序。

image::https://www.iokays.com/tutorialspoint/apache_flume/_images/application_management_window.jpg [应用程序管理窗口]

Step 2

单击 Create New App 按钮。您将重定向到一个窗口,您将在其中获取一个应用程序表单,您必须在其中填写详细信息才能创建该应用程序。填写网站地址时,请提供完整的 URL 模式,例如 http://example.com.

create an application

Step 3

填入详细信息,完成后接受 Developer Agreement ,单击页面底部的 Create your Twitter application button 。如果一切顺利,将使用给定的详细信息创建一个应用程序,如下所示。

application created

Step 4

您可以在页面底部的 keys and Access Tokens 选项卡下,看到一个名为 Create my access token 的按钮。单击此按钮以生成访问令牌。

key access tokens

Step 5

最后,单击页面右上角的 Test OAuth 按钮。这将转到一个页面,其中显示您的 Consumer key, Consumer secret, Access token,Access token secret 。复制这些详细信息。它们可用于配置 Flume 中的代理。

oauth tool

Starting HDFS

由于我们会将数据存储在 HDFS 中,因此我们需要安装/验证 Hadoop。启动 Hadoop并在其中创建一个文件夹以存储 Flume 数据。在配置 Flume 之前,请执行以下步骤。

Step 1: Install / Verify Hadoop

安装 Hadoop 。如果您的系统中已安装 Hadoop,请使用 Hadoop 版本命令验证安装,如下所示。

$ hadoop version

如果您的系统中包含 Hadoop,并且您已设置路径变量,那么您将获得以下输出:

Hadoop 2.6.0
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r
e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1
Compiled by jenkins on 2014-11-13T21:10Z
Compiled with protoc 2.5.0
From source with checksum 18e43357c8f927c0695f1e9522859d6a
This command was run using /home/Hadoop/hadoop/share/hadoop/common/hadoop-common-2.6.0.jar

Step 2: Starting Hadoop

浏览 Hadoop 的 sbin 目录,按如下所示启动 yarn 和 Hadoop dfs(分布式文件系统)。

cd /$Hadoop_Home/sbin/
$ start-dfs.sh
localhost: starting namenode, logging to
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-namenode-localhost.localdomain.out
localhost: starting datanode, logging to
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-datanode-localhost.localdomain.out
Starting secondary namenodes [0.0.0.0]
starting secondarynamenode, logging to
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-secondarynamenode-localhost.localdomain.out

$ start-yarn.sh
starting yarn daemons
starting resourcemanager, logging to
   /home/Hadoop/hadoop/logs/yarn-Hadoop-resourcemanager-localhost.localdomain.out
localhost: starting nodemanager, logging to
   /home/Hadoop/hadoop/logs/yarn-Hadoop-nodemanager-localhost.localdomain.out

Step 3: Create a Directory in HDFS

在 Hadoop DFS 中,可以使用命令 mkdir 创建目录。浏览它,并在所需路径中创建名称为 twitter_data 的目录,如下所示。

$cd /$Hadoop_Home/bin/
$ hdfs dfs -mkdir hdfs://localhost:9000/user/Hadoop/twitter_data

Configuring Flume

我们必须使用 conf 文件夹中的配置文件配置源、通道和接收器。本章中给出的示例使用 Apache Flume 提供的实验性源,称为 Twitter 1% Firehose 内存通道和 HDFS 接收器。

Twitter 1% Firehose Source

该源具有高度实验性。它使用流式 API 连接到 1% 示例 Twitter Firehose,并连续下载推文,将它们转换为 Avro 格式,并将 Avro 事件发送到下游的 Flume 接收器。

安装 Flume 后,我们默认会得到该源。对应于该源的 jar 文件可以位于 lib 文件夹中,如下所示。

twitter jarfiles

Setting the classpath

classpath 变量设置为 Flume-env.sh 文件中 Flume 的 lib 文件夹,如下所示。

export CLASSPATH=$CLASSPATH:/FLUME_HOME/lib/*

该源需要详细信息,如 Twitter 应用程序的 Consumer key, Consumer secret, Access token,Access token secret 。配置该源时,您必须向以下属性提供值 −

  1. Channels

  2. Source type : org.apache.flume.source.twitter.TwitterSource

  3. consumerKey − OAuth 消费密钥

  4. consumerSecret − OAuth 消费密钥

  5. accessToken − OAuth 访问令牌

  6. accessTokenSecret − OAuth 令牌密钥

  7. maxBatchSize − Twitter 批次中应该存在的 Twitter 消息的最大数量。默认值为 1000(可选)。

  8. maxBatchDurationMillis − 关闭批次前等待的最大毫秒数。默认值为 1000(可选)。

Channel

我们正在使用内存通道。要配置内存通道,您必须向通道类型提供值。

  1. type − 它保存通道的类型。在我们的示例中,类型为 MemChannel

  2. Capacity − 这是存储在通道中的最大事件数。其默认值为 100(可选)。

  3. TransactionCapacity − 这是通道接受或发送的最大事件数。其默认值为 100(可选)。

HDFS Sink

该接收器将数据写入 HDFS。要配置该接收器,您必须提供以下详细信息。

  1. Channel

  2. type − hdfs

  3. hdfs.path − 存储数据的 HDFS 中目录的路径。

我们可以根据场景提供一些可选值。以下是我们在应用程序中配置的 HDFS 接收器的可选属性。

  1. fileType − 这是 HDFS 文件所需的文件格式。 SequenceFile, DataStreamCompressedStream 是此流中可用的三种类型。在我们的示例中,我们使用 DataStream

  2. writeFormat − 可以是文本或可写。

  3. batchSize − 写入 HDFS 文件前写入文件中的事件数量。其默认值为 100。

  4. rollsize − 触发滚动所需的的文件大小。其默认值为 100。

  5. rollCount − 滚动前写入文件中的事件数量。其默认值为 10。

Example – Configuration File

以下是配置文件的一个示例。复制此内容,并将其保存为 Flume conf 文件夹中的 twitter.conf

# Naming the components on the current agent.
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

# Describing/Configuring the source
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.consumerKey = Your OAuth consumer key
TwitterAgent.sources.Twitter.consumerSecret = Your OAuth consumer secret
TwitterAgent.sources.Twitter.accessToken = Your OAuth consumer key access token
TwitterAgent.sources.Twitter.accessTokenSecret = Your OAuth consumer key access token secret
TwitterAgent.sources.Twitter.keywords = tutorials point,java, bigdata, mapreduce, mahout, hbase, nosql

# Describing/Configuring the sink

TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/user/Hadoop/twitter_data/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000

# Describing/Configuring the channel
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100

# Binding the source and sink to the channel
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channel = MemChannel

Execution

浏览 Flume 主目录,并如下所示执行应用程序。

$ cd $FLUME_HOME
$ bin/flume-ng agent --conf ./conf/ -f conf/twitter.conf
Dflume.root.logger=DEBUG,console -n TwitterAgent

如果一切顺利,开始向 HDFS 中流式传输推文。以下是获取推文时的命令提示符窗口的快照。

fetching tweets

Verifying HDFS

可以使用以下 URL 访问 Hadoop 管理 Web UI。

http://localhost:50070/

点击页面右侧名为 Utilities 的下拉菜单。你可以看到两个选项,如下面的快照所示。

verifying the hdfs

单击 Browse the file system ,然后输入你在其中存储推文的 HDFS 目录路径。在我们的示例中,路径为 /user/Hadoop/twitter_data/ 。然后,你可以看到如下所示,存储在 HDFS 中的 Twitter 日志文件列表。

browse file system