Spark Sql 简明教程

Spark SQL - Quick Guide

Spark - Introduction

各行业都在广泛使用 Hadoop 来分析其数据集。原因在于 Hadoop 框架基于一个简单的编程模型(MapReduce),并且它提供了一个可扩展、灵活、容错且具有成本效益的计算解决方案。这里,主要顾虑是在处理大型数据集的速度,即查询之间的等待时间和运行程序的等待时间。

Industries are using Hadoop extensively to analyze their data sets. The reason is that Hadoop framework is based on a simple programming model (MapReduce) and it enables a computing solution that is scalable, flexible, fault-tolerant and cost effective. Here, the main concern is to maintain speed in processing large datasets in terms of waiting time between queries and waiting time to run the program.

Apache 软件基金会推出了 Spark 来加速 Hadoop 计算软件流程。

Spark was introduced by Apache Software Foundation for speeding up the Hadoop computational computing software process.

与普遍的看法相反, Spark is not a modified version of Hadoop 并不真正依赖于 Hadoop,因为它有自己的集群管理。Hadoop 只是实现 Spark 的方式之一。

As against a common belief, Spark is not a modified version of Hadoop and is not, really, dependent on Hadoop because it has its own cluster management. Hadoop is just one of the ways to implement Spark.

Spark 以两种方式使用 Hadoop - 一种是 storage ,第二种是 processing 。由于 Spark 有自己的集群管理计算,所以它只将 Hadoop 用于存储目的。

Spark uses Hadoop in two ways – one is storage and second is processing. Since Spark has its own cluster management computation, it uses Hadoop for storage purpose only.

Apache Spark

Apache Spark 是一种闪电般快速的集群计算技术,专为快速计算而设计。它基于 Hadoop MapReduce,并扩展了 MapReduce 模型以有效地将其用于更多类型的计算,包括交互式查询和流处理。Spark 的主要特征是其 in-memory cluster computing ,它提高了应用程序的处理速度。

Apache Spark is a lightning-fast cluster computing technology, designed for fast computation. It is based on Hadoop MapReduce and it extends the MapReduce model to efficiently use it for more types of computations, which includes interactive queries and stream processing. The main feature of Spark is its in-memory cluster computing that increases the processing speed of an application.

Spark 旨在涵盖广泛的工作负载,例如批处理应用程序、迭代算法、交互式查询和流处理。除了在各自系统中支持所有这些工作负载外,它还减轻了维护单独工具的管理负担。

Spark is designed to cover a wide range of workloads such as batch applications, iterative algorithms, interactive queries and streaming. Apart from supporting all these workload in a respective system, it reduces the management burden of maintaining separate tools.

Evolution of Apache Spark

Spark 是 Hadoop 的一个子项目,于 2009 年在加州大学伯克利分校 AMPLab 由 Matei Zaharia 开发。它于 2010 年根据 BSD 许可证开源。它于 2013 年捐赠给 Apache 软件基金会,现在 Apache Spark 已成为自 2014 年 2 月起的一个顶级 Apache 项目。

Spark is one of Hadoop’s sub project developed in 2009 in UC Berkeley’s AMPLab by Matei Zaharia. It was Open Sourced in 2010 under a BSD license. It was donated to Apache software foundation in 2013, and now Apache Spark has become a top level Apache project from Feb-2014.

Features of Apache Spark

Apache Spark 具有以下功能。

Apache Spark has following features.

  1. Speed − Spark helps to run an application in Hadoop cluster, up to 100 times faster in memory, and 10 times faster when running on disk. This is possible by reducing number of read/write operations to disk. It stores the intermediate processing data in memory.

  2. Supports multiple languages − Spark provides built-in APIs in Java, Scala, or Python. Therefore, you can write applications in different languages. Spark comes up with 80 high-level operators for interactive querying.

  3. Advanced Analytics − Spark not only supports ‘Map’ and ‘reduce’. It also supports SQL queries, Streaming data, Machine learning (ML), and Graph algorithms.

Spark Built on Hadoop

下图展示了使用 Hadoop 组件构建 Spark 的三种方法。

The following diagram shows three ways of how Spark can be built with Hadoop components.

spark built on hadoop

