Apache Spark 简明教程
Apache Spark - Introduction
各行业都在广泛使用 Hadoop 来分析其数据集。原因在于 Hadoop 框架基于一个简单的编程模型(MapReduce),并且它提供了一个可扩展、灵活、容错且具有成本效益的计算解决方案。这里,主要顾虑是在处理大型数据集的速度,即查询之间的等待时间和运行程序的等待时间。
Industries are using Hadoop extensively to analyze their data sets. The reason is that Hadoop framework is based on a simple programming model (MapReduce) and it enables a computing solution that is scalable, flexible, fault-tolerant and cost effective. Here, the main concern is to maintain speed in processing large datasets in terms of waiting time between queries and waiting time to run the program.
Apache 软件基金会推出了 Spark 来加速 Hadoop 计算软件流程。
Spark was introduced by Apache Software Foundation for speeding up the Hadoop computational computing software process.
与普遍的看法相反, Spark is not a modified version of Hadoop 并不真正依赖于 Hadoop,因为它有自己的集群管理。Hadoop 只是实现 Spark 的方式之一。
As against a common belief, Spark is not a modified version of Hadoop and is not, really, dependent on Hadoop because it has its own cluster management. Hadoop is just one of the ways to implement Spark.
Spark 以两种方式使用 Hadoop - 一种是 storage ,第二种是 processing 。由于 Spark 有自己的集群管理计算,所以它只将 Hadoop 用于存储目的。
Spark uses Hadoop in two ways – one is storage and second is processing. Since Spark has its own cluster management computation, it uses Hadoop for storage purpose only.
Apache Spark
Apache Spark 是一种闪电般快速的集群计算技术,专为快速计算而设计。它基于 Hadoop MapReduce,并扩展了 MapReduce 模型以有效地将其用于更多类型的计算,包括交互式查询和流处理。Spark 的主要特征是其 in-memory cluster computing ,它提高了应用程序的处理速度。
Apache Spark is a lightning-fast cluster computing technology, designed for fast computation. It is based on Hadoop MapReduce and it extends the MapReduce model to efficiently use it for more types of computations, which includes interactive queries and stream processing. The main feature of Spark is its in-memory cluster computing that increases the processing speed of an application.
Spark 旨在涵盖广泛的工作负载,例如批处理应用程序、迭代算法、交互式查询和流处理。除了在各自系统中支持所有这些工作负载外,它还减轻了维护单独工具的管理负担。
Spark is designed to cover a wide range of workloads such as batch applications, iterative algorithms, interactive queries and streaming. Apart from supporting all these workload in a respective system, it reduces the management burden of maintaining separate tools.
Evolution of Apache Spark
Spark 是 Hadoop 的一个子项目,于 2009 年在加州大学伯克利分校 AMPLab 由 Matei Zaharia 开发。它于 2010 年根据 BSD 许可证开源。它于 2013 年捐赠给 Apache 软件基金会,现在 Apache Spark 已成为自 2014 年 2 月起的一个顶级 Apache 项目。
Spark is one of Hadoop’s sub project developed in 2009 in UC Berkeley’s AMPLab by Matei Zaharia. It was Open Sourced in 2010 under a BSD license. It was donated to Apache software foundation in 2013, and now Apache Spark has become a top level Apache project from Feb-2014.
Features of Apache Spark
Apache Spark 具有以下功能。
Apache Spark has following features.
-
Speed − Spark helps to run an application in Hadoop cluster, up to 100 times faster in memory, and 10 times faster when running on disk. This is possible by reducing number of read/write operations to disk. It stores the intermediate processing data in memory.
-
Supports multiple languages − Spark provides built-in APIs in Java, Scala, or Python. Therefore, you can write applications in different languages. Spark comes up with 80 high-level operators for interactive querying.
-
Advanced Analytics − Spark not only supports ‘Map’ and ‘reduce’. It also supports SQL queries, Streaming data, Machine learning (ML), and Graph algorithms.
Spark Built on Hadoop
下图展示了使用 Hadoop 组件构建 Spark 的三种方法。
The following diagram shows three ways of how Spark can be built with Hadoop components.
如以下说明所示,共有三种 Spark 部署方法。
There are three ways of Spark deployment as explained below.
-
Standalone − Spark Standalone deployment means Spark occupies the place on top of HDFS(Hadoop Distributed File System) and space is allocated for HDFS, explicitly. Here, Spark and MapReduce will run side by side to cover all spark jobs on cluster.
-
Hadoop Yarn − Hadoop Yarn deployment means, simply, spark runs on Yarn without any pre-installation or root access required. It helps to integrate Spark into Hadoop ecosystem or Hadoop stack. It allows other components to run on top of stack.
-
Spark in MapReduce (SIMR) − Spark in MapReduce is used to launch spark job in addition to standalone deployment. With SIMR, user can start Spark and uses its shell without any administrative access.
Components of Spark
下图描绘了 Spark 的不同组件。
The following illustration depicts the different components of Spark.
Apache Spark Core
Spark 核心是 Spark 平台的基础通用执行引擎,所有其他功能都建立在其之上。它提供内存计算和外部存储系统中的引用数据集。
Spark Core is the underlying general execution engine for spark platform that all other functionality is built upon. It provides In-Memory computing and referencing datasets in external storage systems.
Spark SQL
Spark SQL 是 Spark 核心之上的一个组件,它引入了名为 SchemaRDD 的新数据抽象,该抽象为结构化和半结构化数据提供了支持。
Spark SQL is a component on top of Spark Core that introduces a new data abstraction called SchemaRDD, which provides support for structured and semi-structured data.
Spark Streaming
Spark Streaming 利用 Spark 核心 的快速调度功能执行流分析。它以小批量的方式获取数据,并在这些数据小批量上执行 RDD(弹性分布式数据集)转换。
Spark Streaming leverages Spark Core’s fast scheduling capability to perform streaming analytics. It ingests data in mini-batches and performs RDD (Resilient Distributed Datasets) transformations on those mini-batches of data.
MLlib (Machine Learning Library)
MLlib 是一个分布式机器学习框架,位于 Spark 之上,因为其分布式基于内存的 Spark 架构。根据 MLlib 开发人员对交替最小二乘法 (ALS) 实现进行的基准测试,它比 Hadoop 基于磁盘的版本快九倍。 Apache Mahout (在 Mahout 获得 Spark 接口之前)。
MLlib is a distributed machine learning framework above Spark because of the distributed memory-based Spark architecture. It is, according to benchmarks, done by the MLlib developers against the Alternating Least Squares (ALS) implementations. Spark MLlib is nine times as fast as the Hadoop disk-based version of Apache Mahout (before Mahout gained a Spark interface).
GraphX
GraphX 是一个分布式图处理框架,位于 Spark 之上。它提供了一个用于表示图形计算的 API,该 API 可以使用 Pregel 抽象 API 对用户定义的图形进行建模。它还为这种抽象提供了优化的运行时。
GraphX is a distributed graph-processing framework on top of Spark. It provides an API for expressing graph computation that can model the user-defined graphs by using Pregel abstraction API. It also provides an optimized runtime for this abstraction.
Apache Spark - RDD
Resilient Distributed Datasets
弹性分布式数据集 (RDD) 是 Spark 的一个基本数据结构。它是对象的不可变分布式集合。RDD 中的每个数据集都被划分为逻辑分区,这些分区可以在群集的不同节点上计算。RDD 可以包含任何类型的 Python、Java 或 Scala 对象,包括用户定义的类。
Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster. RDDs can contain any type of Python, Java, or Scala objects, including user-defined classes.
形式上,RDD 是只读的分区记录集合。RDD 可以通过对稳定存储器中的数据或其他 RDD 上的确定性操作来创建。RDD 是一个容错元素集合,可以并行对其进行操作。
Formally, an RDD is a read-only, partitioned collection of records. RDDs can be created through deterministic operations on either data on stable storage or other RDDs. RDD is a fault-tolerant collection of elements that can be operated on in parallel.
有两种方法可以创建 RDD − parallelizing 驱动程序程序中的现有集合或 referencing a dataset 位于外部存储系统中,例如共享文件系统、HDFS、HBase 或提供 Hadoop 输入格式的任何数据源。
There are two ways to create RDDs − parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared file system, HDFS, HBase, or any data source offering a Hadoop Input Format.
Spark 利用 RDD 的概念实现了更快速、高效的 MapReduce 操作。让我们首先讨论 MapReduce 操作如何进行以及为什么它们效率不高。
Spark makes use of the concept of RDD to achieve faster and efficient MapReduce operations. Let us first discuss how MapReduce operations take place and why they are not so efficient.
Data Sharing is Slow in MapReduce
MapReduce 被广泛采用,用于在群集上通过并行分布式算法处理和生成大型数据集。它允许用户编写并行计算,使用一组高级运算符,而无需担心工作分布和容错能力。
MapReduce is widely adopted for processing and generating large datasets with a parallel, distributed algorithm on a cluster. It allows users to write parallel computations, using a set of high-level operators, without having to worry about work distribution and fault tolerance.
不幸的是,在大多数当前框架中,在计算之间(例如两个 MapReduce 作业之间)重用数据的唯一方法是将其写入外部稳定存储系统(例如 HDFS)。尽管此框架提供了许多用于访问群集计算资源的抽象,但用户仍然希望获得更多。
Unfortunately, in most current frameworks, the only way to reuse data between computations (Ex − between two MapReduce jobs) is to write it to an external stable storage system (Ex − HDFS). Although this framework provides numerous abstractions for accessing a cluster’s computational resources, users still want more.
Iterative 和 Interactive 应用程序都需要并行作业之间更快的 data 共享。由于 replication, serialization 和 disk IO ,MapReduce 中 data 共享速度很慢。关于存储系统,大多数 Hadoop 应用程序花费超过 90% 的时间执行 HDFS 读写操作。
Both Iterative and Interactive applications require faster data sharing across parallel jobs. Data sharing is slow in MapReduce due to replication, serialization, and disk IO. Regarding storage system, most of the Hadoop applications, they spend more than 90% of the time doing HDFS read-write operations.
Iterative Operations on MapReduce
在多阶段应用程序中跨多个计算重用中间结果。下图解释了在 MapReduce 上执行迭代操作时当前框架如何工作。由于复制数据、磁盘 I/O 和序列化,这会产生大量的开销,从而导致系统变慢。
Reuse intermediate results across multiple computations in multi-stage applications. The following illustration explains how the current framework works, while doing the iterative operations on MapReduce. This incurs substantial overheads due to data replication, disk I/O, and serialization, which makes the system slow.
Interactive Operations on MapReduce
用户针对相同数据子集运行即席查询。每个查询将对持久性存储执行磁盘 I/O,这会占据应用执行时间的大部分。
User runs ad-hoc queries on the same subset of data. Each query will do the disk I/O on the stable storage, which can dominate application execution time.
下图说明了当前框架在 MapReduce 上执行交互式查询时如何工作。
The following illustration explains how the current framework works while doing the interactive queries on MapReduce.
Data Sharing using Spark RDD
由于 replication, serialization 和 disk IO ,MapReduce 中的数据共享较慢。大多数 Hadoop 应用在执行 HDFS 读写操作时会花费 90% 以上的时间。
Data sharing is slow in MapReduce due to replication, serialization, and disk IO. Most of the Hadoop applications, they spend more than 90% of the time doing HDFS read-write operations.
认识到这个问题后,研究人员开发了一个名为 Apache Spark 的专门框架。Spark 的关键思想是 *R*esilient *D*istributed *D*atasets (RDD);它支持内存内处理计算。这意味着它将内存的状态存储为作业间的对象,且此对象可在这些作业中共享。内存内的数据共享比网络和磁盘快 10 到 100 倍。
Recognizing this problem, researchers developed a specialized framework called Apache Spark. The key idea of spark is *R*esilient *D*istributed *D*atasets (RDD); it supports in-memory processing computation. This means, it stores the state of memory as an object across the jobs and the object is sharable between those jobs. Data sharing in memory is 10 to 100 times faster than network and Disk.
现在,我们尝试找出 Spark RDD 中迭代和交互式操作如何执行。
Let us now try to find out how iterative and interactive operations take place in Spark RDD.
Iterative Operations on Spark RDD
下图显示了 Spark RDD 上的迭代操作。它会将中间结果存储在分布式内存中,而不是持久性存储(磁盘)中,并使系统更快。
The illustration given below shows the iterative operations on Spark RDD. It will store intermediate results in a distributed memory instead of Stable storage (Disk) and make the system faster.
Note - 如果分布式内存(RAM)足以存储中间结果(作业状态),则它会将这些结果存储在磁盘上。
Note − If the Distributed memory (RAM) is sufficient to store intermediate results (State of the JOB), then it will store those results on the disk.
Interactive Operations on Spark RDD
此图显示了 Spark RDD 上的交互式操作。如果针对相同数据集反复运行不同的查询,则可以将这个特定数据保留在内存中,以获得更好的执行时间。
This illustration shows interactive operations on Spark RDD. If different queries are run on the same set of data repeatedly, this particular data can be kept in memory for better execution times.
默认情况下,每次针对转换后的 RDD 执行操作时,都会重新计算该 RDD。但是,您也可以选择将 RDD persist 在内存中,在这种情况下,Spark 将把元素保留在集群中以便下次查询时更快地访问。此外,还支持将 RDD 持久保留在磁盘上或跨多个节点复制。
By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory, in which case Spark will keep the elements around on the cluster for much faster access, the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.
Apache Spark - Installation
Spark 是 Hadoop 的子项目。因此,最好将 Spark 安装到基于 Linux 的系统中。以下步骤显示如何安装 Apache Spark。
Spark is Hadoop’s sub-project. Therefore, it is better to install Spark into a Linux based system. The following steps show how to install Apache Spark.
Step 1: Verifying Java Installation
Java 安装是安装 Spark 的必备事项之一。尝试以下命令以验证 JAVA 版本。
Java installation is one of the mandatory things in installing Spark. Try the following command to verify the JAVA version.
$java -version
如果系统中已安装 Java,您将看到以下响应:
If Java is already, installed on your system, you get to see the following response −
java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)
如果您系统中尚未安装 Java,请在继续下一步操作之前安装 Java。
In case you do not have Java installed on your system, then Install Java before proceeding to next step.
Step 2: Verifying Scala installation
您应使用 Scala 语言来实现 Spark。因此,让我们使用以下命令验证 Scala 安装。
You should Scala language to implement Spark. So let us verify Scala installation using following command.
$scala -version
如果系统中已安装 Scala,您将看到以下响应:
If Scala is already installed on your system, you get to see the following response −
Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL
如果您系统中尚未安装 Scala,则继续执行下一步操作以安装 Scala。
In case you don’t have Scala installed on your system, then proceed to next step for Scala installation.
Step 3: Downloading Scala
下载 Scala 的最新版本,请访问以下链接 Download Scala 。对于本教程,我们使用 scala-2.11.6 版本。下载后,您将在下载文件夹中找到 Scala tar 文件。
Download the latest version of Scala by visit the following link Download Scala. For this tutorial, we are using scala-2.11.6 version. After downloading, you will find the Scala tar file in the download folder.
Step 4: Installing Scala
按照以下给定的步骤安装 Scala。
Follow the below given steps for installing Scala.
Extract the Scala tar file
键入以下命令以解压 Scala tar 文件。
Type the following command for extracting the Scala tar file.
$ tar xvf scala-2.11.6.tgz
Move Scala software files
使用以下命令将 Scala 软件文件移动到相应的目录 (/usr/local/scala) 。
Use the following commands for moving the Scala software files, to respective directory (/usr/local/scala).
$ su –
Password:
# cd /home/Hadoop/Downloads/
# mv scala-2.11.6 /usr/local/scala
# exit
Set PATH for Scala
使用以下命令为 Scala 设置 PATH。
Use the following command for setting PATH for Scala.
$ export PATH = $PATH:/usr/local/scala/bin
Verifying Scala Installation
安装后,最好对其进行验证。使用以下命令验证 Scala 安装。
After installation, it is better to verify it. Use the following command for verifying Scala installation.
$scala -version
如果系统中已安装 Scala,您将看到以下响应:
If Scala is already installed on your system, you get to see the following response −
Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL
Step 5: Downloading Apache Spark
访问以下链接 Download Spark 下载 Spark 最新版本。本教程中,我们使用 spark-1.3.1-bin-hadoop2.6 版本。下载后,你将在下载文件夹中找到 Spark tar 文件。
Download the latest version of Spark by visiting the following link Download Spark. For this tutorial, we are using spark-1.3.1-bin-hadoop2.6 version. After downloading it, you will find the Spark tar file in the download folder.
Step 6: Installing Spark
按照以下步骤执行 Spark 安装。
Follow the steps given below for installing Spark.
Extracting Spark tar
以下命令用于解压 spark tar 文件。
The following command for extracting the spark tar file.
$ tar xvf spark-1.3.1-bin-hadoop2.6.tgz
Moving Spark software files
以下命令用于将 Spark 软件文件移动到相应的目录 (/usr/local/spark) 。
The following commands for moving the Spark software files to respective directory (/usr/local/spark).
$ su –
Password:
# cd /home/Hadoop/Downloads/
# mv spark-1.3.1-bin-hadoop2.6 /usr/local/spark
# exit
Setting up the environment for Spark
将以下行添加到 ~ /.bashrc 文件中。这意味着将 Spark 软件文件所在的路径添加到 PATH 变量。
Add the following line to ~/.bashrc file. It means adding the location, where the spark software file are located to the PATH variable.
export PATH=$PATH:/usr/local/spark/bin
使用以下命令加载 ~/.bashrc 文件。
Use the following command for sourcing the ~/.bashrc file.
$ source ~/.bashrc
Step 7: Verifying the Spark Installation
输入以下命令打开 Spark shell。
Write the following command for opening Spark shell.
$spark-shell
如果 Spark 安装成功,你将看到以下输出。
If spark is installed successfully then you will find the following output.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>
Apache Spark - Core Programming
Spark Core 是整个项目的底层。它提供了分布式任务分发、调度和基本 I/O 功能。Spark 使用一种称为 RDD(弹性分布式数据集)的特殊基本数据结构,它是分布在多台机器上的数据的逻辑集合。RDD 可以通过两种方式创建:一是引用外部存储系统中的数据集,二是针对现有的 RDD 应用转换(例如,map、filter、reducer、join)。
Spark Core is the base of the whole project. It provides distributed task dispatching, scheduling, and basic I/O functionalities. Spark uses a specialized fundamental data structure known as RDD (Resilient Distributed Datasets) that is a logical collection of data partitioned across machines. RDDs can be created in two ways; one is by referencing datasets in external storage systems and second is by applying transformations (e.g. map, filter, reducer, join) on existing RDDs.
通过语言集成的 API 公开了 RDD 抽象。这简化了编程复杂度,这是因为应用程序处理 RDD 的方式类似于处理本地数据集合。
The RDD abstraction is exposed through a language-integrated API. This simplifies programming complexity because the way applications manipulate RDDs is similar to manipulating local collections of data.
Spark Shell
Spark 提供了一个交互式 shell——一种交互式分析数据的强大工具。此工具既可以在 Scala 中使用,也可以在 Python 语言中使用。Spark 的主要抽象是称为弹性分布式数据集 (RDD) 的分布式项集合。RDD 可以从 Hadoop 输入格式(例如,HDFS 文件)或通过转换其他 RDD 创建。
Spark provides an interactive shell − a powerful tool to analyze data interactively. It is available in either Scala or Python language. Spark’s primary abstraction is a distributed collection of items called a Resilient Distributed Dataset (RDD). RDDs can be created from Hadoop Input Formats (such as HDFS files) or by transforming other RDDs.
Open Spark Shell
以下命令用于打开 Spark shell。
The following command is used to open Spark shell.
$ spark-shell
Create simple RDD
让我们从文本文件中创建一个简单的 RDD。使用以下命令创建简单的 RDD。
Let us create a simple RDD from the text file. Use the following command to create a simple RDD.
scala> val inputfile = sc.textFile(“input.txt”)
上述命令的输出为
The output for the above command is
inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12
Spark RDD API 引入了少数 Transformations 和少数 Actions 来操作 RDD。
The Spark RDD API introduces few Transformations and few Actions to manipulate RDD.
RDD Transformations
RDD 转换返回对新 RDD 的指针,并允许你创建 RDD 之间的依赖关系。依赖关系链(依赖关系串)中的每个 RDD 都具有一个用于计算其数据的函数,并具有对其父 RDD 的指针(依赖关系)。
RDD transformations returns pointer to new RDD and allows you to create dependencies between RDDs. Each RDD in dependency chain (String of Dependencies) has a function for calculating its data and has a pointer (dependency) to its parent RDD.
Spark 比较懒,所以在调用一些将触发作业创建和执行的转换或动作之前,不会执行任何操作。查看单词计数示例的以下代码段。
Spark is lazy, so nothing will be executed unless you call some transformation or action that will trigger job creation and execution. Look at the following snippet of the word-count example.
因此,RDD 转换不是一组数据,而是程序中的一个步骤(可能是唯一的步骤),告诉 Spark 如何获取数据并对其进行处理。
Therefore, RDD transformation is not a set of data but is a step in a program (might be the only step) telling Spark how to get data and what to do with it.
Programming with RDD
我们借助示例,来看看 RDD 编程中一些 RDD 转换和动作的实现。
Let us see the implementations of few RDD transformations and actions in RDD programming with the help of an example.
Example
考虑一个单词计数的示例——它计算一个文档中出现的每个单词。将以下文本视为一个输入,在主目录中将其保存为 input.txt 文件。
Consider a word count example − It counts each word appearing in a document. Consider the following text as an input and is saved as an input.txt file in a home directory.
input.txt ——输入文件。
input.txt − input file.
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
按照以下给出的过程执行给定的示例。
Follow the procedure given below to execute the given example.
Open Spark-Shell
使用以下命令打开 Spark Shell。通常,Spark 是使用 Scala 构建的。因此,Spark 程序在 Scala 环境中运行。
The following command is used to open spark shell. Generally, spark is built using Scala. Therefore, a Spark program runs on Scala environment.
$ spark-shell
如果 Spark Shell 顺利打开,那么将会看到以下输出。查看输出的最后一行“Spark context available as sc”,这意味着 Spark 容器已自动创建名为 sc 的 Spark context 对象。在开始程序的第一步之前,应该创建 SparkContext 对象。
If Spark shell opens successfully then you will find the following output. Look at the last line of the output “Spark context available as sc” means the Spark container is automatically created spark context object with the name sc. Before starting the first step of a program, the SparkContext object should be created.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>
Create an RDD
首先,我们必须使用 Spark-Scala API 读取输入文件并创建一个 RDD。
First, we have to read the input file using Spark-Scala API and create an RDD.
以下命令用于从给定位置读取文件。此处,使用 inputfile 的名称创建了新的 RDD。在 textFile(“”) 方法中作为参数提供的 String 是输入文件名所在绝对路径。但是,如果只给出了文件名,则表示输入文件位于当前位置。
The following command is used for reading a file from given location. Here, new RDD is created with the name of inputfile. The String which is given as an argument in the textFile(“”) method is absolute path for the input file name. However, if only the file name is given, then it means that the input file is in the current location.
scala> val inputfile = sc.textFile("input.txt")
Execute Word count Transformation
我们的目标是统计文件中出现的单词。创建一个平面映射来将每行拆分到词组中( flatMap(line ⇒ line.split(“ ”) )。
Our aim is to count the words in a file. Create a flat map for splitting each line into words (flatMap(line ⇒ line.split(“ ”)).
接下来,使用映射功能( map(word ⇒ (word, 1) )将每个单词作为具有值 ‘1’ 的键读取(<key, value> = <word,1>)。
Next, read each word as a key with a value ‘1’ (<key, value> = <word,1>)using map function (map(word ⇒ (word, 1)).
最后,通过添加类似键的键值来缩减这些键( reduceByKey(+) )。
Finally, reduce those keys by adding values of similar keys (reduceByKey(+)).
以下命令用于执行单词计数逻辑。执行此操作之后,不会看到任何输出,因为这不是一个动作,而是一个转换;指向新的 RDD 或告诉 Spark 如何处理给定数据。
The following command is used for executing word count logic. After executing this, you will not find any output because this is not an action, this is a transformation; pointing a new RDD or tell spark to what to do with the given data)
scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
Current RDD
在使用 RDD 时,如果想要了解当前 RDD,那么使用以下命令。它将显示有关当前 RDD 的描述及用于调试的依赖项。
While working with the RDD, if you want to know about current RDD, then use the following command. It will show you the description about current RDD and its dependencies for debugging.
scala> counts.toDebugString
Caching the Transformations
可以使用 persist() 或 cache() 方法将 RDD 标记为持久化。在动作中首次对其进行计算时,它将保存在节点的内存中。使用以下命令将中间转换存储在内存中。
You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Use the following command to store the intermediate transformations in memory.
scala> counts.cache()
Applying the Action
应用一个动作,例如存储所有转换,会生成一个文本文件。saveAsTextFile(“ ”) 方法的 String 参数是输出文件夹的绝对路径。尝试以下命令,将输出存储到文本文件中。在以下示例中,“output”文件夹位于当前位置。
Applying an action, like store all the transformations, results into a text file. The String argument for saveAsTextFile(“ ”) method is the absolute path of output folder. Try the following command to save the output in a text file. In the following example, ‘output’ folder is in current location.
scala> counts.saveAsTextFile("output")
Checking the Output
打开另一个终端进入主目录(在另一个终端中执行 Spark 的地方)。使用以下命令检查输出目录。
Open another terminal to go to home directory (where spark is executed in the other terminal). Use the following commands for checking output directory.
[hadoop@localhost ~]$ cd output/
[hadoop@localhost output]$ ls -1
part-00000
part-00001
_SUCCESS
以下命令用于查看 Part-00000 文件的输出。
The following command is used to see output from Part-00000 files.
[hadoop@localhost output]$ cat part-00000
UN Persist the Storage
在取消持久化之前,如果你想查看该应用程序使用的存储空间,请在浏览器中使用以下 URL。
Before UN-persisting, if you want to see the storage space that is used for this application, then use the following URL in your browser.
http://localhost:4040
以下屏幕将显示该应用程序使用的存储空间,这些应用程序正在 Spark shell 上运行。
You will see the following screen, which shows the storage space used for the application, which are running on the Spark shell.
如果你要取消持久化特定 RDD 的存储空间,请使用以下命令。
If you want to UN-persist the storage space of particular RDD, then use the following command.
Scala> counts.unpersist()
你将看到如下输出:
You will see the output as follows −
15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810)
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106)
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14
要验证浏览器中的存储空间,请使用以下 URL。
For verifying the storage space in the browser, use the following URL.
http://localhost:4040/
你将看到以下屏幕。它显示了该应用程序使用的存储空间,这些应用程序正在 Spark shell 上运行。
You will see the following screen. It shows the storage space used for the application, which are running on the Spark shell.
Apache Spark - Deployment
Spark 应用程序使用 spark-submit,这是一个 shell 命令,用于在群集上部署 Spark 应用程序。它通过统一的界面,使用所有各自的群集管理器。因此,你无需为每个管理器配置应用程序。
Spark application, using spark-submit, is a shell command used to deploy the Spark application on a cluster. It uses all respective cluster managers through a uniform interface. Therefore, you do not have to configure your application for each one.
Example
我们使用 shell 命令,来看看之前使用的单词计数的相同示例。在这里,我们考虑相同的示例作为 Spark 应用程序。
Let us take the same example of word count, we used before, using shell commands. Here, we consider the same example as a spark application.
Sample Input
以下文本是输入数据,该文件名为 in.txt 。
The following text is the input data and the file named is in.txt.
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
查看以下程序:
Look at the following program −
SparkWordCount.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map())
/* local = master URL; Word Count = application name; */
/* /usr/local/spark = Spark Home; Nil = jars; Map = environment */
/* Map = variables to work nodes */
/*creating an inputRDD to read text file (in.txt) through Spark context*/
val input = sc.textFile("in.txt")
/* Transform the inputRDD into countRDD */
val count = input.flatMap(line ⇒ line.split(" "))
.map(word ⇒ (word, 1))
.reduceByKey(_ + _)
/* saveAsTextFile method is an action that effects on the RDD */
count.saveAsTextFile("outfile")
System.out.println("OK");
}
}
将上述程序保存在名为 SparkWordCount.scala 的文件中,并将其放置在名为 spark-application 的用户定义目录中。
Save the above program into a file named SparkWordCount.scala and place it in a user-defined directory named spark-application.
Note - 将 inputRDD 转换为 countRDD 时,我们使用 flatMap() 对文本文件中的行进行标记化,使用 map() 方法计算单词频率,并使用 reduceByKey() 方法计算每个单词的重复次数。
Note − While transforming the inputRDD into countRDD, we are using flatMap() for tokenizing the lines (from text file) into words, map() method for counting the word frequency and reduceByKey() method for counting each word repetition.
使用以下步骤提交该应用程序。通过终端执行 spark-application 目录中的所有步骤。
Use the following steps to submit this application. Execute all steps in the spark-application directory through the terminal.
Step 1: Download Spark Ja
编译需要 Spark 核心 jar,因此,请从以下链接下载 spark-core_2.10-1.3.0.jar Spark core jar ,并将 jar 文件从下载目录移动到 spark-application 目录。
Spark core jar is required for compilation, therefore, download spark-core_2.10-1.3.0.jar from the following link Spark core jar and move the jar file from download directory to spark-application directory.
Step 2: Compile program
使用以下命令编译上面的程序。该命令应从 spark-application 目录执行。在这里, /usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar 是从 Spark 库获取的 Hadoop 支持 jar。
Compile the above program using the command given below. This command should be executed from the spark-application directory. Here, /usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar is a Hadoop support jar taken from Spark library.
$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala
Step 3: Create a JAR
使用以下命令创建 spark 应用程序的 jar 文件。在这里, wordcount 是 jar 文件的文件名。
Create a jar file of the spark application using the following command. Here, wordcount is the file name for jar file.
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar
Step 4: Submit spark application
使用以下命令提交 spark 应用程序:
Submit the spark application using the following command −
spark-submit --class SparkWordCount --master local wordcount.jar
如果执行成功,你将找到给定的以下输出。以下输出中出现的 OK 是为了识别用户,这是程序的最后一行。如果你仔细阅读以下输出,你将发现不同信息,例如:
If it is executed successfully, then you will find the output given below. The OK letting in the following output is for user identification and that is the last line of the program. If you carefully read the following output, you will find different things, such as −
-
successfully started service 'sparkDriver' on port 42954
-
MemoryStore started with capacity 267.3 MB
-
Started SparkUI at [role="bare"]http://192.168.1.217:4040
-
Added JAR file:/home/hadoop/piapplication/count.jar
-
ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
-
Stopped Spark web UI at [role="bare"]http://192.168.1.217:4040
-
MemoryStore cleared
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954.
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.217:42954]
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707.
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver
(MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11)
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion.
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext
15/07/08 13:56:14 INFO Utils: Shutdown hook called
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
Step 5: Checking output
程序成功执行后,你将在 spark-application 目录中找到名为 outfile 的目录。
After successful execution of the program, you will find the directory named outfile in the spark-application directory.
以下命令用于打开和检查 outfile 目录中的文件列表。
The following commands are used for opening and checking the list of files in the outfile directory.
$ cd outfile
$ ls
Part-00000 part-00001 _SUCCESS
检查 part-00000 文件中输出的命令为 −
The commands for checking output in part-00000 file are −
$ cat part-00000
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
检查 part-00001 文件中输出的命令为 −
The commands for checking output in part-00001 file are −
$ cat part-00001
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
转到以下部分详细了解 'spark-submit' 命令。
Go through the following section to know more about the ‘spark-submit’ command.