Hcatalog 简明教程
HCatalog - Reader Writer
HCatalog 包含一个数据传输 API,用于在不使用 MapReduce 的情况下实现并行输入和输出。此 API 使用表的存储和行的基本抽象从 Hadoop 集群读取数据,并向其中写入数据。
HCatalog contains a data transfer API for parallel input and output without using MapReduce. This API uses a basic storage abstraction of tables and rows to read data from Hadoop cluster and write data into it.
数据传输 API 主要包含三个类:
The Data Transfer API contains mainly three classes; those are −
-
HCatReader − Reads data from a Hadoop cluster.
-
HCatWriter − Writes data into a Hadoop cluster.
-
DataTransferFactory − Generates reader and writer instances.
此 API 适合主从节点设置。让我们进一步讨论 HCatReader 和 HCatWriter 。
This API is suitable for master-slave node setup. Let us discuss more on HCatReader and HCatWriter.
HCatReader
HCatReader 是 HCatalog 的一个内部抽象类,它抽象了从其检索记录的底层系统中的复杂性。
HCatReader is an abstract class internal to HCatalog and abstracts away the complexities of the underlying system from where the records are to be retrieved.
Sr.No. |
Method Name & Description |
1 |
Public abstract ReaderContext prepareRead() throws HCatException This should be called at master node to obtain ReaderContext which then should be serialized and sent slave nodes. |
2 |
Public abstract Iterator <HCatRecorder> read() throws HCaException This should be called at slaves nodes to read HCatRecords. |
3 |
Public Configuration getConf() It will return the configuration class object. |
HCatReader 类用于读取 HDFS 中的数据。阅读是一个两步过程,其中第一步发生在外部系统的 master 节点上。第二步在多个 slave 节点上并行执行。
The HCatReader class is used to read the data from HDFS. Reading is a two-step process in which the first step occurs on the master node of an external system. The second step is carried out in parallel on multiple slave nodes.
读取在 ReadEntity 上完成。在开始读取之前,你需要定义一个 ReadEntity 用于读取。可以通过 ReadEntity.Builder 完成。你可以指定一个数据库名称、表名称、分区和过滤字符串。例如 −
Reads are done on a ReadEntity. Before you start to read, you need to define a ReadEntity from which to read. This can be done through ReadEntity.Builder. You can specify a database name, table name, partition, and filter string. For example −
ReadEntity.Builder builder = new ReadEntity.Builder();
ReadEntity entity = builder.withDatabase("mydb").withTable("mytbl").build(); 10.
上面的代码片段定义了一个 ReadEntity 对象(“entity”),它包含一个名为 mytbl 的表和一个名为 mydb 的数据库,可用于读取该表的所有行。请注意,此表必须在该操作开始之前存在于 HCatalog 中。
The above code snippet defines a ReadEntity object (“entity”), comprising a table named mytbl in a database named mydb, which can be used to read all the rows of this table. Note that this table must exist in HCatalog prior to the start of this operation.
在定义 ReadEntity 之后,你可以使用 ReadEntity 和集群配置获取 HCatReader 的实例 −
After defining a ReadEntity, you obtain an instance of HCatReader using the ReadEntity and cluster configuration −
HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
下一步是从 reader 获取一个 ReaderContext,如下所示:
The next step is to obtain a ReaderContext from reader as follows −
ReaderContext cntxt = reader.prepareRead();
HCatWriter
该抽象是 HCatalog 内部实现。这便于从外部系统写入 HCatalog。请勿尝试直接实例化它。相反,请使用 DataTransferFactory。
This abstraction is internal to HCatalog. This is to facilitate writing to HCatalog from external systems. Don’t try to instantiate this directly. Instead, use DataTransferFactory.
Sr.No. |
Method Name & Description |
1 |
Public abstract WriterContext prepareRead() throws HCatException External system should invoke this method exactly once from a master node. It returns a WriterContext. This should be serialized and sent to slave nodes to construct HCatWriter there. |
2 |
Public abstract void write(Iterator<HCatRecord> recordItr) throws HCaException This method should be used at slave nodes to perform writes. The recordItr is an iterator object that contains the collection of records to be written into HCatalog. |
3 |
Public abstract void abort(WriterContext cntxt) throws HCatException This method should be called at the master node. The primary purpose of this method is to do cleanups in case of failures. |
4 |
public abstract void commit(WriterContext cntxt) throws HCatException This method should be called at the master node. The purpose of this method is to do metadata commit. |
与阅读类似,写入也是一个两步过程,其中第一步发生在 master 节点上。随后,第二步在 slave 节点上并行执行。
Similar to reading, writing is also a two-step process in which the first step occurs on the master node. Subsequently, the second step occurs in parallel on slave nodes.
在 WriteEntity 上执行写操作,可以按照与读取类似的方式构建 −
Writes are done on a WriteEntity which can be constructed in a fashion similar to reads −
WriteEntity.Builder builder = new WriteEntity.Builder();
WriteEntity entity = builder.withDatabase("mydb").withTable("mytbl").build();
上面的代码创建了一个 WriteEntity 对象 entity,可以用来写到数据库 mydb 中名为 mytbl 的表。
The above code creates a WriteEntity object entity which can be used to write into a table named mytbl in the database mydb.
在创建 WriteEntity 之后,下一步是获取一个 WriterContext −
After creating a WriteEntity, the next step is to obtain a WriterContext −
HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
WriterContext info = writer.prepareWrite();
上述所有步骤都在 master 节点上发生。然后,master 节点将 WriterContext 对象序列化,并使所有 slave 可以使用它。
All of the above steps occur on the master node. The master node then serializes the WriterContext object and makes it available to all the slaves.
在 slave 节点上,你需要使用 WriterContext 获取一个 HCatWriter,如下所示:
On slave nodes, you need to obtain an HCatWriter using WriterContext as follows −
HCatWriter writer = DataTransferFactory.getHCatWriter(context);
然后, writer 将迭代器作为写方法的参数 −
Then, the writer takes an iterator as the argument for the write method −
writer.write(hCatRecordItr);
然后, writer 在循环中对该迭代器调用 getNext() ,并写出附加到迭代器上的所有记录。
The writer then calls getNext() on this iterator in a loop and writes out all the records attached to the iterator.
TestReaderWriter.java 文件用于测试 HCatreader 和 HCatWriter 类。以下程序演示如何使用 HCatReader 和 HCatWriter API 从源文件读取数据,并随后将其写入目标文件。
The TestReaderWriter.java file is used to test the HCatreader and HCatWriter classes. The following program demonstrates how to use HCatReader and HCatWriter API to read data from a source file and subsequently write it onto a destination file.
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hive.HCatalog.common.HCatException;
import org.apache.hive.HCatalog.data.transfer.DataTransferFactory;
import org.apache.hive.HCatalog.data.transfer.HCatReader;
import org.apache.hive.HCatalog.data.transfer.HCatWriter;
import org.apache.hive.HCatalog.data.transfer.ReadEntity;
import org.apache.hive.HCatalog.data.transfer.ReaderContext;
import org.apache.hive.HCatalog.data.transfer.WriteEntity;
import org.apache.hive.HCatalog.data.transfer.WriterContext;
import org.apache.hive.HCatalog.mapreduce.HCatBaseTest;
import org.junit.Assert;
import org.junit.Test;
public class TestReaderWriter extends HCatBaseTest {
@Test
public void test() throws MetaException, CommandNeedRetryException,
IOException, ClassNotFoundException {
driver.run("drop table mytbl");
driver.run("create table mytbl (a string, b int)");
Iterator<Entry<String, String>> itr = hiveConf.iterator();
Map<String, String> map = new HashMap<String, String>();
while (itr.hasNext()) {
Entry<String, String> kv = itr.next();
map.put(kv.getKey(), kv.getValue());
}
WriterContext cntxt = runsInMaster(map);
File writeCntxtFile = File.createTempFile("hcat-write", "temp");
writeCntxtFile.deleteOnExit();
// Serialize context.
ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(writeCntxtFile));
oos.writeObject(cntxt);
oos.flush();
oos.close();
// Now, deserialize it.
ObjectInputStream ois = new ObjectInputStream(new FileInputStream(writeCntxtFile));
cntxt = (WriterContext) ois.readObject();
ois.close();
runsInSlave(cntxt);
commit(map, true, cntxt);
ReaderContext readCntxt = runsInMaster(map, false);
File readCntxtFile = File.createTempFile("hcat-read", "temp");
readCntxtFile.deleteOnExit();
oos = new ObjectOutputStream(new FileOutputStream(readCntxtFile));
oos.writeObject(readCntxt);
oos.flush();
oos.close();
ois = new ObjectInputStream(new FileInputStream(readCntxtFile));
readCntxt = (ReaderContext) ois.readObject();
ois.close();
for (int i = 0; i < readCntxt.numSplits(); i++) {
runsInSlave(readCntxt, i);
}
}
private WriterContext runsInMaster(Map<String, String> config) throws HCatException {
WriteEntity.Builder builder = new WriteEntity.Builder();
WriteEntity entity = builder.withTable("mytbl").build();
HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
WriterContext info = writer.prepareWrite();
return info;
}
private ReaderContext runsInMaster(Map<String, String> config,
boolean bogus) throws HCatException {
ReadEntity entity = new ReadEntity.Builder().withTable("mytbl").build();
HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
ReaderContext cntxt = reader.prepareRead();
return cntxt;
}
private void runsInSlave(ReaderContext cntxt, int slaveNum) throws HCatException {
HCatReader reader = DataTransferFactory.getHCatReader(cntxt, slaveNum);
Iterator<HCatRecord> itr = reader.read();
int i = 1;
while (itr.hasNext()) {
HCatRecord read = itr.next();
HCatRecord written = getRecord(i++);
// Argh, HCatRecord doesnt implement equals()
Assert.assertTrue("Read: " + read.get(0) + "Written: " + written.get(0),
written.get(0).equals(read.get(0)));
Assert.assertTrue("Read: " + read.get(1) + "Written: " + written.get(1),
written.get(1).equals(read.get(1)));
Assert.assertEquals(2, read.size());
}
//Assert.assertFalse(itr.hasNext());
}
private void runsInSlave(WriterContext context) throws HCatException {
HCatWriter writer = DataTransferFactory.getHCatWriter(context);
writer.write(new HCatRecordItr());
}
private void commit(Map<String, String> config, boolean status,
WriterContext context) throws IOException {
WriteEntity.Builder builder = new WriteEntity.Builder();
WriteEntity entity = builder.withTable("mytbl").build();
HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
if (status) {
writer.commit(context);
} else {
writer.abort(context);
}
}
private static HCatRecord getRecord(int i) {
List<Object> list = new ArrayList<Object>(2);
list.add("Row #: " + i);
list.add(i);
return new DefaultHCatRecord(list);
}
private static class HCatRecordItr implements Iterator<HCatRecord> {
int i = 0;
@Override
public boolean hasNext() {
return i++ < 100 ? true : false;
}
@Override
public HCatRecord next() {
return getRecord(i);
}
@Override
public void remove() {
throw new RuntimeException();
}
}
}
以上程序读取 HDFS 中的数据记录,并将记录数据写入 mytable
The above program reads the data from the HDFS in the form of records and writes the record data into mytable