Hcatalog 简明教程

HCatalog - Quick Guide

HCatalog - Introduction

What is HCatalog?

HCatalog 是 Hadoop 的表存储管理工具。它向其他 Hadoop 应用程序公开 Hive 元存储的表格数据。它使用户能够使用不同的数据处理工具(Pig、MapReduce)轻松地将数据写到网格中。它确保用户不必担心数据存储在何处或以何种格式存储。

HCatalog 作为一个 Hive 的关键组件,使用户可以存储任何格式和任何结构的数据。

Why HCatalog?

Enabling right tool for right Job

Hadoop 生态系统包含用于数据处理的不同工具,例如 Hive、Pig 和 MapReduce。虽然这些工具不需要元数据,但是当元数据存在时,它们仍然可以从中受益。共享元数据存储还使跨工具的用户能够更轻松地共享数据。一种非常常见的工作流程是使用 MapReduce 或 Pig 加载和规范化数据,然后通过 Hive 进行分析。如果所有这些工具共享一个元存储,那么每个工具的用户都可以立即访问使用另一个工具创建的数据。无需加载或传输步骤。

Capture processing states to enable sharing

HCatalog 可以发布您的分析结果。因此,其他程序员可以通过“REST”访问您的分析平台。您发布的模式对其他数据科学家也有用。其他数据科学家使用您的发现作为后续发现的输入。

Integrate Hadoop with everything

Hadoop 作为处理和存储环境为企业提供了许多机会;但是,为了促进采用,它必须与现有工具一起工作并对其进行扩展。Hadoop 应作为您分析平台的输入或与您的运营数据存储和 Web 应用程序集成。企业应该享受 Hadoop 的价值,而不必学习全新的工具集。REST 服务通过熟悉的 API 和类似 SQL 的语言向企业开放平台。企业数据管理系统使用 HCatalog 更深入地与 Hadoop 平台集成。

HCatalog Architecture

下图显示了 HCatalog 的整体架构。

architecture

HCatalog 支持为可以使用 SerDe (序列化器-反序列化器)编写的任何格式读写文件。默认情况下,HCatalog 支持 RCFile、CSV、JSON、SequenceFile 和 ORC 文件格式。要使用自定义格式,您必须提供 InputFormat、OutputFormat 和 SerDe。

HCatalog 建立在 Hive 元存储之上,并结合了 Hive 的 DDL。HCatalog 为 Pig 和 MapReduce 提供了读写接口,并使用 Hive 的命令行界面来发布数据定义和元数据探索命令。

HCatalog - Installation

所有 Hadoop 子项目(如 Hive、Pig 和 HBase)都支持 Linux 操作系统。因此,您需要在系统上安装 Linux 版本。HCatalog 已在 2013 年 3 月 26 日与 Hive 安装合并。从 Hive-0.11.0 版本开始,HCatalog 随 Hive 安装提供。因此,请按照以下步骤安装 Hive,进而自动在系统上安装 HCatalog。

Step 1: Verifying JAVA Installation

在安装 Hive 之前,必须在系统上安装 Java。您可以使用以下命令来检查系统上是否已安装 Java -

$ java –version

如果系统上已安装 Java,您将看到以下响应 -

java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

如果系统上未安装 Java,则需要按照以下步骤操作。

Step 2: Installing Java

访问以下链接 http://www.oracle.com/ 下载 Java(JDK <latest version> - X64.tar.gz)

然后 jdk-7u71-linux-x64.tar.gz 将下载到您的系统。

通常,您会在 Downloads 文件夹中找到下载的 Java 文件。使用以下命令验证并解压 jdk-7u71-linux-x64.gz 文件。

$ cd Downloads/
$ ls
jdk-7u71-linux-x64.gz

$ tar zxf jdk-7u71-linux-x64.gz
$ ls
jdk1.7.0_71 jdk-7u71-linux-x64.gz

为了使得所有用户都能使用 Java,您需要将 Java 移动到“/usr/local/”位置。打开 root,然后键入以下命令。

$ su
password:
# mv jdk1.7.0_71 /usr/local/
# exit

为设置 PATHJAVA_HOME 变量,将以下命令添加到 ~/.bashrc 文件。

export JAVA_HOME=/usr/local/jdk1.7.0_71
export PATH=PATH:$JAVA_HOME/bin

现在,使用上述 from terminal java -version 命令验证安装。

Step 3: Verifying Hadoop Installation

在安装 Hive 之前,必须在您的系统上安装 Hadoop。让我们使用以下命令验证 Hadoop 安装:

$ hadoop version

如果已在您的系统上安装 Hadoop,则会收到以下回复:

Hadoop 2.4.1
Subversion https://svn.apache.org/repos/asf/hadoop/common -r 1529768
Compiled by hortonmu on 2013-10-07T06:28Z
Compiled with protoc 2.5.0
From source with checksum 79e53ce7994d1628b240f09af91e1af4

如果您的系统上未安装 Hadoop,请执行以下步骤:

Step 4: Downloading Hadoop

使用以下命令从 Apache 软件基金会下载并解压缩 Hadoop 2.4.1。

$ su
password:
# cd /usr/local
# wget http://apache.claz.org/hadoop/common/hadoop-2.4.1/
hadoop-2.4.1.tar.gz
# tar xzf hadoop-2.4.1.tar.gz
# mv hadoop-2.4.1/* to hadoop/
# exit

Step 5: Installing Hadoop in Pseudo Distributed Mode

以下步骤用于在伪分布式模式下安装 Hadoop 2.4.1

Setting up Hadoop

您可以通过将以下命令追加到 ~/.bashrc 文件来设置 Hadoop 环境变量。

export HADOOP_HOME=/usr/local/hadoop
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin

现在将所有更改应用到当前正在运行的系统中。

$ source ~/.bashrc

Hadoop Configuration

您可以在位置 “$HADOOP_HOME/etc/hadoop” 中找到所有 Hadoop 配置文件。根据您的 Hadoop 基础架构,您需要在这些配置文件中进行适当的更改。

$ cd $HADOOP_HOME/etc/hadoop

为了使用 Java 开发 Hadoop 程序,您必须通过将 JAVA_HOME 值替换为系统中 Java 的位置,来重置 hadoop-env.sh 文件中的 Java 环境变量。

export JAVA_HOME=/usr/local/jdk1.7.0_71

下面列出了您必须编辑以配置 Hadoop 的文件列表。

core-site.xml

core-site.xml 文件包含信息,例如用于 Hadoop 实例的端口号、分配给文件系统内存、用于存储数据的内存限制以及读/写缓冲区大小。

打开 core-site.xml,并在 <configuration> 和 </configuration> 标记之间添加以下属性。

<configuration>
   <property>
      <name>fs.default.name</name>
      <value>hdfs://localhost:9000</value>
   </property>
</configuration>

hdfs-site.xml

hdfs-site.xml 文件包含信息,例如复制数据的值、本地名节点的路径以及本地文件系统的数据节点路径。这意味着您要存储 Hadoop 基础架构的位置。

让我们假设以下数据。

dfs.replication (data replication value) = 1

(In the following path /hadoop/ is the user name.
hadoopinfra/hdfs/namenode is the directory created by hdfs file system.)

namenode path = //home/hadoop/hadoopinfra/hdfs/namenode

(hadoopinfra/hdfs/datanode is the directory created by hdfs file system.)
datanode path = //home/hadoop/hadoopinfra/hdfs/datanode

