Spark Sql 简明教程

Spark SQL - Quick Guide

Spark - Introduction

各行业都在广泛使用 Hadoop 来分析其数据集。原因在于 Hadoop 框架基于一个简单的编程模型(MapReduce),并且它提供了一个可扩展、灵活、容错且具有成本效益的计算解决方案。这里,主要顾虑是在处理大型数据集的速度,即查询之间的等待时间和运行程序的等待时间。

Apache 软件基金会推出了 Spark 来加速 Hadoop 计算软件流程。

与普遍的看法相反, Spark is not a modified version of Hadoop 并不真正依赖于 Hadoop,因为它有自己的集群管理。Hadoop 只是实现 Spark 的方式之一。

Spark 以两种方式使用 Hadoop - 一种是 storage ,第二种是 processing 。由于 Spark 有自己的集群管理计算,所以它只将 Hadoop 用于存储目的。

Apache Spark

Apache Spark 是一种闪电般快速的集群计算技术,专为快速计算而设计。它基于 Hadoop MapReduce,并扩展了 MapReduce 模型以有效地将其用于更多类型的计算,包括交互式查询和流处理。Spark 的主要特征是其 in-memory cluster computing ,它提高了应用程序的处理速度。

Spark 旨在涵盖广泛的工作负载,例如批处理应用程序、迭代算法、交互式查询和流处理。除了在各自系统中支持所有这些工作负载外,它还减轻了维护单独工具的管理负担。

Evolution of Apache Spark

Spark 是 Hadoop 的一个子项目,于 2009 年在加州大学伯克利分校 AMPLab 由 Matei Zaharia 开发。它于 2010 年根据 BSD 许可证开源。它于 2013 年捐赠给 Apache 软件基金会,现在 Apache Spark 已成为自 2014 年 2 月起的一个顶级 Apache 项目。

Features of Apache Spark

Apache Spark 具有以下功能。

  1. Speed - Spark 帮助在 Hadoop 集群中运行应用程序,速度在内存中提高多达 100 倍,在磁盘上运行时速度提高 10 倍。通过减少对磁盘的读/写操作次数,可以做到这一点。它将中间处理数据存储在内存中。

  2. Supports multiple languages - Spark 在 Java、Scala 或 Python 中提供了内置的 API。因此,你可以使用不同的语言编写应用程序。Spark 提供了 80 个高级运算符用于交互式查询。

  3. Advanced Analytics - Spark 不仅支持“映射”和“还原”。它还支持 SQL 查询、流数据、机器学习 (ML) 和图形算法。

Spark Built on Hadoop

下图展示了使用 Hadoop 组件构建 Spark 的三种方法。

spark built on hadoop

如以下说明所示,共有三种 Spark 部署方法。

  1. Standalone − Spark 独立部署表示 Spark 占据 HDFS(Hadoop 分布式文件系统)之上的位置,并明确分配空间给 HDFS。在此,Spark 和 MapReduce 将并排运行以覆盖群集上的所有 Spark 作业。

  2. Hadoop Yarn − Hadoop Yarn 部署仅仅意味着 Spark 在 Yarn 上运行,无需预安装或 root 访问权限。它有助于将 Spark 集成到 Hadoop 生态系统或 Hadoop 堆栈中。它允许其他组件在堆栈之上运行。

  3. Spark in MapReduce (SIMR) − Spark in MapReduce 用于在独立部署之外启动 spark 作业。通过 SIMR,用户可以启动 Spark 并使用其 shell,而无需任何管理访问权限。

Components of Spark

下图描绘了 Spark 的不同组件。

components of spark

Apache Spark Core

Spark 核心是 Spark 平台的基础通用执行引擎,所有其他功能都建立在其之上。它提供内存计算和外部存储系统中的引用数据集。

Spark SQL

Spark SQL 是 Spark 核心之上的一个组件,它引入了名为 SchemaRDD 的新数据抽象,该抽象为结构化和半结构化数据提供了支持。

Spark Streaming

