001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.lang.reflect.Constructor;
022import java.lang.reflect.InvocationTargetException;
023import java.lang.reflect.Method;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.mapred.JobConf;
027import org.apache.hadoop.mapred.MiniMRCluster;
028import org.apache.hadoop.mapreduce.Job;
029import org.apache.hadoop.mapreduce.JobContext;
030import org.apache.hadoop.mapreduce.JobID;
031
032/**
033 * This class provides shims for HBase to interact with the Hadoop 1.0.x and the
034 * Hadoop 0.23.x series.
035 * 
036 * NOTE: No testing done against 0.22.x, or 0.21.x.
037 */
038abstract public class MapreduceTestingShim {
039  private static MapreduceTestingShim instance;
040  private static Class[] emptyParam = new Class[] {};
041
042  static {
043    try {
044      // This class exists in hadoop 0.22+ but not in Hadoop 20.x/1.x
045      Class c = Class
046          .forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
047      instance = new MapreduceV2Shim();
048    } catch (Exception e) {
049      instance = new MapreduceV1Shim();
050    }
051  }
052
053  abstract public JobContext newJobContext(Configuration jobConf)
054      throws IOException;
055
056  abstract public Job newJob(Configuration conf) throws IOException;
057  
058  abstract public JobConf obtainJobConf(MiniMRCluster cluster);
059
060  abstract public String obtainMROutputDirProp();
061  
062  public static JobContext createJobContext(Configuration jobConf)
063      throws IOException {
064    return instance.newJobContext(jobConf);
065  }
066  
067  public static JobConf getJobConf(MiniMRCluster cluster) {
068    return instance.obtainJobConf(cluster);
069  }
070
071  public static Job createJob(Configuration conf) throws IOException {
072    return instance.newJob(conf);
073  }
074
075  public static String getMROutputDirProp() {
076    return instance.obtainMROutputDirProp();
077  }
078  
079  private static class MapreduceV1Shim extends MapreduceTestingShim {
080    @Override
081    public JobContext newJobContext(Configuration jobConf) throws IOException {
082      // Implementing:
083      // return new JobContext(jobConf, new JobID());
084      JobID jobId = new JobID();
085      Constructor<JobContext> c;
086      try {
087        c = JobContext.class.getConstructor(Configuration.class, JobID.class);
088        return c.newInstance(jobConf, jobId);
089      } catch (Exception e) {
090        throw new IllegalStateException(
091            "Failed to instantiate new JobContext(jobConf, new JobID())", e);
092      }
093    }
094
095    @Override
096    public Job newJob(Configuration conf) throws IOException {
097      // Implementing:
098      // return new Job(conf);
099      Constructor<Job> c;
100      try {
101        c = Job.class.getConstructor(Configuration.class);
102        return c.newInstance(conf);
103      } catch (Exception e) {
104        throw new IllegalStateException(
105            "Failed to instantiate new Job(conf)", e);
106      }
107    }
108    
109    @Override
110    public JobConf obtainJobConf(MiniMRCluster cluster) {
111      if (cluster == null) return null;
112      try {
113        Object runner = cluster.getJobTrackerRunner();
114        Method meth = runner.getClass().getDeclaredMethod("getJobTracker", emptyParam);
115        Object tracker = meth.invoke(runner, new Object []{});
116        Method m = tracker.getClass().getDeclaredMethod("getConf", emptyParam);
117        return (JobConf) m.invoke(tracker, new Object []{});
118      } catch (NoSuchMethodException nsme) {
119        return null;
120      } catch (InvocationTargetException ite) {
121        return null;
122      } catch (IllegalAccessException iae) {
123        return null;
124      }
125    }
126
127    @Override
128    public String obtainMROutputDirProp() {
129      return "mapred.output.dir";
130    }
131  };
132
133  private static class MapreduceV2Shim extends MapreduceTestingShim {
134    @Override
135    public JobContext newJobContext(Configuration jobConf) {
136      return newJob(jobConf);
137    }
138
139    @Override
140    public Job newJob(Configuration jobConf) {
141      // Implementing:
142      // return Job.getInstance(jobConf);
143      try {
144        Method m = Job.class.getMethod("getInstance", Configuration.class);
145        return (Job) m.invoke(null, jobConf); // static method, then arg
146      } catch (Exception e) {
147        e.printStackTrace();
148        throw new IllegalStateException(
149            "Failed to return from Job.getInstance(jobConf)");
150      }
151    }
152    
153    @Override
154    public JobConf obtainJobConf(MiniMRCluster cluster) {
155      try {
156        Method meth = MiniMRCluster.class.getMethod("getJobTrackerConf", emptyParam);
157        return (JobConf) meth.invoke(cluster, new Object []{});
158      } catch (NoSuchMethodException nsme) {
159        return null;
160      } catch (InvocationTargetException ite) {
161        return null;
162      } catch (IllegalAccessException iae) {
163        return null;
164      }
165    }
166
167    @Override
168    public String obtainMROutputDirProp() {
169      // This is a copy of o.a.h.mapreduce.lib.output.FileOutputFormat.OUTDIR 
170      // from Hadoop 0.23.x.  If we use the source directly we break the hadoop 1.x compile. 
171      return "mapreduce.output.fileoutputformat.outputdir";
172    }
173  };
174
175}