Hcatalog 简明教程

HCatalog - Input Output Format

HCatInputFormatHCatOutputFormat 用于读取 HDFS 的数据,并在处理后使用 MapReduce 作业将结果数据写入 HDFS。我们来详细介绍输入和输出格式接口。

HCatInputFormat

HCatInputFormat 用于与 MapReduce 作业一起,从 HCatalog 管理的表读取数据。HCatInputFormat 暴露出 Hadoop 0.20 MapReduce API,用于读取数据,就像已发表到表中一样。

Sr.No.

Method Name & Description

1

public static HCatInputFormat setInput(Job job, String dbName, String tableName)throws IOException 为作业设置要使用的输入。它使用给定的输入规范查询信息仓库,并将匹配的分区序列化到 MapReduce 任务的作业配置中。

2

public static HCatInputFormat setInput(Configuration conf, String dbName, String tableName) throws IOException 为作业设置要使用的输入。它使用给定的输入规范查询信息仓库,并将匹配的分区序列化到 MapReduce 任务的作业配置中。

3

public HCatInputFormat setFilter(String filter)throws IOException 设置对输入表的筛选器。

4

public HCatInputFormat setProperties(Properties properties) throws IOException 为输入格式设置属性。

HCatInputFormat API 包括以下方法 −

  1. setInput

  2. setOutputSchema

  3. getTableSchema

要使用 HCatInputFormat 读取数据,请首先使用正在读取的表的必要信息实例化一个 InputJobInfo ,然后使用 InputJobInfo 调用 setInput

可以使用 setOutputSchema 方法包含 projection schema ,以指定输出字段。如果没有指定架构,则会返回表中的所有列。可以使用 getTableSchema 方法来确定指定输入表表的架构。

HCatOutputFormat

HCatOutputFormat 用于向 HCatalog 管理的表写入数据的 MapReduce 作业。HCatOutputFormat 暴露出 Hadoop 0.20 MapReduce API,用于将数据写入表。当 MapReduce 作业使用 HCatOutputFormat 写入输出时,将使用为表配置的默认 OutputFormat,并在作业完成后将新分区发布到表中。

Sr.No.

Method Name & Description

1

public static void setOutput (Configuration conf, Credentials credentials, OutputJobInfo outputJobInfo) throws IOException 为作业设置要写入的输出信息。它会查询元数据服务器来查找可用于该表的 StorageHandler。如果分区已发布,它会抛出一个错误。

2

public static void setSchema (Configuration conf, HCatSchema schema) throws IOException 为写入分区的数据设置架构。如果未调用此方法,则表架构将默认用于分区。

3

public RecordWriter <WritableComparable<?>, HCatRecord > getRecordWriter (TaskAttemptContext context)throws IOException, InterruptedException 获取作业的记录编写器。它使用 StorageHandler 的默认 OutputFormat 来获取记录编写器。

4

public OutputCommitter getOutputCommitter (TaskAttemptContext context) throws IOException, InterruptedException 获取此输出格式的输出提交者。它确保输出正确提交。

HCatOutputFormat API 包括以下方法 -

  1. setOutput

  2. setSchema

  3. getTableSchema

HCatOutputFormat 中的第一个调用必须是 setOutput ;其他任何调用都会引发一个异常,表明输出格式未初始化。

通过 setSchema 方法指定待写入数据的架构。必须调用此方法,提供正在写入数据的架构。如果您的数据与表架构具有相同的架构,则可以使用 HCatOutputFormat.getTableSchema() 获取表架构,然后将其传递给 setSchema()

Example

