Apache Spark 简明教程

Apache Spark - Quick Guide

Apache Spark - Introduction

各行业都在广泛使用 Hadoop 来分析其数据集。原因在于 Hadoop 框架基于一个简单的编程模型(MapReduce),并且它提供了一个可扩展、灵活、容错且具有成本效益的计算解决方案。这里,主要顾虑是在处理大型数据集的速度,即查询之间的等待时间和运行程序的等待时间。

Apache 软件基金会推出了 Spark 来加速 Hadoop 计算软件流程。

与普遍的看法相反, Spark is not a modified version of Hadoop 并不真正依赖于 Hadoop,因为它有自己的集群管理。Hadoop 只是实现 Spark 的方式之一。

Spark 以两种方式使用 Hadoop - 一种是 storage ,第二种是 processing 。由于 Spark 有自己的集群管理计算,所以它只将 Hadoop 用于存储目的。

Apache Spark

Apache Spark 是一种闪电般快速的集群计算技术,专为快速计算而设计。它基于 Hadoop MapReduce,并扩展了 MapReduce 模型以有效地将其用于更多类型的计算,包括交互式查询和流处理。Spark 的主要特征是其 in-memory cluster computing ,它提高了应用程序的处理速度。

Spark 旨在涵盖广泛的工作负载,例如批处理应用程序、迭代算法、交互式查询和流处理。除了在各自系统中支持所有这些工作负载外,它还减轻了维护单独工具的管理负担。

Evolution of Apache Spark

Spark 是 Hadoop 的一个子项目,于 2009 年在加州大学伯克利分校 AMPLab 由 Matei Zaharia 开发。它于 2010 年根据 BSD 许可证开源。它于 2013 年捐赠给 Apache 软件基金会,现在 Apache Spark 已成为自 2014 年 2 月起的一个顶级 Apache 项目。

Features of Apache Spark

Apache Spark 具有以下功能。

  1. Speed - Spark 帮助在 Hadoop 集群中运行应用程序,速度在内存中提高多达 100 倍,在磁盘上运行时速度提高 10 倍。通过减少对磁盘的读/写操作次数,可以做到这一点。它将中间处理数据存储在内存中。

  2. Supports multiple languages - Spark 在 Java、Scala 或 Python 中提供了内置的 API。因此,你可以使用不同的语言编写应用程序。Spark 提供了 80 个高级运算符用于交互式查询。

  3. Advanced Analytics - Spark 不仅支持“映射”和“还原”。它还支持 SQL 查询、流数据、机器学习 (ML) 和图形算法。

Spark Built on Hadoop

下图展示了使用 Hadoop 组件构建 Spark 的三种方法。

spark built on hadoop

如以下说明所示,共有三种 Spark 部署方法。

  1. Standalone − Spark 独立部署表示 Spark 占据 HDFS(Hadoop 分布式文件系统)之上的位置,并明确分配空间给 HDFS。在此,Spark 和 MapReduce 将并排运行以覆盖群集上的所有 Spark 作业。

  2. Hadoop Yarn − Hadoop Yarn 部署仅仅意味着 Spark 在 Yarn 上运行,无需预安装或 root 访问权限。它有助于将 Spark 集成到 Hadoop 生态系统或 Hadoop 堆栈中。它允许其他组件在堆栈之上运行。

  3. Spark in MapReduce (SIMR) − Spark in MapReduce 用于在独立部署之外启动 spark 作业。通过 SIMR,用户可以启动 Spark 并使用其 shell,而无需任何管理访问权限。

Components of Spark

下图描绘了 Spark 的不同组件。

components of spark

Apache Spark Core

Spark 核心是 Spark 平台的基础通用执行引擎,所有其他功能都建立在其之上。它提供内存计算和外部存储系统中的引用数据集。

Spark SQL

Spark SQL 是 Spark 核心之上的一个组件,它引入了名为 SchemaRDD 的新数据抽象,该抽象为结构化和半结构化数据提供了支持。

Spark Streaming

Spark Streaming 利用 Spark 核心 的快速调度功能执行流分析。它以小批量的方式获取数据,并在这些数据小批量上执行 RDD(弹性分布式数据集)转换。

MLlib (Machine Learning Library)