如以下说明所示,共有三种 Spark 部署方法。

There are three ways of Spark deployment as explained below.

  1. Standalone − Spark Standalone deployment means Spark occupies the place on top of HDFS(Hadoop Distributed File System) and space is allocated for HDFS, explicitly. Here, Spark and MapReduce will run side by side to cover all spark jobs on cluster.

  2. Hadoop Yarn − Hadoop Yarn deployment means, simply, spark runs on Yarn without any pre-installation or root access required. It helps to integrate Spark into Hadoop ecosystem or Hadoop stack. It allows other components to run on top of stack.

  3. Spark in MapReduce (SIMR) − Spark in MapReduce is used to launch spark job in addition to standalone deployment. With SIMR, user can start Spark and uses its shell without any administrative access.

Components of Spark

下图描绘了 Spark 的不同组件。

The following illustration depicts the different components of Spark.

components of spark

Apache Spark Core

Spark 核心是 Spark 平台的基础通用执行引擎,所有其他功能都建立在其之上。它提供内存计算和外部存储系统中的引用数据集。

Spark Core is the underlying general execution engine for spark platform that all other functionality is built upon. It provides In-Memory computing and referencing datasets in external storage systems.

Spark SQL

Spark SQL 是 Spark 核心之上的一个组件,它引入了名为 SchemaRDD 的新数据抽象,该抽象为结构化和半结构化数据提供了支持。

Spark SQL is a component on top of Spark Core that introduces a new data abstraction called SchemaRDD, which provides support for structured and semi-structured data.

Spark Streaming

Spark Streaming 利用 Spark 核心 的快速调度功能执行流分析。它以小批量的方式获取数据,并在这些数据小批量上执行 RDD(弹性分布式数据集)转换。

Spark Streaming leverages Spark Core’s fast scheduling capability to perform streaming analytics. It ingests data in mini-batches and performs RDD (Resilient Distributed Datasets) transformations on those mini-batches of data.

MLlib (Machine Learning Library)

MLlib 是一个分布式机器学习框架,位于 Spark 之上,因为其分布式基于内存的 Spark 架构。根据 MLlib 开发人员对交替最小二乘法 (ALS) 实现进行的基准测试,它比 Hadoop 基于磁盘的版本快九倍。 Apache Mahout (在 Mahout 获得 Spark 接口之前)。

MLlib is a distributed machine learning framework above Spark because of the distributed memory-based Spark architecture. It is, according to benchmarks, done by the MLlib developers against the Alternating Least Squares (ALS) implementations. Spark MLlib is nine times as fast as the Hadoop disk-based version of Apache Mahout (before Mahout gained a Spark interface).

GraphX

GraphX 是一个分布式图处理框架,位于 Spark 之上。它提供了一个用于表示图形计算的 API,该 API 可以使用 Pregel 抽象 API 对用户定义的图形进行建模。它还为这种抽象提供了优化的运行时。

GraphX is a distributed graph-processing framework on top of Spark. It provides an API for expressing graph computation that can model the user-defined graphs by using Pregel abstraction API. It also provides an optimized runtime for this abstraction.

Spark – RDD

Resilient Distributed Datasets

弹性分布式数据集 (RDD) 是 Spark 的一个基本数据结构。它是对象的不可变分布式集合。RDD 中的每个数据集都被划分为逻辑分区,这些分区可以在群集的不同节点上计算。RDD 可以包含任何类型的 Python、Java 或 Scala 对象,包括用户定义的类。

Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster. RDDs can contain any type of Python, Java, or Scala objects, including user-defined classes.

形式上,RDD 是只读的分区记录集合。RDD 可以通过对稳定存储器中的数据或其他 RDD 上的确定性操作来创建。RDD 是一个容错元素集合,可以并行对其进行操作。

Formally, an RDD is a read-only, partitioned collection of records. RDDs can be created through deterministic operations on either data on stable storage or other RDDs. RDD is a fault-tolerant collection of elements that can be operated on in parallel.

有两种方法可以创建 RDD − parallelizing 驱动程序程序中的现有集合或 referencing a dataset 位于外部存储系统中,例如共享文件系统、HDFS、HBase 或提供 Hadoop 输入格式的任何数据源。