Spark Streaming 利用 Spark 核心 的快速调度功能执行流分析。它以小批量的方式获取数据,并在这些数据小批量上执行 RDD(弹性分布式数据集)转换。

MLlib (Machine Learning Library)

MLlib 是一个分布式机器学习框架,位于 Spark 之上,因为其分布式基于内存的 Spark 架构。根据 MLlib 开发人员对交替最小二乘法 (ALS) 实现进行的基准测试,它比 Hadoop 基于磁盘的版本快九倍。 Apache Mahout (在 Mahout 获得 Spark 接口之前)。

GraphX

GraphX 是一个分布式图处理框架,位于 Spark 之上。它提供了一个用于表示图形计算的 API,该 API 可以使用 Pregel 抽象 API 对用户定义的图形进行建模。它还为这种抽象提供了优化的运行时。

Spark – RDD

Resilient Distributed Datasets

弹性分布式数据集 (RDD) 是 Spark 的一个基本数据结构。它是对象的不可变分布式集合。RDD 中的每个数据集都被划分为逻辑分区,这些分区可以在群集的不同节点上计算。RDD 可以包含任何类型的 Python、Java 或 Scala 对象,包括用户定义的类。

形式上,RDD 是只读的分区记录集合。RDD 可以通过对稳定存储器中的数据或其他 RDD 上的确定性操作来创建。RDD 是一个容错元素集合,可以并行对其进行操作。

有两种方法可以创建 RDD − parallelizing 驱动程序程序中的现有集合或 referencing a dataset 位于外部存储系统中,例如共享文件系统、HDFS、HBase 或提供 Hadoop 输入格式的任何数据源。

Spark 利用 RDD 的概念实现了更快速、高效的 MapReduce 操作。让我们首先讨论 MapReduce 操作如何进行以及为什么它们效率不高。

Data Sharing is Slow in MapReduce

MapReduce 被广泛采用,用于在群集上通过并行分布式算法处理和生成大型数据集。它允许用户编写并行计算,使用一组高级运算符,而无需担心工作分布和容错能力。

不幸的是,在大多数当前框架中,在计算之间重用数据(例如:在两个 MapReduce 作业之间)的唯一方法是将其写入外部稳定存储系统(例如:HDFS)。虽然此框架提供了大量抽象技术来访问集群的计算资源,但用户仍然希望获得更多。

IterativeInteractive 应用程序都需要更快的并行作业间数据共享。数据共享在 MapReduce 中很慢,原因在于 replicationserializationdisk IO 。关于存储系统,大多数 Hadoop 应用程序都会将 90% 以上的时间用于执行 HDFS 读写操作。

Iterative Operations on MapReduce

在多阶段应用程序中跨多个计算重用中间结果。下图解释了在 MapReduce 上执行迭代操作时当前框架如何工作。由于复制数据、磁盘 I/O 和序列化,这会产生大量的开销,从而导致系统变慢。

iterative operations on mapreduce

Interactive Operations on MapReduce

用户对相同的数据子集运行即席查询。每个查询都将在稳定存储上执行磁盘 I/O,这会占据应用程序执行时间的大部分。

下图说明了当前框架在 MapReduce 上执行交互式查询时如何工作。

interactive operations on mapreduce

Data Sharing using Spark RDD

数据共享在 MapReduce 中很慢,原因在于 replicationserializationdisk IO 。大多数 Hadoop 应用程序都会将 90% 以上的时间用于执行 HDFS 读写操作。

认识到这个问题后,研究人员开发了一个名为 Apache Spark 的专门框架。Spark 的关键思想是 *R*esilient *D*istributed *D*atasets (RDD);它支持内存内处理计算。这意味着它将内存的状态存储为作业间的对象,且此对象可在这些作业中共享。内存内的数据共享比网络和磁盘快 10 到 100 倍。

现在,我们尝试找出 Spark RDD 中迭代和交互式操作如何执行。

Iterative Operations on Spark RDD