MLlib 是一个分布式机器学习框架,位于 Spark 之上,因为其分布式基于内存的 Spark 架构。根据 MLlib 开发人员对交替最小二乘法 (ALS) 实现进行的基准测试,它比 Hadoop 基于磁盘的版本快九倍。 Apache Mahout (在 Mahout 获得 Spark 接口之前)。

GraphX

GraphX 是一个分布式图处理框架,位于 Spark 之上。它提供了一个用于表示图形计算的 API,该 API 可以使用 Pregel 抽象 API 对用户定义的图形进行建模。它还为这种抽象提供了优化的运行时。

Apache Spark - RDD

Resilient Distributed Datasets

弹性分布式数据集 (RDD) 是 Spark 的一个基本数据结构。它是对象的不可变分布式集合。RDD 中的每个数据集都被划分为逻辑分区,这些分区可以在群集的不同节点上计算。RDD 可以包含任何类型的 Python、Java 或 Scala 对象,包括用户定义的类。

形式上,RDD 是只读的分区记录集合。RDD 可以通过对稳定存储器中的数据或其他 RDD 上的确定性操作来创建。RDD 是一个容错元素集合,可以并行对其进行操作。

有两种方法可以创建 RDD − parallelizing 驱动程序程序中的现有集合或 referencing a dataset 位于外部存储系统中,例如共享文件系统、HDFS、HBase 或提供 Hadoop 输入格式的任何数据源。

Spark 利用 RDD 的概念实现了更快速、高效的 MapReduce 操作。让我们首先讨论 MapReduce 操作如何进行以及为什么它们效率不高。

Data Sharing is Slow in MapReduce

MapReduce 被广泛采用,用于在群集上通过并行分布式算法处理和生成大型数据集。它允许用户编写并行计算,使用一组高级运算符,而无需担心工作分布和容错能力。

不幸的是,在大多数当前框架中,在计算之间(例如两个 MapReduce 作业之间)重用数据的唯一方法是将其写入外部稳定存储系统(例如 HDFS)。尽管此框架提供了许多用于访问群集计算资源的抽象,但用户仍然希望获得更多。

IterativeInteractive 应用程序都需要并行作业之间更快的 data 共享。由于 replication, serializationdisk IO ,MapReduce 中 data 共享速度很慢。关于存储系统,大多数 Hadoop 应用程序花费超过 90% 的时间执行 HDFS 读写操作。

Iterative Operations on MapReduce

在多阶段应用程序中跨多个计算重用中间结果。下图解释了在 MapReduce 上执行迭代操作时当前框架如何工作。由于复制数据、磁盘 I/O 和序列化,这会产生大量的开销,从而导致系统变慢。

iterative operations on mapreduce

Interactive Operations on MapReduce

用户针对相同数据子集运行即席查询。每个查询将对持久性存储执行磁盘 I/O,这会占据应用执行时间的大部分。

下图说明了当前框架在 MapReduce 上执行交互式查询时如何工作。

interactive operations on mapreduce

Data Sharing using Spark RDD

由于 replication, serializationdisk IO ,MapReduce 中的数据共享较慢。大多数 Hadoop 应用在执行 HDFS 读写操作时会花费 90% 以上的时间。

认识到这个问题后,研究人员开发了一个名为 Apache Spark 的专门框架。Spark 的关键思想是 *R*esilient *D*istributed *D*atasets (RDD);它支持内存内处理计算。这意味着它将内存的状态存储为作业间的对象,且此对象可在这些作业中共享。内存内的数据共享比网络和磁盘快 10 到 100 倍。

现在,我们尝试找出 Spark RDD 中迭代和交互式操作如何执行。

Iterative Operations on Spark RDD

下图显示了 Spark RDD 上的迭代操作。它会将中间结果存储在分布式内存中,而不是持久性存储(磁盘)中,并使系统更快。

Note - 如果分布式内存(RAM)足以存储中间结果(作业状态),则它会将这些结果存储在磁盘上。

iterative operations on spark rdd

Interactive Operations on Spark RDD

此图显示了 Spark RDD 上的交互式操作。如果针对相同数据集反复运行不同的查询,则可以将这个特定数据保留在内存中,以获得更好的执行时间。

interactive operations on spark rdd