There are two ways to create RDDs − parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared file system, HDFS, HBase, or any data source offering a Hadoop Input Format.

Spark 利用 RDD 的概念实现了更快速、高效的 MapReduce 操作。让我们首先讨论 MapReduce 操作如何进行以及为什么它们效率不高。

Spark makes use of the concept of RDD to achieve faster and efficient MapReduce operations. Let us first discuss how MapReduce operations take place and why they are not so efficient.

Data Sharing is Slow in MapReduce

MapReduce 被广泛采用,用于在群集上通过并行分布式算法处理和生成大型数据集。它允许用户编写并行计算,使用一组高级运算符,而无需担心工作分布和容错能力。

MapReduce is widely adopted for processing and generating large datasets with a parallel, distributed algorithm on a cluster. It allows users to write parallel computations, using a set of high-level operators, without having to worry about work distribution and fault tolerance.

不幸的是,在大多数当前框架中,在计算之间重用数据(例如:在两个 MapReduce 作业之间)的唯一方法是将其写入外部稳定存储系统(例如:HDFS)。虽然此框架提供了大量抽象技术来访问集群的计算资源,但用户仍然希望获得更多。

Unfortunately, in most current frameworks, the only way to reuse data between computations (Ex: between two MapReduce jobs) is to write it to an external stable storage system (Ex: HDFS). Although this framework provides numerous abstractions for accessing a cluster’s computational resources, users still want more.

IterativeInteractive 应用程序都需要更快的并行作业间数据共享。数据共享在 MapReduce 中很慢,原因在于 replicationserializationdisk IO 。关于存储系统,大多数 Hadoop 应用程序都会将 90% 以上的时间用于执行 HDFS 读写操作。

Both Iterative and Interactive applications require faster data sharing across parallel jobs. Data sharing is slow in MapReduce due to replication, serialization, and disk IO. Regarding storage system, most of the Hadoop applications, they spend more than 90% of the time doing HDFS read-write operations.

Iterative Operations on MapReduce

在多阶段应用程序中跨多个计算重用中间结果。下图解释了在 MapReduce 上执行迭代操作时当前框架如何工作。由于复制数据、磁盘 I/O 和序列化,这会产生大量的开销,从而导致系统变慢。

Reuse intermediate results across multiple computations in multi-stage applications. The following illustration explains how the current framework works, while doing the iterative operations on MapReduce. This incurs substantial overheads due to data replication, disk I/O, and serialization, which makes the system slow.

iterative operations on mapreduce

Interactive Operations on MapReduce

用户对相同的数据子集运行即席查询。每个查询都将在稳定存储上执行磁盘 I/O,这会占据应用程序执行时间的大部分。

User runs ad-hoc queries on the same subset of data. Each query will do the disk I/O on the stable storage, which can dominates application execution time.

下图说明了当前框架在 MapReduce 上执行交互式查询时如何工作。

The following illustration explains how the current framework works while doing the interactive queries on MapReduce.

interactive operations on mapreduce

Data Sharing using Spark RDD

数据共享在 MapReduce 中很慢,原因在于 replicationserializationdisk IO 。大多数 Hadoop 应用程序都会将 90% 以上的时间用于执行 HDFS 读写操作。

Data sharing is slow in MapReduce due to replication, serialization, and disk IO. Most of the Hadoop applications, they spend more than 90% of the time doing HDFS read-write operations.

认识到这个问题后,研究人员开发了一个名为 Apache Spark 的专门框架。Spark 的关键思想是 *R*esilient *D*istributed *D*atasets (RDD);它支持内存内处理计算。这意味着它将内存的状态存储为作业间的对象,且此对象可在这些作业中共享。内存内的数据共享比网络和磁盘快 10 到 100 倍。

Recognizing this problem, researchers developed a specialized framework called Apache Spark. The key idea of spark is *R*esilient *D*istributed *D*atasets (RDD); it supports in-memory processing computation. This means, it stores the state of memory as an object across the jobs and the object is sharable between those jobs. Data sharing in memory is 10 to 100 times faster than network and Disk.

现在,我们尝试找出 Spark RDD 中迭代和交互式操作如何执行。

