Apache Flume 简明教程

Apache Flume - Fetching Twitter Data

使用 Flume,我们可以从各种服务获取数据并将其传输到集中化存储(HDFS 和 HBase)。本章介绍了如何从 Twitter 服务获取数据并使用 Apache Flume 将其存储在 HDFS 中。

Using Flume, we can fetch data from various services and transport it to centralized stores (HDFS and HBase). This chapter explains how to fetch data from Twitter service and store it in HDFS using Apache Flume.

如 Flume 体系结构中所述,Web 服务器生成日志数据,而这些数据由 Flume 中的代理收集。通道将这些数据缓冲到一个接收器中,而该接收器最终将其推送到集中化存储。

As discussed in Flume Architecture, a webserver generates log data and this data is collected by an agent in Flume. The channel buffers this data to a sink, which finally pushes it to centralized stores.

在本教程中提供的示例中,我们将创建一个应用程序并使用 Apache Flume 提供的实验性 Twitter 源从中获取推文。我们将使用内存通道缓冲这些推文,并使用 HDFS 接收器将这些推文推入 HDFS。

In the example provided in this chapter, we will create an application and get the tweets from it using the experimental twitter source provided by Apache Flume. We will use the memory channel to buffer these tweets and HDFS sink to push these tweets into the HDFS.

fetch data

要获取 Twitter 数据,我们必须执行以下步骤:

To fetch Twitter data, we will have to follow the steps given below −

  1. Create a twitter Application

  2. Install / Start HDFS

  3. Configure Flume

Creating a Twitter Application

要从 Twitter 获取推文,则需要创建一个 Twitter 应用程序。请按照以下步骤创建 Twitter 应用程序。

In order to get the tweets from Twitter, it is needed to create a Twitter application. Follow the steps given below to create a Twitter application.

Step 1

要创建 Twitter 应用程序,请单击以下链接 https://apps.twitter.com/ 。登录到您的 Twitter 账户。您将拥有一个 Twitter 应用程序管理窗口,您可以在其中创建、删除和管理 Twitter 应用程序。

To create a Twitter application, click on the following link https://apps.twitter.com/. Sign in to your Twitter account. You will have a Twitter Application Management window where you can create, delete, and manage Twitter Apps.

image::https://www.iokays.com/tutorialspoint/apache_flume/_images/application_management_window.jpg [应用程序管理窗口]

image::https://www.iokays.com/tutorialspoint/apache_flume/_images/application_management_window.jpg [Application Management window]

Step 2

单击 Create New App 按钮。您将重定向到一个窗口,您将在其中获取一个应用程序表单,您必须在其中填写详细信息才能创建该应用程序。填写网站地址时,请提供完整的 URL 模式,例如 http://example.com.

Click on the Create New App button. You will be redirected to a window where you will get an application form in which you have to fill in your details in order to create the App. While filling the website address, give the complete URL pattern, for example, http://example.com.

create an application

Step 3

填入详细信息,完成后接受 Developer Agreement ,单击页面底部的 Create your Twitter application button 。如果一切顺利,将使用给定的详细信息创建一个应用程序,如下所示。

Fill in the details, accept the Developer Agreement when finished, click on the Create your Twitter application button which is at the bottom of the page. If everything goes fine, an App will be created with the given details as shown below.

application created

Step 4

您可以在页面底部的 keys and Access Tokens 选项卡下,看到一个名为 Create my access token 的按钮。单击此按钮以生成访问令牌。

Under keys and Access Tokens tab at the bottom of the page, you can observe a button named Create my access token. Click on it to generate the access token.

key access tokens

Step 5

最后,单击页面右上角的 Test OAuth 按钮。这将转到一个页面,其中显示您的 Consumer key, Consumer secret, Access token,Access token secret 。复制这些详细信息。它们可用于配置 Flume 中的代理。

Finally, click on the Test OAuth button which is on the right side top of the page. This will lead to a page which displays your Consumer key, Consumer secret, Access token, and Access token secret. Copy these details. These are useful to configure the agent in Flume.

oauth tool

Starting HDFS

由于我们会将数据存储在 HDFS 中,因此我们需要安装/验证 Hadoop。启动 Hadoop并在其中创建一个文件夹以存储 Flume 数据。在配置 Flume 之前,请执行以下步骤。

