Hcatalog 简明教程
HCatalog - Introduction
What is HCatalog?
HCatalog 是 Hadoop 的表存储管理工具。它向其他 Hadoop 应用程序公开 Hive 元存储的表格数据。它使用户能够使用不同的数据处理工具(Pig、MapReduce)轻松地将数据写到网格中。它确保用户不必担心数据存储在何处或以何种格式存储。
HCatalog 作为一个 Hive 的关键组件,使用户可以存储任何格式和任何结构的数据。
Why HCatalog?
Enabling right tool for right Job
Hadoop 生态系统包含用于数据处理的不同工具,例如 Hive、Pig 和 MapReduce。虽然这些工具不需要元数据,但是当元数据存在时,它们仍然可以从中受益。共享元数据存储还使跨工具的用户能够更轻松地共享数据。一种非常常见的工作流程是使用 MapReduce 或 Pig 加载和规范化数据,然后通过 Hive 进行分析。如果所有这些工具共享一个元存储,那么每个工具的用户都可以立即访问使用另一个工具创建的数据。无需加载或传输步骤。
HCatalog Architecture
下图显示了 HCatalog 的整体架构。
HCatalog 支持为可以使用 SerDe (序列化器-反序列化器)编写的任何格式读写文件。默认情况下,HCatalog 支持 RCFile、CSV、JSON、SequenceFile 和 ORC 文件格式。要使用自定义格式,您必须提供 InputFormat、OutputFormat 和 SerDe。
HCatalog 建立在 Hive 元存储之上,并结合了 Hive 的 DDL。HCatalog 为 Pig 和 MapReduce 提供了读写接口,并使用 Hive 的命令行界面来发布数据定义和元数据探索命令。
HCatalog - Installation
所有 Hadoop 子项目(如 Hive、Pig 和 HBase)都支持 Linux 操作系统。因此,您需要在系统上安装 Linux 版本。HCatalog 已在 2013 年 3 月 26 日与 Hive 安装合并。从 Hive-0.11.0 版本开始,HCatalog 随 Hive 安装提供。因此,请按照以下步骤安装 Hive,进而自动在系统上安装 HCatalog。
Step 1: Verifying JAVA Installation
在安装 Hive 之前,必须在系统上安装 Java。您可以使用以下命令来检查系统上是否已安装 Java -
$ java –version
如果系统上已安装 Java,您将看到以下响应 -
java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)
如果系统上未安装 Java,则需要按照以下步骤操作。
Step 2: Installing Java
访问以下链接 http://www.oracle.com/ 下载 Java(JDK <latest version> - X64.tar.gz)
然后 jdk-7u71-linux-x64.tar.gz 将下载到您的系统。
通常,您会在 Downloads 文件夹中找到下载的 Java 文件。使用以下命令验证并解压 jdk-7u71-linux-x64.gz 文件。
$ cd Downloads/
$ ls
jdk-7u71-linux-x64.gz
$ tar zxf jdk-7u71-linux-x64.gz
$ ls
jdk1.7.0_71 jdk-7u71-linux-x64.gz
为了使得所有用户都能使用 Java,您需要将 Java 移动到“/usr/local/”位置。打开 root,然后键入以下命令。
$ su
password:
# mv jdk1.7.0_71 /usr/local/
# exit
为设置 PATH 和 JAVA_HOME 变量,将以下命令添加到 ~/.bashrc 文件。
export JAVA_HOME=/usr/local/jdk1.7.0_71
export PATH=PATH:$JAVA_HOME/bin
现在,使用上述 from terminal java -version 命令验证安装。
Step 3: Verifying Hadoop Installation
在安装 Hive 之前,必须在您的系统上安装 Hadoop。让我们使用以下命令验证 Hadoop 安装:
$ hadoop version
如果已在您的系统上安装 Hadoop,则会收到以下回复:
Hadoop 2.4.1
Subversion https://svn.apache.org/repos/asf/hadoop/common -r 1529768
Compiled by hortonmu on 2013-10-07T06:28Z
Compiled with protoc 2.5.0
From source with checksum 79e53ce7994d1628b240f09af91e1af4
如果您的系统上未安装 Hadoop,请执行以下步骤:
Step 4: Downloading Hadoop
使用以下命令从 Apache 软件基金会下载并解压缩 Hadoop 2.4.1。
$ su
password:
# cd /usr/local
# wget http://apache.claz.org/hadoop/common/hadoop-2.4.1/
hadoop-2.4.1.tar.gz
# tar xzf hadoop-2.4.1.tar.gz
# mv hadoop-2.4.1/* to hadoop/
# exit
Step 5: Installing Hadoop in Pseudo Distributed Mode
以下步骤用于在伪分布式模式下安装 Hadoop 2.4.1 。
Setting up Hadoop
您可以通过将以下命令追加到 ~/.bashrc 文件来设置 Hadoop 环境变量。
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin
现在将所有更改应用到当前正在运行的系统中。
$ source ~/.bashrc
Hadoop Configuration
您可以在位置 “$HADOOP_HOME/etc/hadoop” 中找到所有 Hadoop 配置文件。根据您的 Hadoop 基础架构,您需要在这些配置文件中进行适当的更改。
$ cd $HADOOP_HOME/etc/hadoop
为了使用 Java 开发 Hadoop 程序,您必须通过将 JAVA_HOME 值替换为系统中 Java 的位置,来重置 hadoop-env.sh 文件中的 Java 环境变量。
export JAVA_HOME=/usr/local/jdk1.7.0_71
下面列出了您必须编辑以配置 Hadoop 的文件列表。
core-site.xml
core-site.xml 文件包含信息,例如用于 Hadoop 实例的端口号、分配给文件系统内存、用于存储数据的内存限制以及读/写缓冲区大小。
打开 core-site.xml,并在 <configuration> 和 </configuration> 标记之间添加以下属性。
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
hdfs-site.xml
hdfs-site.xml 文件包含信息,例如复制数据的值、本地名节点的路径以及本地文件系统的数据节点路径。这意味着您要存储 Hadoop 基础架构的位置。
让我们假设以下数据。
dfs.replication (data replication value) = 1
(In the following path /hadoop/ is the user name.
hadoopinfra/hdfs/namenode is the directory created by hdfs file system.)
namenode path = //home/hadoop/hadoopinfra/hdfs/namenode
(hadoopinfra/hdfs/datanode is the directory created by hdfs file system.)
datanode path = //home/hadoop/hadoopinfra/hdfs/datanode
打开此文件,并在此文件中在 <configuration>、</configuration> 标记之间添加以下属性。
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.name.dir</name>
<value>file:///home/hadoop/hadoopinfra/hdfs/namenode</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>file:///home/hadoop/hadoopinfra/hdfs/datanode</value>
</property>
</configuration>
Note − 在上述文件中,所有属性值都是用户定义的,您可以根据 Hadoop 基础架构进行更改。
yarn-site.xml
此文件用于将 Yarn 配置到 Hadoop 中。打开 yarn-site.xml 文件并在该文件中的 <configuration>、</configuration> 标记之间添加以下属性。
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
mapred-site.xml
此文件用于指定我们使用哪种 MapReduce 框架。默认情况下,Hadoop 包含一个 yarn-site.xml 模板。首先,您需要使用以下命令将文件从 mapred-site,xml.template 复制到 mapred-site.xml 文件。
$ cp mapred-site.xml.template mapred-site.xml
打开 mapred-site.xml 文件,并在此文件中在 <configuration>、</configuration> 标记之间添加以下属性。
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
Step 6: Verifying Hadoop Installation
以下步骤用于验证 Hadoop 安装。
Namenode Setup
使用 “hdfs namenode -format” 命令设置 name 节点,如下所示:
$ cd ~
$ hdfs namenode -format
预期结果如下:
10/24/14 21:30:55 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = localhost/192.168.1.11
STARTUP_MSG: args = [-format]
STARTUP_MSG: version = 2.4.1
...
...
10/24/14 21:30:56 INFO common.Storage: Storage directory
/home/hadoop/hadoopinfra/hdfs/namenode has been successfully formatted.
10/24/14 21:30:56 INFO namenode.NNStorageRetentionManager: Going to retain 1
images with txid >= 0 10/24/14 21:30:56 INFO util.ExitUtil: Exiting with status 0
10/24/14 21:30:56 INFO namenode.NameNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at localhost/192.168.1.11
************************************************************/
Verifying Hadoop DFS
以下命令用于启动 DFS。执行此命令将启动您的 Hadoop 文件系统。
$ start-dfs.sh
预期输出如下所示 −
10/24/14 21:37:56 Starting namenodes on [localhost]
localhost: starting namenode, logging to
/home/hadoop/hadoop-2.4.1/logs/hadoop-hadoop-namenode-localhost.out localhost:
starting datanode, logging to
/home/hadoop/hadoop-2.4.1/logs/hadoop-hadoop-datanode-localhost.out
Starting secondary namenodes [0.0.0.0]
Verifying Yarn Script
以下命令用于启动 Yarn 脚本。执行此命令将启动您的 Yarn 守护程序。
$ start-yarn.sh
预期输出如下所示 −
starting yarn daemons
starting resourcemanager, logging to /home/hadoop/hadoop-2.4.1/logs/
yarn-hadoop-resourcemanager-localhost.out
localhost: starting nodemanager, logging to
/home/hadoop/hadoop-2.4.1/logs/yarn-hadoop-nodemanager-localhost.out
Step 7: Downloading Hive
在本教程中,我们使用 hive-0.14.0。你可以访问以下链接下载: http://apache.petsads.us/hive/hive-0.14.0/ 。我们假设教程下载到了 /Downloads 目录。在这里,我们为此教程下载了名为“ apache-hive-0.14.0-bin.tar.gz ”的 Hive 归档文件。使用以下命令验证下载情况−
$ cd Downloads
$ ls
下载成功后,你会看到以下响应−
apache-hive-0.14.0-bin.tar.gz
Step 8: Installing Hive
若要在系统上安装 Hive,需要执行以下步骤。我们假设 Hive 归档文件下载到了 /Downloads 目录。
Extracting and Verifying Hive Archive
使用以下命令验证下载情况并提取 Hive 归档文件−
$ tar zxvf apache-hive-0.14.0-bin.tar.gz
$ ls
下载成功后,你会看到以下响应−
apache-hive-0.14.0-bin apache-hive-0.14.0-bin.tar.gz
Step 9: Configuring Hive
要将 Hive 与 Hadoop 配置在一起,需要编辑位于 $HIVE_HOME/conf 目录的 hive-env.sh 文件。以下命令重定向到 Hive config 文件夹并复制模板文件 −
$ cd $HIVE_HOME/conf
$ cp hive-env.sh.template hive-env.sh
通过追加以下行编辑 hive-env.sh 文件 −
export HADOOP_HOME=/usr/local/hadoop
这样,Hive 安装就完成了。现在你需要一个外部数据库服务器来配置元存储。我们使用 Apache Derby 数据库。
Step 10: Downloading and Installing Apache Derby
按照以下步骤下载并安装 Apache Derby −
Downloading Apache Derby
使用以下命令下载 Apache Derby。下载需要一些时间。
$ cd ~
$ wget http://archive.apache.org/dist/db/derby/db-derby-10.4.2.0/db-derby-10.4.2.0-bin.tar.gz
使用以下命令验证下载情况−
$ ls
下载成功后,你会看到以下响应−
db-derby-10.4.2.0-bin.tar.gz
Extracting and Verifying Derby Archive
以下命令用于提取和验证 Derby 归档文件 −
$ tar zxvf db-derby-10.4.2.0-bin.tar.gz
$ ls
下载成功后,你会看到以下响应−
db-derby-10.4.2.0-bin db-derby-10.4.2.0-bin.tar.gz
Copying Files to /usr/local/derby Directory
我们需要从超级用户 “su -” 复制文件。以下命令用于将文件从提取的目录复制到 /usr/local/derby 目录 −
$ su -
passwd:
# cd /home/user
# mv db-derby-10.4.2.0-bin /usr/local/derby
# exit
Step 11: Configuring the Hive Metastore
配置元数据存储库表示指定数据库存储在 Hive 中的位置。可以通过编辑 hive-site.xml 文件(位于 $HIVE_HOME/conf 目录中)来完成此操作。首先,使用以下命令复制模板文件 −
$ cd $HIVE_HOME/conf
$ cp hive-default.xml.template hive-site.xml
编辑 hive-site.xml ,并将以下行追加到 <configuration> 和 </configuration> 标记之间 −
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:derby://localhost:1527/metastore_db;create = true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
创建名为 jpox.properties 的文件,并向其中添加以下行 −
javax.jdo.PersistenceManagerFactoryClass = org.jpox.PersistenceManagerFactoryImpl
org.jpox.autoCreateSchema = false
org.jpox.validateTables = false
org.jpox.validateColumns = false
org.jpox.validateConstraints = false
org.jpox.storeManagerType = rdbms
org.jpox.autoCreateSchema = true
org.jpox.autoStartMechanismMode = checked
org.jpox.transactionIsolation = read_committed
javax.jdo.option.DetachAllOnCommit = true
javax.jdo.option.NontransactionalRead = true
javax.jdo.option.ConnectionDriverName = org.apache.derby.jdbc.ClientDriver
javax.jdo.option.ConnectionURL = jdbc:derby://hadoop1:1527/metastore_db;create = true
javax.jdo.option.ConnectionUserName = APP
javax.jdo.option.ConnectionPassword = mine
Step 12: Verifying Hive Installation
在运行 Hive 之前,您需要在 HDFS 中创建 /tmp 文件夹和一个单独的 Hive 文件夹。在此处,我们使用 /user/hive/warehouse 文件夹。您需要为这些新创建的文件夹设置写入权限,如下所示 −
chmod g+w
现在,在验证 Hive 之前,请在 HDFS 中设置它们。使用以下命令 −
$ $HADOOP_HOME/bin/hadoop fs -mkdir /tmp
$ $HADOOP_HOME/bin/hadoop fs -mkdir /user/hive/warehouse
$ $HADOOP_HOME/bin/hadoop fs -chmod g+w /tmp
$ $HADOOP_HOME/bin/hadoop fs -chmod g+w /user/hive/warehouse
以下命令用于验证 Hive 安装 −
$ cd $HIVE_HOME
$ bin/hive
在成功安装 Hive 后,您将看到以下响应 −
Logging initialized using configuration in
jar:file:/home/hadoop/hive-0.9.0/lib/hive-common-0.9.0.jar!/
hive-log4j.properties Hive history
=/tmp/hadoop/hive_job_log_hadoop_201312121621_1494929084.txt
………………….
hive>
您可以执行以下示例命令来显示所有表 −
hive> show tables;
OK Time taken: 2.798 seconds
hive>
Step 13: Verify HCatalog Installation
使用以下命令为 HCatalog 主目录设置系统变量 HCAT_HOME 。
export HCAT_HOME = $HiVE_HOME/HCatalog
使用以下命令验证 HCatalog 安装。
cd $HCAT_HOME/bin
./hcat
如果安装成功,您将看到以下输出 −
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
usage: hcat { -e "<query>" | -f "<filepath>" }
[ -g "<group>" ] [ -p "<perms>" ]
[ -D"<name> = <value>" ]
-D <property = value> use hadoop value for given property
-e <exec> hcat command given from command line
-f <file> hcat commands in file
-g <group> group for the db/table specified in CREATE statement
-h,--help Print help information
-p <perms> permissions for the db/table specified in CREATE statement
HCatalog - CLI
HCatalog 命令行界面 (CLI) 可以从命令 $HIVE_HOME/HCatalog/bin/hcat 中调用,其中 $HIVE_HOME 是 Hive 的主目录。 hcat 是用于初始化 HCatalog 服务器的命令。
使用以下命令初始化 HCatalog 命令行。
cd $HCAT_HOME/bin
./hcat
如果安装已正确完成,则您将获得以下输出 −
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
usage: hcat { -e "<query>" | -f "<filepath>" }
[ -g "<group>" ] [ -p "<perms>" ]
[ -D"<name> = <value>" ]
-D <property = value> use hadoop value for given property
-e <exec> hcat command given from command line
-f <file> hcat commands in file
-g <group> group for the db/table specified in CREATE statement
-h,--help Print help information
-p <perms> permissions for the db/table specified in CREATE statement
HCatalog CLI 支持这些命令行选项 −
Sr.No |
Option |
Example & Description |
1 |
-g |
hcat -g mygroup …​ 要创建的表必须具有“mygroup”组。 |
2 |
-p |
hcat -p rwxr-xr-x …​ 要创建的表必须具有读、写和执行权限。 |
3 |
-f |
hcat -f myscript.HCatalog …​ myscript.HCatalog 是一个包含要执行的 DDL 命令的脚本文件。 |
4 |
-e |
hcat -e 'create table mytable(a int);' …​ 将以下字符串视为 DDL 命令并执行它。 |
5 |
-D |
hcat -Dkey = value …​ 将键值对作为 Java 系统属性传递给 HCatalog。 |
6 |
- |
hcat Prints a usage message. |
Note −
-
-g 和 -p 选项不是必需的。
-
一次,可以提供 -e 或 -f 选项,但不是两者。
-
选项的顺序无关紧要;您可以按任何顺序指定选项。
Sr.No |
DDL Command & Description |
1 |
CREATE TABLE 使用 HCatalog 创建表。如果你使用 CLUSTERED BY 子句创建表,你将无法使用 Pig 或 MapReduce 向其中写入。 |
2 |
ALTER TABLE 支持,但 REBUILD 和 CONCATENATE 选项除外。它的行为与 Hive 中相同。 |
3 |
DROP TABLE 支持。与 Hive 相同的行为(删除完整的表和结构)。 |
4 |
CREATE/ALTER/DROP VIEW 支持。与 Hive 的行为相同。 Note − Pig 和 MapReduce 不能从视图中读取或向视图中写入数据。 |
5 |
SHOW TABLES 显示表列表。 |
6 |
SHOW PARTITIONS 显示分区列表。 |
7 |
Create/Drop Index CREATE 和 DROP FUNCTION 操作受支持,但创建的函数仍必须在 Pig 中注册并将其置于 MapReduce 的 CLASSPATH 中。 |
8 |
DESCRIBE 支持。与 Hive 相同的行为。描述结构。 |
上表中的一些命令在后续章节中进行了说明。
HCatalog - Create Table
本章介绍了如何创建表以及如何向其中插入数据。在 HCatalog 中创建表的约定与使用 Hive 创建表非常相似。
Create Table Statement
Create Table 是一个用于使用 HCatalog 在 Hive Metastore 中创建表的语句。它的语法和示例如下 −
Syntax
CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.] table_name
[(col_name data_type [COMMENT col_comment], ...)]
[COMMENT table_comment]
[ROW FORMAT row_format]
[STORED AS file_format]
Example
让我们假设您需要使用 CREATE TABLE 语句创建名为 employee 的表。下表列出了 employee 表中的字段及其数据类型 −
Sr.No |
Field Name |
Data Type |
1 |
Eid |
int |
2 |
Name |
String |
3 |
Salary |
Float |
4 |
Designation |
string |
以下数据定义了 Comment 等受支持字段、行格式字段例如 Field terminator 、 Lines terminator 和 Stored File type 。
COMMENT ‘Employee details’
FIELDS TERMINATED BY ‘\t’
LINES TERMINATED BY ‘\n’
STORED IN TEXT FILE
以下查询使用上述数据创建名为 employee 的表。
./hcat –e "CREATE TABLE IF NOT EXISTS employee ( eid int, name String,
salary String, destination String) \
COMMENT 'Employee details' \
ROW FORMAT DELIMITED \
FIELDS TERMINATED BY ‘\t’ \
LINES TERMINATED BY ‘\n’ \
STORED AS TEXTFILE;"
如果添加选项 IF NOT EXISTS ,则在表已存在的情况下,HCatalog 忽略该声明。
当表创建成功时,您可以看到以下响应:
OK
Time taken: 5.905 seconds
Load Data Statement
总体上,在 SQL 中创建一个表之后,我们可以使用 Insert 声明插入数据。但在 HCatalog 中,我们使用 LOAD DATA 声明插入数据。
向 HCatalog 插入数据时,最好使用 LOAD DATA 来存储批量记录。有两种方法可用于加载数据:一种是从 local file system ,另一种是从 Hadoop file system 。
Syntax
LOAD DATA 的语法如下:
LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename
[PARTITION (partcol1=val1, partcol2=val2 ...)]
-
LOCAL 是指定本地路径的标识符。它可选。
-
OVERWRITE 可选,可覆盖表中的数据。
-
PARTITION is optional.
Example
我们将向表中插入以下数据。它是一个文本文件,名为 sample.txt ,位于 /home/user 目录中。
1201 Gopal 45000 Technical manager
1202 Manisha 45000 Proof reader
1203 Masthanvali 40000 Technical writer
1204 Kiran 40000 Hr Admin
1205 Kranthi 30000 Op Admin
以下查询将给定的文本加载到表中。
./hcat –e "LOAD DATA LOCAL INPATH '/home/user/sample.txt'
OVERWRITE INTO TABLE employee;"
下载成功后,你会看到以下响应−
OK
Time taken: 15.905 seconds
HCatalog - Alter Table
本章介绍如何修改表的属性,例如更改表名、更改列名、添加列以及删除或替换列。
Change Statement
下表包含 employee 表的字段,并显示要更改的字段(以粗体显示)。
Field Name |
Convert from Data Type |
Change Field Name |
Convert to Data Type |
eid |
int |
eid |
int |
name |
String |
ename |
String |
salary |
Float |
salary |
Double |
designation |
String |
designation |
String |
以下查询使用上述数据重命名列名和列数据类型 −
./hcat –e "ALTER TABLE employee CHANGE name ename String;"
./hcat –e "ALTER TABLE employee CHANGE salary salary Double;"
Add Columns Statement
以下查询向 employee 表中添加一个名为 dept 的列。
./hcat –e "ALTER TABLE employee ADD COLUMNS (dept STRING COMMENT 'Department name');"
HCatalog - View
本章介绍如何在HCatalog中创建并管理 view 。数据库视图使用 CREATE VIEW 语句创建。视图可以从单个表格、多个表格或另一个视图创建。
要创建视图,用户必须根据具体实现具有适当的系统权限。
Create View Statement
CREATE VIEW 创建一个具有给定名称的视图。如果已经存在具有相同名称的表或视图,则会引发错误。可以使用 IF NOT EXISTS 来跳过该错误。
如果没有提供列名,则视图的列名将自动从 defining SELECT expression 派生。
Note − 如果 SELECT 包含未别名的标量表达式,例如 x+y,则生成的视图列名将采用 _C0、_C1 等形式。
重命名列时,还可以提供列注释。注释不会自动从基础列继承。
如果视图的 defining SELECT expression 无效,则 CREATE VIEW 语句将失败。
Syntax
CREATE VIEW [IF NOT EXISTS] [db_name.]view_name [(column_name [COMMENT column_comment], ...) ]
[COMMENT view_comment]
[TBLPROPERTIES (property_name = property_value, ...)]
AS SELECT ...;
Example
以下是员工表数据。现在让我们看看如何创建一个名为 Emp_Deg_View 的视图,其中包含薪水高于 35000 的员工的 ID、姓名、职位和薪水字段。
+------+-------------+--------+-------------------+-------+
| ID | Name | Salary | Designation | Dept |
+------+-------------+--------+-------------------+-------+
| 1201 | Gopal | 45000 | Technical manager | TP |
| 1202 | Manisha | 45000 | Proofreader | PR |
| 1203 | Masthanvali | 30000 | Technical writer | TP |
| 1204 | Kiran | 40000 | Hr Admin | HR |
| 1205 | Kranthi | 30000 | Op Admin | Admin |
+------+-------------+--------+-------------------+-------+
以下是基于以上给定数据创建视图的命令。
./hcat –e "CREATE VIEW Emp_Deg_View (salary COMMENT ' salary more than 35,000')
AS SELECT id, name, salary, designation FROM employee WHERE salary ≥ 35000;"
HCatalog - Show Tables
您通常希望列出数据库中的所有表或列出表中的所有列。显然,每个数据库都有自己列出表和列的语法。
Show Tables 语句显示所有表的名称。默认情况下,它会列出当前数据库中的表,或者在 IN 子句中,在指定数据库中列出表。
本节描述如何列出 HCatalog 中当前数据库中的所有表。
HCatalog - Show Partitions
分区是用于创建单独表格或视图的表格数据条件。SHOW PARTITIONS列出给定基础表的所有现有分区。分区按字母顺序列出。在Hive 0.6之后,还可以指定分区规范的部分以筛选结果列表。
可以使用SHOW PARTITIONS命令查看特定表格中存在的分区。本章介绍如何列出HCatalog中特定表格的分区。
Show Partitions Statement
其语法如下:
SHOW PARTITIONS table_name;
以下查询删除名为 employee 的表格−
./hcat –e "Show partitions employee;"
成功执行查询后,你会看到以下响应−
OK
Designation = IT
Time taken: 5.3 seconds
Dynamic Partition
HCatalog将表格组织成分区。这是一种基于分区列,如日期、城市和部门的值,将表格划分为相关部分的方式。使用分区,很容易查询部分数据。
例如,名为 Tab1 的表格包含员工数据,如id、姓名、部门和yoj(即参加年份)。假设你需要检索2012年加入的所有员工的详细信息。查询搜索整个表格以获取所需信息。然而,如果您使用年份对员工数据进行分区并将其存储在单独的文件中,则会减少查询处理时间。以下示例显示如何对文件及其数据进行分区 −
以下文件包含 employeedata 表格。
Adding a Partition
我们可以通过更改表格来向表格添加分区。让我们假设我们有一个名为 employee 的表格,其中包含诸如Id、姓名、工资、职务、部门和yoj等字段。
Syntax
ALTER TABLE table_name ADD [IF NOT EXISTS] PARTITION partition_spec
[LOCATION 'location1'] partition_spec [LOCATION 'location2'] ...;
partition_spec:
: (p_column = p_col_value, p_column = p_col_value, ...)
以下查询用于向 employee 表格添加分区。
./hcat –e "ALTER TABLE employee ADD PARTITION (year = '2013') location '/2012/part2012';"
HCatalog - Indexes
Creating an Index
索引实际上只是表某个特定列上的指针。创建索引是指在表的特定列上创建一个指针。其语法如下:
CREATE INDEX index_name
ON TABLE base_table_name (col_name, ...)
AS 'index.handler.class.name'
[WITH DEFERRED REBUILD]
[IDXPROPERTIES (property_name = property_value, ...)]
[IN TABLE index_table_name]
[PARTITIONED BY (col_name, ...)][
[ ROW FORMAT ...] STORED AS ...
| STORED BY ...
]
[LOCATION hdfs_path]
[TBLPROPERTIES (...)]
Example
让我们举一个例子来理解索引的概念。使用我们先前使用过的、包含字段 Id、Name、Salary、Designation 和 Dept 的相同 employee 表。在 employee 表的 salary 列上创建一个名为 index_salary 的索引。
以下查询创建了一个索引:
./hcat –e "CREATE INDEX inedx_salary ON TABLE employee(salary)
AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler';"
它是 salary 列的指针。如果该列被修改,则更改将使用索引值进行存储。
HCatalog - Reader Writer
HCatalog 包含一个数据传输 API,用于在不使用 MapReduce 的情况下实现并行输入和输出。此 API 使用表的存储和行的基本抽象从 Hadoop 集群读取数据,并向其中写入数据。
数据传输 API 主要包含三个类:
-
HCatReader - 读取 Hadoop 集群中的数据。
-
HCatWriter - 向 Hadoop 集群中写入数据。
-
DataTransferFactory - 生成读取器和写入器实例。
此 API 适合主从节点设置。让我们进一步讨论 HCatReader 和 HCatWriter 。
HCatReader
HCatReader 是 HCatalog 的一个内部抽象类,它抽象了从其检索记录的底层系统中的复杂性。
S. No. |
Method Name & Description |
1 |
Public abstract ReaderContext prepareRead() throws HCatException 应在主节点上调用此内容来获得 ReaderContext,之后应序列化并发送从属于节点。 |
2 |
Public abstract Iterator <HCatRecorder> read() throws HCaException 应在从属于节点上调用此内容来读取 HCatRecords。 |
3 |
Public Configuration getConf() 它将返回配置类对象。 |
HCatReader 类用于读取 HDFS 中的数据。阅读是一个两步过程,其中第一步发生在外部系统的 master 节点上。第二步在多个 slave 节点上并行执行。
读取在 ReadEntity 上完成。在开始读取之前,你需要定义一个 ReadEntity 用于读取。可以通过 ReadEntity.Builder 完成。你可以指定一个数据库名称、表名称、分区和过滤字符串。例如 −
ReadEntity.Builder builder = new ReadEntity.Builder();
ReadEntity entity = builder.withDatabase("mydb").withTable("mytbl").build(); 10.
上面的代码片段定义了一个 ReadEntity 对象(“entity”),它包含一个名为 mytbl 的表和一个名为 mydb 的数据库,可用于读取该表的所有行。请注意,此表必须在该操作开始之前存在于 HCatalog 中。
在定义 ReadEntity 之后,你可以使用 ReadEntity 和集群配置获取 HCatReader 的实例 −
HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
下一步是从 reader 获取一个 ReaderContext,如下所示:
ReaderContext cntxt = reader.prepareRead();
HCatWriter
该抽象是 HCatalog 内部实现。这便于从外部系统写入 HCatalog。请勿尝试直接实例化它。相反,请使用 DataTransferFactory。
Sr.No. |
Method Name & Description |
1 |
Public abstract WriterContext prepareRead() throws HCatException 外部系统应从 master 节点调用此方法一次。它返回一个 WriterContext 。应该对它进行序列化并将其发送到 slave 节点,以在该处构建 HCatWriter 。 |
2 |
Public abstract void write(Iterator<HCatRecord> recordItr) throws HCaException 此方法应在 slave 节点上使用,用于执行写操作。recordItr 是一个迭代器对象,它包含需要写入 HCatalog 中的记录集合。 |
3 |
Public abstract void abort(WriterContext cntxt) throws HCatException 此方法应在 master 节点上调用。此方法的主要目的是在出现故障的情况下进行清理。 |
4 |
public abstract void commit(WriterContext cntxt) throws HCatException 此方法应在 master 节点上调用。此方法的目的是进行元数据提交。 |
与阅读类似,写入也是一个两步过程,其中第一步发生在 master 节点上。随后,第二步在 slave 节点上并行执行。
在 WriteEntity 上执行写操作,可以按照与读取类似的方式构建 −
WriteEntity.Builder builder = new WriteEntity.Builder();
WriteEntity entity = builder.withDatabase("mydb").withTable("mytbl").build();
上面的代码创建了一个 WriteEntity 对象 entity,可以用来写到数据库 mydb 中名为 mytbl 的表。
在创建 WriteEntity 之后,下一步是获取一个 WriterContext −
HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
WriterContext info = writer.prepareWrite();
上述所有步骤都在 master 节点上发生。然后,master 节点将 WriterContext 对象序列化,并使所有 slave 可以使用它。
在 slave 节点上,你需要使用 WriterContext 获取一个 HCatWriter,如下所示:
HCatWriter writer = DataTransferFactory.getHCatWriter(context);
然后, writer 将迭代器作为写方法的参数 −
writer.write(hCatRecordItr);
然后, writer 在循环中对该迭代器调用 getNext() ,并写出附加到迭代器上的所有记录。
TestReaderWriter.java 文件用于测试 HCatreader 和 HCatWriter 类。以下程序演示如何使用 HCatReader 和 HCatWriter API 从源文件读取数据,并随后将其写入目标文件。
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hive.HCatalog.common.HCatException;
import org.apache.hive.HCatalog.data.transfer.DataTransferFactory;
import org.apache.hive.HCatalog.data.transfer.HCatReader;
import org.apache.hive.HCatalog.data.transfer.HCatWriter;
import org.apache.hive.HCatalog.data.transfer.ReadEntity;
import org.apache.hive.HCatalog.data.transfer.ReaderContext;
import org.apache.hive.HCatalog.data.transfer.WriteEntity;
import org.apache.hive.HCatalog.data.transfer.WriterContext;
import org.apache.hive.HCatalog.mapreduce.HCatBaseTest;
import org.junit.Assert;
import org.junit.Test;
public class TestReaderWriter extends HCatBaseTest {
@Test
public void test() throws MetaException, CommandNeedRetryException,
IOException, ClassNotFoundException {
driver.run("drop table mytbl");
driver.run("create table mytbl (a string, b int)");
Iterator<Entry<String, String>> itr = hiveConf.iterator();
Map<String, String> map = new HashMap<String, String>();
while (itr.hasNext()) {
Entry<String, String> kv = itr.next();
map.put(kv.getKey(), kv.getValue());
}
WriterContext cntxt = runsInMaster(map);
File writeCntxtFile = File.createTempFile("hcat-write", "temp");
writeCntxtFile.deleteOnExit();
// Serialize context.
ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(writeCntxtFile));
oos.writeObject(cntxt);
oos.flush();
oos.close();
// Now, deserialize it.
ObjectInputStream ois = new ObjectInputStream(new FileInputStream(writeCntxtFile));
cntxt = (WriterContext) ois.readObject();
ois.close();
runsInSlave(cntxt);
commit(map, true, cntxt);
ReaderContext readCntxt = runsInMaster(map, false);
File readCntxtFile = File.createTempFile("hcat-read", "temp");
readCntxtFile.deleteOnExit();
oos = new ObjectOutputStream(new FileOutputStream(readCntxtFile));
oos.writeObject(readCntxt);
oos.flush();
oos.close();
ois = new ObjectInputStream(new FileInputStream(readCntxtFile));
readCntxt = (ReaderContext) ois.readObject();
ois.close();
for (int i = 0; i < readCntxt.numSplits(); i++) {
runsInSlave(readCntxt, i);
}
}
private WriterContext runsInMaster(Map<String, String> config) throws HCatException {
WriteEntity.Builder builder = new WriteEntity.Builder();
WriteEntity entity = builder.withTable("mytbl").build();
HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
WriterContext info = writer.prepareWrite();
return info;
}
private ReaderContext runsInMaster(Map<String, String> config,
boolean bogus) throws HCatException {
ReadEntity entity = new ReadEntity.Builder().withTable("mytbl").build();
HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
ReaderContext cntxt = reader.prepareRead();
return cntxt;
}
private void runsInSlave(ReaderContext cntxt, int slaveNum) throws HCatException {
HCatReader reader = DataTransferFactory.getHCatReader(cntxt, slaveNum);
Iterator<HCatRecord> itr = reader.read();
int i = 1;
while (itr.hasNext()) {
HCatRecord read = itr.next();
HCatRecord written = getRecord(i++);
// Argh, HCatRecord doesnt implement equals()
Assert.assertTrue("Read: " + read.get(0) + "Written: " + written.get(0),
written.get(0).equals(read.get(0)));
Assert.assertTrue("Read: " + read.get(1) + "Written: " + written.get(1),
written.get(1).equals(read.get(1)));
Assert.assertEquals(2, read.size());
}
//Assert.assertFalse(itr.hasNext());
}
private void runsInSlave(WriterContext context) throws HCatException {
HCatWriter writer = DataTransferFactory.getHCatWriter(context);
writer.write(new HCatRecordItr());
}
private void commit(Map<String, String> config, boolean status,
WriterContext context) throws IOException {
WriteEntity.Builder builder = new WriteEntity.Builder();
WriteEntity entity = builder.withTable("mytbl").build();
HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
if (status) {
writer.commit(context);
} else {
writer.abort(context);
}
}
private static HCatRecord getRecord(int i) {
List<Object> list = new ArrayList<Object>(2);
list.add("Row #: " + i);
list.add(i);
return new DefaultHCatRecord(list);
}
private static class HCatRecordItr implements Iterator<HCatRecord> {
int i = 0;
@Override
public boolean hasNext() {
return i++ < 100 ? true : false;
}
@Override
public HCatRecord next() {
return getRecord(i);
}
@Override
public void remove() {
throw new RuntimeException();
}
}
}
以上程序读取 HDFS 中的数据记录,并将记录数据写入 mytable
HCatalog - Input Output Format
HCatInputFormat 和 HCatOutputFormat 用于读取 HDFS 的数据,并在处理后使用 MapReduce 作业将结果数据写入 HDFS。我们来详细介绍输入和输出格式接口。
HCatInputFormat
HCatInputFormat 用于与 MapReduce 作业一起,从 HCatalog 管理的表读取数据。HCatInputFormat 暴露出 Hadoop 0.20 MapReduce API,用于读取数据,就像已发表到表中一样。
Sr.No. |
Method Name & Description |
1 |
public static HCatInputFormat setInput(Job job, String dbName, String tableName)throws IOException 为作业设置要使用的输入。它使用给定的输入规范查询信息仓库,并将匹配的分区序列化到 MapReduce 任务的作业配置中。 |
2 |
public static HCatInputFormat setInput(Configuration conf, String dbName, String tableName) throws IOException 为作业设置要使用的输入。它使用给定的输入规范查询信息仓库,并将匹配的分区序列化到 MapReduce 任务的作业配置中。 |
3 |
public HCatInputFormat setFilter(String filter)throws IOException 设置对输入表的筛选器。 |
4 |
public HCatInputFormat setProperties(Properties properties) throws IOException 为输入格式设置属性。 |
HCatInputFormat API 包括以下方法 -
-
setInput
-
setOutputSchema
-
getTableSchema
要使用 HCatInputFormat 读取数据,请首先使用正在读取的表的必要信息实例化一个 InputJobInfo ,然后使用 InputJobInfo 调用 setInput 。
可以使用 setOutputSchema 方法包含 projection schema ,以指定输出字段。如果没有指定架构,则会返回表中的所有列。可以使用 getTableSchema 方法来确定指定输入表表的架构。
HCatOutputFormat
HCatOutputFormat 用于向 HCatalog 管理的表写入数据的 MapReduce 作业。HCatOutputFormat 暴露出 Hadoop 0.20 MapReduce API,用于将数据写入表。当 MapReduce 作业使用 HCatOutputFormat 写入输出时,将使用为表配置的默认 OutputFormat,并在作业完成后将新分区发布到表中。
Sr.No. |
Method Name & Description |
1 |
public static void setOutput (Configuration conf, Credentials credentials, OutputJobInfo outputJobInfo) throws IOException 为作业设置要写入的输出信息。它会查询元数据服务器来查找可用于该表的 StorageHandler。如果分区已发布,它会抛出一个错误。 |
2 |
public static void setSchema (Configuration conf, HCatSchema schema) throws IOException 为写入分区的数据设置架构。如果未调用此方法,则表架构将默认用于分区。 |
3 |
public RecordWriter <WritableComparable<?>, HCatRecord > getRecordWriter (TaskAttemptContext context)throws IOException, InterruptedException 获取作业的记录编写器。它使用 StorageHandler 的默认 OutputFormat 来获取记录编写器。 |
4 |
public OutputCommitter getOutputCommitter (TaskAttemptContext context) throws IOException, InterruptedException 获取此输出格式的输出提交者。它确保输出正确提交。 |
HCatOutputFormat API 包括以下方法 -
-
setOutput
-
setSchema
-
getTableSchema
HCatOutputFormat 中的第一个调用必须是 setOutput ;其他任何调用都会引发一个异常,表明输出格式未初始化。
通过 setSchema 方法指定待写入数据的架构。必须调用此方法,提供正在写入数据的架构。如果您的数据与表架构具有相同的架构,则可以使用 HCatOutputFormat.getTableSchema() 获取表架构,然后将其传递给 setSchema() 。
Example
以下 MapReduce 程序从一个表中读取数据,它假定第二列(“列 1”)中有整数,并统计找到的每个不同值的实例数。也就是说,它执行了“ select col1, count( from $table group by col1;*”的等价操作。
例如,如果第二列中的值为 {1, 1, 1, 3, 3, 5},那么程序将生成以下值和次数输出 −
1, 3
3, 2
5, 1
我们现在来看一下程序代码 −
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.HCatalog.common.HCatConstants;
import org.apache.HCatalog.data.DefaultHCatRecord;
import org.apache.HCatalog.data.HCatRecord;
import org.apache.HCatalog.data.schema.HCatSchema;
import org.apache.HCatalog.mapreduce.HCatInputFormat;
import org.apache.HCatalog.mapreduce.HCatOutputFormat;
import org.apache.HCatalog.mapreduce.InputJobInfo;
import org.apache.HCatalog.mapreduce.OutputJobInfo;
public class GroupByAge extends Configured implements Tool {
public static class Map extends Mapper<WritableComparable,
HCatRecord, IntWritable, IntWritable> {
int age;
@Override
protected void map(
WritableComparable key, HCatRecord value,
org.apache.hadoop.mapreduce.Mapper<WritableComparable,
HCatRecord, IntWritable, IntWritable>.Context context
)throws IOException, InterruptedException {
age = (Integer) value.get(1);
context.write(new IntWritable(age), new IntWritable(1));
}
}
public static class Reduce extends Reducer<IntWritable, IntWritable,
WritableComparable, HCatRecord> {
@Override
protected void reduce(
IntWritable key, java.lang.Iterable<IntWritable> values,
org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable,
WritableComparable, HCatRecord>.Context context
)throws IOException ,InterruptedException {
int sum = 0;
Iterator<IntWritable> iter = values.iterator();
while (iter.hasNext()) {
sum++;
iter.next();
}
HCatRecord record = new DefaultHCatRecord(2);
record.set(0, key.get());
record.set(1, sum);
context.write(null, record);
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
args = new GenericOptionsParser(conf, args).getRemainingArgs();
String serverUri = args[0];
String inputTableName = args[1];
String outputTableName = args[2];
String dbName = null;
String principalID = System
.getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL);
if (principalID != null)
conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID);
Job job = new Job(conf, "GroupByAge");
HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null));
// initialize HCatOutputFormat
job.setInputFormatClass(HCatInputFormat.class);
job.setJarByClass(GroupByAge.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(WritableComparable.class);
job.setOutputValueClass(DefaultHCatRecord.class);
HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));
HCatSchema s = HCatOutputFormat.getTableSchema(job);
System.err.println("INFO: output schema explicitly set for writing:" + s);
HCatOutputFormat.setSchema(job, s);
job.setOutputFormatClass(HCatOutputFormat.class);
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new GroupByAge(), args);
System.exit(exitCode);
}
}
在编译上述程序之前,您必须先下载一些 jars 并将它们添加到此应用程序的 classpath 。您需要下载所有 Hive jar 和 HCatalog jar(HCatalog-core-0.5.0.jar、hive-metastore-0.10.0.jar、libthrift-0.7.0.jar、hive-exec-0.10.0.jar、libfb303-0.7.0.jar、jdo2-api-2.3-ec.jar、slf4j-api-1.6.1.jar)。
使用以下命令将 jar 文件从 local 复制到 HDFS 并将它们添加到 classpath 。
bin/hadoop fs -copyFromLocal $HCAT_HOME/share/HCatalog/HCatalog-core-0.5.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp
export LIB_JARS=hdfs:///tmp/HCatalog-core-0.5.0.jar,
hdfs:///tmp/hive-metastore-0.10.0.jar,
hdfs:///tmp/libthrift-0.7.0.jar,
hdfs:///tmp/hive-exec-0.10.0.jar,
hdfs:///tmp/libfb303-0.7.0.jar,
hdfs:///tmp/jdo2-api-2.3-ec.jar,
hdfs:///tmp/slf4j-api-1.6.1.jar
使用以下命令编译并执行给定的程序。
$HADOOP_HOME/bin/hadoop jar GroupByAge tmp/hive
现在,检查输出目录 (hdfs: user/tmp/hive) 以查看输出 (part_0000, part_0001)。
HCatalog - Loader & Storer
HCatLoader 和 HCatStorer API 与 Pig 脚本一起用于读写 HCatalog 管理的表中的数据。这些接口不需要 HCatalog 特定的设置。
最好了解一些 Apache Pig 脚本的知识,才能更好地理解本章。更多参考信息,请参阅我们的 Apache Pig 教程。
HCatloader
HCatLoader 与 Pig 脚本一起用于从 HCatalog 管理的表中读取数据。使用以下语法以使用 HCatloader 将数据加载到 HDFS 中。
A = LOAD 'tablename' USING org.apache.HCatalog.pig.HCatLoader();
您必须用单引号指定表名: LOAD 'tablename' 。如果您正在使用非默认数据库,那么您必须将您的输入指定为 ' dbname.tablename' 。
Hive 元存储允许您在不指定数据库的情况下创建表。如果您以这种方式创建表,那么数据库名称为 'default' ,并且在为 HCatLoader 指定表时不需要该名称。
下表包含 HCatloader 类的重要方法及其描述。
Sr.No. |
Method Name & Description |
1 |
public InputFormat<?,?> getInputFormat()throws IOException 使用 HCatloader 类读取加载数据的输入格式。 |
2 |
public String relativeToAbsolutePath(String location, Path curDir) throws IOException 返回 Absolute path 的字符串格式。 |
3 |
public void setLocation(String location, Job job) throws IOException 设置可执行作业的位置。 |
4 |
public Tuple getNext() throws IOException 从循环中返回当前元组 ( key 和 value )。 |
HCatStorer
HCatStorer 与 Pig 脚本一起用于将数据写入 HCatalog 管理的表中。对于存储操作,使用以下语法。
A = LOAD ...
B = FOREACH A ...
...
...
my_processed_data = ...
STORE my_processed_data INTO 'tablename' USING org.apache.HCatalog.pig.HCatStorer();
您必须用单引号指定表名: LOAD 'tablename' 。在运行您的 Pig 脚本之前,必须创建数据库和表。如果您正在使用非默认数据库,那么您必须将您的输入指定为 'dbname.tablename' 。
Hive 元存储允许您在不指定数据库的情况下创建表。如果您以这种方式创建表,那么数据库名称为 'default' ,您不需要在 store 语句中指定数据库名称。
对于 USING 语句,您可以有一个表示分区键值对的字符串参数。当您写入分区表而分区列不在输出列中时,这是一个强制参数。分区键的值不应加引号。
下表包含 HCatStorer 类的重要方法及其说明。
Sr.No. |
Method Name & Description |
1 |
public OutputFormat getOutputFormat() throws IOException 使用 HCatStorer 类读取存储数据的输出格式。 |
2 |
public void setStoreLocation (String location, Job job) throws IOException 设置执行此 store 应用程序的位置。 |
3 |
public void storeSchema (ResourceSchema schema, String arg1, Job job) throws IOException 存储架构。 |
4 |
public void prepareToWrite (RecordWriter writer) throws IOException 可帮助使用 RecordWriter 将数据写入特定文件。 |
5 |
public void putNext (Tuple tuple) throws IOException 将元组数据写入文件。 |
Running Pig with HCatalog
Pig 不会自动获取 HCatalog jar。若要引入必要的 jar,可使用 Pig 命令中的标志或设置 PIG_CLASSPATH 和 PIG_OPTS 环境变量,如下所示。
若要引入用于处理 HCatalog 的适当 jar,只需包含以下标志 −
pig –useHCatalog <Sample pig scripts file>
Setting the CLASSPATH for Execution
使用以下 CLASSPATH 设置同步 HCatalog 与 Apache Pig。
export HADOOP_HOME = <path_to_hadoop_install>
export HIVE_HOME = <path_to_hive_install>
export HCAT_HOME = <path_to_hcat_install>
export PIG_CLASSPATH = $HCAT_HOME/share/HCatalog/HCatalog-core*.jar:\
$HCAT_HOME/share/HCatalog/HCatalog-pig-adapter*.jar:\
$HIVE_HOME/lib/hive-metastore-*.jar:$HIVE_HOME/lib/libthrift-*.jar:\
$HIVE_HOME/lib/hive-exec-*.jar:$HIVE_HOME/lib/libfb303-*.jar:\
$HIVE_HOME/lib/jdo2-api-*-ec.jar:$HIVE_HOME/conf:$HADOOP_HOME/conf:\
$HIVE_HOME/lib/slf4j-api-*.jar
Example
假设我们在 HDFS 中有一个文件 student_details.txt ,内容如下。
student_details.txt
001, Rajiv, Reddy, 21, 9848022337, Hyderabad
002, siddarth, Battacharya, 22, 9848022338, Kolkata
003, Rajesh, Khanna, 22, 9848022339, Delhi
004, Preethi, Agarwal, 21, 9848022330, Pune
005, Trupthi, Mohanthy, 23, 9848022336, Bhuwaneshwar
006, Archana, Mishra, 23, 9848022335, Chennai
007, Komal, Nayak, 24, 9848022334, trivendram
008, Bharathi, Nambiayar, 24, 9848022333, Chennai
我们还有同一个 HDFS 目录中的一个样例脚本,名为 sample_script.pig 。该文件包含对 student 关系执行操作和转换的语句,如下所示。
student = LOAD 'hdfs://localhost:9000/pig_data/student_details.txt' USING
PigStorage(',') as (id:int, firstname:chararray, lastname:chararray,
phone:chararray, city:chararray);
student_order = ORDER student BY age DESC;
STORE student_order INTO 'student_order_table' USING org.apache.HCatalog.pig.HCatStorer();
student_limit = LIMIT student_order 4;
Dump student_limit;
-
该脚本的第一条语句将以 student_details.txt 命名的文件中数据载入到名为 student 的关系中。
-
该脚本的第二条语句将按照年龄对该关系的元组进行降序排列,并将其存储为 student_order 。
-
第三个语句将已处理数据 student_order 结果存储在名为 student_order_table 的单独表中。
-
脚本的第四个语句将 student_order 的前四个元组存储为 student_limit 。
-
最后,第五个语句将转储关系 student_limit 的内容。
现在让我们按照如下所示执行 sample_script.pig 。
$./pig -useHCatalog hdfs://localhost:9000/pig_data/sample_script.pig
现在,检查输出目录 (hdfs: user/tmp/hive) 以查看输出 (part_0000, part_0001)。