默认情况下,每次针对转换后的 RDD 执行操作时,都会重新计算该 RDD。但是,您也可以选择将 RDD persist 在内存中,在这种情况下,Spark 将把元素保留在集群中以便下次查询时更快地访问。此外,还支持将 RDD 持久保留在磁盘上或跨多个节点复制。

Apache Spark - Installation

Spark 是 Hadoop 的子项目。因此,最好将 Spark 安装到基于 Linux 的系统中。以下步骤显示如何安装 Apache Spark。

Step 1: Verifying Java Installation

Java 安装是安装 Spark 的必备事项之一。尝试以下命令以验证 JAVA 版本。

$java -version

如果系统中已安装 Java,您将看到以下响应:

java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

如果您系统中尚未安装 Java,请在继续下一步操作之前安装 Java。

Step 2: Verifying Scala installation

您应使用 Scala 语言来实现 Spark。因此,让我们使用以下命令验证 Scala 安装。

$scala -version

如果系统中已安装 Scala,您将看到以下响应:

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

如果您系统中尚未安装 Scala,则继续执行下一步操作以安装 Scala。

Step 3: Downloading Scala

下载 Scala 的最新版本,请访问以下链接 Download Scala 。对于本教程,我们使用 scala-2.11.6 版本。下载后,您将在下载文件夹中找到 Scala tar 文件。

Step 4: Installing Scala

按照以下给定的步骤安装 Scala。

Extract the Scala tar file

键入以下命令以解压 Scala tar 文件。

$ tar xvf scala-2.11.6.tgz

Move Scala software files

使用以下命令将 Scala 软件文件移动到相应的目录 (/usr/local/scala)

$ su –
Password:
# cd /home/Hadoop/Downloads/
# mv scala-2.11.6 /usr/local/scala
# exit

Set PATH for Scala

使用以下命令为 Scala 设置 PATH。

$ export PATH = $PATH:/usr/local/scala/bin

Verifying Scala Installation

安装后,最好对其进行验证。使用以下命令验证 Scala 安装。

$scala -version

如果系统中已安装 Scala,您将看到以下响应:

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

Step 5: Downloading Apache Spark

访问以下链接 Download Spark 下载 Spark 最新版本。本教程中,我们使用 spark-1.3.1-bin-hadoop2.6 版本。下载后,你将在下载文件夹中找到 Spark tar 文件。

Step 6: Installing Spark

按照以下步骤执行 Spark 安装。

Extracting Spark tar

以下命令用于解压 spark tar 文件。

$ tar xvf spark-1.3.1-bin-hadoop2.6.tgz

Moving Spark software files

以下命令用于将 Spark 软件文件移动到相应的目录 (/usr/local/spark)

$ su –
Password:

# cd /home/Hadoop/Downloads/
# mv spark-1.3.1-bin-hadoop2.6 /usr/local/spark
# exit

Setting up the environment for Spark

将以下行添加到 ~ /.bashrc 文件中。这意味着将 Spark 软件文件所在的路径添加到 PATH 变量。

export PATH=$PATH:/usr/local/spark/bin

使用以下命令加载 ~/.bashrc 文件。

$ source ~/.bashrc

Step 7: Verifying the Spark Installation

输入以下命令打开 Spark shell。

$spark-shell

如果 Spark 安装成功,你将看到以下输出。

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>

Apache Spark - Core Programming

Spark Core 是整个项目的底层。它提供了分布式任务分发、调度和基本 I/O 功能。Spark 使用一种称为 RDD(弹性分布式数据集)的特殊基本数据结构,它是分布在多台机器上的数据的逻辑集合。RDD 可以通过两种方式创建:一是引用外部存储系统中的数据集,二是针对现有的 RDD 应用转换(例如,map、filter、reducer、join)。

通过语言集成的 API 公开了 RDD 抽象。这简化了编程复杂度,这是因为应用程序处理 RDD 的方式类似于处理本地数据集合。

Spark Shell

Spark 提供了一个交互式 shell——一种交互式分析数据的强大工具。此工具既可以在 Scala 中使用,也可以在 Python 语言中使用。Spark 的主要抽象是称为弹性分布式数据集 (RDD) 的分布式项集合。RDD 可以从 Hadoop 输入格式(例如,HDFS 文件)或通过转换其他 RDD 创建。

Open Spark Shell

以下命令用于打开 Spark shell。

$ spark-shell

Create simple RDD