以下 MapReduce 程序从一个表中读取数据,它假定第二列(“列 1”)中有整数,并统计找到的每个不同值的实例数。也就是说,它执行了“ select col1, count( from $table group by col1;*”的等价操作。

例如,如果第二列中的值为 {1, 1, 1, 3, 3, 5},那么程序将生成以下值和次数输出 −

1, 3
3, 2
5, 1

我们现在来看一下程序代码 −

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.apache.HCatalog.common.HCatConstants;
import org.apache.HCatalog.data.DefaultHCatRecord;
import org.apache.HCatalog.data.HCatRecord;
import org.apache.HCatalog.data.schema.HCatSchema;

import org.apache.HCatalog.mapreduce.HCatInputFormat;
import org.apache.HCatalog.mapreduce.HCatOutputFormat;
import org.apache.HCatalog.mapreduce.InputJobInfo;
import org.apache.HCatalog.mapreduce.OutputJobInfo;

public class GroupByAge extends Configured implements Tool {

   public static class Map extends Mapper<WritableComparable,
      HCatRecord, IntWritable, IntWritable> {
      int age;

      @Override
      protected void map(
         WritableComparable key, HCatRecord value,
         org.apache.hadoop.mapreduce.Mapper<WritableComparable,
         HCatRecord, IntWritable, IntWritable>.Context context
      )throws IOException, InterruptedException {
         age = (Integer) value.get(1);
         context.write(new IntWritable(age), new IntWritable(1));
      }
   }

   public static class Reduce extends Reducer<IntWritable, IntWritable,
      WritableComparable, HCatRecord> {
      @Override
      protected void reduce(
         IntWritable key, java.lang.Iterable<IntWritable> values,
         org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable,
         WritableComparable, HCatRecord>.Context context
      )throws IOException ,InterruptedException {
         int sum = 0;
         Iterator<IntWritable> iter = values.iterator();

         while (iter.hasNext()) {
            sum++;
            iter.next();
         }

         HCatRecord record = new DefaultHCatRecord(2);
         record.set(0, key.get());
         record.set(1, sum);
         context.write(null, record);
      }
   }

   public int run(String[] args) throws Exception {
      Configuration conf = getConf();
      args = new GenericOptionsParser(conf, args).getRemainingArgs();

      String serverUri = args[0];
      String inputTableName = args[1];
      String outputTableName = args[2];
      String dbName = null;
      String principalID = System

      .getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL);
      if (principalID != null)
      conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID);
      Job job = new Job(conf, "GroupByAge");
      HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null));

      // initialize HCatOutputFormat
      job.setInputFormatClass(HCatInputFormat.class);
      job.setJarByClass(GroupByAge.class);
      job.setMapperClass(Map.class);
      job.setReducerClass(Reduce.class);

      job.setMapOutputKeyClass(IntWritable.class);
      job.setMapOutputValueClass(IntWritable.class);
      job.setOutputKeyClass(WritableComparable.class);
      job.setOutputValueClass(DefaultHCatRecord.class);

      HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));
      HCatSchema s = HCatOutputFormat.getTableSchema(job);
      System.err.println("INFO: output schema explicitly set for writing:" + s);
      HCatOutputFormat.setSchema(job, s);
      job.setOutputFormatClass(HCatOutputFormat.class);
      return (job.waitForCompletion(true) ? 0 : 1);
   }

   public static void main(String[] args) throws Exception {
      int exitCode = ToolRunner.run(new GroupByAge(), args);
      System.exit(exitCode);
   }
}

在编译上述程序之前,您必须先下载一些 jars 并将它们添加到此应用程序的 classpath 。您需要下载所有 Hive jar 和 HCatalog jar(HCatalog-core-0.5.0.jar、hive-metastore-0.10.0.jar、libthrift-0.7.0.jar、hive-exec-0.10.0.jar、libfb303-0.7.0.jar、jdo2-api-2.3-ec.jar、slf4j-api-1.6.1.jar)。

使用以下命令将 jar 文件从 local 复制到 HDFS 并将它们添加到 classpath

bin/hadoop fs -copyFromLocal $HCAT_HOME/share/HCatalog/HCatalog-core-0.5.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp

export LIB_JARS=hdfs:///tmp/HCatalog-core-0.5.0.jar,
hdfs:///tmp/hive-metastore-0.10.0.jar,
hdfs:///tmp/libthrift-0.7.0.jar,
hdfs:///tmp/hive-exec-0.10.0.jar,
hdfs:///tmp/libfb303-0.7.0.jar,
hdfs:///tmp/jdo2-api-2.3-ec.jar,
hdfs:///tmp/slf4j-api-1.6.1.jar

使用以下命令编译并执行给定的程序。

$HADOOP_HOME/bin/hadoop jar GroupByAge tmp/hive

现在,检查输出目录 (hdfs: user/tmp/hive) 以查看输出 (part_0000, part_0001)。