打开此文件,并在此文件中在 <configuration>、</configuration> 标记之间添加以下属性。

<configuration>
   <property>
      <name>dfs.replication</name>
      <value>1</value>
   </property>

   <property>
      <name>dfs.name.dir</name>
      <value>file:///home/hadoop/hadoopinfra/hdfs/namenode</value>
   </property>

   <property>
      <name>dfs.data.dir</name>
      <value>file:///home/hadoop/hadoopinfra/hdfs/datanode</value>
   </property>
</configuration>

Note − 在上述文件中,所有属性值都是用户定义的,您可以根据 Hadoop 基础架构进行更改。

yarn-site.xml

此文件用于将 Yarn 配置到 Hadoop 中。打开 yarn-site.xml 文件并在该文件中的 <configuration>、</configuration> 标记之间添加以下属性。

<configuration>
   <property>
      <name>yarn.nodemanager.aux-services</name>
      <value>mapreduce_shuffle</value>
   </property>
</configuration>

mapred-site.xml

此文件用于指定我们使用哪种 MapReduce 框架。默认情况下,Hadoop 包含一个 yarn-site.xml 模板。首先,您需要使用以下命令将文件从 mapred-site,xml.template 复制到 mapred-site.xml 文件。

$ cp mapred-site.xml.template mapred-site.xml

打开 mapred-site.xml 文件,并在此文件中在 <configuration>、</configuration> 标记之间添加以下属性。

<configuration>
   <property>
      <name>mapreduce.framework.name</name>
      <value>yarn</value>
   </property>
</configuration>

Step 6: Verifying Hadoop Installation

以下步骤用于验证 Hadoop 安装。

Namenode Setup

使用 “hdfs namenode -format” 命令设置 name 节点,如下所示:

$ cd ~
$ hdfs namenode -format

预期结果如下:

10/24/14 21:30:55 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = localhost/192.168.1.11
STARTUP_MSG: args = [-format]
STARTUP_MSG: version = 2.4.1
...
...
10/24/14 21:30:56 INFO common.Storage: Storage directory
/home/hadoop/hadoopinfra/hdfs/namenode has been successfully formatted.
10/24/14 21:30:56 INFO namenode.NNStorageRetentionManager: Going to retain 1
images with txid >= 0 10/24/14 21:30:56 INFO util.ExitUtil: Exiting with status 0
10/24/14 21:30:56 INFO namenode.NameNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at localhost/192.168.1.11
************************************************************/

Verifying Hadoop DFS

以下命令用于启动 DFS。执行此命令将启动您的 Hadoop 文件系统。

$ start-dfs.sh

预期输出如下所示 −

10/24/14 21:37:56 Starting namenodes on [localhost]
localhost: starting namenode, logging to
/home/hadoop/hadoop-2.4.1/logs/hadoop-hadoop-namenode-localhost.out localhost:
starting datanode, logging to
   /home/hadoop/hadoop-2.4.1/logs/hadoop-hadoop-datanode-localhost.out
Starting secondary namenodes [0.0.0.0]

Verifying Yarn Script

以下命令用于启动 Yarn 脚本。执行此命令将启动您的 Yarn 守护程序。

$ start-yarn.sh

预期输出如下所示 −

starting yarn daemons
starting resourcemanager, logging to /home/hadoop/hadoop-2.4.1/logs/
yarn-hadoop-resourcemanager-localhost.out
localhost: starting nodemanager, logging to
   /home/hadoop/hadoop-2.4.1/logs/yarn-hadoop-nodemanager-localhost.out

Accessing Hadoop on Browser

访问 Hadoop 的默认端口号是 50070。使用以下 URL 在您的浏览器上获取 Hadoop 服务。

http://localhost:50070/
accessing hadoop

Verify all applications for cluster

访问集群所有应用程序的默认端口号为 8088。使用以下网址访问此服务。

http://localhost:8088/
cluster

Hadoop 安装完成后,执行下一步并在系统上安装 Hive。

Step 7: Downloading Hive

在本教程中,我们使用 hive-0.14.0。你可以访问以下链接下载: http://apache.petsads.us/hive/hive-0.14.0/ 。我们假设教程下载到了 /Downloads 目录。在这里,我们为此教程下载了名为“ apache-hive-0.14.0-bin.tar.gz ”的 Hive 归档文件。使用以下命令验证下载情况−

$ cd Downloads
$ ls

下载成功后,你会看到以下响应−

apache-hive-0.14.0-bin.tar.gz

Step 8: Installing Hive

若要在系统上安装 Hive,需要执行以下步骤。我们假设 Hive 归档文件下载到了 /Downloads 目录。

Extracting and Verifying Hive Archive

使用以下命令验证下载情况并提取 Hive 归档文件−

$ tar zxvf apache-hive-0.14.0-bin.tar.gz
$ ls

下载成功后,你会看到以下响应−

apache-hive-0.14.0-bin apache-hive-0.14.0-bin.tar.gz

Copying files to /usr/local/hive directory

我们需要从超级用户 “su -” 复制文件。以下命令用于将文件从提取的目录复制到 /usr/local/hive ”目录。

$ su -
passwd:
# cd /home/user/Download
# mv apache-hive-0.14.0-bin /usr/local/hive
# exit

Setting up the environment for Hive

可以通过向 ~/.bashrc 文件追加以下行来设置 Hive 环境−