Let us now try to find out how iterative and interactive operations take place in Spark RDD.

Iterative Operations on Spark RDD

下图显示了 Spark RDD 上的迭代操作。它会将中间结果存储在分布式内存中,而不是持久性存储(磁盘)中,并使系统更快。

The illustration given below shows the iterative operations on Spark RDD. It will store intermediate results in a distributed memory instead of Stable storage (Disk) and make the system faster.

Note − 如果分布式内存(RAM)足以存储中间结果(作业状态),则会将这些结果存储在磁盘上

Note − If the Distributed memory (RAM) in sufficient to store intermediate results (State of the JOB), then it will store those results on the disk

iterative operations on spark rdd

Interactive Operations on Spark RDD

此图显示了 Spark RDD 上的交互式操作。如果针对相同数据集反复运行不同的查询,则可以将这个特定数据保留在内存中,以获得更好的执行时间。

This illustration shows interactive operations on Spark RDD. If different queries are run on the same set of data repeatedly, this particular data can be kept in memory for better execution times.

interactive operations on spark rdd

默认情况下,每次针对转换后的 RDD 执行操作时,都会重新计算该 RDD。但是,您也可以选择将 RDD persist 在内存中,在这种情况下,Spark 将把元素保留在集群中以便下次查询时更快地访问。此外,还支持将 RDD 持久保留在磁盘上或跨多个节点复制。

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory, in which case Spark will keep the elements around on the cluster for much faster access, the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.

Spark - Installation

Spark 是 Hadoop 的子项目。因此,最好将 Spark 安装到基于 Linux 的系统中。以下步骤显示如何安装 Apache Spark。

Spark is Hadoop’s sub-project. Therefore, it is better to install Spark into a Linux based system. The following steps show how to install Apache Spark.

Step1: Verifying Java Installation

Java 安装是安装 Spark 的必备事项之一。尝试以下命令以验证 JAVA 版本。

Java installation is one of the mandatory things in installing Spark. Try the following command to verify the JAVA version.

$java -version

如果系统中已安装 Java,您将看到以下响应:

If Java is already, installed on your system, you get to see the following response −

java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

如果您系统中尚未安装 Java,请在继续下一步操作之前安装 Java。

In case you do not have Java installed on your system, then Install Java before proceeding to next step.

Step2: Verifying Scala Installation

您应使用 Scala 语言来实现 Spark。因此,让我们使用以下命令验证 Scala 安装。

You should Scala language to implement Spark. So let us verify Scala installation using following command.

$scala -version

如果系统中已安装 Scala,您将看到以下响应:

If Scala is already installed on your system, you get to see the following response −

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

如果您系统中尚未安装 Scala,则继续执行下一步操作以安装 Scala。

In case you don’t have Scala installed on your system, then proceed to next step for Scala installation.

Step3: Downloading Scala

访问以下链接下载最新版本的 Scala Download Scala 。在本教程中,我们使用 scala-2.11.6 版本。下载后,你会在下载文件夹中找到 Scala tar 文件。

Download the latest version of Scala by visit the following link Download Scala. For this tutorial, we are using scala-2.11.6 version. After downloading, you will find the Scala tar file in the download folder.

Step4: Installing Scala

按照以下给定的步骤安装 Scala。

Follow the below given steps for installing Scala.

Extract the Scala tar file

键入以下命令以解压 Scala tar 文件。

Type the following command for extracting the Scala tar file.

$ tar xvf scala-2.11.6.tgz

Move Scala software files

使用以下命令将 Scala 软件文件移动到相应的目录 (/usr/local/scala)

Use the following commands for moving the Scala software files, to respective directory (/usr/local/scala).

$ su –
Password:
# cd /home/Hadoop/Downloads/
# mv scala-2.11.6 /usr/local/scala
# exit

Set PATH for Scala

使用以下命令为 Scala 设置 PATH。

Use the following command for setting PATH for Scala.

$ export PATH = $PATH:/usr/local/scala/bin

Verifying Scala Installation

安装后,最好对其进行验证。使用以下命令验证 Scala 安装。

After installation, it is better to verify it. Use the following command for verifying Scala installation.

