Spark Sql 简明教程
Spark - Introduction
各行业都在广泛使用 Hadoop 来分析其数据集。原因在于 Hadoop 框架基于一个简单的编程模型(MapReduce),并且它提供了一个可扩展、灵活、容错且具有成本效益的计算解决方案。这里,主要顾虑是在处理大型数据集的速度,即查询之间的等待时间和运行程序的等待时间。
Apache 软件基金会推出了 Spark 来加速 Hadoop 计算软件流程。
与普遍的看法相反, Spark is not a modified version of Hadoop 并不真正依赖于 Hadoop,因为它有自己的集群管理。Hadoop 只是实现 Spark 的方式之一。
Spark 以两种方式使用 Hadoop - 一种是 storage ,第二种是 processing 。由于 Spark 有自己的集群管理计算,所以它只将 Hadoop 用于存储目的。
Apache Spark
Apache Spark 是一种闪电般快速的集群计算技术,专为快速计算而设计。它基于 Hadoop MapReduce,并扩展了 MapReduce 模型以有效地将其用于更多类型的计算,包括交互式查询和流处理。Spark 的主要特征是其 in-memory cluster computing ,它提高了应用程序的处理速度。
Spark 旨在涵盖广泛的工作负载,例如批处理应用程序、迭代算法、交互式查询和流处理。除了在各自系统中支持所有这些工作负载外,它还减轻了维护单独工具的管理负担。
Evolution of Apache Spark
Spark 是 Hadoop 的一个子项目,于 2009 年在加州大学伯克利分校 AMPLab 由 Matei Zaharia 开发。它于 2010 年根据 BSD 许可证开源。它于 2013 年捐赠给 Apache 软件基金会,现在 Apache Spark 已成为自 2014 年 2 月起的一个顶级 Apache 项目。
Features of Apache Spark
Apache Spark 具有以下功能。
-
Speed - Spark 帮助在 Hadoop 集群中运行应用程序,速度在内存中提高多达 100 倍,在磁盘上运行时速度提高 10 倍。通过减少对磁盘的读/写操作次数,可以做到这一点。它将中间处理数据存储在内存中。
-
Supports multiple languages - Spark 在 Java、Scala 或 Python 中提供了内置的 API。因此,你可以使用不同的语言编写应用程序。Spark 提供了 80 个高级运算符用于交互式查询。
-
Advanced Analytics - Spark 不仅支持“映射”和“还原”。它还支持 SQL 查询、流数据、机器学习 (ML) 和图形算法。
Spark Built on Hadoop
下图展示了使用 Hadoop 组件构建 Spark 的三种方法。
如以下说明所示,共有三种 Spark 部署方法。
-
Standalone − Spark 独立部署表示 Spark 占据 HDFS(Hadoop 分布式文件系统)之上的位置,并明确分配空间给 HDFS。在此,Spark 和 MapReduce 将并排运行以覆盖群集上的所有 Spark 作业。
-
Hadoop Yarn − Hadoop Yarn 部署仅仅意味着 Spark 在 Yarn 上运行,无需预安装或 root 访问权限。它有助于将 Spark 集成到 Hadoop 生态系统或 Hadoop 堆栈中。它允许其他组件在堆栈之上运行。
-
Spark in MapReduce (SIMR) − Spark in MapReduce 用于在独立部署之外启动 spark 作业。通过 SIMR,用户可以启动 Spark 并使用其 shell,而无需任何管理访问权限。
Spark – RDD
Resilient Distributed Datasets
弹性分布式数据集 (RDD) 是 Spark 的一个基本数据结构。它是对象的不可变分布式集合。RDD 中的每个数据集都被划分为逻辑分区,这些分区可以在群集的不同节点上计算。RDD 可以包含任何类型的 Python、Java 或 Scala 对象,包括用户定义的类。
形式上,RDD 是只读的分区记录集合。RDD 可以通过对稳定存储器中的数据或其他 RDD 上的确定性操作来创建。RDD 是一个容错元素集合,可以并行对其进行操作。
有两种方法可以创建 RDD − parallelizing 驱动程序程序中的现有集合或 referencing a dataset 位于外部存储系统中,例如共享文件系统、HDFS、HBase 或提供 Hadoop 输入格式的任何数据源。
Spark 利用 RDD 的概念实现了更快速、高效的 MapReduce 操作。让我们首先讨论 MapReduce 操作如何进行以及为什么它们效率不高。
Data Sharing is Slow in MapReduce
MapReduce 被广泛采用,用于在群集上通过并行分布式算法处理和生成大型数据集。它允许用户编写并行计算,使用一组高级运算符,而无需担心工作分布和容错能力。
不幸的是,在大多数当前框架中,在计算之间重用数据(例如:在两个 MapReduce 作业之间)的唯一方法是将其写入外部稳定存储系统(例如:HDFS)。虽然此框架提供了大量抽象技术来访问集群的计算资源,但用户仍然希望获得更多。
Iterative 和 Interactive 应用程序都需要更快的并行作业间数据共享。数据共享在 MapReduce 中很慢,原因在于 replication 、 serialization 和 disk IO 。关于存储系统,大多数 Hadoop 应用程序都会将 90% 以上的时间用于执行 HDFS 读写操作。
Iterative Operations on MapReduce
在多阶段应用程序中跨多个计算重用中间结果。下图解释了在 MapReduce 上执行迭代操作时当前框架如何工作。由于复制数据、磁盘 I/O 和序列化,这会产生大量的开销,从而导致系统变慢。
Interactive Operations on MapReduce
用户对相同的数据子集运行即席查询。每个查询都将在稳定存储上执行磁盘 I/O,这会占据应用程序执行时间的大部分。
下图说明了当前框架在 MapReduce 上执行交互式查询时如何工作。
Data Sharing using Spark RDD
数据共享在 MapReduce 中很慢,原因在于 replication 、 serialization 和 disk IO 。大多数 Hadoop 应用程序都会将 90% 以上的时间用于执行 HDFS 读写操作。
认识到这个问题后,研究人员开发了一个名为 Apache Spark 的专门框架。Spark 的关键思想是 *R*esilient *D*istributed *D*atasets (RDD);它支持内存内处理计算。这意味着它将内存的状态存储为作业间的对象,且此对象可在这些作业中共享。内存内的数据共享比网络和磁盘快 10 到 100 倍。
现在,我们尝试找出 Spark RDD 中迭代和交互式操作如何执行。
Spark - Installation
Spark 是 Hadoop 的子项目。因此,最好将 Spark 安装到基于 Linux 的系统中。以下步骤显示如何安装 Apache Spark。
Step1: Verifying Java Installation
Java 安装是安装 Spark 的必备事项之一。尝试以下命令以验证 JAVA 版本。
$java -version
如果系统中已安装 Java,您将看到以下响应:
java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)
如果您系统中尚未安装 Java,请在继续下一步操作之前安装 Java。
Step2: Verifying Scala Installation
您应使用 Scala 语言来实现 Spark。因此,让我们使用以下命令验证 Scala 安装。
$scala -version
如果系统中已安装 Scala,您将看到以下响应:
Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL
如果您系统中尚未安装 Scala,则继续执行下一步操作以安装 Scala。
Step3: Downloading Scala
访问以下链接下载最新版本的 Scala Download Scala 。在本教程中,我们使用 scala-2.11.6 版本。下载后,你会在下载文件夹中找到 Scala tar 文件。
Step4: Installing Scala
按照以下给定的步骤安装 Scala。
Step5: Downloading Apache Spark
访问以下链接 Download Spark 下载 Spark 最新版本。本教程中,我们使用 spark-1.3.1-bin-hadoop2.6 版本。下载后,你将在下载文件夹中找到 Spark tar 文件。
Step6: Installing Spark
按照以下步骤执行 Spark 安装。
Step7: Verifying the Spark Installation
输入以下命令打开 Spark shell。
$spark-shell
如果 Spark 安装成功,你将看到以下输出。
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>
Spark SQL - Introduction
Spark 引入了称为 Spark SQL 的结构化数据处理编程模块。它提供了一个称为 DataFrame 的编程抽象,并可以充当分布式 SQL 查询引擎。
Features of Spark SQL
以下为 Spark SQL 的功能 −
-
Integrated − 将 SQL 查询与 Spark 程序无缝混合。Spark SQL 允许您将结构化数据作为 Spark 中的分布式数据集(RDD)查询,并与 Python、Scala 和 Java 中的 API 集成。这种紧密集成使得运行 SQL查询以及复杂的分析算法变得非常容易。
-
Unified Data Access − 从各种来源加载和查询数据。Schema-RDD 提供了一个用于有效处理结构化数据的单一界面,包括 Apache Hive 表、parquet 文件和 JSON 文件。
-
Hive Compatibility − 在现有仓库上运行未经修改的 Hive 查询。Spark SQL 会重复使用 Hive 前端和 MetaStore,使您与现有 Hive 数据、查询和 UDF 完全兼容。只需将其与 Hive 一起安装。
-
Standard Connectivity − 通过 JDBC 或 ODBC 连接。Spark SQL 包含一个搭载行业标准 JDBC 和 ODBC 连接性的服务器模式。
-
Scalability − 同一引擎用于交互查询和长时间查询。Spark SQL 利用 RDD 模型支持查询故障中容,可将它扩展到大型作业。无需为历史数据使用不同的引擎。
Spark SQL Architecture
下图说明了 Spark SQL 的架构 −
此架构包含三层,分别为语言 API、模式 RDD 和数据源。
-
Language API − Spark 与不同的语言和 Spark SQL 兼容。还受以下语言支持:API (python、scala、java、HiveQL)。
-
Schema RDD − Spark 内核的特殊数据结构称为 RDD。一般来说,Spark SQL 处理模式、表格和记录。因此,我们可以将模式 RDD 用作临时表格。我们可以将此模式 RDD 称为数据帧。
-
Data Sources − 通常来说,Spark 内核的数据源是文本文件、Avro 文件等。然而,Spark SQL 的数据源有所不同。这些数据源是 Parquet 文件、JSON 文档、HIVE 表和 Cassandra 数据库。
我们将在后续章节中深入讨论这些内容。
Spark SQL - DataFrames
数据帧是一个分布式数据集合,按指定列排列。从概念上讲,它相当于关系表,带有良好的优化技术。
数据帧可基于不同源的数组构建,如 Hive 表、结构化数据文件、外部数据库或现有的 RDD。此 API 设计用于现代大数据和数据科学应用程序,灵感源自 DataFrame in R Programming 和 Pandas in Python 。
Features of DataFrame
以下是 DataFrame 的一些特征特性集合 −
-
在单个节点集群到大集群上处理千兆字节到 PB(拍字节)规模数据的能力。
-
支持不同的数据格式(Avro、csv、弹性搜索和 Cassandra)和存储系统(HDFS、HIVE 表、MySQL 等)。
-
通过 Spark SQL Catalyst 优化器(树变换框架)进行最先进的优化和代码生成。
-
可以通过 Spark Core 轻松与所有大数据工具和框架集成。
-
提供 Python、Java、Scala 和 R 编程的 API。
SQLContext
SQLContext 是一个类,用于初始化 Spark SQL 的功能。SparkContext 类对象 (sc) 用于初始化 SQLContext 类对象。
以下命令用于通过 spark-shell 初始化 SparkContext。
$ spark-shell
默认情况下,当 spark-shell 启动时,SparkContext 对象会使用名称 sc 初始化。
使用以下命令创建 SQLContext。
scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
Example
我们来考虑一个名为 employee.json 的 JSON 文件中的员工记录示例。使用以下命令创建 DataFrame (df) 并读取一个名为 employee.json 的 JSON 文档,其内容如下。
employee.json − 将此文件放在当前 scala> 指针所在的目录中。
{
{"id" : "1201", "name" : "satish", "age" : "25"}
{"id" : "1202", "name" : "krishna", "age" : "28"}
{"id" : "1203", "name" : "amith", "age" : "39"}
{"id" : "1204", "name" : "javed", "age" : "23"}
{"id" : "1205", "name" : "prudvi", "age" : "23"}
}
DataFrame Operations
DataFrame 提供了专门针对结构化数据处理的领域性语言。此处,我们包括使用 DataFrame 进行结构化数据处理的一些基本示例。
按照以下步骤执行 DataFrame 操作 −
Read the JSON Document
首先,我们必须读取 JSON 文档。基于此,生成名为 (dfs) 的 DataFrame。
使用以下命令读取名为 employee.json 的 JSON 文档。数据以一个包含字段 − id、name 和 age 的表格形式显示。
scala> val dfs = sqlContext.read.json("employee.json")
Output − 字段名称自动从 employee.json 中获取。
dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]
Show the Data
如果您想要查看 DataFrame 中的数据,请使用以下命令。
scala> dfs.show()
Output − 您可以在表格格式中查看员工数据。
<console>:22, took 0.052610 s
+----+------+--------+
|age | id | name |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith |
| 23 | 1204 | javed |
| 23 | 1205 | prudvi |
+----+------+--------+
Use printSchema Method
如果您想查看 DataFrame 的结构(模式),请使用以下命令。
scala> dfs.printSchema()
Output
root
|-- age: string (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
Use Select Method
使用以下命令从 DataFrame 中三个列中获取 name 列。
scala> dfs.select("name").show()
Output − 您可以看到 name 列的值。
<console>:22, took 0.044023 s
+--------+
| name |
+--------+
| satish |
| krishna|
| amith |
| javed |
| prudvi |
+--------+
Running SQL Queries Programmatically
SQLContext 允许应用程序在运行 SQL 函数时以编程方式运行 SQL 查询,并将结果返回为 DataFrame。
通常,在后台,SparkSQL 支持两种不同的将现有 RDD 转换为 DataFrame 的方法 −
Sr. No |
Methods & Description |
1 |
Inferring the Schema using Reflection 此方法使用反射来生成包含特定类型对象的 RDD 的模式。 |
2 |
Programmatically Specifying the Schema 创建 DataFrame 的第二种方法是通过允许您构造模式然后将其应用于现有 RDD 的编程接口。 |
Spark SQL - Data Sources
一个数据帧接口允许不同的数据源在 Spark SQL 上运行。这是一个临时表,可以作为普通 RDD 进行操作。将数据帧注册为表格可让你对其数据运行 SQL 查询。
在本章中,我们将介绍使用不同 Spark 数据源加载和保存数据的一般方法。然后,我们将详细讨论内置数据源的可用特定选项。
SparkSQL 中有不同的数据源类型,其中一些列在下面:
Sr. No |
Data Sources |
1 |
JSON Datasets Spark SQL 可以自动捕获 JSON 数据集的模式并将其加载为数据帧。 |
2 |
Hive Tables Hive 随 HiveContext 与 Spark 库捆绑在一起,该库继承自 SQLContext。 |
3 |
Parquet Files Parquet 是一种由许多数据处理系统支持的列格式。 |