export HIVE_HOME=/usr/local/hive
export PATH=$PATH:$HIVE_HOME/bin
export CLASSPATH=$CLASSPATH:/usr/local/Hadoop/lib/*:.
export CLASSPATH=$CLASSPATH:/usr/local/hive/lib/*:.

使用以下命令执行 ~/.bashrc 文件。

$ source ~/.bashrc

Step 9: Configuring Hive

要将 Hive 与 Hadoop 配置在一起,需要编辑位于 $HIVE_HOME/conf 目录的 hive-env.sh 文件。以下命令重定向到 Hive config 文件夹并复制模板文件 −

$ cd $HIVE_HOME/conf
$ cp hive-env.sh.template hive-env.sh

通过追加以下行编辑 hive-env.sh 文件 −

export HADOOP_HOME=/usr/local/hadoop

这样,Hive 安装就完成了。现在你需要一个外部数据库服务器来配置元存储。我们使用 Apache Derby 数据库。

Step 10: Downloading and Installing Apache Derby

按照以下步骤下载并安装 Apache Derby −

Downloading Apache Derby

使用以下命令下载 Apache Derby。下载需要一些时间。

$ cd ~
$ wget http://archive.apache.org/dist/db/derby/db-derby-10.4.2.0/db-derby-10.4.2.0-bin.tar.gz

使用以下命令验证下载情况−

$ ls

下载成功后,你会看到以下响应−

db-derby-10.4.2.0-bin.tar.gz

Extracting and Verifying Derby Archive

以下命令用于提取和验证 Derby 归档文件 −

$ tar zxvf db-derby-10.4.2.0-bin.tar.gz
$ ls

下载成功后,你会看到以下响应−

db-derby-10.4.2.0-bin db-derby-10.4.2.0-bin.tar.gz

Copying Files to /usr/local/derby Directory

我们需要从超级用户 “su -” 复制文件。以下命令用于将文件从提取的目录复制到 /usr/local/derby 目录 −

$ su -
passwd:
# cd /home/user
# mv db-derby-10.4.2.0-bin /usr/local/derby
# exit

Setting up the Environment for Derby

可以通过向 ~/.bashrc 文件追加以下行来设置 Derby 环境 −

export DERBY_HOME=/usr/local/derby
export PATH=$PATH:$DERBY_HOME/bin
export CLASSPATH=$CLASSPATH:$DERBY_HOME/lib/derby.jar:$DERBY_HOME/lib/derbytools.jar

使用以下命令执行 ~/.bashrc file

$ source ~/.bashrc

Create a Directory for Metastore

在 $DERBY_HOME 目录中创建一个名为 data 的目录来存储元存储数据。

$ mkdir $DERBY_HOME/data

Derby 安装和环境设置已完成。

Step 11: Configuring the Hive Metastore

配置元数据存储库表示指定数据库存储在 Hive 中的位置。可以通过编辑 hive-site.xml 文件(位于 $HIVE_HOME/conf 目录中)来完成此操作。首先,使用以下命令复制模板文件 −

$ cd $HIVE_HOME/conf
$ cp hive-default.xml.template hive-site.xml

编辑 hive-site.xml ,并将以下行追加到 <configuration> 和 </configuration> 标记之间 −

<property>
   <name>javax.jdo.option.ConnectionURL</name>
   <value>jdbc:derby://localhost:1527/metastore_db;create = true</value>
   <description>JDBC connect string for a JDBC metastore</description>
</property>

创建名为 jpox.properties 的文件,并向其中添加以下行 −

javax.jdo.PersistenceManagerFactoryClass = org.jpox.PersistenceManagerFactoryImpl

org.jpox.autoCreateSchema = false
org.jpox.validateTables = false
org.jpox.validateColumns = false
org.jpox.validateConstraints = false

org.jpox.storeManagerType = rdbms
org.jpox.autoCreateSchema = true
org.jpox.autoStartMechanismMode = checked
org.jpox.transactionIsolation = read_committed

javax.jdo.option.DetachAllOnCommit = true
javax.jdo.option.NontransactionalRead = true
javax.jdo.option.ConnectionDriverName = org.apache.derby.jdbc.ClientDriver
javax.jdo.option.ConnectionURL = jdbc:derby://hadoop1:1527/metastore_db;create = true
javax.jdo.option.ConnectionUserName = APP
javax.jdo.option.ConnectionPassword = mine

Step 12: Verifying Hive Installation

在运行 Hive 之前,您需要在 HDFS 中创建 /tmp 文件夹和一个单独的 Hive 文件夹。在此处,我们使用 /user/hive/warehouse 文件夹。您需要为这些新创建的文件夹设置写入权限,如下所示 −

chmod g+w

现在,在验证 Hive 之前,请在 HDFS 中设置它们。使用以下命令 −

$ $HADOOP_HOME/bin/hadoop fs -mkdir /tmp
$ $HADOOP_HOME/bin/hadoop fs -mkdir /user/hive/warehouse
$ $HADOOP_HOME/bin/hadoop fs -chmod g+w /tmp
$ $HADOOP_HOME/bin/hadoop fs -chmod g+w /user/hive/warehouse

以下命令用于验证 Hive 安装 −

$ cd $HIVE_HOME
$ bin/hive

在成功安装 Hive 后,您将看到以下响应 −

Logging initialized using configuration in
   jar:file:/home/hadoop/hive-0.9.0/lib/hive-common-0.9.0.jar!/
hive-log4j.properties Hive history
   =/tmp/hadoop/hive_job_log_hadoop_201312121621_1494929084.txt
………………….
hive>

您可以执行以下示例命令来显示所有表 −

hive> show tables;
OK Time taken: 2.798 seconds
hive>

Step 13: Verify HCatalog Installation

使用以下命令为 HCatalog 主目录设置系统变量 HCAT_HOME

export HCAT_HOME = $HiVE_HOME/HCatalog

使用以下命令验证 HCatalog 安装。

cd $HCAT_HOME/bin
./hcat

如果安装成功,您将看到以下输出 −

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
usage: hcat { -e "<query>" | -f "<filepath>" }
   [ -g "<group>" ] [ -p "<perms>" ]
   [ -D"<name> = <value>" ]

-D <property = value>    use hadoop value for given property
-e <exec>                hcat command given from command line
-f <file>                hcat commands in file
-g <group>               group for the db/table specified in CREATE statement
-h,--help                Print help information
-p <perms>               permissions for the db/table specified in CREATE statement

HCatalog - CLI

HCatalog 命令行界面 (CLI) 可以从命令 $HIVE_HOME/HCatalog/bin/hcat 中调用,其中 $HIVE_HOME 是 Hive 的主目录。 hcat 是用于初始化 HCatalog 服务器的命令。

使用以下命令初始化 HCatalog 命令行。

cd $HCAT_HOME/bin
./hcat

如果安装已正确完成,则您将获得以下输出 −

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
usage: hcat { -e "<query>" | -f "<filepath>" }
   [ -g "<group>" ] [ -p "<perms>" ]
   [ -D"<name> = <value>" ]

-D <property = value>    use hadoop value for given property
-e <exec>                hcat command given from command line
-f <file>                hcat commands in file
-g <group>               group for the db/table specified in CREATE statement
-h,--help                Print help information
-p <perms>               permissions for the db/table specified in CREATE statement

HCatalog CLI 支持这些命令行选项 −

Sr.No

Option

Example & Description

1

-g

hcat -g mygroup &#8230;&#8203; 要创建的表必须具有“mygroup”组。

2

-p

hcat -p rwxr-xr-x &#8230;&#8203; 要创建的表必须具有读、写和执行权限。

3

-f

hcat -f myscript.HCatalog &#8230;&#8203; myscript.HCatalog 是一个包含要执行的 DDL 命令的脚本文件。

4

-e

hcat -e 'create table mytable(a int);' &#8230;&#8203; 将以下字符串视为 DDL 命令并执行它。

5

-D

hcat -Dkey = value &#8230;&#8203; 将键值对作为 Java 系统属性传递给 HCatalog。

6

-

hcat Prints a usage message.

Note −

  1. -g-p 选项不是必需的。

  2. 一次,可以提供 -e-f 选项,但不是两者。

  3. 选项的顺序无关紧要;您可以按任何顺序指定选项。

Sr.No

DDL Command & Description

1

CREATE TABLE 使用 HCatalog 创建表。如果你使用 CLUSTERED BY 子句创建表,你将无法使用 Pig 或 MapReduce 向其中写入。

2

ALTER TABLE 支持,但 REBUILD 和 CONCATENATE 选项除外。它的行为与 Hive 中相同。

3

DROP TABLE 支持。与 Hive 相同的行为(删除完整的表和结构)。

4

CREATE/ALTER/DROP VIEW 支持。与 Hive 的行为相同。 Note − Pig 和 MapReduce 不能从视图中读取或向视图中写入数据。

5

SHOW TABLES 显示表列表。

6

SHOW PARTITIONS 显示分区列表。

7

Create/Drop Index CREATE 和 DROP FUNCTION 操作受支持,但创建的函数仍必须在 Pig 中注册并将其置于 MapReduce 的 CLASSPATH 中。

8

DESCRIBE 支持。与 Hive 相同的行为。描述结构。

上表中的一些命令在后续章节中进行了说明。

HCatalog - Create Table

本章介绍了如何创建表以及如何向其中插入数据。在 HCatalog 中创建表的约定与使用 Hive 创建表非常相似。

Create Table Statement

Create Table 是一个用于使用 HCatalog 在 Hive Metastore 中创建表的语句。它的语法和示例如下 −

Syntax

CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.] table_name
[(col_name data_type [COMMENT col_comment], ...)]
[COMMENT table_comment]
[ROW FORMAT row_format]
[STORED AS file_format]

Example

让我们假设您需要使用 CREATE TABLE 语句创建名为 employee 的表。下表列出了 employee 表中的字段及其数据类型 −

Sr.No

Field Name

Data Type

1

Eid

int

2

Name

String

3

Salary

Float

4

Designation

string

以下数据定义了 Comment 等受支持字段、行格式字段例如 Field terminatorLines terminatorStored File type

COMMENT ‘Employee details’
FIELDS TERMINATED BY ‘\t’
LINES TERMINATED BY ‘\n’
STORED IN TEXT FILE

以下查询使用上述数据创建名为 employee 的表。

./hcat –e "CREATE TABLE IF NOT EXISTS employee ( eid int, name String,
   salary String, destination String) \
COMMENT 'Employee details' \
ROW FORMAT DELIMITED \
FIELDS TERMINATED BY ‘\t’ \
LINES TERMINATED BY ‘\n’ \
STORED AS TEXTFILE;"

如果添加选项 IF NOT EXISTS ,则在表已存在的情况下,HCatalog 忽略该声明。

当表创建成功时,您可以看到以下响应:

OK
Time taken: 5.905 seconds

Load Data Statement

总体上,在 SQL 中创建一个表之后,我们可以使用 Insert 声明插入数据。但在 HCatalog 中,我们使用 LOAD DATA 声明插入数据。

向 HCatalog 插入数据时,最好使用 LOAD DATA 来存储批量记录。有两种方法可用于加载数据:一种是从 local file system ,另一种是从 Hadoop file system

Syntax

LOAD DATA 的语法如下:

LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename
[PARTITION (partcol1=val1, partcol2=val2 ...)]
  1. LOCAL 是指定本地路径的标识符。它可选。

  2. OVERWRITE 可选,可覆盖表中的数据。

  3. PARTITION is optional.

Example

我们将向表中插入以下数据。它是一个文本文件,名为 sample.txt ,位于 /home/user 目录中。

1201  Gopal        45000    Technical manager
1202  Manisha      45000    Proof reader
1203  Masthanvali  40000    Technical writer
1204  Kiran        40000    Hr Admin
1205  Kranthi      30000    Op Admin

以下查询将给定的文本加载到表中。

./hcat –e "LOAD DATA LOCAL INPATH '/home/user/sample.txt'
OVERWRITE INTO TABLE employee;"

下载成功后,你会看到以下响应−

OK
Time taken: 15.905 seconds

HCatalog - Alter Table

本章介绍如何修改表的属性,例如更改表名、更改列名、添加列以及删除或替换列。

Alter Table Statement

您可以使用 ALTER TABLE 语句来修改 Hive 中的表。

Syntax

该语句根据我们希望在表中修改哪些属性采用以下任一语法。

ALTER TABLE name RENAME TO new_name
ALTER TABLE name ADD COLUMNS (col_spec[, col_spec ...])
ALTER TABLE name DROP [COLUMN] column_name
ALTER TABLE name CHANGE column_name new_name new_type
ALTER TABLE name REPLACE COLUMNS (col_spec[, col_spec ...])

下面介绍一些场景。

Rename To… Statement

以下查询将表 employee 重命名为 emp

./hcat –e "ALTER TABLE employee RENAME TO emp;"

Change Statement

下表包含 employee 表的字段,并显示要更改的字段(以粗体显示)。

Field Name

Convert from Data Type

Change Field Name

Convert to Data Type

eid

int

eid

int

name

String

ename

String

salary

Float

salary

Double

designation

String

designation

String

以下查询使用上述数据重命名列名和列数据类型 −

./hcat –e "ALTER TABLE employee CHANGE name ename String;"
./hcat –e "ALTER TABLE employee CHANGE salary salary Double;"

Add Columns Statement

以下查询向 employee 表中添加一个名为 dept 的列。

./hcat –e "ALTER TABLE employee ADD COLUMNS (dept STRING COMMENT 'Department name');"

Replace Statement

以下查询从 employee 表中删除所有列,并用 empname 列替换它们 −

./hcat – e "ALTER TABLE employee REPLACE COLUMNS ( eid INT empid Int, ename STRING name String);"

Drop Table Statement

本章介绍如何在 HCatalog 中删除表。当您从元存储中删除表时,它会删除表/列数据及其元数据。它可以是普通表(存储在元存储中)或外部表(存储在本地文件系统中);无论类型如何,HCatalog 都以相同的方式处理它们。

其语法如下:

DROP TABLE [IF EXISTS] table_name;

以下查询删除名为 employee 的表格−

./hcat –e "DROP TABLE IF EXISTS employee;"

成功执行查询后,你会看到以下响应−

OK
Time taken: 5.3 seconds

HCatalog - View

本章介绍如何在HCatalog中创建并管理 view 。数据库视图使用 CREATE VIEW 语句创建。视图可以从单个表格、多个表格或另一个视图创建。

要创建视图,用户必须根据具体实现具有适当的系统权限。

Create View Statement

CREATE VIEW 创建一个具有给定名称的视图。如果已经存在具有相同名称的表或视图,则会引发错误。可以使用 IF NOT EXISTS 来跳过该错误。

如果没有提供列名,则视图的列名将自动从 defining SELECT expression 派生。

Note − 如果 SELECT 包含未别名的标量表达式,例如 x+y,则生成的视图列名将采用 _C0、_C1 等形式。

重命名列时,还可以提供列注释。注释不会自动从基础列继承。

如果视图的 defining SELECT expression 无效,则 CREATE VIEW 语句将失败。

Syntax

CREATE VIEW [IF NOT EXISTS] [db_name.]view_name [(column_name [COMMENT column_comment], ...) ]
[COMMENT view_comment]
[TBLPROPERTIES (property_name = property_value, ...)]
AS SELECT ...;

Example

以下是员工表数据。现在让我们看看如何创建一个名为 Emp_Deg_View 的视图,其中包含薪水高于 35000 的员工的 ID、姓名、职位和薪水字段。

+------+-------------+--------+-------------------+-------+
|  ID  |    Name     | Salary |    Designation    | Dept  |
+------+-------------+--------+-------------------+-------+
| 1201 |    Gopal    | 45000  | Technical manager |  TP   |
| 1202 |   Manisha   | 45000  | Proofreader       |  PR   |
| 1203 | Masthanvali | 30000  | Technical writer  |  TP   |
| 1204 |    Kiran    | 40000  | Hr Admin          |  HR   |
| 1205 |   Kranthi   | 30000  | Op Admin          | Admin |
+------+-------------+--------+-------------------+-------+

以下是基于以上给定数据创建视图的命令。

./hcat –e "CREATE VIEW Emp_Deg_View (salary COMMENT ' salary more than 35,000')
   AS SELECT id, name, salary, designation FROM employee WHERE salary ≥ 35000;"

Output

OK
Time taken: 5.3 seconds

Drop View Statement

DROP VIEW 删除指定视图的元数据。删除其他视图引用的视图时,不会给出警告(依赖视图会一直处于无效状态,并且必须由用户删除或重新创建)。

Syntax

DROP VIEW [IF EXISTS] view_name;

Example

以下命令用于删除名为 Emp_Deg_View 的视图。

DROP VIEW Emp_Deg_View;

HCatalog - Show Tables

您通常希望列出数据库中的所有表或列出表中的所有列。显然,每个数据库都有自己列出表和列的语法。

Show Tables 语句显示所有表的名称。默认情况下,它会列出当前数据库中的表,或者在 IN 子句中,在指定数据库中列出表。

本节描述如何列出 HCatalog 中当前数据库中的所有表。

Show Tables Statement

SHOW TABLES 的语法如下 −

SHOW TABLES [IN database_name] ['identifier_with_wildcards'];

以下查询显示表列表 −

./hcat –e "Show tables;"

成功执行查询后,你会看到以下响应−

OK
emp
employee
Time taken: 5.3 seconds

HCatalog - Show Partitions

分区是用于创建单独表格或视图的表格数据条件。SHOW PARTITIONS列出给定基础表的所有现有分区。分区按字母顺序列出。在Hive 0.6之后,还可以指定分区规范的部分以筛选结果列表。

可以使用SHOW PARTITIONS命令查看特定表格中存在的分区。本章介绍如何列出HCatalog中特定表格的分区。

Show Partitions Statement

其语法如下:

SHOW PARTITIONS table_name;

以下查询删除名为 employee 的表格−

./hcat –e "Show partitions employee;"

成功执行查询后,你会看到以下响应−

OK
Designation = IT
Time taken: 5.3 seconds

Dynamic Partition

HCatalog将表格组织成分区。这是一种基于分区列,如日期、城市和部门的值,将表格划分为相关部分的方式。使用分区,很容易查询部分数据。

例如,名为 Tab1 的表格包含员工数据,如id、姓名、部门和yoj(即参加年份)。假设你需要检索2012年加入的所有员工的详细信息。查询搜索整个表格以获取所需信息。然而,如果您使用年份对员工数据进行分区并将其存储在单独的文件中,则会减少查询处理时间。以下示例显示如何对文件及其数据进行分区 −

以下文件包含 employeedata 表格。

/tab1/employeedata/file1

id, name,   dept, yoj
1,  gopal,   TP, 2012
2,  kiran,   HR, 2012
3,  kaleel,  SC, 2013
4, Prasanth, SC, 2013

上面数据使用年份分成两个文件。

/tab1/employeedata/2012/file2

1, gopal, TP, 2012
2, kiran, HR, 2012

/tab1/employeedata/2013/file3

3, kaleel,   SC, 2013
4, Prasanth, SC, 2013

Adding a Partition

我们可以通过更改表格来向表格添加分区。让我们假设我们有一个名为 employee 的表格,其中包含诸如Id、姓名、工资、职务、部门和yoj等字段。

Syntax

ALTER TABLE table_name ADD [IF NOT EXISTS] PARTITION partition_spec
[LOCATION 'location1'] partition_spec [LOCATION 'location2'] ...;
partition_spec:
: (p_column = p_col_value, p_column = p_col_value, ...)

以下查询用于向 employee 表格添加分区。

./hcat –e "ALTER TABLE employee ADD PARTITION (year = '2013') location '/2012/part2012';"

Renaming a Partition

可以使用RENAME-TO命令重命名分区。它的语法如下−

./hact –e "ALTER TABLE table_name PARTITION partition_spec RENAME TO PARTITION partition_spec;"

以下查询用于重命名分区 −

./hcat –e "ALTER TABLE employee PARTITION (year=’1203’) RENAME TO PARTITION (Yoj='1203');"

Dropping a Partition

用于删除分区的命令的语法如下−

./hcat –e "ALTER TABLE table_name DROP [IF EXISTS] PARTITION partition_spec,.
   PARTITION partition_spec,...;"

以下查询用于删除分区−

./hcat –e "ALTER TABLE employee DROP [IF EXISTS] PARTITION (year=’1203’);"

HCatalog - Indexes

Creating an Index

索引实际上只是表某个特定列上的指针。创建索引是指在表的特定列上创建一个指针。其语法如下:

CREATE INDEX index_name
ON TABLE base_table_name (col_name, ...)
AS 'index.handler.class.name'
[WITH DEFERRED REBUILD]
[IDXPROPERTIES (property_name = property_value, ...)]
[IN TABLE index_table_name]
[PARTITIONED BY (col_name, ...)][
   [ ROW FORMAT ...] STORED AS ...
   | STORED BY ...
]
[LOCATION hdfs_path]
[TBLPROPERTIES (...)]

Example

让我们举一个例子来理解索引的概念。使用我们先前使用过的、包含字段 Id、Name、Salary、Designation 和 Dept 的相同 employee 表。在 employee 表的 salary 列上创建一个名为 index_salary 的索引。

以下查询创建了一个索引:

./hcat –e "CREATE INDEX inedx_salary ON TABLE employee(salary)
AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler';"

它是 salary 列的指针。如果该列被修改,则更改将使用索引值进行存储。

Dropping an Index

以下语法用于删除索引:

DROP INDEX <index_name> ON <table_name>

以下查询将删除索引 index_salary −

./hcat –e "DROP INDEX index_salary ON employee;"

HCatalog - Reader Writer

HCatalog 包含一个数据传输 API,用于在不使用 MapReduce 的情况下实现并行输入和输出。此 API 使用表的存储和行的基本抽象从 Hadoop 集群读取数据,并向其中写入数据。

数据传输 API 主要包含三个类:

  1. HCatReader - 读取 Hadoop 集群中的数据。

  2. HCatWriter - 向 Hadoop 集群中写入数据。

  3. DataTransferFactory - 生成读取器和写入器实例。

此 API 适合主从节点设置。让我们进一步讨论 HCatReaderHCatWriter

HCatReader

HCatReader 是 HCatalog 的一个内部抽象类,它抽象了从其检索记录的底层系统中的复杂性。

S. No.

Method Name & Description

1

Public abstract ReaderContext prepareRead() throws HCatException 应在主节点上调用此内容来获得 ReaderContext,之后应序列化并发送从属于节点。

2

Public abstract Iterator &lt;HCatRecorder&gt; read() throws HCaException 应在从属于节点上调用此内容来读取 HCatRecords。

3

Public Configuration getConf() 它将返回配置类对象。

HCatReader 类用于读取 HDFS 中的数据。阅读是一个两步过程,其中第一步发生在外部系统的 master 节点上。第二步在多个 slave 节点上并行执行。

读取在 ReadEntity 上完成。在开始读取之前,你需要定义一个 ReadEntity 用于读取。可以通过 ReadEntity.Builder 完成。你可以指定一个数据库名称、表名称、分区和过滤字符串。例如 −

ReadEntity.Builder builder = new ReadEntity.Builder();
ReadEntity entity = builder.withDatabase("mydb").withTable("mytbl").build(); 10.

上面的代码片段定义了一个 ReadEntity 对象(“entity”),它包含一个名为 mytbl 的表和一个名为 mydb 的数据库,可用于读取该表的所有行。请注意,此表必须在该操作开始之前存在于 HCatalog 中。

在定义 ReadEntity 之后,你可以使用 ReadEntity 和集群配置获取 HCatReader 的实例 −

HCatReader reader = DataTransferFactory.getHCatReader(entity, config);

下一步是从 reader 获取一个 ReaderContext,如下所示:

ReaderContext cntxt = reader.prepareRead();

HCatWriter

该抽象是 HCatalog 内部实现。这便于从外部系统写入 HCatalog。请勿尝试直接实例化它。相反,请使用 DataTransferFactory。

Sr.No.

Method Name & Description

1

Public abstract WriterContext prepareRead() throws HCatException 外部系统应从 master 节点调用此方法一次。它返回一个 WriterContext 。应该对它进行序列化并将其发送到 slave 节点,以在该处构建 HCatWriter

2

Public abstract void write(Iterator&lt;HCatRecord&gt; recordItr) throws HCaException 此方法应在 slave 节点上使用,用于执行写操作。recordItr 是一个迭代器对象,它包含需要写入 HCatalog 中的记录集合。

3

Public abstract void abort(WriterContext cntxt) throws HCatException 此方法应在 master 节点上调用。此方法的主要目的是在出现故障的情况下进行清理。

4

public abstract void commit(WriterContext cntxt) throws HCatException 此方法应在 master 节点上调用。此方法的目的是进行元数据提交。

与阅读类似,写入也是一个两步过程,其中第一步发生在 master 节点上。随后,第二步在 slave 节点上并行执行。

WriteEntity 上执行写操作,可以按照与读取类似的方式构建 −

WriteEntity.Builder builder = new WriteEntity.Builder();
WriteEntity entity = builder.withDatabase("mydb").withTable("mytbl").build();

上面的代码创建了一个 WriteEntity 对象 entity,可以用来写到数据库 mydb 中名为 mytbl 的表。

在创建 WriteEntity 之后,下一步是获取一个 WriterContext −

HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
WriterContext info = writer.prepareWrite();

上述所有步骤都在 master 节点上发生。然后,master 节点将 WriterContext 对象序列化,并使所有 slave 可以使用它。

在 slave 节点上,你需要使用 WriterContext 获取一个 HCatWriter,如下所示:

HCatWriter writer = DataTransferFactory.getHCatWriter(context);

然后, writer 将迭代器作为写方法的参数 −

writer.write(hCatRecordItr);

然后, writer 在循环中对该迭代器调用 getNext() ,并写出附加到迭代器上的所有记录。

TestReaderWriter.java 文件用于测试 HCatreader 和 HCatWriter 类。以下程序演示如何使用 HCatReader 和 HCatWriter API 从源文件读取数据,并随后将其写入目标文件。

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hive.HCatalog.common.HCatException;
import org.apache.hive.HCatalog.data.transfer.DataTransferFactory;
import org.apache.hive.HCatalog.data.transfer.HCatReader;
import org.apache.hive.HCatalog.data.transfer.HCatWriter;
import org.apache.hive.HCatalog.data.transfer.ReadEntity;
import org.apache.hive.HCatalog.data.transfer.ReaderContext;
import org.apache.hive.HCatalog.data.transfer.WriteEntity;
import org.apache.hive.HCatalog.data.transfer.WriterContext;
import org.apache.hive.HCatalog.mapreduce.HCatBaseTest;

import org.junit.Assert;
import org.junit.Test;

public class TestReaderWriter extends HCatBaseTest {
   @Test
   public void test() throws MetaException, CommandNeedRetryException,
      IOException, ClassNotFoundException {

      driver.run("drop table mytbl");
      driver.run("create table mytbl (a string, b int)");

      Iterator<Entry<String, String>> itr = hiveConf.iterator();
      Map<String, String> map = new HashMap<String, String>();

      while (itr.hasNext()) {
         Entry<String, String> kv = itr.next();
         map.put(kv.getKey(), kv.getValue());
      }

      WriterContext cntxt = runsInMaster(map);
      File writeCntxtFile = File.createTempFile("hcat-write", "temp");
      writeCntxtFile.deleteOnExit();

      // Serialize context.
      ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(writeCntxtFile));
      oos.writeObject(cntxt);
      oos.flush();
      oos.close();

      // Now, deserialize it.
      ObjectInputStream ois = new ObjectInputStream(new FileInputStream(writeCntxtFile));
      cntxt = (WriterContext) ois.readObject();
      ois.close();
      runsInSlave(cntxt);
      commit(map, true, cntxt);

      ReaderContext readCntxt = runsInMaster(map, false);
      File readCntxtFile = File.createTempFile("hcat-read", "temp");
      readCntxtFile.deleteOnExit();
      oos = new ObjectOutputStream(new FileOutputStream(readCntxtFile));
      oos.writeObject(readCntxt);
      oos.flush();
      oos.close();

      ois = new ObjectInputStream(new FileInputStream(readCntxtFile));
      readCntxt = (ReaderContext) ois.readObject();
      ois.close();

      for (int i = 0; i < readCntxt.numSplits(); i++) {
         runsInSlave(readCntxt, i);
      }
   }

   private WriterContext runsInMaster(Map<String, String> config) throws HCatException {
      WriteEntity.Builder builder = new WriteEntity.Builder();
      WriteEntity entity = builder.withTable("mytbl").build();

      HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
      WriterContext info = writer.prepareWrite();
      return info;
   }

   private ReaderContext runsInMaster(Map<String, String> config,
      boolean bogus) throws HCatException {
      ReadEntity entity = new ReadEntity.Builder().withTable("mytbl").build();
      HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
      ReaderContext cntxt = reader.prepareRead();
      return cntxt;
   }

   private void runsInSlave(ReaderContext cntxt, int slaveNum) throws HCatException {
      HCatReader reader = DataTransferFactory.getHCatReader(cntxt, slaveNum);
      Iterator<HCatRecord> itr = reader.read();
      int i = 1;

      while (itr.hasNext()) {
         HCatRecord read = itr.next();
         HCatRecord written = getRecord(i++);

         // Argh, HCatRecord doesnt implement equals()
         Assert.assertTrue("Read: " + read.get(0) + "Written: " + written.get(0),
         written.get(0).equals(read.get(0)));

         Assert.assertTrue("Read: " + read.get(1) + "Written: " + written.get(1),
         written.get(1).equals(read.get(1)));

         Assert.assertEquals(2, read.size());
      }

      //Assert.assertFalse(itr.hasNext());
   }

   private void runsInSlave(WriterContext context) throws HCatException {
      HCatWriter writer = DataTransferFactory.getHCatWriter(context);
      writer.write(new HCatRecordItr());
   }

   private void commit(Map<String, String> config, boolean status,
      WriterContext context) throws IOException {
      WriteEntity.Builder builder = new WriteEntity.Builder();
      WriteEntity entity = builder.withTable("mytbl").build();
      HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);

      if (status) {
         writer.commit(context);
      } else {
         writer.abort(context);
      }
   }

   private static HCatRecord getRecord(int i) {
      List<Object> list = new ArrayList<Object>(2);
      list.add("Row #: " + i);
      list.add(i);
      return new DefaultHCatRecord(list);
   }

   private static class HCatRecordItr implements Iterator<HCatRecord> {
      int i = 0;

      @Override
      public boolean hasNext() {
         return i++ < 100 ? true : false;
      }

      @Override
      public HCatRecord next() {
         return getRecord(i);
      }

      @Override
      public void remove() {
         throw new RuntimeException();
      }
   }
}

以上程序读取 HDFS 中的数据记录,并将记录数据写入 mytable

HCatalog - Input Output Format

HCatInputFormatHCatOutputFormat 用于读取 HDFS 的数据,并在处理后使用 MapReduce 作业将结果数据写入 HDFS。我们来详细介绍输入和输出格式接口。

HCatInputFormat

HCatInputFormat 用于与 MapReduce 作业一起,从 HCatalog 管理的表读取数据。HCatInputFormat 暴露出 Hadoop 0.20 MapReduce API,用于读取数据,就像已发表到表中一样。

Sr.No.

Method Name & Description

1

public static HCatInputFormat setInput(Job job, String dbName, String tableName)throws IOException 为作业设置要使用的输入。它使用给定的输入规范查询信息仓库,并将匹配的分区序列化到 MapReduce 任务的作业配置中。

2

public static HCatInputFormat setInput(Configuration conf, String dbName, String tableName) throws IOException 为作业设置要使用的输入。它使用给定的输入规范查询信息仓库,并将匹配的分区序列化到 MapReduce 任务的作业配置中。

3

public HCatInputFormat setFilter(String filter)throws IOException 设置对输入表的筛选器。

4

public HCatInputFormat setProperties(Properties properties) throws IOException 为输入格式设置属性。

HCatInputFormat API 包括以下方法 -

  1. setInput

  2. setOutputSchema

  3. getTableSchema

要使用 HCatInputFormat 读取数据,请首先使用正在读取的表的必要信息实例化一个 InputJobInfo ,然后使用 InputJobInfo 调用 setInput

可以使用 setOutputSchema 方法包含 projection schema ,以指定输出字段。如果没有指定架构,则会返回表中的所有列。可以使用 getTableSchema 方法来确定指定输入表表的架构。

HCatOutputFormat

HCatOutputFormat 用于向 HCatalog 管理的表写入数据的 MapReduce 作业。HCatOutputFormat 暴露出 Hadoop 0.20 MapReduce API,用于将数据写入表。当 MapReduce 作业使用 HCatOutputFormat 写入输出时,将使用为表配置的默认 OutputFormat,并在作业完成后将新分区发布到表中。

Sr.No.

Method Name & Description

1

public static void setOutput (Configuration conf, Credentials credentials, OutputJobInfo outputJobInfo) throws IOException 为作业设置要写入的输出信息。它会查询元数据服务器来查找可用于该表的 StorageHandler。如果分区已发布,它会抛出一个错误。

2

public static void setSchema (Configuration conf, HCatSchema schema) throws IOException 为写入分区的数据设置架构。如果未调用此方法,则表架构将默认用于分区。

3

public RecordWriter &lt;WritableComparable&lt;?&gt;, HCatRecord &gt; getRecordWriter (TaskAttemptContext context)throws IOException, InterruptedException 获取作业的记录编写器。它使用 StorageHandler 的默认 OutputFormat 来获取记录编写器。

4

public OutputCommitter getOutputCommitter (TaskAttemptContext context) throws IOException, InterruptedException 获取此输出格式的输出提交者。它确保输出正确提交。

HCatOutputFormat API 包括以下方法 -

  1. setOutput

  2. setSchema

  3. getTableSchema

HCatOutputFormat 中的第一个调用必须是 setOutput ;其他任何调用都会引发一个异常,表明输出格式未初始化。

通过 setSchema 方法指定待写入数据的架构。必须调用此方法,提供正在写入数据的架构。如果您的数据与表架构具有相同的架构,则可以使用 HCatOutputFormat.getTableSchema() 获取表架构,然后将其传递给 setSchema()

Example

以下 MapReduce 程序从一个表中读取数据,它假定第二列(“列 1”)中有整数,并统计找到的每个不同值的实例数。也就是说,它执行了“ select col1, count( from $table group by col1;*”的等价操作。

例如,如果第二列中的值为 {1, 1, 1, 3, 3, 5},那么程序将生成以下值和次数输出 −

1, 3
3, 2
5, 1

我们现在来看一下程序代码 −

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.apache.HCatalog.common.HCatConstants;
import org.apache.HCatalog.data.DefaultHCatRecord;
import org.apache.HCatalog.data.HCatRecord;
import org.apache.HCatalog.data.schema.HCatSchema;

import org.apache.HCatalog.mapreduce.HCatInputFormat;
import org.apache.HCatalog.mapreduce.HCatOutputFormat;
import org.apache.HCatalog.mapreduce.InputJobInfo;
import org.apache.HCatalog.mapreduce.OutputJobInfo;

public class GroupByAge extends Configured implements Tool {

   public static class Map extends Mapper<WritableComparable,
      HCatRecord, IntWritable, IntWritable> {
      int age;

      @Override
      protected void map(
         WritableComparable key, HCatRecord value,
         org.apache.hadoop.mapreduce.Mapper<WritableComparable,
         HCatRecord, IntWritable, IntWritable>.Context context
      )throws IOException, InterruptedException {
         age = (Integer) value.get(1);
         context.write(new IntWritable(age), new IntWritable(1));
      }
   }

   public static class Reduce extends Reducer<IntWritable, IntWritable,
      WritableComparable, HCatRecord> {
      @Override
      protected void reduce(
         IntWritable key, java.lang.Iterable<IntWritable> values,
         org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable,
         WritableComparable, HCatRecord>.Context context
      )throws IOException ,InterruptedException {
         int sum = 0;
         Iterator<IntWritable> iter = values.iterator();

         while (iter.hasNext()) {
            sum++;
            iter.next();
         }

         HCatRecord record = new DefaultHCatRecord(2);
         record.set(0, key.get());
         record.set(1, sum);
         context.write(null, record);
      }
   }

   public int run(String[] args) throws Exception {
      Configuration conf = getConf();
      args = new GenericOptionsParser(conf, args).getRemainingArgs();

      String serverUri = args[0];
      String inputTableName = args[1];
      String outputTableName = args[2];
      String dbName = null;
      String principalID = System

      .getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL);
      if (principalID != null)
      conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID);
      Job job = new Job(conf, "GroupByAge");
      HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null));

      // initialize HCatOutputFormat
      job.setInputFormatClass(HCatInputFormat.class);
      job.setJarByClass(GroupByAge.class);
      job.setMapperClass(Map.class);
      job.setReducerClass(Reduce.class);

      job.setMapOutputKeyClass(IntWritable.class);
      job.setMapOutputValueClass(IntWritable.class);
      job.setOutputKeyClass(WritableComparable.class);
      job.setOutputValueClass(DefaultHCatRecord.class);

      HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));
      HCatSchema s = HCatOutputFormat.getTableSchema(job);
      System.err.println("INFO: output schema explicitly set for writing:" + s);
      HCatOutputFormat.setSchema(job, s);
      job.setOutputFormatClass(HCatOutputFormat.class);
      return (job.waitForCompletion(true) ? 0 : 1);
   }

   public static void main(String[] args) throws Exception {
      int exitCode = ToolRunner.run(new GroupByAge(), args);
      System.exit(exitCode);
   }
}

在编译上述程序之前,您必须先下载一些 jars 并将它们添加到此应用程序的 classpath 。您需要下载所有 Hive jar 和 HCatalog jar(HCatalog-core-0.5.0.jar、hive-metastore-0.10.0.jar、libthrift-0.7.0.jar、hive-exec-0.10.0.jar、libfb303-0.7.0.jar、jdo2-api-2.3-ec.jar、slf4j-api-1.6.1.jar)。

使用以下命令将 jar 文件从 local 复制到 HDFS 并将它们添加到 classpath

bin/hadoop fs -copyFromLocal $HCAT_HOME/share/HCatalog/HCatalog-core-0.5.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp

export LIB_JARS=hdfs:///tmp/HCatalog-core-0.5.0.jar,
hdfs:///tmp/hive-metastore-0.10.0.jar,
hdfs:///tmp/libthrift-0.7.0.jar,
hdfs:///tmp/hive-exec-0.10.0.jar,
hdfs:///tmp/libfb303-0.7.0.jar,
hdfs:///tmp/jdo2-api-2.3-ec.jar,
hdfs:///tmp/slf4j-api-1.6.1.jar

使用以下命令编译并执行给定的程序。

$HADOOP_HOME/bin/hadoop jar GroupByAge tmp/hive

现在,检查输出目录 (hdfs: user/tmp/hive) 以查看输出 (part_0000, part_0001)。

HCatalog - Loader & Storer

HCatLoaderHCatStorer API 与 Pig 脚本一起用于读写 HCatalog 管理的表中的数据。这些接口不需要 HCatalog 特定的设置。

最好了解一些 Apache Pig 脚本的知识,才能更好地理解本章。更多参考信息,请参阅我们的 Apache Pig 教程。

HCatloader

HCatLoader 与 Pig 脚本一起用于从 HCatalog 管理的表中读取数据。使用以下语法以使用 HCatloader 将数据加载到 HDFS 中。

A = LOAD 'tablename' USING org.apache.HCatalog.pig.HCatLoader();

您必须用单引号指定表名: LOAD 'tablename' 。如果您正在使用非默认数据库,那么您必须将您的输入指定为 ' dbname.tablename'

Hive 元存储允许您在不指定数据库的情况下创建表。如果您以这种方式创建表,那么数据库名称为 'default' ,并且在为 HCatLoader 指定表时不需要该名称。

下表包含 HCatloader 类的重要方法及其描述。

Sr.No.

Method Name & Description

1

public InputFormat&lt;?,?&gt; getInputFormat()throws IOException 使用 HCatloader 类读取加载数据的输入格式。

2

public String relativeToAbsolutePath(String location, Path curDir) throws IOException 返回 Absolute path 的字符串格式。

3

public void setLocation(String location, Job job) throws IOException 设置可执行作业的位置。

4

public Tuple getNext() throws IOException 从循环中返回当前元组 ( keyvalue )。

HCatStorer

HCatStorer 与 Pig 脚本一起用于将数据写入 HCatalog 管理的表中。对于存储操作,使用以下语法。

A = LOAD ...
B = FOREACH A ...
...
...
my_processed_data = ...

STORE my_processed_data INTO 'tablename' USING org.apache.HCatalog.pig.HCatStorer();

您必须用单引号指定表名: LOAD 'tablename' 。在运行您的 Pig 脚本之前,必须创建数据库和表。如果您正在使用非默认数据库,那么您必须将您的输入指定为 'dbname.tablename'

Hive 元存储允许您在不指定数据库的情况下创建表。如果您以这种方式创建表,那么数据库名称为 'default' ,您不需要在 store 语句中指定数据库名称。

对于 USING 语句,您可以有一个表示分区键值对的字符串参数。当您写入分区表而分区列不在输出列中时,这是一个强制参数。分区键的值不应加引号。

下表包含 HCatStorer 类的重要方法及其说明。

Sr.No.

Method Name & Description

1

public OutputFormat getOutputFormat() throws IOException 使用 HCatStorer 类读取存储数据的输出格式。

2

public void setStoreLocation (String location, Job job) throws IOException 设置执行此 store 应用程序的位置。

3

public void storeSchema (ResourceSchema schema, String arg1, Job job) throws IOException 存储架构。

4

public void prepareToWrite (RecordWriter writer) throws IOException 可帮助使用 RecordWriter 将数据写入特定文件。

5

public void putNext (Tuple tuple) throws IOException 将元组数据写入文件。

Running Pig with HCatalog

Pig 不会自动获取 HCatalog jar。若要引入必要的 jar,可使用 Pig 命令中的标志或设置 PIG_CLASSPATHPIG_OPTS 环境变量,如下所示。

若要引入用于处理 HCatalog 的适当 jar,只需包含以下标志 −

pig –useHCatalog <Sample pig scripts file>

Setting the CLASSPATH for Execution

使用以下 CLASSPATH 设置同步 HCatalog 与 Apache Pig。

export HADOOP_HOME = <path_to_hadoop_install>
export HIVE_HOME = <path_to_hive_install>
export HCAT_HOME = <path_to_hcat_install>

export PIG_CLASSPATH = $HCAT_HOME/share/HCatalog/HCatalog-core*.jar:\
$HCAT_HOME/share/HCatalog/HCatalog-pig-adapter*.jar:\
$HIVE_HOME/lib/hive-metastore-*.jar:$HIVE_HOME/lib/libthrift-*.jar:\
$HIVE_HOME/lib/hive-exec-*.jar:$HIVE_HOME/lib/libfb303-*.jar:\
$HIVE_HOME/lib/jdo2-api-*-ec.jar:$HIVE_HOME/conf:$HADOOP_HOME/conf:\
$HIVE_HOME/lib/slf4j-api-*.jar

Example

假设我们在 HDFS 中有一个文件 student_details.txt ,内容如下。

student_details.txt

001, Rajiv,    Reddy,       21, 9848022337, Hyderabad
002, siddarth, Battacharya, 22, 9848022338, Kolkata
003, Rajesh,   Khanna,      22, 9848022339, Delhi
004, Preethi,  Agarwal,     21, 9848022330, Pune
005, Trupthi,  Mohanthy,    23, 9848022336, Bhuwaneshwar
006, Archana,  Mishra,      23, 9848022335, Chennai
007, Komal,    Nayak,       24, 9848022334, trivendram
008, Bharathi, Nambiayar,   24, 9848022333, Chennai

我们还有同一个 HDFS 目录中的一个样例脚本,名为 sample_script.pig 。该文件包含对 student 关系执行操作和转换的语句,如下所示。

student = LOAD 'hdfs://localhost:9000/pig_data/student_details.txt' USING
   PigStorage(',') as (id:int, firstname:chararray, lastname:chararray,
   phone:chararray, city:chararray);

student_order = ORDER student BY age DESC;
STORE student_order INTO 'student_order_table' USING org.apache.HCatalog.pig.HCatStorer();
student_limit = LIMIT student_order 4;
Dump student_limit;
  1. 该脚本的第一条语句将以 student_details.txt 命名的文件中数据载入到名为 student 的关系中。

  2. 该脚本的第二条语句将按照年龄对该关系的元组进行降序排列,并将其存储为 student_order

  3. 第三个语句将已处理数据 student_order 结果存储在名为 student_order_table 的单独表中。

  4. 脚本的第四个语句将 student_order 的前四个元组存储为 student_limit

  5. 最后,第五个语句将转储关系 student_limit 的内容。

现在让我们按照如下所示执行 sample_script.pig

$./pig -useHCatalog hdfs://localhost:9000/pig_data/sample_script.pig

现在,检查输出目录 (hdfs: user/tmp/hive) 以查看输出 (part_0000, part_0001)。