001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.tool;
019
020import java.io.IOException;
021import java.lang.reflect.Constructor;
022import java.lang.reflect.InvocationTargetException;
023import java.lang.reflect.Method;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.mapred.JobConf;
026import org.apache.hadoop.mapred.MiniMRCluster;
027import org.apache.hadoop.mapreduce.Job;
028import org.apache.hadoop.mapreduce.JobContext;
029import org.apache.hadoop.mapreduce.JobID;
030
031/**
032 * This class provides shims for HBase to interact with the Hadoop 1.0.x and the Hadoop 0.23.x
033 * series. NOTE: No testing done against 0.22.x, or 0.21.x.
034 */
035abstract public class MapreduceTestingShim {
036  private static MapreduceTestingShim instance;
037  private static Class[] emptyParam = new Class[] {};
038
039  static {
040    try {
041      // This class exists in hadoop 0.22+ but not in Hadoop 20.x/1.x
042      Class c = Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
043      instance = new MapreduceV2Shim();
044    } catch (Exception e) {
045      instance = new MapreduceV1Shim();
046    }
047  }
048
049  abstract public JobContext newJobContext(Configuration jobConf) throws IOException;
050
051  abstract public Job newJob(Configuration conf) throws IOException;
052
053  abstract public JobConf obtainJobConf(MiniMRCluster cluster);
054
055  abstract public String obtainMROutputDirProp();
056
057  public static JobContext createJobContext(Configuration jobConf) throws IOException {
058    return instance.newJobContext(jobConf);
059  }
060
061  public static JobConf getJobConf(MiniMRCluster cluster) {
062    return instance.obtainJobConf(cluster);
063  }
064
065  public static Job createJob(Configuration conf) throws IOException {
066    return instance.newJob(conf);
067  }
068
069  public static String getMROutputDirProp() {
070    return instance.obtainMROutputDirProp();
071  }
072
073  private static class MapreduceV1Shim extends MapreduceTestingShim {
074    @Override
075    public JobContext newJobContext(Configuration jobConf) throws IOException {
076      // Implementing:
077      // return new JobContext(jobConf, new JobID());
078      JobID jobId = new JobID();
079      Constructor<JobContext> c;
080      try {
081        c = JobContext.class.getConstructor(Configuration.class, JobID.class);
082        return c.newInstance(jobConf, jobId);
083      } catch (Exception e) {
084        throw new IllegalStateException(
085          "Failed to instantiate new JobContext(jobConf, new JobID())", e);
086      }
087    }
088
089    @Override
090    public Job newJob(Configuration conf) throws IOException {
091      // Implementing:
092      // return new Job(conf);
093      Constructor<Job> c;
094      try {
095        c = Job.class.getConstructor(Configuration.class);
096        return c.newInstance(conf);
097      } catch (Exception e) {
098        throw new IllegalStateException("Failed to instantiate new Job(conf)", e);
099      }
100    }
101
102    @Override
103    public JobConf obtainJobConf(MiniMRCluster cluster) {
104      if (cluster == null) return null;
105      try {
106        Object runner = cluster.getJobTrackerRunner();
107        Method meth = runner.getClass().getDeclaredMethod("getJobTracker", emptyParam);
108        Object tracker = meth.invoke(runner, new Object[] {});
109        Method m = tracker.getClass().getDeclaredMethod("getConf", emptyParam);
110        return (JobConf) m.invoke(tracker, new Object[] {});
111      } catch (NoSuchMethodException nsme) {
112        return null;
113      } catch (InvocationTargetException ite) {
114        return null;
115      } catch (IllegalAccessException iae) {
116        return null;
117      }
118    }
119
120    @Override
121    public String obtainMROutputDirProp() {
122      return "mapred.output.dir";
123    }
124  };
125
126  private static class MapreduceV2Shim extends MapreduceTestingShim {
127    @Override
128    public JobContext newJobContext(Configuration jobConf) {
129      return newJob(jobConf);
130    }
131
132    @Override
133    public Job newJob(Configuration jobConf) {
134      // Implementing:
135      // return Job.getInstance(jobConf);
136      try {
137        Method m = Job.class.getMethod("getInstance", Configuration.class);
138        return (Job) m.invoke(null, jobConf); // static method, then arg
139      } catch (Exception e) {
140        e.printStackTrace();
141        throw new IllegalStateException("Failed to return from Job.getInstance(jobConf)");
142      }
143    }
144
145    @Override
146    public JobConf obtainJobConf(MiniMRCluster cluster) {
147      try {
148        Method meth = MiniMRCluster.class.getMethod("getJobTrackerConf", emptyParam);
149        return (JobConf) meth.invoke(cluster, new Object[] {});
150      } catch (NoSuchMethodException nsme) {
151        return null;
152      } catch (InvocationTargetException ite) {
153        return null;
154      } catch (IllegalAccessException iae) {
155        return null;
156      }
157    }
158
159    @Override
160    public String obtainMROutputDirProp() {
161      // This is a copy of o.a.h.mapreduce.lib.output.FileOutputFormat.OUTDIR
162      // from Hadoop 0.23.x. If we use the source directly we break the hadoop 1.x compile.
163      return "mapreduce.output.fileoutputformat.outputdir";
164    }
165  };
166
167}