下图显示了 Spark RDD 上的迭代操作。它会将中间结果存储在分布式内存中,而不是持久性存储(磁盘)中,并使系统更快。

Note − 如果分布式内存(RAM)足以存储中间结果(作业状态),则会将这些结果存储在磁盘上

iterative operations on spark rdd

Interactive Operations on Spark RDD

此图显示了 Spark RDD 上的交互式操作。如果针对相同数据集反复运行不同的查询,则可以将这个特定数据保留在内存中,以获得更好的执行时间。

interactive operations on spark rdd

默认情况下,每次针对转换后的 RDD 执行操作时,都会重新计算该 RDD。但是,您也可以选择将 RDD persist 在内存中,在这种情况下,Spark 将把元素保留在集群中以便下次查询时更快地访问。此外,还支持将 RDD 持久保留在磁盘上或跨多个节点复制。

Spark - Installation

Spark 是 Hadoop 的子项目。因此,最好将 Spark 安装到基于 Linux 的系统中。以下步骤显示如何安装 Apache Spark。

Step1: Verifying Java Installation

Java 安装是安装 Spark 的必备事项之一。尝试以下命令以验证 JAVA 版本。

$java -version

如果系统中已安装 Java,您将看到以下响应:

java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

如果您系统中尚未安装 Java,请在继续下一步操作之前安装 Java。

Step2: Verifying Scala Installation

您应使用 Scala 语言来实现 Spark。因此,让我们使用以下命令验证 Scala 安装。

$scala -version

如果系统中已安装 Scala,您将看到以下响应:

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

如果您系统中尚未安装 Scala,则继续执行下一步操作以安装 Scala。

Step3: Downloading Scala

访问以下链接下载最新版本的 Scala Download Scala 。在本教程中,我们使用 scala-2.11.6 版本。下载后,你会在下载文件夹中找到 Scala tar 文件。

Step4: Installing Scala

按照以下给定的步骤安装 Scala。

Extract the Scala tar file

键入以下命令以解压 Scala tar 文件。

$ tar xvf scala-2.11.6.tgz

Move Scala software files

使用以下命令将 Scala 软件文件移动到相应的目录 (/usr/local/scala)

$ su –
Password:
# cd /home/Hadoop/Downloads/
# mv scala-2.11.6 /usr/local/scala
# exit

Set PATH for Scala

使用以下命令为 Scala 设置 PATH。

$ export PATH = $PATH:/usr/local/scala/bin

Verifying Scala Installation

安装后,最好对其进行验证。使用以下命令验证 Scala 安装。

$scala -version

如果系统中已安装 Scala,您将看到以下响应:

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

Step5: Downloading Apache Spark

访问以下链接 Download Spark 下载 Spark 最新版本。本教程中,我们使用 spark-1.3.1-bin-hadoop2.6 版本。下载后,你将在下载文件夹中找到 Spark tar 文件。

Step6: Installing Spark

按照以下步骤执行 Spark 安装。

Extracting Spark tar

以下命令用于解压 spark tar 文件。

$ tar xvf spark-1.3.1-bin-hadoop2.6.tgz

Moving Spark software files

以下命令用于将 Spark 软件文件移动到相应的目录 (/usr/local/spark)

$ su –
Password:
# cd /home/Hadoop/Downloads/
# mv spark-1.3.1-bin-hadoop2.6 /usr/local/spark
# exit

Setting up the environment for Spark

将以下行添加到 ~ /.bashrc 文件中。这意味着将 Spark 软件文件所在的路径添加到 PATH 变量。

export PATH = $PATH:/usr/local/spark/bin

使用以下命令加载 ~/.bashrc 文件。

$ source ~/.bashrc

Step7: Verifying the Spark Installation

输入以下命令打开 Spark shell。

$spark-shell

