Spark Sql 简明教程
Spark SQL - DataFrames
数据帧是一个分布式数据集合,按指定列排列。从概念上讲,它相当于关系表,带有良好的优化技术。
A DataFrame is a distributed collection of data, which is organized into named columns. Conceptually, it is equivalent to relational tables with good optimization techniques.
数据帧可基于不同源的数组构建,如 Hive 表、结构化数据文件、外部数据库或现有的 RDD。此 API 设计用于现代大数据和数据科学应用程序,灵感源自 DataFrame in R Programming 和 Pandas in Python 。
A DataFrame can be constructed from an array of different sources such as Hive tables, Structured Data files, external databases, or existing RDDs. This API was designed for modern Big Data and data science applications taking inspiration from DataFrame in R Programming and Pandas in Python.
Features of DataFrame
以下是 DataFrame 的一些特征特性集合 −
Here is a set of few characteristic features of DataFrame −
-
Ability to process the data in the size of Kilobytes to Petabytes on a single node cluster to large cluster.
-
Supports different data formats (Avro, csv, elastic search, and Cassandra) and storage systems (HDFS, HIVE tables, mysql, etc).
-
State of art optimization and code generation through the Spark SQL Catalyst optimizer (tree transformation framework).
-
Can be easily integrated with all Big Data tools and frameworks via Spark-Core.
-
Provides API for Python, Java, Scala, and R Programming.
SQLContext
SQLContext 是一个类,用于初始化 Spark SQL 的功能。SparkContext 类对象 (sc) 用于初始化 SQLContext 类对象。
SQLContext is a class and is used for initializing the functionalities of Spark SQL. SparkContext class object (sc) is required for initializing SQLContext class object.
以下命令用于通过 spark-shell 初始化 SparkContext。
The following command is used for initializing the SparkContext through spark-shell.
$ spark-shell
默认情况下,当 spark-shell 启动时,SparkContext 对象会使用名称 sc 初始化。
By default, the SparkContext object is initialized with the name sc when the spark-shell starts.
使用以下命令创建 SQLContext。
Use the following command to create SQLContext.
scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
Example
我们来考虑一个名为 employee.json 的 JSON 文件中的员工记录示例。使用以下命令创建 DataFrame (df) 并读取一个名为 employee.json 的 JSON 文档,其内容如下。
Let us consider an example of employee records in a JSON file named employee.json. Use the following commands to create a DataFrame (df) and read a JSON document named employee.json with the following content.
employee.json − 将此文件放在当前 scala> 指针所在的目录中。
employee.json − Place this file in the directory where the current scala> pointer is located.
{
{"id" : "1201", "name" : "satish", "age" : "25"}
{"id" : "1202", "name" : "krishna", "age" : "28"}
{"id" : "1203", "name" : "amith", "age" : "39"}
{"id" : "1204", "name" : "javed", "age" : "23"}
{"id" : "1205", "name" : "prudvi", "age" : "23"}
}
DataFrame Operations
DataFrame 提供了专门针对结构化数据处理的领域性语言。此处,我们包括使用 DataFrame 进行结构化数据处理的一些基本示例。
DataFrame provides a domain-specific language for structured data manipulation. Here, we include some basic examples of structured data processing using DataFrames.
按照以下步骤执行 DataFrame 操作 −
Follow the steps given below to perform DataFrame operations −
Read the JSON Document
首先,我们必须读取 JSON 文档。基于此,生成名为 (dfs) 的 DataFrame。
First, we have to read the JSON document. Based on this, generate a DataFrame named (dfs).
使用以下命令读取名为 employee.json 的 JSON 文档。数据以一个包含字段 − id、name 和 age 的表格形式显示。
Use the following command to read the JSON document named employee.json. The data is shown as a table with the fields − id, name, and age.
scala> val dfs = sqlContext.read.json("employee.json")
Output − 字段名称自动从 employee.json 中获取。
Output − The field names are taken automatically from employee.json.
dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]
Show the Data
如果您想要查看 DataFrame 中的数据,请使用以下命令。
If you want to see the data in the DataFrame, then use the following command.
scala> dfs.show()
Output − 您可以在表格格式中查看员工数据。
Output − You can see the employee data in a tabular format.
<console>:22, took 0.052610 s
+----+------+--------+
|age | id | name |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith |
| 23 | 1204 | javed |
| 23 | 1205 | prudvi |
+----+------+--------+
Use printSchema Method
如果您想查看 DataFrame 的结构(模式),请使用以下命令。
If you want to see the Structure (Schema) of the DataFrame, then use the following command.
scala> dfs.printSchema()
Output
root
|-- age: string (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
Use Select Method
使用以下命令从 DataFrame 中三个列中获取 name 列。
Use the following command to fetch name-column among three columns from the DataFrame.
scala> dfs.select("name").show()
Output − 您可以看到 name 列的值。
Output − You can see the values of the name column.
<console>:22, took 0.044023 s
+--------+
| name |
+--------+
| satish |
| krishna|
| amith |
| javed |
| prudvi |
+--------+
Use Age Filter
使用以下命令查找年龄大于 23 岁(age > 23)的员工。
Use the following command for finding the employees whose age is greater than 23 (age > 23).
scala> dfs.filter(dfs("age") > 23).show()
Output
<console>:22, took 0.078670 s
+----+------+--------+
|age | id | name |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith |
+----+------+--------+
Use groupBy Method
使用以下命令计算同一年龄的员工人数。
Use the following command for counting the number of employees who are of the same age.
scala> dfs.groupBy("age").count().show()
Output − 两名员工的年龄为 23 岁。
Output − two employees are having age 23.
<console>:22, took 5.196091 s
+----+-----+
|age |count|
+----+-----+
| 23 | 2 |
| 25 | 1 |
| 28 | 1 |
| 39 | 1 |
+----+-----+
Running SQL Queries Programmatically
SQLContext 允许应用程序在运行 SQL 函数时以编程方式运行 SQL 查询,并将结果返回为 DataFrame。
An SQLContext enables applications to run SQL queries programmatically while running SQL functions and returns the result as a DataFrame.
通常,在后台,SparkSQL 支持两种不同的将现有 RDD 转换为 DataFrame 的方法 −
Generally, in the background, SparkSQL supports two different methods for converting existing RDDs into DataFrames −
Sr. No |
Methods & Description |
1 |
Inferring the Schema using ReflectionThis method uses reflection to generate the schema of an RDD that contains specific types of objects. |
2 |
Programmatically Specifying the SchemaThe second method for creating DataFrame is through programmatic interface that allows you to construct a schema and then apply it to an existing RDD. |