让我们从文本文件中创建一个简单的 RDD。使用以下命令创建简单的 RDD。

scala> val inputfile = sc.textFile(“input.txt”)

上述命令的输出为

inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12

Spark RDD API 引入了少数 Transformations 和少数 Actions 来操作 RDD。

RDD Transformations

RDD 转换返回对新 RDD 的指针,并允许你创建 RDD 之间的依赖关系。依赖关系链(依赖关系串)中的每个 RDD 都具有一个用于计算其数据的函数,并具有对其父 RDD 的指针(依赖关系)。

Spark 比较懒,所以在调用一些将触发作业创建和执行的转换或动作之前,不会执行任何操作。查看单词计数示例的以下代码段。

因此,RDD 转换不是一组数据,而是程序中的一个步骤(可能是唯一的步骤),告诉 Spark 如何获取数据并对其进行处理。

Actions

Programming with RDD

我们借助示例,来看看 RDD 编程中一些 RDD 转换和动作的实现。

Example

考虑一个单词计数的示例——它计算一个文档中出现的每个单词。将以下文本视为一个输入,在主目录中将其保存为 input.txt 文件。

input.txt ——输入文件。

people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful  as they love,
as they care as they share.

按照以下给出的过程执行给定的示例。

Open Spark-Shell

使用以下命令打开 Spark Shell。通常,Spark 是使用 Scala 构建的。因此,Spark 程序在 Scala 环境中运行。

$ spark-shell

如果 Spark Shell 顺利打开,那么将会看到以下输出。查看输出的最后一行“Spark context available as sc”,这意味着 Spark 容器已自动创建名为 sc 的 Spark context 对象。在开始程序的第一步之前,应该创建 SparkContext 对象。

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>

Create an RDD

首先,我们必须使用 Spark-Scala API 读取输入文件并创建一个 RDD。

以下命令用于从给定位置读取文件。此处,使用 inputfile 的名称创建了新的 RDD。在 textFile(“”) 方法中作为参数提供的 String 是输入文件名所在绝对路径。但是,如果只给出了文件名,则表示输入文件位于当前位置。

scala> val inputfile = sc.textFile("input.txt")

Execute Word count Transformation

我们的目标是统计文件中出现的单词。创建一个平面映射来将每行拆分到词组中( flatMap(line ⇒ line.split(“ ”) )。

接下来,使用映射功能( map(word ⇒ (word, 1) )将每个单词作为具有值 ‘1’ 的键读取(<key, value> = <word,1>)。

最后,通过添加类似键的键值来缩减这些键( reduceByKey(+) )。

以下命令用于执行单词计数逻辑。执行此操作之后,不会看到任何输出,因为这不是一个动作,而是一个转换;指向新的 RDD 或告诉 Spark 如何处理给定数据。

scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);

Current RDD

在使用 RDD 时,如果想要了解当前 RDD,那么使用以下命令。它将显示有关当前 RDD 的描述及用于调试的依赖项。

scala> counts.toDebugString

Caching the Transformations

可以使用 persist() 或 cache() 方法将 RDD 标记为持久化。在动作中首次对其进行计算时,它将保存在节点的内存中。使用以下命令将中间转换存储在内存中。

scala> counts.cache()

Applying the Action

应用一个动作,例如存储所有转换,会生成一个文本文件。saveAsTextFile(“ ”) 方法的 String 参数是输出文件夹的绝对路径。尝试以下命令,将输出存储到文本文件中。在以下示例中,“output”文件夹位于当前位置。

scala> counts.saveAsTextFile("output")

Checking the Output

打开另一个终端进入主目录(在另一个终端中执行 Spark 的地方)。使用以下命令检查输出目录。

[hadoop@localhost ~]$ cd output/
[hadoop@localhost output]$ ls -1

part-00000
part-00001
_SUCCESS

以下命令用于查看 Part-00000 文件的输出。

[hadoop@localhost output]$ cat part-00000

Output

(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)

以下命令用于查看 Part-00001 文件的输出。

[hadoop@localhost output]$ cat part-00001

Output

(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)

UN Persist the Storage

在取消持久化之前,如果你想查看该应用程序使用的存储空间,请在浏览器中使用以下 URL。

http://localhost:4040

以下屏幕将显示该应用程序使用的存储空间,这些应用程序正在 Spark shell 上运行。

storage space

如果你要取消持久化特定 RDD 的存储空间,请使用以下命令。

Scala> counts.unpersist()

你将看到如下输出:

15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810)
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106)
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14

