001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import java.io.IOException; 021import java.util.Collection; 022import java.util.Map; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.hbase.HBaseConfiguration; 026import org.apache.hadoop.hbase.TableName; 027import org.apache.hadoop.hbase.client.Connection; 028import org.apache.hadoop.hbase.client.ConnectionFactory; 029import org.apache.hadoop.hbase.client.Put; 030import org.apache.hadoop.hbase.client.RegionLocator; 031import org.apache.hadoop.hbase.client.Scan; 032import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 033import org.apache.hadoop.hbase.mapreduce.MutationSerialization; 034import org.apache.hadoop.hbase.mapreduce.ResultSerialization; 035import org.apache.hadoop.hbase.security.User; 036import org.apache.hadoop.hbase.security.UserProvider; 037import org.apache.hadoop.hbase.security.token.TokenUtil; 038import org.apache.hadoop.hbase.util.RegionSplitter; 039import org.apache.hadoop.mapred.FileInputFormat; 040import org.apache.hadoop.mapred.InputFormat; 041import org.apache.hadoop.mapred.JobConf; 042import org.apache.hadoop.mapred.OutputFormat; 043import org.apache.hadoop.mapred.TextInputFormat; 044import org.apache.hadoop.mapred.TextOutputFormat; 045import org.apache.yetus.audience.InterfaceAudience; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * Utility for {@link TableMap} and {@link TableReduce} 051 */ 052@InterfaceAudience.Public 053@SuppressWarnings({ "rawtypes", "unchecked" }) 054public class TableMapReduceUtil { 055 private static final Logger LOG = LoggerFactory.getLogger(TableMapReduceUtil.class); 056 057 /** 058 * Use this before submitting a TableMap job. It will appropriately set up the JobConf. 059 * @param table The table name to read from. 060 * @param columns The columns to scan. 061 * @param mapper The mapper class to use. 062 * @param outputKeyClass The class of the output key. 063 * @param outputValueClass The class of the output value. 064 * @param job The current job configuration to adjust. 065 */ 066 public static void initTableMapJob(String table, String columns, Class<? extends TableMap> mapper, 067 Class<?> outputKeyClass, Class<?> outputValueClass, JobConf job) { 068 initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job, true, 069 TableInputFormat.class); 070 } 071 072 public static void initTableMapJob(String table, String columns, Class<? extends TableMap> mapper, 073 Class<?> outputKeyClass, Class<?> outputValueClass, JobConf job, boolean addDependencyJars) { 074 initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job, 075 addDependencyJars, TableInputFormat.class); 076 } 077 078 /** 079 * Use this before submitting a TableMap job. It will appropriately set up the JobConf. 080 * @param table The table name to read from. 081 * @param columns The columns to scan. 082 * @param mapper The mapper class to use. 083 * @param outputKeyClass The class of the output key. 084 * @param outputValueClass The class of the output value. 085 * @param job The current job configuration to adjust. 086 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 087 * the distributed cache (tmpjars). 088 */ 089 public static void initTableMapJob(String table, String columns, Class<? extends TableMap> mapper, 090 Class<?> outputKeyClass, Class<?> outputValueClass, JobConf job, boolean addDependencyJars, 091 Class<? extends InputFormat> inputFormat) { 092 093 job.setInputFormat(inputFormat); 094 job.setMapOutputValueClass(outputValueClass); 095 job.setMapOutputKeyClass(outputKeyClass); 096 job.setMapperClass(mapper); 097 job.setStrings("io.serializations", job.get("io.serializations"), 098 MutationSerialization.class.getName(), ResultSerialization.class.getName()); 099 FileInputFormat.addInputPaths(job, table); 100 job.set(TableInputFormat.COLUMN_LIST, columns); 101 if (addDependencyJars) { 102 try { 103 addDependencyJars(job); 104 } catch (IOException e) { 105 LOG.error("IOException encountered while adding dependency jars", e); 106 } 107 } 108 try { 109 initCredentials(job); 110 } catch (IOException ioe) { 111 // just spit out the stack trace? really? 112 LOG.error("IOException encountered while initializing credentials", ioe); 113 } 114 } 115 116 /** 117 * Sets up the job for reading from one or more multiple table snapshots, with one or more scans 118 * per snapshot. It bypasses hbase servers and read directly from snapshot files. 119 * @param snapshotScans map of snapshot name to scans on that snapshot. 120 * @param mapper The mapper class to use. 121 * @param outputKeyClass The class of the output key. 122 * @param outputValueClass The class of the output value. 123 * @param job The current job to adjust. Make sure the passed job is carrying all 124 * necessary HBase configuration. 125 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 126 * the distributed cache (tmpjars). 127 */ 128 public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans, 129 Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 130 JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException { 131 MultiTableSnapshotInputFormat.setInput(job, snapshotScans, tmpRestoreDir); 132 133 job.setInputFormat(MultiTableSnapshotInputFormat.class); 134 if (outputValueClass != null) { 135 job.setMapOutputValueClass(outputValueClass); 136 } 137 if (outputKeyClass != null) { 138 job.setMapOutputKeyClass(outputKeyClass); 139 } 140 job.setMapperClass(mapper); 141 if (addDependencyJars) { 142 addDependencyJars(job); 143 } 144 145 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job); 146 } 147 148 /** 149 * Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly 150 * from snapshot files. 151 * @param snapshotName The name of the snapshot (of a table) to read from. 152 * @param columns The columns to scan. 153 * @param mapper The mapper class to use. 154 * @param outputKeyClass The class of the output key. 155 * @param outputValueClass The class of the output value. 156 * @param job The current job to adjust. Make sure the passed job is carrying all 157 * necessary HBase configuration. 158 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 159 * the distributed cache (tmpjars). 160 * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user 161 * should have write permissions to this directory, and this should not 162 * be a subdirectory of rootdir. After the job is finished, restore 163 * directory can be deleted. 164 * @throws IOException When setting up the details fails. 165 * @see TableSnapshotInputFormat 166 */ 167 public static void initTableSnapshotMapJob(String snapshotName, String columns, 168 Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 169 JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException { 170 TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir); 171 initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job, 172 addDependencyJars, TableSnapshotInputFormat.class); 173 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job); 174 } 175 176 /** 177 * Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly 178 * from snapshot files. 179 * @param snapshotName The name of the snapshot (of a table) to read from. 180 * @param columns The columns to scan. 181 * @param mapper The mapper class to use. 182 * @param outputKeyClass The class of the output key. 183 * @param outputValueClass The class of the output value. 184 * @param jobConf The current job to adjust. Make sure the passed job is carrying all 185 * necessary HBase configuration. 186 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 187 * the distributed cache (tmpjars). 188 * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user 189 * should have write permissions to this directory, and this should not 190 * be a subdirectory of rootdir. After the job is finished, restore 191 * directory can be deleted. 192 * @param splitAlgo algorithm to split 193 * @param numSplitsPerRegion how many input splits to generate per one region 194 * @throws IOException When setting up the details fails. 195 * @see TableSnapshotInputFormat 196 */ 197 public static void initTableSnapshotMapJob(String snapshotName, String columns, 198 Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 199 JobConf jobConf, boolean addDependencyJars, Path tmpRestoreDir, 200 RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion) throws IOException { 201 TableSnapshotInputFormat.setInput(jobConf, snapshotName, tmpRestoreDir, splitAlgo, 202 numSplitsPerRegion); 203 initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, jobConf, 204 addDependencyJars, TableSnapshotInputFormat.class); 205 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(jobConf); 206 } 207 208 /** 209 * Use this before submitting a TableReduce job. It will appropriately set up the JobConf. 210 * @param table The output table. 211 * @param reducer The reducer class to use. 212 * @param job The current job configuration to adjust. 213 * @throws IOException When determining the region count fails. 214 */ 215 public static void initTableReduceJob(String table, Class<? extends TableReduce> reducer, 216 JobConf job) throws IOException { 217 initTableReduceJob(table, reducer, job, null); 218 } 219 220 /** 221 * Use this before submitting a TableReduce job. It will appropriately set up the JobConf. 222 * @param table The output table. 223 * @param reducer The reducer class to use. 224 * @param job The current job configuration to adjust. 225 * @param partitioner Partitioner to use. Pass <code>null</code> to use default partitioner. 226 * @throws IOException When determining the region count fails. 227 */ 228 public static void initTableReduceJob(String table, Class<? extends TableReduce> reducer, 229 JobConf job, Class partitioner) throws IOException { 230 initTableReduceJob(table, reducer, job, partitioner, true); 231 } 232 233 /** 234 * Use this before submitting a TableReduce job. It will appropriately set up the JobConf. 235 * @param table The output table. 236 * @param reducer The reducer class to use. 237 * @param job The current job configuration to adjust. 238 * @param partitioner Partitioner to use. Pass <code>null</code> to use default partitioner. 239 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 240 * the distributed cache (tmpjars). 241 * @throws IOException When determining the region count fails. 242 */ 243 public static void initTableReduceJob(String table, Class<? extends TableReduce> reducer, 244 JobConf job, Class partitioner, boolean addDependencyJars) throws IOException { 245 job.setOutputFormat(TableOutputFormat.class); 246 job.setReducerClass(reducer); 247 job.set(TableOutputFormat.OUTPUT_TABLE, table); 248 job.setOutputKeyClass(ImmutableBytesWritable.class); 249 job.setOutputValueClass(Put.class); 250 job.setStrings("io.serializations", job.get("io.serializations"), 251 MutationSerialization.class.getName(), ResultSerialization.class.getName()); 252 if (partitioner == HRegionPartitioner.class) { 253 job.setPartitionerClass(HRegionPartitioner.class); 254 int regions = getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table)); 255 if (job.getNumReduceTasks() > regions) { 256 job.setNumReduceTasks(regions); 257 } 258 } else if (partitioner != null) { 259 job.setPartitionerClass(partitioner); 260 } 261 if (addDependencyJars) { 262 addDependencyJars(job); 263 } 264 initCredentials(job); 265 } 266 267 public static void initCredentials(JobConf job) throws IOException { 268 UserProvider userProvider = UserProvider.instantiate(job); 269 if (userProvider.isHadoopSecurityEnabled()) { 270 // propagate delegation related props from launcher job to MR job 271 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { 272 job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION")); 273 } 274 } 275 276 if (userProvider.isHBaseSecurityEnabled()) { 277 Connection conn = ConnectionFactory.createConnection(job); 278 try { 279 // login the server principal (if using secure Hadoop) 280 User user = userProvider.getCurrent(); 281 TokenUtil.addTokenForJob(conn, job, user); 282 } catch (InterruptedException ie) { 283 LOG.error("Interrupted obtaining user authentication token", ie); 284 Thread.currentThread().interrupt(); 285 } finally { 286 conn.close(); 287 } 288 } 289 } 290 291 /** 292 * Ensures that the given number of reduce tasks for the given job configuration does not exceed 293 * the number of regions for the given table. 294 * @param table The table to get the region count for. 295 * @param job The current job configuration to adjust. 296 * @throws IOException When retrieving the table details fails. 297 */ 298 // Used by tests. 299 public static void limitNumReduceTasks(String table, JobConf job) throws IOException { 300 int regions = getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table)); 301 if (job.getNumReduceTasks() > regions) { 302 job.setNumReduceTasks(regions); 303 } 304 } 305 306 /** 307 * Ensures that the given number of map tasks for the given job configuration does not exceed the 308 * number of regions for the given table. 309 * @param table The table to get the region count for. 310 * @param job The current job configuration to adjust. 311 * @throws IOException When retrieving the table details fails. 312 */ 313 // Used by tests. 314 public static void limitNumMapTasks(String table, JobConf job) throws IOException { 315 int regions = getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table)); 316 if (job.getNumMapTasks() > regions) { 317 job.setNumMapTasks(regions); 318 } 319 } 320 321 /** 322 * Sets the number of reduce tasks for the given job configuration to the number of regions the 323 * given table has. 324 * @param table The table to get the region count for. 325 * @param job The current job configuration to adjust. 326 * @throws IOException When retrieving the table details fails. 327 */ 328 public static void setNumReduceTasks(String table, JobConf job) throws IOException { 329 job.setNumReduceTasks(getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table))); 330 } 331 332 /** 333 * Sets the number of map tasks for the given job configuration to the number of regions the given 334 * table has. 335 * @param table The table to get the region count for. 336 * @param job The current job configuration to adjust. 337 * @throws IOException When retrieving the table details fails. 338 */ 339 public static void setNumMapTasks(String table, JobConf job) throws IOException { 340 job.setNumMapTasks(getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table))); 341 } 342 343 /** 344 * Sets the number of rows to return and cache with each scanner iteration. Higher caching values 345 * will enable faster mapreduce jobs at the expense of requiring more heap to contain the cached 346 * rows. 347 * @param job The current job configuration to adjust. 348 * @param batchSize The number of rows to return in batch with each scanner iteration. 349 */ 350 public static void setScannerCaching(JobConf job, int batchSize) { 351 job.setInt("hbase.client.scanner.caching", batchSize); 352 } 353 354 /** 355 * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job) 356 */ 357 public static void addDependencyJars(JobConf job) throws IOException { 358 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job); 359 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(job, 360 job.getMapOutputKeyClass(), job.getMapOutputValueClass(), job.getOutputKeyClass(), 361 job.getOutputValueClass(), job.getPartitionerClass(), 362 job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class), 363 job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class), 364 job.getCombinerClass()); 365 } 366 367 private static int getRegionCount(Configuration conf, TableName tableName) throws IOException { 368 try (Connection conn = ConnectionFactory.createConnection(conf); 369 RegionLocator locator = conn.getRegionLocator(tableName)) { 370 return locator.getAllRegionLocations().size(); 371 } 372 } 373}