Since we are storing the data in HDFS, we need to install / verify Hadoop. Start Hadoop and create a folder in it to store Flume data. Follow the steps given below before configuring Flume.

Step 1: Install / Verify Hadoop

安装 Hadoop 。如果您的系统中已安装 Hadoop,请使用 Hadoop 版本命令验证安装,如下所示。

Install Hadoop. If Hadoop is already installed in your system, verify the installation using Hadoop version command, as shown below.

$ hadoop version

如果您的系统中包含 Hadoop,并且您已设置路径变量,那么您将获得以下输出:

If your system contains Hadoop, and if you have set the path variable, then you will get the following output −

Hadoop 2.6.0
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r
e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1
Compiled by jenkins on 2014-11-13T21:10Z
Compiled with protoc 2.5.0
From source with checksum 18e43357c8f927c0695f1e9522859d6a
This command was run using /home/Hadoop/hadoop/share/hadoop/common/hadoop-common-2.6.0.jar

Step 2: Starting Hadoop

浏览 Hadoop 的 sbin 目录,按如下所示启动 yarn 和 Hadoop dfs(分布式文件系统)。

Browse through the sbin directory of Hadoop and start yarn and Hadoop dfs (distributed file system) as shown below.

cd /$Hadoop_Home/sbin/
$ start-dfs.sh
localhost: starting namenode, logging to
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-namenode-localhost.localdomain.out
localhost: starting datanode, logging to
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-datanode-localhost.localdomain.out
Starting secondary namenodes [0.0.0.0]
starting secondarynamenode, logging to
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-secondarynamenode-localhost.localdomain.out

$ start-yarn.sh
starting yarn daemons
starting resourcemanager, logging to
   /home/Hadoop/hadoop/logs/yarn-Hadoop-resourcemanager-localhost.localdomain.out
localhost: starting nodemanager, logging to
   /home/Hadoop/hadoop/logs/yarn-Hadoop-nodemanager-localhost.localdomain.out

Step 3: Create a Directory in HDFS

在 Hadoop DFS 中,可以使用命令 mkdir 创建目录。浏览它,并在所需路径中创建名称为 twitter_data 的目录,如下所示。

In Hadoop DFS, you can create directories using the command mkdir. Browse through it and create a directory with the name twitter_data in the required path as shown below.

$cd /$Hadoop_Home/bin/
$ hdfs dfs -mkdir hdfs://localhost:9000/user/Hadoop/twitter_data

Configuring Flume

我们必须使用 conf 文件夹中的配置文件配置源、通道和接收器。本章中给出的示例使用 Apache Flume 提供的实验性源,称为 Twitter 1% Firehose 内存通道和 HDFS 接收器。

We have to configure the source, the channel, and the sink using the configuration file in the conf folder. The example given in this chapter uses an experimental source provided by Apache Flume named Twitter 1% Firehose Memory channel and HDFS sink.

Twitter 1% Firehose Source

该源具有高度实验性。它使用流式 API 连接到 1% 示例 Twitter Firehose,并连续下载推文,将它们转换为 Avro 格式,并将 Avro 事件发送到下游的 Flume 接收器。

This source is highly experimental. It connects to the 1% sample Twitter Firehose using streaming API and continuously downloads tweets, converts them to Avro format, and sends Avro events to a downstream Flume sink.

安装 Flume 后,我们默认会得到该源。对应于该源的 jar 文件可以位于 lib 文件夹中,如下所示。

We will get this source by default along with the installation of Flume. The jar files corresponding to this source can be located in the lib folder as shown below.

twitter jarfiles

Setting the classpath

classpath 变量设置为 Flume-env.sh 文件中 Flume 的 lib 文件夹,如下所示。

Set the classpath variable to the lib folder of Flume in Flume-env.sh file as shown below.