要验证浏览器中的存储空间,请使用以下 URL。

http://localhost:4040/

你将看到以下屏幕。它显示了该应用程序使用的存储空间,这些应用程序正在 Spark shell 上运行。

storage space for application

Apache Spark - Deployment

Spark 应用程序使用 spark-submit,这是一个 shell 命令,用于在群集上部署 Spark 应用程序。它通过统一的界面,使用所有各自的群集管理器。因此,你无需为每个管理器配置应用程序。

Example

我们使用 shell 命令,来看看之前使用的单词计数的相同示例。在这里,我们考虑相同的示例作为 Spark 应用程序。

Sample Input

以下文本是输入数据,该文件名为 in.txt

people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful  as they love,
as they care as they share.

查看以下程序:

SparkWordCount.scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._

object SparkWordCount {
   def main(args: Array[String]) {

      val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map())

      /* local = master URL; Word Count = application name; */
      /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */
      /* Map = variables to work nodes */
      /*creating an inputRDD to read text file (in.txt) through Spark context*/
      val input = sc.textFile("in.txt")
      /* Transform the inputRDD into countRDD */

      val count = input.flatMap(line ⇒ line.split(" "))
      .map(word ⇒ (word, 1))
      .reduceByKey(_ + _)

      /* saveAsTextFile method is an action that effects on the RDD */
      count.saveAsTextFile("outfile")
      System.out.println("OK");
   }
}

将上述程序保存在名为 SparkWordCount.scala 的文件中,并将其放置在名为 spark-application 的用户定义目录中。

Note - 将 inputRDD 转换为 countRDD 时,我们使用 flatMap() 对文本文件中的行进行标记化,使用 map() 方法计算单词频率,并使用 reduceByKey() 方法计算每个单词的重复次数。

使用以下步骤提交该应用程序。通过终端执行 spark-application 目录中的所有步骤。

Step 1: Download Spark Ja

编译需要 Spark 核心 jar,因此,请从以下链接下载 spark-core_2.10-1.3.0.jar Spark core jar ,并将 jar 文件从下载目录移动到 spark-application 目录。

Step 2: Compile program

使用以下命令编译上面的程序。该命令应从 spark-application 目录执行。在这里, /usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar 是从 Spark 库获取的 Hadoop 支持 jar。

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

Step 3: Create a JAR

使用以下命令创建 spark 应用程序的 jar 文件。在这里, wordcount 是 jar 文件的文件名。

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

Step 4: Submit spark application

使用以下命令提交 spark 应用程序:

spark-submit --class SparkWordCount --master local wordcount.jar

如果执行成功,你将找到给定的以下输出。以下输出中出现的 OK 是为了识别用户,这是程序的最后一行。如果你仔细阅读以下输出,你将发现不同信息,例如:

  1. 在端口 42954 上成功启动服务 'sparkDriver'

  2. MemoryStore 已启动,容量为 267.3 MB

  3. Started SparkUI at [role="bare"]http://192.168.1.217:4040

  4. Added JAR file:/home/hadoop/piapplication/count.jar

  5. ResultStage 1(SparkPi.scala:11 处的 saveAsTextFile)在 0.566 秒内完成

  6. 已在 [role="bare"] [role="bare"]http://192.168.1.217:4040 处停止 Spark 网络 UI

  7. MemoryStore cleared

15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954.
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.217:42954]
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707.
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver
 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11)
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion.
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext
15/07/08 13:56:14 INFO Utils: Shutdown hook called
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

Step 5: Checking output

程序成功执行后,你将在 spark-application 目录中找到名为 outfile 的目录。

以下命令用于打开和检查 outfile 目录中的文件列表。

$ cd outfile
$ ls
Part-00000 part-00001 _SUCCESS

检查 part-00000 文件中输出的命令为 −

$ cat part-00000
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)

检查 part-00001 文件中输出的命令为 −

$ cat part-00001
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)

转到以下部分详细了解 'spark-submit' 命令。

Spark-submit Syntax

spark-submit [options] <app jar | python file> [app arguments]

Options