如果 Spark 安装成功,你将看到以下输出。

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
    ____             __
   / __/__ ___ _____/ /__
   _\ \/ _ \/ _ `/ __/ '_/
   /___/ .__/\_,_/_/ /_/\_\ version 1.4.0
      /_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>

Spark SQL - Introduction

Spark 引入了称为 Spark SQL 的结构化数据处理编程模块。它提供了一个称为 DataFrame 的编程抽象,并可以充当分布式 SQL 查询引擎。

Features of Spark SQL

以下为 Spark SQL 的功能 −

  1. Integrated − 将 SQL 查询与 Spark 程序无缝混合。Spark SQL 允许您将结构化数据作为 Spark 中的分布式数据集(RDD)查询,并与 Python、Scala 和 Java 中的 API 集成。这种紧密集成使得运行 SQL查询以及复杂的分析算法变得非常容易。

  2. Unified Data Access − 从各种来源加载和查询数据。Schema-RDD 提供了一个用于有效处理结构化数据的单一界面,包括 Apache Hive 表、parquet 文件和 JSON 文件。

  3. Hive Compatibility − 在现有仓库上运行未经修改的 Hive 查询。Spark SQL 会重复使用 Hive 前端和 MetaStore,使您与现有 Hive 数据、查询和 UDF 完全兼容。只需将其与 Hive 一起安装。

  4. Standard Connectivity − 通过 JDBC 或 ODBC 连接。Spark SQL 包含一个搭载行业标准 JDBC 和 ODBC 连接性的服务器模式。

  5. Scalability − 同一引擎用于交互查询和长时间查询。Spark SQL 利用 RDD 模型支持查询故障中容,可将它扩展到大型作业。无需为历史数据使用不同的引擎。

Spark SQL Architecture

下图说明了 Spark SQL 的架构 −

spark sql architecture

此架构包含三层,分别为语言 API、模式 RDD 和数据源。

  1. Language API − Spark 与不同的语言和 Spark SQL 兼容。还受以下语言支持:API (python、scala、java、HiveQL)。

  2. Schema RDD − Spark 内核的特殊数据结构称为 RDD。一般来说,Spark SQL 处理模式、表格和记录。因此,我们可以将模式 RDD 用作临时表格。我们可以将此模式 RDD 称为数据帧。

  3. Data Sources − 通常来说,Spark 内核的数据源是文本文件、Avro 文件等。然而,Spark SQL 的数据源有所不同。这些数据源是 Parquet 文件、JSON 文档、HIVE 表和 Cassandra 数据库。

我们将在后续章节中深入讨论这些内容。

Spark SQL - DataFrames

数据帧是一个分布式数据集合,按指定列排列。从概念上讲,它相当于关系表,带有良好的优化技术。

数据帧可基于不同源的数组构建,如 Hive 表、结构化数据文件、外部数据库或现有的 RDD。此 API 设计用于现代大数据和数据科学应用程序,灵感源自 DataFrame in R ProgrammingPandas in Python

Features of DataFrame

以下是 DataFrame 的一些特征特性集合 −

  1. 在单个节点集群到大集群上处理千兆字节到 PB(拍字节)规模数据的能力。

  2. 支持不同的数据格式(Avro、csv、弹性搜索和 Cassandra)和存储系统(HDFS、HIVE 表、MySQL 等)。

  3. 通过 Spark SQL Catalyst 优化器(树变换框架)进行最先进的优化和代码生成。

  4. 可以通过 Spark Core 轻松与所有大数据工具和框架集成。

  5. 提供 Python、Java、Scala 和 R 编程的 API。

SQLContext

SQLContext 是一个类,用于初始化 Spark SQL 的功能。SparkContext 类对象 (sc) 用于初始化 SQLContext 类对象。

以下命令用于通过 spark-shell 初始化 SparkContext。

$ spark-shell

默认情况下,当 spark-shell 启动时,SparkContext 对象会使用名称 sc 初始化。

使用以下命令创建 SQLContext。

scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

Example

我们来考虑一个名为 employee.json 的 JSON 文件中的员工记录示例。使用以下命令创建 DataFrame (df) 并读取一个名为 employee.json 的 JSON 文档,其内容如下。

employee.json − 将此文件放在当前 scala> 指针所在的目录中。

{
   {"id" : "1201", "name" : "satish", "age" : "25"}
   {"id" : "1202", "name" : "krishna", "age" : "28"}
   {"id" : "1203", "name" : "amith", "age" : "39"}
   {"id" : "1204", "name" : "javed", "age" : "23"}
   {"id" : "1205", "name" : "prudvi", "age" : "23"}
}

DataFrame Operations

DataFrame 提供了专门针对结构化数据处理的领域性语言。此处,我们包括使用 DataFrame 进行结构化数据处理的一些基本示例。

按照以下步骤执行 DataFrame 操作 −

Read the JSON Document

首先,我们必须读取 JSON 文档。基于此,生成名为 (dfs) 的 DataFrame。

使用以下命令读取名为 employee.json 的 JSON 文档。数据以一个包含字段 − id、name 和 age 的表格形式显示。

scala> val dfs = sqlContext.read.json("employee.json")

Output − 字段名称自动从 employee.json 中获取。

dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]

Show the Data

如果您想要查看 DataFrame 中的数据,请使用以下命令。

scala> dfs.show()

Output − 您可以在表格格式中查看员工数据。

<console>:22, took 0.052610 s
+----+------+--------+
|age | id   |  name  |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
| 23 | 1204 | javed  |
| 23 | 1205 | prudvi |
+----+------+--------+

Use printSchema Method

如果您想查看 DataFrame 的结构(模式),请使用以下命令。

scala> dfs.printSchema()

Output

root
   |-- age: string (nullable = true)
   |-- id: string (nullable = true)
   |-- name: string (nullable = true)

Use Select Method

使用以下命令从 DataFrame 中三个列中获取 name 列。

scala> dfs.select("name").show()

Output − 您可以看到 name 列的值。

<console>:22, took 0.044023 s
+--------+
|  name  |
+--------+
| satish |
| krishna|
| amith  |
| javed  |
| prudvi |
+--------+

Use Age Filter

使用以下命令查找年龄大于 23 岁(age > 23)的员工。

scala> dfs.filter(dfs("age") > 23).show()

Output

<console>:22, took 0.078670 s
+----+------+--------+
|age | id   | name   |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
+----+------+--------+

Use groupBy Method

使用以下命令计算同一年龄的员工人数。

scala> dfs.groupBy("age").count().show()

Output − 两名员工的年龄为 23 岁。

<console>:22, took 5.196091 s
+----+-----+
|age |count|
+----+-----+
| 23 |  2  |
| 25 |  1  |
| 28 |  1  |
| 39 |  1  |
+----+-----+

Running SQL Queries Programmatically

SQLContext 允许应用程序在运行 SQL 函数时以编程方式运行 SQL 查询,并将结果返回为 DataFrame。

通常,在后台,SparkSQL 支持两种不同的将现有 RDD 转换为 DataFrame 的方法 −

Sr. No

Methods & Description

1

Inferring the Schema using Reflection 此方法使用反射来生成包含特定类型对象的 RDD 的模式。

2

Programmatically Specifying the Schema 创建 DataFrame 的第二种方法是通过允许您构造模式然后将其应用于现有 RDD 的编程接口。

Spark SQL - Data Sources

一个数据帧接口允许不同的数据源在 Spark SQL 上运行。这是一个临时表,可以作为普通 RDD 进行操作。将数据帧注册为表格可让你对其数据运行 SQL 查询。

在本章中,我们将介绍使用不同 Spark 数据源加载和保存数据的一般方法。然后,我们将详细讨论内置数据源的可用特定选项。

SparkSQL 中有不同的数据源类型,其中一些列在下面:

Sr. No

Data Sources

1

JSON Datasets Spark SQL 可以自动捕获 JSON 数据集的模式并将其加载为数据帧。

2

Hive Tables Hive 随 HiveContext 与 Spark 库捆绑在一起,该库继承自 SQLContext。

3

Parquet Files Parquet 是一种由许多数据处理系统支持的列格式。