$scala -version

如果系统中已安装 Scala,您将看到以下响应:

If Scala is already installed on your system, you get to see the following response −

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

Step5: Downloading Apache Spark

访问以下链接 Download Spark 下载 Spark 最新版本。本教程中,我们使用 spark-1.3.1-bin-hadoop2.6 版本。下载后,你将在下载文件夹中找到 Spark tar 文件。

Download the latest version of Spark by visiting the following link Download Spark. For this tutorial, we are using spark-1.3.1-bin-hadoop2.6 version. After downloading it, you will find the Spark tar file in the download folder.

Step6: Installing Spark

按照以下步骤执行 Spark 安装。

Follow the steps given below for installing Spark.

Extracting Spark tar

以下命令用于解压 spark tar 文件。

The following command for extracting the spark tar file.

$ tar xvf spark-1.3.1-bin-hadoop2.6.tgz

Moving Spark software files

以下命令用于将 Spark 软件文件移动到相应的目录 (/usr/local/spark)

The following commands for moving the Spark software files to respective directory (/usr/local/spark).

$ su –
Password:
# cd /home/Hadoop/Downloads/
# mv spark-1.3.1-bin-hadoop2.6 /usr/local/spark
# exit

Setting up the environment for Spark

将以下行添加到 ~ /.bashrc 文件中。这意味着将 Spark 软件文件所在的路径添加到 PATH 变量。

Add the following line to ~/.bashrc file. It means adding the location, where the spark software file are located to the PATH variable.

export PATH = $PATH:/usr/local/spark/bin

使用以下命令加载 ~/.bashrc 文件。

Use the following command for sourcing the ~/.bashrc file.

$ source ~/.bashrc

Step7: Verifying the Spark Installation

输入以下命令打开 Spark shell。

Write the following command for opening Spark shell.

$spark-shell

如果 Spark 安装成功,你将看到以下输出。