export CLASSPATH=$CLASSPATH:/FLUME_HOME/lib/*

该源需要详细信息,如 Twitter 应用程序的 Consumer key, Consumer secret, Access token,Access token secret 。配置该源时,您必须向以下属性提供值 −

This source needs the details such as Consumer key, Consumer secret, Access token, and Access token secret of a Twitter application. While configuring this source, you have to provide values to the following properties −

  1. Channels

  2. Source type : org.apache.flume.source.twitter.TwitterSource

  3. consumerKey − The OAuth consumer key

  4. consumerSecret − OAuth consumer secret

  5. accessToken − OAuth access token

  6. accessTokenSecret − OAuth token secret

  7. maxBatchSize − Maximum number of twitter messages that should be in a twitter batch. The default value is 1000 (optional).

  8. maxBatchDurationMillis − Maximum number of milliseconds to wait before closing a batch. The default value is 1000 (optional).

Channel

我们正在使用内存通道。要配置内存通道,您必须向通道类型提供值。

We are using the memory channel. To configure the memory channel, you must provide value to the type of the channel.

  1. type − It holds the type of the channel. In our example, the type is MemChannel.

  2. Capacity − It is the maximum number of events stored in the channel. Its default value is 100 (optional).

  3. TransactionCapacity − It is the maximum number of events the channel accepts or sends. Its default value is 100 (optional).

HDFS Sink

该接收器将数据写入 HDFS。要配置该接收器,您必须提供以下详细信息。

This sink writes data into the HDFS. To configure this sink, you must provide the following details.

  1. Channel

  2. type − hdfs

  3. hdfs.path − the path of the directory in HDFS where data is to be stored.

我们可以根据场景提供一些可选值。以下是我们在应用程序中配置的 HDFS 接收器的可选属性。

And we can provide some optional values based on the scenario. Given below are the optional properties of the HDFS sink that we are configuring in our application.

  1. fileType − This is the required file format of our HDFS file. SequenceFile, DataStream and CompressedStream are the three types available with this stream. In our example, we are using the DataStream.

  2. writeFormat − Could be either text or writable.

  3. batchSize − It is the number of events written to a file before it is flushed into the HDFS. Its default value is 100.

  4. rollsize − It is the file size to trigger a roll. It default value is 100.

  5. rollCount − It is the number of events written into the file before it is rolled. Its default value is 10.

Example – Configuration File

以下是配置文件的一个示例。复制此内容,并将其保存为 Flume conf 文件夹中的 twitter.conf

Given below is an example of the configuration file. Copy this content and save as twitter.conf in the conf folder of Flume.

# Naming the components on the current agent.
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

# Describing/Configuring the source
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.consumerKey = Your OAuth consumer key
TwitterAgent.sources.Twitter.consumerSecret = Your OAuth consumer secret
TwitterAgent.sources.Twitter.accessToken = Your OAuth consumer key access token
TwitterAgent.sources.Twitter.accessTokenSecret = Your OAuth consumer key access token secret
TwitterAgent.sources.Twitter.keywords = tutorials point,java, bigdata, mapreduce, mahout, hbase, nosql

# Describing/Configuring the sink

TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/user/Hadoop/twitter_data/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000

# Describing/Configuring the channel
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100

# Binding the source and sink to the channel
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channel = MemChannel

Execution

浏览 Flume 主目录,并如下所示执行应用程序。

Browse through the Flume home directory and execute the application as shown below.

$ cd $FLUME_HOME
$ bin/flume-ng agent --conf ./conf/ -f conf/twitter.conf
Dflume.root.logger=DEBUG,console -n TwitterAgent

如果一切顺利,开始向 HDFS 中流式传输推文。以下是获取推文时的命令提示符窗口的快照。

If everything goes fine, the streaming of tweets into HDFS will start. Given below is the snapshot of the command prompt window while fetching tweets.

fetching tweets

Verifying HDFS

可以使用以下 URL 访问 Hadoop 管理 Web UI。

You can access the Hadoop Administration Web UI using the URL given below.

http://localhost:50070/

点击页面右侧名为 Utilities 的下拉菜单。你可以看到两个选项,如下面的快照所示。

Click on the dropdown named Utilities on the right-hand side of the page. You can see two options as shown in the snapshot given below.

verifying the hdfs

单击 Browse the file system ,然后输入你在其中存储推文的 HDFS 目录路径。在我们的示例中,路径为 /user/Hadoop/twitter_data/ 。然后,你可以看到如下所示,存储在 HDFS 中的 Twitter 日志文件列表。

Click on Browse the file system and enter the path of the HDFS directory where you have stored the tweets. In our example, the path will be /user/Hadoop/twitter_data/. Then, you can see the list of twitter log files stored in HDFS as given below.

browse file system