Spark Sql 简明教程
Spark SQL - DataFrames
数据帧是一个分布式数据集合,按指定列排列。从概念上讲,它相当于关系表,带有良好的优化技术。
数据帧可基于不同源的数组构建,如 Hive 表、结构化数据文件、外部数据库或现有的 RDD。此 API 设计用于现代大数据和数据科学应用程序,灵感源自 DataFrame in R Programming 和 Pandas in Python 。
Features of DataFrame
以下是 DataFrame 的一些特征特性集合 −
-
在单个节点集群到大集群上处理千兆字节到 PB(拍字节)规模数据的能力。
-
支持不同的数据格式(Avro、csv、弹性搜索和 Cassandra)和存储系统(HDFS、HIVE 表、MySQL 等)。
-
通过 Spark SQL Catalyst 优化器(树变换框架)进行最先进的优化和代码生成。
-
可以通过 Spark Core 轻松与所有大数据工具和框架集成。
-
提供 Python、Java、Scala 和 R 编程的 API。
SQLContext
SQLContext 是一个类,用于初始化 Spark SQL 的功能。SparkContext 类对象 (sc) 用于初始化 SQLContext 类对象。
以下命令用于通过 spark-shell 初始化 SparkContext。
$ spark-shell
默认情况下,当 spark-shell 启动时,SparkContext 对象会使用名称 sc 初始化。
使用以下命令创建 SQLContext。
scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
Example
我们来考虑一个名为 employee.json 的 JSON 文件中的员工记录示例。使用以下命令创建 DataFrame (df) 并读取一个名为 employee.json 的 JSON 文档,其内容如下。
employee.json − 将此文件放在当前 scala> 指针所在的目录中。
{
{"id" : "1201", "name" : "satish", "age" : "25"}
{"id" : "1202", "name" : "krishna", "age" : "28"}
{"id" : "1203", "name" : "amith", "age" : "39"}
{"id" : "1204", "name" : "javed", "age" : "23"}
{"id" : "1205", "name" : "prudvi", "age" : "23"}
}
DataFrame Operations
DataFrame 提供了专门针对结构化数据处理的领域性语言。此处,我们包括使用 DataFrame 进行结构化数据处理的一些基本示例。
按照以下步骤执行 DataFrame 操作 −
Read the JSON Document
首先,我们必须读取 JSON 文档。基于此,生成名为 (dfs) 的 DataFrame。
使用以下命令读取名为 employee.json 的 JSON 文档。数据以一个包含字段 − id、name 和 age 的表格形式显示。
scala> val dfs = sqlContext.read.json("employee.json")
Output − 字段名称自动从 employee.json 中获取。
dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]
Show the Data
如果您想要查看 DataFrame 中的数据,请使用以下命令。
scala> dfs.show()
Output − 您可以在表格格式中查看员工数据。
<console>:22, took 0.052610 s
+----+------+--------+
|age | id | name |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith |
| 23 | 1204 | javed |
| 23 | 1205 | prudvi |
+----+------+--------+
Use printSchema Method
如果您想查看 DataFrame 的结构(模式),请使用以下命令。
scala> dfs.printSchema()
Output
root
|-- age: string (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
Use Select Method
使用以下命令从 DataFrame 中三个列中获取 name 列。
scala> dfs.select("name").show()
Output − 您可以看到 name 列的值。
<console>:22, took 0.044023 s
+--------+
| name |
+--------+
| satish |
| krishna|
| amith |
| javed |
| prudvi |
+--------+
Running SQL Queries Programmatically
SQLContext 允许应用程序在运行 SQL 函数时以编程方式运行 SQL 查询,并将结果返回为 DataFrame。
通常,在后台,SparkSQL 支持两种不同的将现有 RDD 转换为 DataFrame 的方法 −
Sr. No |
Methods & Description |
1 |
Inferring the Schema using Reflection 此方法使用反射来生成包含特定类型对象的 RDD 的模式。 |
2 |
Programmatically Specifying the Schema 创建 DataFrame 的第二种方法是通过允许您构造模式然后将其应用于现有 RDD 的编程接口。 |