001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.lang.reflect.Constructor; 022import java.lang.reflect.InvocationTargetException; 023import java.lang.reflect.Method; 024 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.mapred.JobConf; 027import org.apache.hadoop.mapred.MiniMRCluster; 028import org.apache.hadoop.mapreduce.Job; 029import org.apache.hadoop.mapreduce.JobContext; 030import org.apache.hadoop.mapreduce.JobID; 031 032/** 033 * This class provides shims for HBase to interact with the Hadoop 1.0.x and the 034 * Hadoop 0.23.x series. 035 * 036 * NOTE: No testing done against 0.22.x, or 0.21.x. 037 */ 038abstract public class MapreduceTestingShim { 039 private static MapreduceTestingShim instance; 040 private static Class[] emptyParam = new Class[] {}; 041 042 static { 043 try { 044 // This class exists in hadoop 0.22+ but not in Hadoop 20.x/1.x 045 Class c = Class 046 .forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl"); 047 instance = new MapreduceV2Shim(); 048 } catch (Exception e) { 049 instance = new MapreduceV1Shim(); 050 } 051 } 052 053 abstract public JobContext newJobContext(Configuration jobConf) 054 throws IOException; 055 056 abstract public Job newJob(Configuration conf) throws IOException; 057 058 abstract public JobConf obtainJobConf(MiniMRCluster cluster); 059 060 abstract public String obtainMROutputDirProp(); 061 062 public static JobContext createJobContext(Configuration jobConf) 063 throws IOException { 064 return instance.newJobContext(jobConf); 065 } 066 067 public static JobConf getJobConf(MiniMRCluster cluster) { 068 return instance.obtainJobConf(cluster); 069 } 070 071 public static Job createJob(Configuration conf) throws IOException { 072 return instance.newJob(conf); 073 } 074 075 public static String getMROutputDirProp() { 076 return instance.obtainMROutputDirProp(); 077 } 078 079 private static class MapreduceV1Shim extends MapreduceTestingShim { 080 @Override 081 public JobContext newJobContext(Configuration jobConf) throws IOException { 082 // Implementing: 083 // return new JobContext(jobConf, new JobID()); 084 JobID jobId = new JobID(); 085 Constructor<JobContext> c; 086 try { 087 c = JobContext.class.getConstructor(Configuration.class, JobID.class); 088 return c.newInstance(jobConf, jobId); 089 } catch (Exception e) { 090 throw new IllegalStateException( 091 "Failed to instantiate new JobContext(jobConf, new JobID())", e); 092 } 093 } 094 095 @Override 096 public Job newJob(Configuration conf) throws IOException { 097 // Implementing: 098 // return new Job(conf); 099 Constructor<Job> c; 100 try { 101 c = Job.class.getConstructor(Configuration.class); 102 return c.newInstance(conf); 103 } catch (Exception e) { 104 throw new IllegalStateException( 105 "Failed to instantiate new Job(conf)", e); 106 } 107 } 108 109 @Override 110 public JobConf obtainJobConf(MiniMRCluster cluster) { 111 if (cluster == null) return null; 112 try { 113 Object runner = cluster.getJobTrackerRunner(); 114 Method meth = runner.getClass().getDeclaredMethod("getJobTracker", emptyParam); 115 Object tracker = meth.invoke(runner, new Object []{}); 116 Method m = tracker.getClass().getDeclaredMethod("getConf", emptyParam); 117 return (JobConf) m.invoke(tracker, new Object []{}); 118 } catch (NoSuchMethodException nsme) { 119 return null; 120 } catch (InvocationTargetException ite) { 121 return null; 122 } catch (IllegalAccessException iae) { 123 return null; 124 } 125 } 126 127 @Override 128 public String obtainMROutputDirProp() { 129 return "mapred.output.dir"; 130 } 131 }; 132 133 private static class MapreduceV2Shim extends MapreduceTestingShim { 134 @Override 135 public JobContext newJobContext(Configuration jobConf) { 136 return newJob(jobConf); 137 } 138 139 @Override 140 public Job newJob(Configuration jobConf) { 141 // Implementing: 142 // return Job.getInstance(jobConf); 143 try { 144 Method m = Job.class.getMethod("getInstance", Configuration.class); 145 return (Job) m.invoke(null, jobConf); // static method, then arg 146 } catch (Exception e) { 147 e.printStackTrace(); 148 throw new IllegalStateException( 149 "Failed to return from Job.getInstance(jobConf)"); 150 } 151 } 152 153 @Override 154 public JobConf obtainJobConf(MiniMRCluster cluster) { 155 try { 156 Method meth = MiniMRCluster.class.getMethod("getJobTrackerConf", emptyParam); 157 return (JobConf) meth.invoke(cluster, new Object []{}); 158 } catch (NoSuchMethodException nsme) { 159 return null; 160 } catch (InvocationTargetException ite) { 161 return null; 162 } catch (IllegalAccessException iae) { 163 return null; 164 } 165 } 166 167 @Override 168 public String obtainMROutputDirProp() { 169 // This is a copy of o.a.h.mapreduce.lib.output.FileOutputFormat.OUTDIR 170 // from Hadoop 0.23.x. If we use the source directly we break the hadoop 1.x compile. 171 return "mapreduce.output.fileoutputformat.outputdir"; 172 } 173 }; 174 175}