If spark is installed successfully then you will find the following output.

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
    ____             __
   / __/__ ___ _____/ /__
   _\ \/ _ \/ _ `/ __/ '_/
   /___/ .__/\_,_/_/ /_/\_\ version 1.4.0
      /_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>

Spark SQL - Introduction

Spark 引入了称为 Spark SQL 的结构化数据处理编程模块。它提供了一个称为 DataFrame 的编程抽象,并可以充当分布式 SQL 查询引擎。

Spark introduces a programming module for structured data processing called Spark SQL. It provides a programming abstraction called DataFrame and can act as distributed SQL query engine.

Features of Spark SQL

以下为 Spark SQL 的功能 −

The following are the features of Spark SQL −

  1. Integrated − Seamlessly mix SQL queries with Spark programs. Spark SQL lets you query structured data as a distributed dataset (RDD) in Spark, with integrated APIs in Python, Scala and Java. This tight integration makes it easy to run SQL queries alongside complex analytic algorithms.

  2. Unified Data Access − Load and query data from a variety of sources. Schema-RDDs provide a single interface for efficiently working with structured data, including Apache Hive tables, parquet files and JSON files.

  3. Hive Compatibility − Run unmodified Hive queries on existing warehouses. Spark SQL reuses the Hive frontend and MetaStore, giving you full compatibility with existing Hive data, queries, and UDFs. Simply install it alongside Hive.

  4. Standard Connectivity − Connect through JDBC or ODBC. Spark SQL includes a server mode with industry standard JDBC and ODBC connectivity.

  5. Scalability − Use the same engine for both interactive and long queries. Spark SQL takes advantage of the RDD model to support mid-query fault tolerance, letting it scale to large jobs too. Do not worry about using a different engine for historical data.

Spark SQL Architecture

下图说明了 Spark SQL 的架构 −

The following illustration explains the architecture of Spark SQL −

spark sql architecture

此架构包含三层,分别为语言 API、模式 RDD 和数据源。

This architecture contains three layers namely, Language API, Schema RDD, and Data Sources.

  1. Language API − Spark is compatible with different languages and Spark SQL. It is also, supported by these languages- API (python, scala, java, HiveQL).

  2. Schema RDD − Spark Core is designed with special data structure called RDD. Generally, Spark SQL works on schemas, tables, and records. Therefore, we can use the Schema RDD as temporary table. We can call this Schema RDD as Data Frame.

  3. Data Sources − Usually the Data source for spark-core is a text file, Avro file, etc. However, the Data Sources for Spark SQL is different. Those are Parquet file, JSON document, HIVE tables, and Cassandra database.

我们将在后续章节中深入讨论这些内容。

We will discuss more about these in the subsequent chapters.

Spark SQL - DataFrames

数据帧是一个分布式数据集合,按指定列排列。从概念上讲,它相当于关系表,带有良好的优化技术。

A DataFrame is a distributed collection of data, which is organized into named columns. Conceptually, it is equivalent to relational tables with good optimization techniques.

数据帧可基于不同源的数组构建,如 Hive 表、结构化数据文件、外部数据库或现有的 RDD。此 API 设计用于现代大数据和数据科学应用程序,灵感源自 DataFrame in R ProgrammingPandas in Python

A DataFrame can be constructed from an array of different sources such as Hive tables, Structured Data files, external databases, or existing RDDs. This API was designed for modern Big Data and data science applications taking inspiration from DataFrame in R Programming and Pandas in Python.

Features of DataFrame

以下是 DataFrame 的一些特征特性集合 −

Here is a set of few characteristic features of DataFrame −

  1. Ability to process the data in the size of Kilobytes to Petabytes on a single node cluster to large cluster.

  2. Supports different data formats (Avro, csv, elastic search, and Cassandra) and storage systems (HDFS, HIVE tables, mysql, etc).

  3. State of art optimization and code generation through the Spark SQL Catalyst optimizer (tree transformation framework).

  4. Can be easily integrated with all Big Data tools and frameworks via Spark-Core.

  5. Provides API for Python, Java, Scala, and R Programming.

SQLContext

SQLContext 是一个类,用于初始化 Spark SQL 的功能。SparkContext 类对象 (sc) 用于初始化 SQLContext 类对象。

SQLContext is a class and is used for initializing the functionalities of Spark SQL. SparkContext class object (sc) is required for initializing SQLContext class object.

以下命令用于通过 spark-shell 初始化 SparkContext。

The following command is used for initializing the SparkContext through spark-shell.

$ spark-shell

默认情况下,当 spark-shell 启动时,SparkContext 对象会使用名称 sc 初始化。

By default, the SparkContext object is initialized with the name sc when the spark-shell starts.

使用以下命令创建 SQLContext。

Use the following command to create SQLContext.

scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

Example

我们来考虑一个名为 employee.json 的 JSON 文件中的员工记录示例。使用以下命令创建 DataFrame (df) 并读取一个名为 employee.json 的 JSON 文档,其内容如下。

Let us consider an example of employee records in a JSON file named employee.json. Use the following commands to create a DataFrame (df) and read a JSON document named employee.json with the following content.

employee.json − 将此文件放在当前 scala> 指针所在的目录中。

employee.json − Place this file in the directory where the current scala> pointer is located.

{
   {"id" : "1201", "name" : "satish", "age" : "25"}
   {"id" : "1202", "name" : "krishna", "age" : "28"}
   {"id" : "1203", "name" : "amith", "age" : "39"}
   {"id" : "1204", "name" : "javed", "age" : "23"}
   {"id" : "1205", "name" : "prudvi", "age" : "23"}
}

DataFrame Operations

DataFrame 提供了专门针对结构化数据处理的领域性语言。此处,我们包括使用 DataFrame 进行结构化数据处理的一些基本示例。

DataFrame provides a domain-specific language for structured data manipulation. Here, we include some basic examples of structured data processing using DataFrames.

按照以下步骤执行 DataFrame 操作 −

Follow the steps given below to perform DataFrame operations −

Read the JSON Document

首先,我们必须读取 JSON 文档。基于此,生成名为 (dfs) 的 DataFrame。

First, we have to read the JSON document. Based on this, generate a DataFrame named (dfs).

使用以下命令读取名为 employee.json 的 JSON 文档。数据以一个包含字段 − id、name 和 age 的表格形式显示。

Use the following command to read the JSON document named employee.json. The data is shown as a table with the fields − id, name, and age.

scala> val dfs = sqlContext.read.json("employee.json")

Output − 字段名称自动从 employee.json 中获取。

Output − The field names are taken automatically from employee.json.

dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]

Show the Data

如果您想要查看 DataFrame 中的数据,请使用以下命令。

If you want to see the data in the DataFrame, then use the following command.

scala> dfs.show()

Output − 您可以在表格格式中查看员工数据。

Output − You can see the employee data in a tabular format.

<console>:22, took 0.052610 s
+----+------+--------+
|age | id   |  name  |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
| 23 | 1204 | javed  |
| 23 | 1205 | prudvi |
+----+------+--------+

Use printSchema Method

如果您想查看 DataFrame 的结构(模式),请使用以下命令。

If you want to see the Structure (Schema) of the DataFrame, then use the following command.

scala> dfs.printSchema()

Output

root
   |-- age: string (nullable = true)
   |-- id: string (nullable = true)
   |-- name: string (nullable = true)

Use Select Method

使用以下命令从 DataFrame 中三个列中获取 name 列。

Use the following command to fetch name-column among three columns from the DataFrame.

scala> dfs.select("name").show()

Output − 您可以看到 name 列的值。

Output − You can see the values of the name column.

<console>:22, took 0.044023 s
+--------+
|  name  |
+--------+
| satish |
| krishna|
| amith  |
| javed  |
| prudvi |
+--------+

Use Age Filter

使用以下命令查找年龄大于 23 岁(age > 23)的员工。

Use the following command for finding the employees whose age is greater than 23 (age > 23).

scala> dfs.filter(dfs("age") > 23).show()

Output

<console>:22, took 0.078670 s
+----+------+--------+
|age | id   | name   |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
+----+------+--------+

Use groupBy Method

使用以下命令计算同一年龄的员工人数。

Use the following command for counting the number of employees who are of the same age.

scala> dfs.groupBy("age").count().show()

Output − 两名员工的年龄为 23 岁。

Output − two employees are having age 23.

<console>:22, took 5.196091 s
+----+-----+
|age |count|
+----+-----+
| 23 |  2  |
| 25 |  1  |
| 28 |  1  |
| 39 |  1  |
+----+-----+

Running SQL Queries Programmatically

SQLContext 允许应用程序在运行 SQL 函数时以编程方式运行 SQL 查询,并将结果返回为 DataFrame。

An SQLContext enables applications to run SQL queries programmatically while running SQL functions and returns the result as a DataFrame.

通常,在后台,SparkSQL 支持两种不同的将现有 RDD 转换为 DataFrame 的方法 −

Generally, in the background, SparkSQL supports two different methods for converting existing RDDs into DataFrames −

Sr. No

Methods & Description

1

Inferring the Schema using ReflectionThis method uses reflection to generate the schema of an RDD that contains specific types of objects.

2

Programmatically Specifying the SchemaThe second method for creating DataFrame is through programmatic interface that allows you to construct a schema and then apply it to an existing RDD.

Spark SQL - Data Sources

一个数据帧接口允许不同的数据源在 Spark SQL 上运行。这是一个临时表,可以作为普通 RDD 进行操作。将数据帧注册为表格可让你对其数据运行 SQL 查询。

A DataFrame interface allows different DataSources to work on Spark SQL. It is a temporary table and can be operated as a normal RDD. Registering a DataFrame as a table allows you to run SQL queries over its data.

在本章中,我们将介绍使用不同 Spark 数据源加载和保存数据的一般方法。然后,我们将详细讨论内置数据源的可用特定选项。

In this chapter, we will describe the general methods for loading and saving data using different Spark DataSources. Thereafter, we will discuss in detail the specific options that are available for the built-in data sources.

SparkSQL 中有不同的数据源类型,其中一些列在下面:

There are different types of data sources available in SparkSQL, some of which are listed below −

Sr. No

Data Sources

1

JSON DatasetsSpark SQL can automatically capture the schema of a JSON dataset and load it as a DataFrame.

2

Hive TablesHive comes bundled with the Spark library as HiveContext, which inherits from SQLContext.

3

Parquet FilesParquet is a columnar format, supported by many data processing systems.