Hcatalog 简明教程
HCatalog - Input Output Format
HCatInputFormat 和 HCatOutputFormat 用于读取 HDFS 的数据,并在处理后使用 MapReduce 作业将结果数据写入 HDFS。我们来详细介绍输入和输出格式接口。
HCatInputFormat
HCatInputFormat 用于与 MapReduce 作业一起,从 HCatalog 管理的表读取数据。HCatInputFormat 暴露出 Hadoop 0.20 MapReduce API,用于读取数据,就像已发表到表中一样。
Sr.No. |
Method Name & Description |
1 |
public static HCatInputFormat setInput(Job job, String dbName, String tableName)throws IOException 为作业设置要使用的输入。它使用给定的输入规范查询信息仓库,并将匹配的分区序列化到 MapReduce 任务的作业配置中。 |
2 |
public static HCatInputFormat setInput(Configuration conf, String dbName, String tableName) throws IOException 为作业设置要使用的输入。它使用给定的输入规范查询信息仓库,并将匹配的分区序列化到 MapReduce 任务的作业配置中。 |
3 |
public HCatInputFormat setFilter(String filter)throws IOException 设置对输入表的筛选器。 |
4 |
public HCatInputFormat setProperties(Properties properties) throws IOException 为输入格式设置属性。 |
HCatInputFormat API 包括以下方法 −
-
setInput
-
setOutputSchema
-
getTableSchema
要使用 HCatInputFormat 读取数据,请首先使用正在读取的表的必要信息实例化一个 InputJobInfo ,然后使用 InputJobInfo 调用 setInput 。
可以使用 setOutputSchema 方法包含 projection schema ,以指定输出字段。如果没有指定架构,则会返回表中的所有列。可以使用 getTableSchema 方法来确定指定输入表表的架构。
HCatOutputFormat
HCatOutputFormat 用于向 HCatalog 管理的表写入数据的 MapReduce 作业。HCatOutputFormat 暴露出 Hadoop 0.20 MapReduce API,用于将数据写入表。当 MapReduce 作业使用 HCatOutputFormat 写入输出时,将使用为表配置的默认 OutputFormat,并在作业完成后将新分区发布到表中。
Sr.No. |
Method Name & Description |
1 |
public static void setOutput (Configuration conf, Credentials credentials, OutputJobInfo outputJobInfo) throws IOException 为作业设置要写入的输出信息。它会查询元数据服务器来查找可用于该表的 StorageHandler。如果分区已发布,它会抛出一个错误。 |
2 |
public static void setSchema (Configuration conf, HCatSchema schema) throws IOException 为写入分区的数据设置架构。如果未调用此方法,则表架构将默认用于分区。 |
3 |
public RecordWriter <WritableComparable<?>, HCatRecord > getRecordWriter (TaskAttemptContext context)throws IOException, InterruptedException 获取作业的记录编写器。它使用 StorageHandler 的默认 OutputFormat 来获取记录编写器。 |
4 |
public OutputCommitter getOutputCommitter (TaskAttemptContext context) throws IOException, InterruptedException 获取此输出格式的输出提交者。它确保输出正确提交。 |
HCatOutputFormat API 包括以下方法 -
-
setOutput
-
setSchema
-
getTableSchema
HCatOutputFormat 中的第一个调用必须是 setOutput ;其他任何调用都会引发一个异常,表明输出格式未初始化。
通过 setSchema 方法指定待写入数据的架构。必须调用此方法,提供正在写入数据的架构。如果您的数据与表架构具有相同的架构,则可以使用 HCatOutputFormat.getTableSchema() 获取表架构,然后将其传递给 setSchema() 。
Example
以下 MapReduce 程序从一个表中读取数据,它假定第二列(“列 1”)中有整数,并统计找到的每个不同值的实例数。也就是说,它执行了“ select col1, count( from $table group by col1;*”的等价操作。
例如,如果第二列中的值为 {1, 1, 1, 3, 3, 5},那么程序将生成以下值和次数输出 −
1, 3
3, 2
5, 1
我们现在来看一下程序代码 −
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.HCatalog.common.HCatConstants;
import org.apache.HCatalog.data.DefaultHCatRecord;
import org.apache.HCatalog.data.HCatRecord;
import org.apache.HCatalog.data.schema.HCatSchema;
import org.apache.HCatalog.mapreduce.HCatInputFormat;
import org.apache.HCatalog.mapreduce.HCatOutputFormat;
import org.apache.HCatalog.mapreduce.InputJobInfo;
import org.apache.HCatalog.mapreduce.OutputJobInfo;
public class GroupByAge extends Configured implements Tool {
public static class Map extends Mapper<WritableComparable,
HCatRecord, IntWritable, IntWritable> {
int age;
@Override
protected void map(
WritableComparable key, HCatRecord value,
org.apache.hadoop.mapreduce.Mapper<WritableComparable,
HCatRecord, IntWritable, IntWritable>.Context context
)throws IOException, InterruptedException {
age = (Integer) value.get(1);
context.write(new IntWritable(age), new IntWritable(1));
}
}
public static class Reduce extends Reducer<IntWritable, IntWritable,
WritableComparable, HCatRecord> {
@Override
protected void reduce(
IntWritable key, java.lang.Iterable<IntWritable> values,
org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable,
WritableComparable, HCatRecord>.Context context
)throws IOException ,InterruptedException {
int sum = 0;
Iterator<IntWritable> iter = values.iterator();
while (iter.hasNext()) {
sum++;
iter.next();
}
HCatRecord record = new DefaultHCatRecord(2);
record.set(0, key.get());
record.set(1, sum);
context.write(null, record);
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
args = new GenericOptionsParser(conf, args).getRemainingArgs();
String serverUri = args[0];
String inputTableName = args[1];
String outputTableName = args[2];
String dbName = null;
String principalID = System
.getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL);
if (principalID != null)
conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID);
Job job = new Job(conf, "GroupByAge");
HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null));
// initialize HCatOutputFormat
job.setInputFormatClass(HCatInputFormat.class);
job.setJarByClass(GroupByAge.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(WritableComparable.class);
job.setOutputValueClass(DefaultHCatRecord.class);
HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));
HCatSchema s = HCatOutputFormat.getTableSchema(job);
System.err.println("INFO: output schema explicitly set for writing:" + s);
HCatOutputFormat.setSchema(job, s);
job.setOutputFormatClass(HCatOutputFormat.class);
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new GroupByAge(), args);
System.exit(exitCode);
}
}
在编译上述程序之前,您必须先下载一些 jars 并将它们添加到此应用程序的 classpath 。您需要下载所有 Hive jar 和 HCatalog jar(HCatalog-core-0.5.0.jar、hive-metastore-0.10.0.jar、libthrift-0.7.0.jar、hive-exec-0.10.0.jar、libfb303-0.7.0.jar、jdo2-api-2.3-ec.jar、slf4j-api-1.6.1.jar)。
使用以下命令将 jar 文件从 local 复制到 HDFS 并将它们添加到 classpath 。
bin/hadoop fs -copyFromLocal $HCAT_HOME/share/HCatalog/HCatalog-core-0.5.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp
export LIB_JARS=hdfs:///tmp/HCatalog-core-0.5.0.jar,
hdfs:///tmp/hive-metastore-0.10.0.jar,
hdfs:///tmp/libthrift-0.7.0.jar,
hdfs:///tmp/hive-exec-0.10.0.jar,
hdfs:///tmp/libfb303-0.7.0.jar,
hdfs:///tmp/jdo2-api-2.3-ec.jar,
hdfs:///tmp/slf4j-api-1.6.1.jar
使用以下命令编译并执行给定的程序。
$HADOOP_HOME/bin/hadoop jar GroupByAge tmp/hive
现在,检查输出目录 (hdfs: user/tmp/hive) 以查看输出 (part_0000, part_0001)。